none
Sending Messages to EventHub using Task Script in SSIS

    Question

  • Hi,

    We have a requirement to send messages and loads those data in azure sql db via eventhubs and streaming analytics in azure.

    We are getting error message while run C# codes. I am not C# savvy.

    Here is a code:

    #region Help:  Introduction to the Script Component
    /* The Script Component allows you to perform virtually any operation that can be accomplished in
     * a .Net application within the context of an Integration Services data flow.
     *
     * Expand the other regions which have "Help" prefixes for examples of specific ways to use
     * Integration Services features within this script component. */
    #endregion
    
    #region Namespaces
    using System;
    using System.Data;
    using System.Text;
    using System.Data.SqlClient;
    using System.Threading.Tasks;
    using Microsoft.Azure.EventHubs;
    using Microsoft.SqlServer.Dts.Pipeline;
    using Microsoft.SqlServer.Dts.Runtime.Wrapper;
    using System.Collections.Generic;
    using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
    #endregion
    
    /// <summary>
    /// This is the class to which to add your code.  Do not change the name, attributes, or parent
    /// of this class.
    /// </summary>
    [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
    public class ScriptMain : UserComponent
    {
        #region Help:  Using Integration Services variables and parameters
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script component, according to whether or not your
         * code needs to write into the variable.  To do so, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and ReadWriteVariables properties in the
         * Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable or parameter:
         *  DateTime startTime = Variables.MyStartTime;
         *
         * Example of writing to a variable:
         *  Variables.myStringVariable = "new value";
         */
        #endregion
    
        #region Help:  Using Integration Services Connnection Managers
        /* Some types of connection managers can be used in this script component.  See the help topic
         * "Working with Connection Managers Programatically" for details.
         *
         * To use a connection manager in this script, first ensure that the connection manager has
         * been added to either the list of connection managers on the Connection Managers page of the
         * script component editor.  To add the connection manager, save this script, close this instance of
         * Visual Studio, and add the Connection Manager to the list.
         *
         * If the component needs to hold a connection open while processing rows, override the
         * AcquireConnections and ReleaseConnections methods.
         * 
         * Example of using an ADO.Net connection manager to acquire a SqlConnection:
         *  object rawConnection = Connections.SalesDB.AcquireConnection(transaction);
         *  SqlConnection salesDBConn = (SqlConnection)rawConnection;
         *
         * Example of using a File connection manager to acquire a file path:
         *  object rawConnection = Connections.Prices_zip.AcquireConnection(transaction);
         *  string filePath = (string)rawConnection;
         *
         * Example of releasing a connection manager:
         *  Connections.SalesDB.ReleaseConnection(rawConnection);
         */
        #endregion
    
        #region Help:  Firing Integration Services Events
        /* This script component can fire events.
         *
         * Example of firing an error event:
         *  ComponentMetaData.FireError(10, "Process Values", "Bad value", "", 0, out cancel);
         *
         * Example of firing an information event:
         *  ComponentMetaData.FireInformation(10, "Process Values", "Processing has started", "", 0, fireAgain);
         *
         * Example of firing a warning event:
         *  ComponentMetaData.FireWarning(10, "Process Values", "No rows were received", "", 0);
         */
        #endregion
    
        /// <summary>
        /// This method is called once, before rows begin to be processed in the data flow.
        ///
        /// You can remove this method if you don't need to do anything here.
        /// </summary>
        public override void PreExecute()
        {
            base.PreExecute();
            
        }
    
        /// <summary>
        /// This method is called after all the rows have passed through this component.
        ///
        /// You can delete this method if you don't need to do anything here.
        /// </summary>
        public override void PostExecute()
        {
            base.PostExecute();
            string ConnectionString = Variables.ConnectionString;
            string EventHubName = Variables.EventHubName;
            string sqlConnectionString = Variables.sqlConnectionString;
            string sqlQuery = Variables.sqlQuery;
    
            SendingRandomMessages(ConnectionString, EventHubName, sqlConnectionString, sqlQuery).GetAwaiter().GetResult();
        }
    
        /// <summary>
        /// This method is called once for every row that passes through the component from Input0.
        ///
        /// Example of reading a value from a column in the the row:
        ///  string zipCode = Row.ZipCode
        ///
        /// Example of writing a value to a column in the row:
        ///  Row.ZipCode = zipCode
        /// </summary>
        /// <param name="Row">The row that is currently passing through the component</param>
        public override void Input0_ProcessInputRow(Input0Buffer Row)
        {
            
        }
        //public override void ProcessInput(int InputID, PipelineBuffer Buffer)
        //{
            
        //    string ConnectionString = Variables.ConnectionString;
        //    string EventHubName = Variables.EventHubName;      
    
        //    bool fireAgain = true;
        //    ComponentMetaData.FireInformation(0, "",
        //      Buffer.ColumnCount.ToString() + " columns",
        //      "", 0, ref fireAgain);
    
        //    while (Buffer.NextRow())
        //    {
        //        for (int columnIndex = 0;
        //          columnIndex < Buffer.ColumnCount;
        //          columnIndex++)
        //        {
        //            string columnData = null;
        //            if (Buffer.IsNull(columnIndex))
        //            {
        //                columnData = "is NULL";
        //            }
        //            else
        //            {
        //                BufferColumn columnInfo = Buffer.GetColumnInfo(columnIndex);
        //                switch (columnInfo.DataType)
        //                {
        //                    case DataType.DT_BOOL:
        //                        columnData = Buffer.GetBoolean(columnIndex).ToString();
        //                        break;
    
        //                    case DataType.DT_WSTR:
        //                        columnData += Buffer.GetString(columnIndex);
        //                        break;
    
        //                    // add code to support more data types here
    
        //                    default:
        //                        columnData = "";
        //                        break;
        //                }
        //            }
    
        //            ComponentMetaData.FireInformation(0, "",
        //              "Column " + columnIndex.ToString() + ": " + columnData,
        //              "", 0, ref fireAgain);
        //        }
        //    }
        //    base.ProcessInput(InputID, Buffer);
        //    string Json = DataTableToJson(Buffer);
    
        //    SendingRandomMessages(ConnectionString, EventHubName, Json).GetAwaiter().GetResult();
    
        //}
    
        static async Task SendingRandomMessages(string ConnectionString, string EventHubName, string sqlConnectionString, string sqlQuery)
        {
            var connectionStringBuilder = new EventHubsConnectionStringBuilder(ConnectionString)
            {
                EntityPath = EventHubName
            };
            EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
            try
            {
                var con = new SqlConnection(sqlConnectionString);
                var cmd = new SqlCommand(sqlQuery, con);
                DataTable dataTable = new DataTable();
                con.Open();
                
                SqlDataAdapter da = new SqlDataAdapter(cmd);
                // this will query your database and return the result to your datatable
                da.Fill(dataTable);
                con.Close();
                
                string rawData = DataTableToJson(dataTable);
    
                EventData eventData = new EventData(Encoding.UTF8.GetBytes(rawData));
                await eventHubClient.SendAsync(eventData);
            }
            catch (Exception exception)
            {
                throw exception;
            }
    
        }
    
        public static string DataTableToJson(DataTable table)
        {
            var jsonString = new StringBuilder();
            if (table.Rows.Count > 0)
            {
                jsonString.Append("[");
                for (int i = 0; i < table.Rows.Count; i++)
                {
                    jsonString.Append("{");
                    for (int j = 0; j < table.Columns.Count; j++)
                    {
                        if (j < table.Columns.Count - 1)
                        {
                            jsonString.Append("\"" + table.Columns[j].ColumnName.ToString()
                                              + "\":" + "\""
                                              + table.Rows[i][j].ToString() + "\",");
                        }
                        else if (j == table.Columns.Count - 1)
                        {
                            jsonString.Append("\"" + table.Columns[j].ColumnName.ToString()
                                              + "\":" + "\""
                                              + table.Rows[i][j].ToString() + "\"");
                        }
                    }
                    if (i == table.Rows.Count - 1)
                    {
                        jsonString.Append("}");
                    }
                    else
                    {
                        jsonString.Append("},");
                    }
                }
                jsonString.Append("]");
            }
            return jsonString.ToString();
        }
    }
    

    Monday, March 20, 2017 3:10 PM

All replies

  • I think you need to install Azure integration tools, typically from within Visual Studio

    Arthur

    MyBlog


    Twitter

    Monday, March 20, 2017 3:49 PM
    Moderator
  • I do that Microsoft Azure DDK for .net (VS 2015). I am running VS 2015 community version. I did test with trial version of VS 2015 Professional also. I am getting same error message. Do I need VS 2015 Professional License to do that?
    Monday, March 20, 2017 5:35 PM
  • AFAIK VS 2015 community does not qualify for development against Azure

    Arthur

    MyBlog


    Twitter

    Monday, March 20, 2017 6:22 PM
    Moderator
  • Hi, I tried that with VS 2015 professional trial version as well. I do have Microsoft Azure SDK. net for VS 2015. Do I need to get License ?
    Monday, March 20, 2017 6:55 PM
  • It requires a license 

    Arthur

    MyBlog


    Twitter

    Monday, March 20, 2017 7:17 PM
    Moderator
  • Hi 1234alex,

    You could first have a look at following blog to learn how to use 3<sup>rd</sup> dll reference in SSIS Script Task or Script Component. Personally, your issue may be related to it. So, you could have a try to use the method in following blog to load the dll.

    https://blogs.msdn.microsoft.com/dbrowne/2014/06/25/how-to-load-an-assembly-in-a-ssis-script-task-that-isnt-in-the-gac/

    Or, you could create an external web service to send message to EventHub and then use the SSIS Web Service Task to implement your needs.

    Best Regards,

    Albert Zhang


    MSDN Community Support
    Please remember to click "Mark as Answer" the responses that resolved your issue, and to click "Unmark as Answer" if not. This can be beneficial to other community members reading this thread. If you have any compliments or complaints to MSDN Support, feel free to contact MSDNFSF@microsoft.com.

    • Proposed as answer by Albert_ Zhang Sunday, April 02, 2017 6:49 AM
    Tuesday, March 21, 2017 4:42 AM
  • Thank you for the suggestion. I tried that and it was working fro few months but suddenly started getting error message says that it is not able to "Microsoft.Azure.Amqp.Resource". This is what error message says.

    + exception {"Could not find file 'Microsoft.Azure.Amqp.resources'.":null} System.Exception {System.IO.FileNotFoundException}

    Is this a bug with that new Microsoft.Azure.Amqp.dll?

    Monday, June 12, 2017 5:11 PM