none
Detecting the absence of Events

    问题

  • Hello,

    I tried to implement this feature using two differents way but it didn't work, the first one was with the left anti semi join method of the Hitchhiker's Guide and the second one was by using the join method explained into the Microsoft CEP Overview.

    I want to detect an absence of event. So i used the datagenerator adapter from the streamInsight sample to generate periodically a reference event. Then i created the following linq request (left anti semi join method)

    //var referenceStream = stream as CepStream<M3BusinessDataInput> generated by the datagenerator adapter
    
    var ats = new AdvanceTimeSettings(null,
     new AdvanceTimeImportSettings("ReferenceStream"),
     AdvanceTimePolicy.Adjust);
    
     var observedStream = CepStream<M3BusinessDataInput>.Create("ObservedStream",
     typeof(PublishedStreamAdapterFactory),
     new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps") },
     EventShape.Interval,
     ats);
    
     var filteredStream = from left in referenceStream
     where (from right in observedStream select right).IsEmpty()
     select left;

    I have the felling that something is wrong because i have a quick (reference stream) and a slow stream (observed stream). Actually, when i receive an event from the observed stream, all the previous events from the reference stream pass throught the filter query. But i want to get the events from the reference stream if i don't have any event in the observed stream, not once i get an event from the observed stream...

    Do you have an example corresponding to this feature?

    Thank you for your help,







    2012年3月16日 9:38

答案

  • Here you go (again).

    If this helps, mark this one as "answer" to help others looking for the same answer later.

    void Main()
    {
    	Func<int, DateTimeOffset> t = 
    	(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s);
    	var values = new []
    	{
    		//Good data
    		new {Item="Variable1", Value=92, Timestamp=0},
    		new {Item="Variable2", Value=60, Timestamp=0},
    		new {Item="Variable1", Value=93, Timestamp=2},
    		//Variable 1 is offline 
    		new {Item="Variable2", Value=75, Timestamp=2},
    		new {Item="Variable2", Value=81, Timestamp=3},
    		new {Item="Variable2", Value=82, Timestamp=5},
    		new {Item="Variable2", Value=82, Timestamp=8},
    		new {Item="Variable2", Value=82, Timestamp=12},
    		//Variable 1 is online.
    		new {Item="Variable1", Value=92, Timestamp=15},
    		new {Item="Variable2", Value=60, Timestamp=15},
    		new {Item="Variable1", Value=92, Timestamp=18},
    		new {Item="Variable2", Value=60, Timestamp=18},
    		new {Item="Variable1", Value=92, Timestamp=20},
    		new {Item="Variable2", Value=60, Timestamp=20}
    	};
    	
    	var reference = new[]
    	{
    		new {Item="Variable1"}, 
    		new {Item="Variable2"}
    	};
    	
    	var dataStream = values.ToPointStream(Application, 
    		e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime);
    	//Create interval stream that overlaps all values. 
    	//Would typical use ToSignal() but this is simpler. 
    	//Real-world, this would be a slow-moving reference stream
    	//And use ToSignal() 
    	var referenceStream = reference.ToIntervalStream(Application, 
    		e=> IntervalEvent.CreateInsert( t(0), t(20), e), AdvanceTimeSettings.IncreasingStartTime);
    	//Left Anti Semi-Join detects events in the reference stream not in the real stream. 
    	//Alter event duration on the data stream provides a "timeout" 
      	//How long between values before we say it's "offline".
    
    	var offline = from r in  referenceStream
    					where (from d in dataStream
    						.AlterEventDuration(e=>TimeSpan.FromMinutes(5))
    						where d.Item == r.Item
    						select d).IsEmpty()
    					select r; 
    						
    	offline.ToEdgeEnumerable().Dump("Offline"); 
    	
    }
    
    


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月16日 15:11
  • I am not importing CTIs, your are right. But i guess you are meaning that i need to import CTIs from the reference stream into the dataStream, isn't it?

    No. Import the CTIs from the data stream into the reference stream.

    Your reference stream is going to be slow-moving and updating (relatively) infrequently. StreamInsight will then synchronize the two streams using the slowest stream ... so your data stream will only move as fast as your reference stream. You don't want that. That would be Bad. So you import the CTIs from the data stream into the reference stream. Then both streams are moving at the same speed and, when you Alter/Clip, you'll have the reference data always available.

    See http://blogs.msdn.com/b/appfabriccat/archive/2010/10/05/streaminsight-synchronizing-slow-moving-reference-streams-with-fast-moving-data-streams-time-import.aspx for some more details.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月19日 18:30

全部回复

  • Here you go (again).

    If this helps, mark this one as "answer" to help others looking for the same answer later.

    void Main()
    {
    	Func<int, DateTimeOffset> t = 
    	(s) => new DateTimeOffset(2011, 1, 11, 8, 0, 0, TimeSpan.Zero).AddMinutes(s);
    	var values = new []
    	{
    		//Good data
    		new {Item="Variable1", Value=92, Timestamp=0},
    		new {Item="Variable2", Value=60, Timestamp=0},
    		new {Item="Variable1", Value=93, Timestamp=2},
    		//Variable 1 is offline 
    		new {Item="Variable2", Value=75, Timestamp=2},
    		new {Item="Variable2", Value=81, Timestamp=3},
    		new {Item="Variable2", Value=82, Timestamp=5},
    		new {Item="Variable2", Value=82, Timestamp=8},
    		new {Item="Variable2", Value=82, Timestamp=12},
    		//Variable 1 is online.
    		new {Item="Variable1", Value=92, Timestamp=15},
    		new {Item="Variable2", Value=60, Timestamp=15},
    		new {Item="Variable1", Value=92, Timestamp=18},
    		new {Item="Variable2", Value=60, Timestamp=18},
    		new {Item="Variable1", Value=92, Timestamp=20},
    		new {Item="Variable2", Value=60, Timestamp=20}
    	};
    	
    	var reference = new[]
    	{
    		new {Item="Variable1"}, 
    		new {Item="Variable2"}
    	};
    	
    	var dataStream = values.ToPointStream(Application, 
    		e=> PointEvent.CreateInsert(t(e.Timestamp), e), AdvanceTimeSettings.IncreasingStartTime);
    	//Create interval stream that overlaps all values. 
    	//Would typical use ToSignal() but this is simpler. 
    	//Real-world, this would be a slow-moving reference stream
    	//And use ToSignal() 
    	var referenceStream = reference.ToIntervalStream(Application, 
    		e=> IntervalEvent.CreateInsert( t(0), t(20), e), AdvanceTimeSettings.IncreasingStartTime);
    	//Left Anti Semi-Join detects events in the reference stream not in the real stream. 
    	//Alter event duration on the data stream provides a "timeout" 
      	//How long between values before we say it's "offline".
    
    	var offline = from r in  referenceStream
    					where (from d in dataStream
    						.AlterEventDuration(e=>TimeSpan.FromMinutes(5))
    						where d.Item == r.Item
    						select d).IsEmpty()
    					select r; 
    						
    	offline.ToEdgeEnumerable().Dump("Offline"); 
    	
    }
    
    


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月16日 15:11
  • It doesn't work and i can't see why :/

    I understand how it should work but my code returns something different:

    // here is my generator data congi, it generates interval events each 20s:
                GeneratorConfig iConfigDataGen = new GeneratorConfig
                {
                    CtiFrequency = 1,
                    EventInterval = 20000,
                    EventIntervalVariance = 0,
                };
    
    
    
    //var referenceStream = stream as CepStream<M3BusinessDataInput> generated by the datagenerator adapter
    
     var observedStream = CepStream<M3BusinessDataInput>.Create("ObservedStream",
     typeof(PublishedStreamAdapterFactory),
     new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/SqlApplication/PublishedStream/ps") },
     EventShape.Point,
     AdvanceTimeSettings.IncreasingStartTime);
    
     var filteredStream = from left in referenceStream
                                          where (from right in observedStream.AlterEventDuration(e => TimeSpan.FromSeconds(5))
                                                 select right).IsEmpty()
                                          select left;

    So i don't see a lot of differences with what i previously did (except the alterEventDuration)

    With the streaminight debugger:

    debugger screenshot

    The result stream gets values once i have an event into the datastream. And the values are the ones that are in the empty period of data:

    detection graph

    But why only once i get an event into the data stream? The result stream should have data from the reference stream all along the execution and filter it when the data stream is not empty, isn'it?

    I am really confused :(




    2012年3月19日 10:45
  • Are you importing the CTIs from the data stream into the reference stream? This doesn't work very well in LinqPad but that's what you'll need to do in a real-world scenario ... otherwise StreamInsight will sync to the slowest (reference) stream.

    Like the following:

    var referenceATS = new AdvanceTimeSettings(
    	null, 
    	new AdvanceTimeImportSettings("DataStream"), 
    	AdvanceTimePolicy.Adjust);


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月19日 11:43
  • I am not importing CTIs, your are right. But i guess you are meaning that i need to import CTIs from the reference stream into the dataStream, isn't it?
    2012年3月19日 17:25
  • I am not importing CTIs, your are right. But i guess you are meaning that i need to import CTIs from the reference stream into the dataStream, isn't it?

    No. Import the CTIs from the data stream into the reference stream.

    Your reference stream is going to be slow-moving and updating (relatively) infrequently. StreamInsight will then synchronize the two streams using the slowest stream ... so your data stream will only move as fast as your reference stream. You don't want that. That would be Bad. So you import the CTIs from the data stream into the reference stream. Then both streams are moving at the same speed and, when you Alter/Clip, you'll have the reference data always available.

    See http://blogs.msdn.com/b/appfabriccat/archive/2010/10/05/streaminsight-synchronizing-slow-moving-reference-streams-with-fast-moving-data-streams-time-import.aspx for some more details.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月19日 18:30
  • But if i import the CTIs from the data stream into the reference stream, the result is based on the data stream's CTIs, so i have to wait until i have a new data into the data stream to get results (actually it is the behaviour i had previously and the behaviour streaminight gives throught the flow debugger if i do that). And what i want is to generate in real-time an event each time and as soon as there is a gap in the data stream. This is why i have a data generator. So if the left anti semi join is empty, based on the CTI's reference stream, the event generated pass through the filter, so i get an event in the result stream. Or maybe i am wrong and this is not the right use case or something is still wrong with my code...

    Also i tried to execute your sample code to test it and the following line give me an error (unknown method 'Dump'):

    offline.ToEdgeEnumerable().Dump("Offline");

    Is there a specific assembly to load?






    2012年3月20日 8:49
  • That's from running the queries in LinqPad ... it's a LinqPad extensions. It only works in LinqPad.

    If you don't have LinqPad installed, get it and install the StreamInsight extensions. It is absolutely essential for modeling StreamInsight queries.


    DevBiker (aka J Sawyer)
    My Blog
    My Bike - Concours 14


    If I answered your question, please mark as answer.
    If my post was helpful, please mark as helpful.

    2012年3月20日 12:41
  • I tried to execute it in a sample c# project. I haven't used linqpad a lot but it seems to be helpful, I will give it a try.

    Thank you

    2012年3月20日 16:20