none
Monitoring 10000 sensors(Assets) using StreamInsight Standing Queries

    问题

  • Hi, please look at one of the use case for StreamInsight:

    Monitor 10000 sensors (Assets), probably for every minute (and some times even less than a minit) for any issues with Assets.

    we have configured some limits for each asset, for example Boiler temperature should be between 40 degree C and 90 degree C. And any variation on this we need to raise the alaram immediately.

    Now, I have 10000 such assets (and some cases it can be up to 0.1 Million also).

    I am exploring how to write standing Queries to Monitor these assets/sensors. One of the direct appraoch is: I can write 10000 standing queries (one query for each asset and its limit validation).

    is there any other mechansim to write standing queries or writing 10000 queries is the only option?  if it is10000 means there would be performance issues.. because every time Event is Queued, it has go through these 10000 standing queries.

    Any thoughts or comments on this? I could not find any samples for this kind of uses cases.. all samples have only very few standing queries..


    Venkat
    2011年12月28日 7:31

答案

  • When running this type of example in LINQPad, please ensure that you have a StreamInsight context selected (otherwise you won't have the right namespace and assemblies configured).  This sample breaks down into three stages:

    (1) Create a CepStream<T> using the IObservable<T> interface.  Since a CepStream<T> can be created from either an adapter or an IObservable, this part can happen in isolation.  I don't know which adapters you're using, but provided you have the appropriate transports available the steps to switch out the code are:

    • Create strongly typed classes for the sensor and metadata (as adapters need non-anonymous classes)
    • Create CepStream<T> for both by passing the appropriate variables to an adapter factory

    From this point the mergedStream and alarmStream code is exactly the same.

    (2) Joining the two streams and applying the filter query.  This part doesn't change.

    (3) Binding the query definition to an output; in this case we leverage the IObservable support to bind the output of the alarms query definition to the LINQPad output window.  To switch out the code simply call alarms.ToQuery() with the appropriate output adapter.

    As an aside, I'd highly recommend spending some time with the 101 Query Samples (which leverage the IObservable interfaces) to aid in getting more familiar with the StreamInsight query language and query patterns.

    • 已标记为答案 KJian_ 2012年1月3日 7:42
    2011年12月29日 19:15

全部回复

  • Here's how I generally approach this problem:

    • Create two streams, one for the sensor readings, another for the sensor metadata (thresholds).
    • Use the reference data signal technique (described here) to pull in a slow changing stream from an external source
    • Write a single query that looks for threshold exceptions

    There are very few cases wherein writing a large number of standing queries is the right approach; if you see a problem that seems to point in that direction your best bet is to look for a way to create a generic query that works over a group of sub-streams.

    Here's an example:

    Func<int, DateTime> dt = (i) => new DateTime(2011, 12, 1, 13, i, 0, DateTimeKind.Utc);
    
    var metadata = new []
    {
    	new { Name = "A", Time = dt(0), Low = 40.0, High = 80.0 },
    	new { Name = "B", Time = dt(0), Low = 50.0, High = 70.0 },
    };
    
    var sensor = new []
    {
    	new { Name = "A", Time = dt(1), Value = 50.0 },
    	new { Name = "B", Time = dt(1), Value = 60.0 },
    	new { Name = "A", Time = dt(2), Value = 90.0 },
    	new { Name = "B", Time = dt(2), Value = 60.0 },
    	new { Name = "A", Time = dt(3), Value = 50.0 },
    	new { Name = "B", Time = dt(3), Value = 40.0 },	
    };
    
    var sensorStream = sensor.ToPointStream(Application,
    	e => PointEvent.CreateInsert(e.Time, e),
    	AdvanceTimeSettings.IncreasingStartTime);
    	
    var metadataStream = metadata.ToPointStream(Application,
    	e => PointEvent.CreateInsert(e.Time, e),
    	AdvanceTimeSettings.IncreasingStartTime);
    	
    var metaSignal = metadataStream
    	.AlterEventDuration(e => TimeSpan.MaxValue)
    	.ClipEventDuration(metadataStream, (e1, e2) => e1.Name == e2.Name);
    
    var mergedStream = 
    	from s in sensorStream 
    	join m in metaSignal
    	on s.Name equals m.Name
    	select new
    	{
    		Name = s.Name,
    		Time = s.Time,
    		Value = s.Value,
    		LowThreshold = m.Low,
    		HighThreshold = m.High
    	};
    
    var alarms = 
    	from e in mergedStream 
    	where e.Value <= e.LowThreshold || e.Value >= e.HighThreshold
    	select new
    	{
    		Name = e.Name,
    		Value = e.Value,
    		Message = (e.Value < e.LowThreshold) ? "Value below threshold" : "Value above threshold",
    		Threshold = (e.Value < e.LowThreshold) ? e.LowThreshold : e.HighThreshold
    	};
    
    alarms.Dump("alarms");
    

    2011年12月28日 20:28
  • Hi Mark,

    i am getting error (i ran this sample in the LINQPad) :

    System.Array does not contain definition for ToPointStream. Am I missing some thing?

    Also, I am trying convert your sample to InputAdapter format..

    So I will have 2 input Adpaters? one for Sensor Meta data and other for Senor data? Can you explain me this interms Input and Output adapters?

    As per my knowledge:

    • Create a StreamInsight applicaiton
    • Create CEPStream, by passing InputAdapter Factory
    • Preapre a Query on the Stream
    • Call ToQuery on the Query by passing Output Adapter.

    Can you rearrange the sample you have given in this format? I am new to StreamInsight and going through the samples from MS, and all samples follows this pattern, appreciate if you could rearrange your sample code in the above pattern.

     


    Venkat
    2011年12月29日 7:17
  • When running this type of example in LINQPad, please ensure that you have a StreamInsight context selected (otherwise you won't have the right namespace and assemblies configured).  This sample breaks down into three stages:

    (1) Create a CepStream<T> using the IObservable<T> interface.  Since a CepStream<T> can be created from either an adapter or an IObservable, this part can happen in isolation.  I don't know which adapters you're using, but provided you have the appropriate transports available the steps to switch out the code are:

    • Create strongly typed classes for the sensor and metadata (as adapters need non-anonymous classes)
    • Create CepStream<T> for both by passing the appropriate variables to an adapter factory

    From this point the mergedStream and alarmStream code is exactly the same.

    (2) Joining the two streams and applying the filter query.  This part doesn't change.

    (3) Binding the query definition to an output; in this case we leverage the IObservable support to bind the output of the alarms query definition to the LINQPad output window.  To switch out the code simply call alarms.ToQuery() with the appropriate output adapter.

    As an aside, I'd highly recommend spending some time with the 101 Query Samples (which leverage the IObservable interfaces) to aid in getting more familiar with the StreamInsight query language and query patterns.

    • 已标记为答案 KJian_ 2012年1月3日 7:42
    2011年12月29日 19:15