none
C# TCP异步通信中,接收数据顺序问题 RRS feed

  • 问题

  • 就是在TCP异步通信中,服务端采用BeginRead函数接收数据。采用的是递归循环接收,并且我用的是自定义数据包的形式,就是数据包=包头(包头中包含包体长度字段)+包体,但是为何每次解析玩一个正确的数据包后,紧接着后面的数据就是错误的数据为何呢,是什么原因呢?比如正确的数据包总长度是325个字节,头长为13个固定字节,其中包含一个4字节长度定义的是数据体的长度。但是实际现象是,在处理完一个接受到的完整数据包后,紧接着后面新收到的20个字节,中进行解析发现数据体长度为-425545998,又看了下数据确实不对,那这是因为什么原因引起的呢?

    贴出部分代码如下:

     public void HandleDataReceived(IAsyncResult ar)
            {
                StateObject state = null;
                try
                {
                    //结束读取
                    state = (StateObject)ar.AsyncState;
                    int ret = state.CurrtStream.EndRead(ar);

                    MinPacketLength = new TcpHeaderData().GetByteData().Length;

                    if (ret == 0)
                    {
                        if (ClientDisConnect != null)
                        {
                            //IsAlive = false;
                            IsConnected = false;
                            ClientDisConnect(ClientIp);
                        }
                        return;//表示当前连接断开,则退出,直接返回
                    }

                    byte[] currentData = new byte[0];
                    var temop = new byte[0];
                    lock (lokObjHisData)
                    {
                        temop = HistoryData;
                        currentData = new byte[HistoryData.Length + state.buffer.Length];
                        Buffer.BlockCopy(HistoryData, 0, currentData, 0, HistoryData.Length);
                        Buffer.BlockCopy(state.buffer, 0, currentData, HistoryData.Length, state.buffer.Length);
                        HistoryData = currentData;//由于每次判断粘包半包,都是不断累加判断的过程,因此这里当条件不满足完整包时,每次都会累加
                    }

                    if (currentData.Length < MinPacketLength)
                    {
                        Console.WriteLine("当前线程ID为:{0},当前数据包长度小于最小包大小,继续叠加,总次数为:{1}次,+++++++++++++++++++++++++++++", Thread.CurrentThread.ManagedThreadId, ++globalLessMinPacketNum);
                        return;
                    }
                    else
                    {
                        TcpHeaderData headData = new TcpHeaderData();
                        var isSuc = headData.Filter(currentData);
                        var contentLen = headData.ContentLength;//解析到内容长度

                        var sumSize = MinPacketLength + contentLen;//得到数据包总大小
                        if (contentLen < 0 || sumSize > 65536)
                        {
                            lock (lokObjHisData) { HistoryData = new byte[0]; }
                            Console.WriteLine("当前线程ID为:{0},当前数据包长度异常,总大小为{1}字节,将进行丢弃,总丢弃次数为:{2}次,+++++++++++++++++++++++++++++", Thread.CurrentThread.ManagedThreadId, sumSize, ++globalCurrentExceptNum);
                            return;
                        }

                        if (currentData.Length == sumSize)
                        {
                            //完整包则直接处理
                            TcpPacketData sumData = new TcpPacketData();
                            var isSucs = sumData.Filter(currentData);
                            state.TcpData = sumData;
                            lock (lokObjHisData)
                            {
                                HistoryData = new byte[0];//每次处理完整包后重置
                            }

                            Console.WriteLine("当前线程ID为:{0},当前数据包大小相等,总相等次数为:{1},-----------------------------------", Thread.CurrentThread.ManagedThreadId, ++globalStatSameNum);
                        }
                        else if (currentData.Length > sumSize)
                        {
                            //大于完整包则截取后,处理,因为数据包字节流长度与实际不符,肯定不能正确解析到数据,
                            var realData = new byte[sumSize];
                            Buffer.BlockCopy(currentData, 0, realData, 0, sumSize);

                            TcpPacketData sumData = new TcpPacketData();
                            var isSucs = sumData.Filter(realData);
                            state.TcpData = sumData;

                            var tempData = new byte[currentData.Length - sumSize];
                            Buffer.BlockCopy(currentData, sumSize, tempData, 0, currentData.Length - sumSize);
                            //当前条件下,要把处理完毕后,结余的字节重新赋值,便于衔接当前字节流处理机制,HistoryData.Length != 0 && HistoryData[0] == (byte)44

                            TcpHeaderData headData1 = new TcpHeaderData();
                            var minLen = new TcpHeaderData().GetByteData().Length;
                            if (tempData.Length < minLen)
                            {
                                lock (lokObjHisData) { HistoryData = tempData; }
                                Console.WriteLine("当前线程ID为:{0},当前结余字节数不满足正常大小,总次数为:{1},继续叠加!!!!!!!!!!!!!!!!!", Thread.CurrentThread.ManagedThreadId, ++globalStatByLen);
                            }
                            else
                            {
                                var isSuc1 = headData1.Filter(tempData);
                                var contentLen1 = headData1.ContentLength;//解析到内容长度
                                var sumSize1 = MinPacketLength + contentLen1;//得到数据包总大小
                                if (contentLen1 < 0 || contentLen1 > 65536)
                                {
                                    lock (lokObjHisData) { HistoryData = new byte[0]; }
                                    Console.WriteLine("当前线程ID为:{0},当前结余数据包长度异常,总大小为{1}字节,将进行丢弃,总丢弃次数为:{2}次,+++++++++++++++++++++++++++++", Thread.CurrentThread.ManagedThreadId, sumSize1, ++globalStatExceptNum);
                                }
                                else
                                {
                                    lock (lokObjHisData) { HistoryData = tempData; }
                                    Console.WriteLine("当前线程ID为:{0},结余字节数※※※※※※※※※※※※※※※※※※※※※※※{1}", Thread.CurrentThread.ManagedThreadId, tempData.Length);
                                }
                            }
                        }
                        else
                        {
                            Console.WriteLine("当前线程ID为:{0},当前接收数据大小※※※※※※※※※※※※※※※※※※※※※※※{1}", Thread.CurrentThread.ManagedThreadId, ret);
                            return;
                        }

                    }
                    Console.WriteLine("当前线程ID为:{0},tcpClient.ReceiveBufferSize=================================,{1}", Thread.CurrentThread.ManagedThreadId, state.currentTCPClient.ReceiveBufferSize);
                    Console.WriteLine("当前线程ID为:{0},tcpClient.SendBufferSize=================================,{1}", Thread.CurrentThread.ManagedThreadId, state.currentTCPClient.SendBufferSize);

                    //接收数据处理线程
                    ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveCallBack), state);
                }
                catch (Exception ex)
                {
                    lock (lokObjHisData)
                    {
                        HistoryData = new byte[0];//出现错误也重置
                    }

                    IsConnected = false;
                }
                finally
                {
                    if (tcpClient != null && tcpClient.Connected)
                    {
                        state.CurrtStream.BeginRead(state.buffer, 0, state.buffer.Length, new AsyncCallback(HandleDataReceived), state);//递归循环接收
                    }
                }

            }

    2021年11月1日 8:57

全部回复

  • 就是在TCP异步通信中,服务端采用BeginRead函数接收数据。采用的是递归循环接收,并且我用的是自定义数据包的形式,就是数据包=包头(包头中包含包体长度字段)+包体,但是为何每次解析玩一个正确的数据包后,紧接着后面的数据就是错误的数据为何呢,是什么原因呢?比如正确的数据包总长度是325个字节,头长为13个固定字节,其中包含一个4字节长度定义的是数据体的长度。但是实际现象是,在处理完一个接受到的完整数据包后,紧接着后面新收到的20个字节,中进行解析发现数据体长度为-425545998,又看了下数据确实不对,那这是因为什么原因引起的呢?

    贴出部分代码如下:

     public void HandleDataReceived(IAsyncResult ar)
            {
                StateObject state = null;
                try
                {
                    //结束读取
                    state = (StateObject)ar.AsyncState;
                    int ret = state.CurrtStream.EndRead(ar);

                    MinPacketLength = new TcpHeaderData().GetByteData().Length;

                    if (ret == 0)
                    {
                        if (ClientDisConnect != null)
                        {
                            //IsAlive = false;
                            IsConnected = false;
                            ClientDisConnect(ClientIp);
                        }
                        return;//表示当前连接断开,则退出,直接返回
                    }

                    byte[] currentData = new byte[0];
                    var temop = new byte[0];
                    lock (lokObjHisData)
                    {
                        temop = HistoryData;
                        currentData = new byte[HistoryData.Length + state.buffer.Length];
                        Buffer.BlockCopy(HistoryData, 0, currentData, 0, HistoryData.Length);
                        Buffer.BlockCopy(state.buffer, 0, currentData, HistoryData.Length, state.buffer.Length);
                        HistoryData = currentData;//由于每次判断粘包半包,都是不断累加判断的过程,因此这里当条件不满足完整包时,每次都会累加
                    }

                    if (currentData.Length < MinPacketLength)
                    {
                        Console.WriteLine("当前线程ID为:{0},当前数据包长度小于最小包大小,继续叠加,总次数为:{1}次,+++++++++++++++++++++++++++++", Thread.CurrentThread.ManagedThreadId, ++globalLessMinPacketNum);
                        return;
                    }
                    else
                    {
                        TcpHeaderData headData = new TcpHeaderData();
                        var isSuc = headData.Filter(currentData);
                        var contentLen = headData.ContentLength;//解析到内容长度

                        var sumSize = MinPacketLength + contentLen;//得到数据包总大小
                        if (contentLen < 0 || sumSize > 65536)
                        {
                            lock (lokObjHisData) { HistoryData = new byte[0]; }
                            Console.WriteLine("当前线程ID为:{0},当前数据包长度异常,总大小为{1}字节,将进行丢弃,总丢弃次数为:{2}次,+++++++++++++++++++++++++++++", Thread.CurrentThread.ManagedThreadId, sumSize, ++globalCurrentExceptNum);
                            return;
                        }

                        if (currentData.Length == sumSize)
                        {
                            //完整包则直接处理
                            TcpPacketData sumData = new TcpPacketData();
                            var isSucs = sumData.Filter(currentData);
                            state.TcpData = sumData;
                            lock (lokObjHisData)
                            {
                                HistoryData = new byte[0];//每次处理完整包后重置
                            }

                            Console.WriteLine("当前线程ID为:{0},当前数据包大小相等,总相等次数为:{1},-----------------------------------", Thread.CurrentThread.ManagedThreadId, ++globalStatSameNum);
                        }
                        else if (currentData.Length > sumSize)
                        {
                            //大于完整包则截取后,处理,因为数据包字节流长度与实际不符,肯定不能正确解析到数据,
                            var realData = new byte[sumSize];
                            Buffer.BlockCopy(currentData, 0, realData, 0, sumSize);

                            TcpPacketData sumData = new TcpPacketData();
                            var isSucs = sumData.Filter(realData);
                            state.TcpData = sumData;

                            var tempData = new byte[currentData.Length - sumSize];
                            Buffer.BlockCopy(currentData, sumSize, tempData, 0, currentData.Length - sumSize);
                            //当前条件下,要把处理完毕后,结余的字节重新赋值,便于衔接当前字节流处理机制,HistoryData.Length != 0 && HistoryData[0] == (byte)44

                            TcpHeaderData headData1 = new TcpHeaderData();
                            var minLen = new TcpHeaderData().GetByteData().Length;
                            if (tempData.Length < minLen)
                            {
                                lock (lokObjHisData) { HistoryData = tempData; }
                                Console.WriteLine("当前线程ID为:{0},当前结余字节数不满足正常大小,总次数为:{1},继续叠加!!!!!!!!!!!!!!!!!", Thread.CurrentThread.ManagedThreadId, ++globalStatByLen);
                            }
                            else
                            {
                                var isSuc1 = headData1.Filter(tempData);
                                var contentLen1 = headData1.ContentLength;//解析到内容长度
                                var sumSize1 = MinPacketLength + contentLen1;//得到数据包总大小
                                if (contentLen1 < 0 || contentLen1 > 65536)
                                {
                                    lock (lokObjHisData) { HistoryData = new byte[0]; }
                                    Console.WriteLine("当前线程ID为:{0},当前结余数据包长度异常,总大小为{1}字节,将进行丢弃,总丢弃次数为:{2}次,+++++++++++++++++++++++++++++", Thread.CurrentThread.ManagedThreadId, sumSize1, ++globalStatExceptNum);
                                }
                                else
                                {
                                    lock (lokObjHisData) { HistoryData = tempData; }
                                    Console.WriteLine("当前线程ID为:{0},结余字节数※※※※※※※※※※※※※※※※※※※※※※※{1}", Thread.CurrentThread.ManagedThreadId, tempData.Length);
                                }
                            }
                        }
                        else
                        {
                            Console.WriteLine("当前线程ID为:{0},当前接收数据大小※※※※※※※※※※※※※※※※※※※※※※※{1}", Thread.CurrentThread.ManagedThreadId, ret);
                            return;
                        }

                    }
                    Console.WriteLine("当前线程ID为:{0},tcpClient.ReceiveBufferSize=================================,{1}", Thread.CurrentThread.ManagedThreadId, state.currentTCPClient.ReceiveBufferSize);
                    Console.WriteLine("当前线程ID为:{0},tcpClient.SendBufferSize=================================,{1}", Thread.CurrentThread.ManagedThreadId, state.currentTCPClient.SendBufferSize);

                    //接收数据处理线程
                    ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveCallBack), state);
                }
                catch (Exception ex)
                {
                    lock (lokObjHisData)
                    {
                        HistoryData = new byte[0];//出现错误也重置
                    }

                    IsConnected = false;
                }
                finally
                {
                    if (tcpClient != null && tcpClient.Connected)
                    {
                        state.CurrtStream.BeginRead(state.buffer, 0, state.buffer.Length, new AsyncCallback(HandleDataReceived), state);//递归循环接收
                    }
                }

            }

    2021年11月1日 9:00
  • 补充一下,就是为了测试稳定性,及暴露出各种可能出现的问题,我特意把自定义缓存大小设置为了20字节长度,也就是如下:

     public class StateObject
        {
            private NetworkStream currtStream = null;

            public NetworkStream CurrtStream
            {
                get { return currtStream; }
                set { currtStream = value; }
            }

            public byte[] buffer = new byte[bufferSize];
            private static int bufferSize = 20;

            public TcpPacketData TcpData { get; set; }

            public TcpClient currentTCPClient { get; set; }
           
        }

    2021年11月1日 9:30
  • 补充一下,就是为了测试稳定性,及暴露出各种可能出现的问题,我特意把自定义缓存大小设置为了20字节长度,也就是如下:

     public class StateObject
        {
            private NetworkStream currtStream = null;

            public NetworkStream CurrtStream
            {
                get { return currtStream; }
                set { currtStream = value; }
            }

            public byte[] buffer = new byte[bufferSize];
            private static int bufferSize = 20;

            public TcpPacketData TcpData { get; set; }

            public TcpClient currentTCPClient { get; set; }
           
        }

    2021年11月1日 9:30