Fragensteller
Output adapter stops before reading any events from input adapter

Frage
-
I have a simple csv-file input adapter and a csv-file output adapter. The input adapter implements the edge shape and the interval shape. These were working well up until a few days ago. Now what is happening is that my output adapter is entering hte stopping state before it is able to consume any of the events that teh input adapter is successfully enqueueing. Some sannity checks: I am enqueueing CTI events. The problem only occurs for the Edge event shape. If I use the Interval shape, the output adapter is fine. If I use another input adapter (Edge shape) that reads the stream from another source (not csv file), it all works well. The essential pattern of this other input adapter is the same ass used in teh csv adapter. For these reasons, I am assuming I have a problem in my input adapter, but don't know what.
Here is the code for the ProduceEvents method of the csv input adapter:
private void ProduceEvents()
{
try
{
string key = null;while (true)
{
if (AdapterState.Stopping == AdapterState)
{
Stopped();
return;
}EdgeEvent currentEvent = default(EdgeEvent);
// Did we enqueue the previous line successfully?
if (this.currentLine == null)
{
this.currentLine = this.streamReader.ReadLine();if (this.currentLine == null)
{
// Stop adapter (and hence the query) at the end of the file.
Stopped();
return;
}
}try
{
DateTime timestamp = GetTimeStampAndKeyFromLine(currentLine, ref key);// Did we have a previous value for the data source with this ID?
// If so, then we need to send the end edge for a previous start edge.
if (this.dataSourceState.ContainsKey(key))
{
currentEvent = CreateInsertEvent(EdgeType.End);if (null == currentEvent)
{
// We just went out of the running state - throw away the
// current event. We will try to send the end edge with
// the next incoming data.
return;
}// Create the end event for the previous start event of this
// data source. Start time and data are stored, end time is the
// new event's time.
currentEvent.StartTime = this.dataSourceState[key].Timestamp;
currentEvent = PopulatePayload(currentEvent, this.dataSourceState[key].Data);
currentEvent.EndTime = timestamp;// Try to enqueue. If it fails, we will just ignore the current
// event and try to send the proper end edge the next time.
if (EnqueueOperationResult.Full == Enqueue(ref currentEvent))
{
ReleaseEvent(ref currentEvent);
Ready();
return;
}// We don't need this state anymore.
this.dataSourceState.Remove(key);
}// Now enqueue the new data as a start edge.
currentEvent = CreateInsertEvent(EdgeType.Start);if (null == currentEvent)
{
// Throw away the current event. At this point, there is no
// previous event stored for this data source, so we can try
// a new start edge next time.
return;
}currentEvent = PopulatePayload(currentEvent, currentLine);
currentEvent.StartTime = timestamp;if (EnqueueOperationResult.Full == Enqueue(ref currentEvent))
{
ReleaseEvent(ref currentEvent);
Ready();// Throw away the current event. At this point, there is no
// previous event stored for this data source, so we can try
// a new start edge next time.
return;
}// If we arrived here, we did enqueue a start edge. Now we need
// to remember it for the next round, when the corresponding end
// edge will be enqueued.
this.dataSourceState.Add(key, new DeviceState(currentLine, timestamp));
// set up to read a new line again.
this.currentLine = null;
EnqueueCtiEvent(timestamp + TimeSpan.FromTicks(-1));}
catch (Exception e)
{
// The line couldn't be transformed into an event.
// Just ignore it, and release the event's memory.
if (currentEvent != null)
{
ReleaseEvent(ref currentEvent);
}this.consoleTracer.WriteLine(this.currentLine + " could not be read into a CEP event: " + e.Message);
// Make sure we read a new line next time.
this.currentLine = null;continue;
}}
}
catch (AdapterException e)
{
Console.WriteLine("ProduceEvents - " + e.Message + e.StackTrace);
}
}
private EdgeEvent PopulatePayload(EdgeEvent evt, string line)
{
// use a split that does NOT remove empty entries, so that the payload load indexing
// remains the same as for interval events.
string[] split = line.Split(new char[] { delimiter }, StringSplitOptions.None);// populate the payload fields
for (int ordinal = 0; ordinal < bindtimeEventType.FieldsByOrdinal.Count; ordinal++)
{
try
{
int cepOrdinal = inputOrdinalToCepOrdinal[ordinal];
CepEventTypeField evtField = bindtimeEventType.FieldsByOrdinal[cepOrdinal];object value = Convert.ChangeType(split[ordinal + NumNonPayloadFields] == "" ? "0" : split[ordinal + NumNonPayloadFields],
evtField.Type.ClrType, cultureInfo);
evt.SetField(cepOrdinal, value);
}
catch (AdapterException e)
{
Console.WriteLine(e.Message + e.StackTrace);
}
}
return evt;
}