none
Custom Disassembler Pipeline Component (Split/Debatch)

    Question

  • My Code: http://pastebin.com/xwgVUpbR

    I have searched and read the other posts; but having problem that many others seem to have (and many of them said it took 2 day or more to debug). My code is a "Frankenstein" combination of several other examples I have found. 

    I'm splitting a document and putting in the queue collection.  But when I get to the GetNext and dequeue, that document doesn't seem to have the body in it.  Also, I did not create a SendPort on purpose, I want the message to bomb, then I got to BizTalk Admin consoel, and see that it split correctly, because I get five suspends, one for the original, and one for each of the split docs.  But when I open the message in BTAdmin, it has no body. I have a custom trace code that writes to a SQL trace table, and that verifies that I split into four messages, and I see the GetNext loop being performed.  For purposes of this question, I don't think the message in or how/why I'm splitting is relevant, that code is working and verified by my trace.

    Specific questions:

    1) Do I need to include the XMLDisassembler after my Disassembler in the pipeline? I've tried with and without it.
    2) I should only need to reposition a queue to zero when I read it or use it, correct?
    3) Do I need to set the MessageType and SchemaStrongName, or can I just set the output context to the input context? My output will be in the same schema as my input schema; I'm just changing the child node and spinning off x messages out for 1 in.

    Thanks in advance for taking the time to look at the code I posted in the link above.

    Neal Walters
    http://NealWalters.com




    Thursday, June 14, 2012 1:20 PM

Answers

  • My point is you cannot let it bomb.  You must have a subscriber - if you want it to stop in BizTalk Admin then just ensure the subscriber is stopped but enlisted.

    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    • Marked as answer by Neal Walters Thursday, June 14, 2012 9:54 PM
    Thursday, June 14, 2012 3:58 PM

All replies

  • Try rewinding your new streams.  You rewind the original, but you'll also need to rewind anything you create.

    outStream.Seek(0, SeekOrigin.Begin);


    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    Thursday, June 14, 2012 1:25 PM
  • Alastair,

      Isn't that what this code is already doing?

                            outMsg.BodyPart.GetOriginalDataStream().Position = 0;
                            outMsg.BodyPart.GetOriginalDataStream().Seek(0, SeekOrigin.Begin);

    Thanks,

    Neal


    Thursday, June 14, 2012 1:33 PM
  • I think issue is in GetNext.

    I got the 1102 trace working this morning (was too burned-out last night to do it): 

                    trcCode = "1102";
                    string xmlDebug = "<Wrapper><![CDATA[" + textFromBodyPart + "]]></Wrapper>";
                    Shared.Utility.Log.WriteEntryConditional(
                       System.Diagnostics.TraceLevel.Info,
                       trcSource, trcCorrId, trcCode, strTraceMessage,
                       xmlDebug, trcCorrId);

    The trace shows this in the XML (The wrapper is just because it's an XML column in my SQL db): 

    <Wrapper>&lt;CoreFlightData&gt;
      &lt;Event code="1111" gmtTs="2012-06-13T09:39:14Z" evntSeqNbr=""&gt;
        &lt;FlightLeg crrCd="DL" fltNbr="239" fltOrigDt="2012-06-15" dprtStnCd="AMS" arrStnCd="ATL" tkoffNbr="1" bestArrGmtTs="2012-06-15T18:45:00Z" bestDprtGmtTs="2012-06-15T09:05:00Z" schdLofSeqNbr="1" actlLofSeqNbr="1" fltTypCd="N" fltLegTypCd="O" dprtStCd="1" arrStCd="1" intlInd="true" dprtGmtOffst="PT120M" arrGmtOffst="-PT240M" schdDprtGmtTs="2012-06-15T09:05:00Z" schdDprtDt="2012-06-15" schdArrGmtTs="2012-06-15T18:45:00Z" fltLegRtrndInd="false" fltLegAddInd="false" fltLegCncldInd="false" fltLegReinstdInd="false" fltLegStbdInd="false" fltLegNoopInd="false" fltLegDvrtdInd="false" fltLegStrbrstInd="false" lstRtrndLegRmvdInd="false" famSttInd="false" intlBagSecFltInd="false" dlAcTypCd="333" schdLofTxt="AMS  01ATL  " eqpOwnrCrrCd="DL" pubDprtGate="E7" pubArrGate="F02" lstUpdGmtTs="2012-06-12T07:31:19Z"&gt;
          &lt;FlightLegAsscn asscdCrrCd="KL" asscdFltNbr="6039" asscdFltTypCd="P" asscdEqpCrrCd="DL" asscdEqpFltNbr="239" asscdSchdLofSeqNbr="1" lstUpdGmtTs="2012-05-16T06:58:26Z" /&gt;
        &lt;/FlightLeg&gt;
      &lt;/Event&gt;
    &lt;/CoreFlightData&gt;</Wrapper>

    In another post, someone warned that not all streams are seekable, so I did a check:

                    strTraceMessage = "GetNext qOutputMsgs.Count = " + qOutputMsgs.Count +
                                      " CanSeek=" + msg.BodyPart.GetOriginalDataStream().CanSeek;
    which showed it is Seekable:
    GetNext qOutputMsgs.Count = 3 CanSeek=True

    Thursday, June 14, 2012 1:36 PM
  • GetOriginalDataStream() gets an unedited stream - I'm not entirely sure how this works on a newly created message but I would be wary of it.

    When I get elusive problems I tend to break down the code a bit more.

    Use IBaseMessagePart and allocate your stream to the .Data property.  Make sure you rewind this property.  Then add this part using the AddPart method, instead of writing it inline.

    Your GetNext() code looks fine, it's not doing anything complicated.


    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    Thursday, June 14, 2012 1:50 PM
  • Alastair,

    I had that code at one time, you can see it commented out where I did the Seek0 on the outstream.  But I'll put that back in, take out the other, and give it a whirl.

    Thanks,
    Neal


    Thursday, June 14, 2012 1:58 PM
  • Alastair,

    Here's the variation I just tried: http://pastebin.com/mRHFzpb0

    I did the seek on the original outStream before putting in the outMsg.BodyPart.Data.
    I removed all debug "access" to the stream from GetNext to make sure the seek position didn't change.
    Same result.

    Does it matter if "Body", first letter is cap or not in this code:

                            outMsg.AddPart("Body",
                                           pContext.GetMessageFactory().CreateMessagePart(),
                                           true);

    Some examples use pInMsg.BodyPartyName which has a lower case "body", instead of "Body".

    Thanks,
    Neal

    Thursday, June 14, 2012 2:05 PM
  • I just thought I would check the position in my GetNext/debug:

                    trcCode = "1101";
                    strTraceMessage = "GetNext qOutputMsgs.Count = " + qOutputMsgs.Count +
                                      " Position=" + msg.BodyPart.GetOriginalDataStream().Position;

    shows in trace:

    GetNext qOutputMsgs.Count = 3 Position=0

    So is this really a seek/position problem?  Or something else?

    Did my body/part really get created in the right place?

    Thanks,
    Neal

    P.S.  This is what I'm seeing in BTAdmin.  What I want to see is the "body" there: 

    http://i50.tinypic.com/2115tab.png

    MessageContent seems pretty good, but doesn't have MessageType nor SchemaStrongName.

    http://tinypic.com/r/2qjvas6/6





    Thursday, June 14, 2012 2:09 PM
  • Also, regarding the GUID at the top:

    [System.Runtime.InteropServices.Guid("6118B8F0-8684-4ba2-87B4-8336D70BD4F7")]

    is this a generated GUID, or a specific GUID for a xml pipeline disassembler?

    Neal

    Thursday, June 14, 2012 2:29 PM
  • Ah, I think you also need to Flush() your stream.  Try this code, I've split it up a bit to avoid any confusion about references and such:

    outStream.Flush();
    outStream.Position = 0;
    outStream.Seek(0, SeekOrigin.Begin);
    
    IBaseMessage outMsg = pContext.GetMessageFactory().CreateMessage();
    IBaseMessagePart partNew = pContext.GetMessageFactory().CreateMessagePart();
    partNew.Data = outStream;
    outMsg.AddPart("Body", partNew, true);

    You'll have to fix any typos!

    In my experience the part name seems utterly irrelevant.


    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    Thursday, June 14, 2012 2:31 PM
  • Same result.  I also tried full redeploy (not just GAC and copy pipeline component), and no help there either?

    I think your code is just another way of doing the same thing.  Code here: http://pastebin.com/dEh4vBfh
    I still say that if I can see the message in the GetNext, then the problem is between GetNext and Biztalk somehow.

    Let's think outside the box:

    1) What about the GUID?
    2) Should I have the XML Disassembler after my custom disassembler?
    3) Anything else to change on the pipeline itself
    4) I tried Recoverable Interchange Processing the XML Disassembler, no change.

    Thanks,
    Neal


    Thursday, June 14, 2012 2:47 PM
  • I've just taken a look at your screen shots - these are routing failures.  You must have a subscriber you can't have messages just go into the message box, this is an error.

    For the purposes of debugging, set yourself a Send Port with a Filter that matches BTS.ReceivePortName to the name of your Receive Port.  This will ensure there is a subscription whether your message has been typed or not.  You do not have to start the port if you don't want messages going out, but it must be enlisted.

    It's quite likely that your code is working fine but you aren't seeing the output due to no subscribers.


    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    Thursday, June 14, 2012 3:20 PM
  • Alastair,

      See my original post, I'm letting it bomb with no subscribers so I can view the message in BTAdmin.  But there is no message there.  My issue or complaint is that I should see the Message Body.

      When I click "Message Parts" I should see name=Body, size=some bytes, and Part ID, and that's not there.
    Then I should see "Body" under Message Parts, and be able to see the xml message that was created.

    Neal

    Thursday, June 14, 2012 3:49 PM
  • My point is you cannot let it bomb.  You must have a subscriber - if you want it to stop in BizTalk Admin then just ensure the subscriber is stopped but enlisted.

    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    • Marked as answer by Neal Walters Thursday, June 14, 2012 9:54 PM
    Thursday, June 14, 2012 3:58 PM
  • Alastair,

         I see what you mean.  I had a SendPort, but rather than having it enabled, I assumed that I could view the message-context and message-body in the BTAdmin console.

        When I have a correct subscription, and have that SendPort in the stopped state, I can see the Body.  But if I put the SendPort in the unenlisted state, the Body doesn't show.  I don't understand why that would be, but it seems to be true. Any ideas why?

        My theory was that until I could see the Body in the BTAdmin, there no point in doing a SendPort subscription.  Once I saw a Body, I would add the SendPort and continue with development. 

    Thanks,
    Neal

    Thursday, June 14, 2012 4:10 PM
  • Attach your debugger to the BizTalk Host Instance and set a breakpoint in your GetNext() method on the following line:

    IBaseMessage msg = (IBaseMessage)qOutputMsgs.Dequeue();

    Step to the next instruction.

    From the immediate window enter the following:

    string test = System.Text.Encoding.ASCII.GetString(((MemoryStream)msg.BodyPart.Data).GetBuffer());

    Verify that the contents of test is what you expect.


    David Downing... If this answers your question, please Mark as the Answer. If this post is helpful, please vote as helpful.

    Thursday, June 14, 2012 4:11 PM
  • David,

       I already did essentially the same with my trace statements. My whole question is now why I don't see a Body in the BTAdmin Console.
    I was thinking that there was still a communication issue between the IBaseMessage and the way Biztalk delievers or receives the message. (Not sure if you saw last post above, might have been concurrent with your post).

    Neal

    Thursday, June 14, 2012 4:24 PM
  • Can you attach the code for the following method?

    Shared.Utility.Log.WriteEntryConditional(
  •                    System.Diagnostics.TraceLevel.Info,
  •                    trcSource, trcCorrId, trcCode, strTraceMessage,
  •                    null, trcCorrId);

  • David Downing... If this answers your question, please Mark as the Answer. If this post is helpful, please vote as helpful.

Thursday, June 14, 2012 4:58 PM
  • Can you attach the implementation for: Shared.Utility.Log.WriteEntryConditional(...)?


    David Downing... If this answers your question, please Mark as the Answer. If this post is helpful, please vote as helpful.

    Thursday, June 14, 2012 5:42 PM
  • David,

       No, cannot attach it. It's just a simple trace routine that writes to SQL table, similar to Log4Net, but custom.

    Neal

    Thursday, June 14, 2012 8:09 PM
  • Neal,

    I took your code and stripped everything and brute forced the rest and was able to get things working with my XML structure.  The primary thing I changed was to queue the stream rather than the constructed IBaseMessage.  I don't know if this is related or not, but the IBaseMessage involves COM interop.  Another thing I noticed is that if you don't have a valid subscription for the messages, the body will not be included in the suspended disassembled messages and they are not resumable.  The original message will be suspended and is resumable.

    NOTE: the construction of the IBaseMessage in the GetNext() may or may not be related to your issue, so I would first try creating a valid subscription  (BTS.ReceivePortName == "your receive port") and see if that works.

    My prototype code is included below just in case the valid subscription doesn't work:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.IO;
    using System.Xml;
    using Microsoft.BizTalk.Message.Interop;
    using Microsoft.BizTalk.Component.Interop;
    
    namespace Delta.Import.PipelineComponents
    {
    
    
        [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
        [ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
        [System.Runtime.InteropServices.Guid("6118B8F0-8684-4ba2-87B4-8336D70BD4F7")]
        public class FPES2Custom3DayDisassembler :
                        IBaseComponent,
                        IComponentUI,
                        IDisassemblerComponent,
                        IProbeMessage,
                        IPersistPropertyBag
        {
    
            #region Private Members
            System.Collections.Queue qOutputMsgs = new System.Collections.Queue();
            IBaseMessageContext inMsgContext;
            #endregion
    
            #region PropertyBagVariables
            public string _rootNodeName;
            public string _debugLevel;
            #endregion
    
            #region PropertyBagProperties
            public string DefaultTargetNamespace
            {
                get { return _rootNodeName; }
                set { _rootNodeName = value; }
            }
    
            /// <summary>
            /// Debug Parm to allow BizTalk Admin to turn it on/off from BizTalk Admin Console 
            /// </summary>	
            public string DebugLevel
            {
                get { return _debugLevel; }
                set
                {
                    // Setup to allow for None, Informational, Warning, Error, Verbose in the future. 
                    if (value == "OFF" || value == "NONE")
                    {
                        DebugLevel = "";
                    }
                    else
                    {
                        DebugLevel = value.ToUpper();
                    }
                }
            }
            #endregion
    
            #region IBaseComponent Members
    
            public string Description
            {
                get { return "FPES2 - Custom 3Day Disassembler"; }
            }
    
            public string Name
            {
                get { return "FPES2Custom3DayDisassembler"; }
            }
    
            public string Version
            {
                get { return "1.0.0.0"; }
            }
    
            #endregion
    
    
            #region IPersistPropertyBag Members
    
            public void GetClassID(out Guid classID)
            {
                classID = new Guid("6118B8F0-8684-4ba2-87B4-8336D70BD4F7");
            }
    
            public void InitNew()
            {
            }
    
            public void Load(IPropertyBag propertyBag, int errorLog)
            {
            }
    
            public void Save(IPropertyBag propertyBag, bool clearDirty,
                             bool saveAllProperties)
            {
            }
    
            #endregion
    
            #region IComponentUI Members
    
            public IntPtr Icon
            {
                get { return System.IntPtr.Zero; }
            }
    
            public System.Collections.IEnumerator Validate(object projectSystem)
            {
                return null;
            }
    
            #endregion
    
            #region IDisassemblerComponent Members
            public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
            {
                IBaseMessagePart msgPart = pInMsg.BodyPart;
                inMsgContext = pInMsg.Context;
    
                Stream originalStream = pInMsg.BodyPart.GetOriginalDataStream();
    
                try
                {
                    if (msgPart != null)
                    {
                        if (originalStream != null)
                        {
                            string stream = string.Empty;
    
                            XmlDocument xmlDoc = new XmlDocument();
                            xmlDoc.Load(pInMsg.BodyPart.Data);
    
    
                            string strXPath = "/CoreFlightData/FlightLeg";
                            XmlNodeList xmlNodeList = xmlDoc.SelectNodes(strXPath);
                            if (xmlNodeList.Count == 0)
                            {
                                // Removed
                            }
                            int loopCounter = 0;
                            foreach (XmlNode xmlNode in xmlNodeList)
                            {
                                loopCounter++;
                                string newMessageText =
                                         "<" + xmlDoc.DocumentElement.Name + ">" +
                                         "<Event code='1111' gmtTs='2012-06-13T09:39:14Z' evntSeqNbr=''>" +
                                         xmlNode.OuterXml +
                                         "</Event>" +
                                         "</" + xmlDoc.DocumentElement.Name + ">";
    
    
                                XmlDocument xmlDocOut = new XmlDocument();
                                xmlDocOut.LoadXml(newMessageText);
    
                                MemoryStream outStream = new MemoryStream();
                                xmlDocOut.Save(outStream);
                                outStream.Seek(0, SeekOrigin.Begin);
    
                                qOutputMsgs.Enqueue(outStream);
    
                                // This will ensure the stream will not be disposed of prematurely
                                pContext.ResourceTracker.AddResource(outStream);
                            }
    
    
                        }
                    }
                }
                catch
                {
                    throw;
                }
            }
    
            public IBaseMessage GetNext(IPipelineContext pContext)
            {
                try
                {
                    if (qOutputMsgs.Count > 0)
                    {
                        IBaseMessage outMsg = pContext.GetMessageFactory().CreateMessage();
                        IBaseMessagePart partNew = pContext.GetMessageFactory().CreateMessagePart();
                        outMsg.AddPart("Body", partNew, true);
                        outMsg.BodyPart.Data = (Stream)qOutputMsgs.Dequeue();
    
                        // copy over all context from incoming message, then change only what is needed 
                        outMsg.Context = inMsgContext;
                        outMsg.Context.Promote("MessageType",
                          "http://schemas.microsoft.com/BizTalk/2003/system-properties",
                          "http://qt.bts.external.delta.schemas/FPES2#CoreFlightData");
    
                        return outMsg;
                    }
                    else
                    {
                        return null;
                    }
                }
                catch
                {
                    throw;
                }
            }
            #endregion
    
            public bool Probe(IPipelineContext pContext, IBaseMessage pInMsg)
            {
                return true;
            }
        } // end class 
    } // end namespace


    David Downing... If this answers your question, please Mark as the Answer. If this post is helpful, please vote as helpful.

    Thursday, June 14, 2012 8:48 PM
  • Just for my own curiosity, I recoded it to queue the IBaseMessage and it also works when a valid subscription is configured.


    David Downing... If this answers your question, please Mark as the Answer. If this post is helpful, please vote as helpful.

    Thursday, June 14, 2012 9:08 PM
  • David,

        Thanks for the info, but you we seemed to have resolved the exact same thing this a few hours ago. My confusion was that I thought the body was always in the message, whether it was resumable or not. I was trying to use the "body" in the BTSAdmin as my verification that it was working, when all along, all I needed to do was to enable my SendPort.  My theory was to get it working, inspect the data, then later turn on the SendPort and write it to disk.

    Neal

    Thursday, June 14, 2012 9:54 PM
  • I marked a previous post as an answer, but no one in fact answered any of my original three questions:

    Specific questions:

    1) Do I need to include the XMLDisassembler after my Disassembler in the pipeline? I've tried with and without it.
    2) I should only need to reposition a queue to zero when I read it or use it, correct?
    3) Do I need to set the MessageType and SchemaStrongName, or can I just set the output context to the input context? My output will be in the same schema as my input schema; I'm just changing the child node and spinning off x messages out for 1 in.

    Neal

    Thursday, June 14, 2012 9:59 PM
  • Answers:

    1. The Disassemble stage is a first match stage (unless you brute force it to execute all).  If you want the XML Disassembler to do property promotion, you can extend it by using it as your base class.  I don't remember the pipeline behaving this way (not including the body of suspended disassembled messages), but I developed the flat file disassembler almost 10 years ago, so the newer versions may have changed.  Perhaps some additional properties were promoted in the XMLDisassembler, again it was too long ago for me to remember.
    2. Any stream pointer you present to the downstream components needs to be positioned at the exact place in the stream where the message begins.  Downstream components have no idea where upstream components messages begin.  If the original input message stream is completely consumed and is never presented as an output message in the GetNext(...) method, the position doesn't matter.  It may come into play for tracking... I can't remember.
    3. MessageType is extremely important in BizTalk because it is how all messages are identified and matched to schemas, orchestration messages, ... that is with the exception of when the SchemaStrongName is specified (I believe this is only used in Disassembler and Assembler components).

    One comment on your implementation; if you extend the XMLDisassembler, it can disassembler your message for you and it will also keep the memory footprint at a minimum because the streaming is not done in memory.  The key part is to call the base.Disassemble(...) from your disassembler Disassemble() method, and call the base.GetNext(...) from your GetNext(...) method and use the stream from the base call to construct your reformatted message, write the MessageType, set the msg.BodyPart.Data to your stream and return this as the output of your GetNext(...).  You should probably consider this if the size of your messages can be quite large.

    Hope that clarifies your questions.


    David Downing... If this answers your question, please Mark as the Answer. If this post is helpful, please vote as helpful.



    Friday, June 15, 2012 3:51 AM
  • The GUID at the top of the Pipeline declaration is just a unique one, it doesn't relate to the Pipeline type etc.  This is done with the ComponentCategoryAttribute.

    As BizTalk does everything in transactions, if something fails then it gets rolled back.  This is why when there is no subscriber the step fails.  You get presented with the last valid data, which is pre-pipeline processing.  The reason that you see multiple suspensions though is that these are Routing Failure Reports.  They're not actually messages just a snapshot of the context related to the failed message - a little confusing, but their purpose is to allow you to see what was promoted so you can match that up to your subscriptions.

    In relation to your questions, it generally depends (as it always does in BizTalk).  You don't have to type a message but it does make things easier and cleaner.  There are two main ways of doing this - running an XMLDisassembler in your Pipeline, or popping the code into your pipeline.  As the Disassemble stage works on a First Match principle if you put a XMLDisassembler after your custom component then it won't fire as your custom component was the first successful match.  So the simplest approach here is to do the work yourself and write the MessageType in.  Normally you don't need to set the SchemaStrongName.  You can do this with this sort of code:

    outMessage.Context.Promote("MessageType", "http://schemas.microsoft.com/BizTalk/2003/system-properties", messageType)
    Where messageType is the namespace#rootElement.  I notice that when you build your new messages they do not have namespaces, so your message type would just be the #rootName bit.  It would be better if you added a namespace too as it can get messy down the line if everything is anonymous.

    A Queue handles it's position in a FIFO fashion so you do not need to tinker with it.  When you Add() it gets tagged onto the back, Dequeue() removes from the front.

    I believe this answers everything - let me know if I missed anything.


    If this is helpful or answers your question - please mark accordingly.
    Because I get points for it which gives my life purpose (also, it helps other people find answers quickly)

    Friday, June 15, 2012 8:58 AM
  • I have developed same type of pipeline components in the last week and i also get same issues but i have resolved 

    First check whether the spilted  content in the GetNext method in debug

    write strem into the string and check weather splited happens or not if splited content is there and let me know i will share the code for issue it will resolve.

    Send me the code i will see and fix it

    my mail id is :kapil.it24@gmail.com

    Friday, June 15, 2012 11:36 AM