.net core使用Pipelines进行消息IO合并

栏目: ASP.NET · 发布时间: 5年前

内容简介:之前的文章讲述过通过IO合并实现百万级RPS和千万级消息推送,但这两篇文章只是简单地讲了一下原理和测试结果并没有在代码实现上的讲解,这一编文章主要通过代码的实现来讲述消息IO合并的原理。其实在早期的版本实现IO合并还是比较因难的,需要大量的代码和测试其实消息IO合并的原理在这里再多说一遍,就是多个消息使用同一个网络IO写入,其实就是把原来一个消息对应一个Buffer,设计成多个消息写入同一个Buffer.从原理上实现可以看以下图解。

之前的文章讲述过通过IO合并实现百万级RPS和千万级消息推送,但这两篇文章只是简单地讲了一下原理和测试结果并没有在代码实现上的讲解,这一编文章主要通过代码的实现来讲述消息IO合并的原理。其实在早期的版本实现IO合并还是比较因难的,需要大量的代码和测试 Beetlex 是完全自己实现这套机制。不过这一章就不是从Beetlex的实现来讲解,因为MS已经提供了一个新东西给以支持,那就是System.IO.Pipelines.在Pipelines的支持下实现消息Buffer的合并变得非常简单的事情。

消息IO合并原理

其实消息IO合并的原理在这里再多说一遍,就是多个消息使用同一个网络IO写入,其实就是把原来一个消息对应一个Buffer,设计成多个消息写入同一个Buffer.从原理上实现可以看以下图解。

.net core使用Pipelines进行消息IO合并

System.IO.Pipelines介绍

System.IO.Pipelines: High performance IO in .NET, 微软是这样说的 了解详情 但我了解System.IO.Pipelines后发现其实是一个安全可靠的内存池读写+状态态通知机制;不过这套机制对普通开发者来说是件非常复杂的工作,主要原因是一但处理不好的情况那就导致内存泄露的可能!基于System.IO.Pipelines这套机制,可以非常方便地把消息和网络buffer分离出来。接下来就讲一下使用System.IO.Pipelines实现自动批量把消息合并到Buffer中。

Pipe类

针对System.IO.Pipelines的介绍说得还是挺神的,其实打开System.IO.Pipelines一看你就发现就几抽像类,真正使用的就只有 Pipe 一个类. Pipe 看上去更像一个 Stream 提供一个Read和write属性。Writer属性是写入数据,而Reader则是读取消息,不过这两个属性对象基于状态交互所以两者可以分别在不同的线程进行处理。

消息队列和写入

前面的原理已经讲了,如果想消息能合并那就需要一个队列,然后确保同一时间只有一个线程来处理队列中的消息。如果当前线程检测到队列中有多个消息那就可以获取所有消息进行一个批序列化,接下来看一下这代码代码是怎样实现的.

        private async void OnMergeWrite(object state)
        {
            while (true)
            {
                var memory = mWrite.GetMemory(2048);
                var length = memory.Length;
                int offset = 0;
                int count = 0;
                while (_msgQueues.TryDequeue(out string msg))
                {
                    if (length < msg.Length)
                    {
                        mWrite.Advance(count);
                        memory = mWrite.GetMemory(2048);
                        length = memory.Length;
                        offset = 0;
                        count = 0;
                    }
                    var elen = System.Text.Encoding.ASCII.GetBytes(msg, memory.Slice(offset, msg.Length).Span);
                    count += elen;
                    offset += elen;
                    length -= elen;
                }
                if (count > 0)
                    mWrite.Advance(count);
                await mWrite.FlushAsync();
                lock (_workSync)
                {
                    if (_msgQueues.IsEmpty)
                    {
                        _doingWork = false;
                        return;
                    }
                }
            }
        }

代码并不复杂,进入线程不断地获取消息并序列化到Buffer中,当Buffer满了后提交给Writer后重新获取Buffer继续序列化。当没有消息的时候再一次检测队列如果又存在消息则继续,为什么需要两层While来检测呢,主要是和队列写入状态检测的一致性判断。

      public void Enqueue(string message)
        {
            _msgQueues.Enqueue(message);
            lock (_workSync)
            {
                if (!_doingWork)
                {
                    System.Threading.ThreadPool.UnsafeQueueUserWorkItem(OnMergeWrite, this);
                    _doingWork = true;
                }
            }
        }

以上是消息写入队列方法。

Pipe数据读取

由于Pipe的Write和Read是基于状态同步,所以Reader可以在任何意时间和任意线程中进行读取,以下是Read的代码:

        private async static void Read(object state)
        {
            int count = 0;
            while (true)
            {
                var result = await pipe.Reader.ReadAsync();
                var buffer = result.Buffer;
                var end = buffer.End;
                if (buffer.IsSingleSegment)
                {
                    Console.WriteLine(System.Text.Encoding.ASCII.GetString(buffer.First.Span));
                    // SAEA.Memory=buffer;
                }
                else
                {
                    foreach (var b in buffer)
                    {
                        Console.WriteLine(System.Text.Encoding.ASCII.GetString(b.Span));
                    }
                    //SAEA.BufferList=buffer;
                }
                pipe.Reader.AdvanceTo(end);
                count++;
                Console.WriteLine(count);
            }
        }

测试

代码写完了,接下来的工作就是通过测试看一下是不是达到合并的效果,以下开启两个线程分别连续写入1000个消息。

        static void Main(string[] args)
        {
            pipe = new Pipe();
            messageQueue = new MessageQueue(pipe.Writer);
            System.Threading.ThreadPool.QueueUserWorkItem(Read);
            System.Threading.ThreadPool.QueueUserWorkItem(Write, "AAAA");
            System.Threading.ThreadPool.QueueUserWorkItem(Write, "BBBB");
            Console.Read();
        }
        private static void Write(object state)
        {
            string name = (string)state;
            for (int i = 0; i < 1000; i++)
            {
                messageQueue.Enqueue($"[{name + i}]");
            }
        }

实际运行效果:

.net core使用Pipelines进行消息IO合并

总结

通过以上示例相信大家对System.IO.Pipelines来对消息进行Buffer合并有一个很好的理解,不过实际情况处理的是对象消息则相对复杂一些,毕竟消息的大小是不可知的,不过可以针对最大消息长度来分析Buffer,确保一个Buffer能够序列化一个或多个消息即可。如果你想抛开System.IO.Pipelines更深入地了解实现原因可以查看 Beetlex 的源码,具体位置在: PipeStream

最后奉上以上示例的代码 http://www.ikende.com/Files/SocketIOMerge.zip?tag=manager


以上所述就是小编给大家介绍的《.net core使用Pipelines进行消息IO合并》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Big Java Late Objects

Big Java Late Objects

Horstmann, Cay S. / 2012-2 / 896.00元

The introductory programming course is difficult. Many students fail to succeed or have trouble in the course because they don't understand the material and do not practice programming sufficiently. ......一起来看看 《Big Java Late Objects》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具