corefx 源码学习:NetworkStream.ReadAsync 是如何从 Socket 异步读取数据的

栏目: 服务器 · 发布时间: 6年前

内容简介:最近遇到 NetworkStream.ReadAsync 在 Linux 上高并发读取数据的问题,由此激发了阅读 corefx 中 System.Net.Sockets 实现源码(基于这篇随笔是阅读 NetworkStream.ReadAsync 相关源码的简单笔记,基于在 Linux 上运行的场景。NetworkStream 继承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法签名是

最近遇到 NetworkStream.ReadAsync 在 Linux 上高并发读取数据的问题,由此激发了阅读 corefx 中 System.Net.Sockets 实现源码(基于 corefx 2.2 )的兴趣。

这篇随笔是阅读 NetworkStream.ReadAsync 相关源码的简单笔记,基于在 Linux 上运行的场景。

NetworkStream 继承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法签名是

public Task<int> ReadAsync(byte[] buffer, int offset, int count);

实际调用的是

public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)

上面的的方法被 NetworkStream 重写(override),调用的是 Socket 的 ReceiveAsync 方法

return _streamSocket.ReceiveAsync(
    new Memory<byte>(buffer, offset, size),
    SocketFlags.None,
    fromNetworkStream: true,
    cancellationToken).AsTask();

Socket.ReceiveAsync 的方法签名

internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)

主要实现代码

AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref LazyInitializer.EnsureInitialized(ref _cachedTaskEventArgs).ValueTaskReceive);
if (saea.Reserve())
{
    saea.SetBuffer(buffer);
    saea.SocketFlags = socketFlags;
    saea.WrapExceptionsInIOExceptions = fromNetworkStream;
    var result = saea.ReceiveAsync(this);
    return result;
}
else
{
    // We couldn't get a cached instance, due to a concurrent receive operation on the socket.
    // Fall back to wrapping APM.
    return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags));
}

通常情况下都会使用 AwaitableSocketAsyncEventArgs 异步读取数据,所以我们这里只从 saea.ReceiveAsync 往下看。

saea.ReceiveAsync 调用的是 Socket.ReceiveAsync(SocketAsyncEventArgs e) 方法,而后者调用的是  SocketAsyncEventArgs.DoOperationReceive(SafeCloseSocket handle)

在 Linux 上 DoOperationReceive 的实现在 SocketAsyncEventArgs.Unix.cs 中,主要代码如下

internal unsafe SocketError DoOperationReceive(SafeCloseSocket handle)
{
    //...
    if (_bufferList == null)
    {
        errorCode = handle.AsyncContext.ReceiveAsync(_buffer.Slice(_offset, _count), _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
    }
    else
    {
        errorCode = handle.AsyncContext.ReceiveAsync(_bufferListInternal, _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
    }

    if (errorCode != SocketError.IOPending)
    {
        CompleteTransferOperation(bytesReceived, null, 0, flags, errorCode);
        FinishOperationSync(errorCode, bytesReceived, flags);
    }

    return errorCode;
}

handle.AsyncContext.ReceiveAsync 对应的 Linux 实现在 SocketAsyncContext.Unix.cs 中,调用的是 SocketAsyncContext 的 ReceiveFrom 方法,ReceiveFrom 的主要实现代码如下

public SocketError ReceiveFromAsync(Memory<byte> buffer,  SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[], int, SocketFlags, SocketError> callback)
{
    SetNonBlocking();

    SocketError errorCode;
    int observedSequenceNumber;
    if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
        SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
    {
        return errorCode;
    }

    BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
    operation.Callback = callback;
    operation.Buffer = buffer;
    operation.Flags = flags;
    operation.SocketAddress = socketAddress;
    operation.SocketAddressLen = socketAddressLen;

    if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
    {
        receivedFlags = operation.ReceivedFlags;
        bytesReceived = operation.BytesTransferred;
        errorCode = operation.ErrorCode;

        ReturnOperation(operation);
        return errorCode;
    }

    bytesReceived = 0;
    receivedFlags = SocketFlags.None;
    return SocketError.IOPending;
}

SocketPal.TryCompleteReceiveFrom 的实现代码在 SocketPal.Unix.cs 中,所调用的另一个 TryCompleteReceiveFrom 方法的签名是

public static unsafe bool TryCompleteReceiveFrom(SafeCloseSocket socket, Span<byte> buffer, IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, out SocketError errorCode)

该方法调用的是 Receive 方法

private static unsafe int Receive(SafeCloseSocket socket, SocketFlags flags, IList<ArraySegment<byte>> buffers, byte[] socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)

在 Receive 方法中调用了

errno = Interop.Sys.ReceiveMessage(
    socket.DangerousGetHandle(), 
    &messageHeader,
    flags,
    &received);

Interop.Sys.ReceiveMessage 对应的是 Linux 本地库中的方法

internal static partial class Sys
{
    [DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReceiveMessage")]
    internal static extern unsafe Error ReceiveMessage(IntPtr socket, MessageHeader* messageHeader, SocketFlags flags, long* received);
}

Libraries.SystemNative 对应的是哪个库呢?

它就是 System.Native.so

$ find /usr/share/dotnet/ -name System.Native.so
/usr/share/dotnet/shared/Microsoft.NETCore.App/2.2.0/System.Native.so

接下来根据 SocketError.IOPending 的情况阅读源码。

SocketAsyncEventArgs 在 DoOperationReceive 方法中调用 SocketAsyncContext.ReceiveFrom 方法时(handle.AsyncContext.ReceiveAsync)传递了 TransferCompletionCallback 参数值,在异步操作时是通过这个 callback 读取 socket 数据的,对应的方法是 TransferCompletionCallbackCore 。

private void TransferCompletionCallbackCore(int bytesTransferred, byte[] socketAddress, int socketAddressSize, SocketFlags receivedFlags, SocketError socketError)
{
    CompleteTransferOperation(bytesTransferred, socketAddress, socketAddressSize, receivedFlags, socketError);

    CompletionCallback(bytesTransferred, receivedFlags, socketError);
}

TransferCompletionCallbackCore 中进一步调用 CompletionCallback

private void CompletionCallback(int bytesTransferred, SocketFlags flags, SocketError socketError)
{
    if (socketError == SocketError.Success)
    {
        FinishOperationAsyncSuccess(bytesTransferred, flags);
    }
    else
    {
        if (_currentSocket.CleanedUp)
        {
            socketError = SocketError.OperationAborted;
        }

        FinishOperationAsyncFailure(socketError, bytesTransferred, flags);
    }
}

在 CompletionCallback 中当 SocketError.Success 时进一步调用 FinishOperationAsyncSuccess

internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flags)
{
    FinishOperationSyncSuccess(bytesTransferred, flags);

    // Raise completion event.
    if (_context == null)
    {
        OnCompleted(this);
    }
    else
    {
        ExecutionContext.Run(_context, s_executionCallback, this);
    }
}

从上面的代码可以看出实际调用的也是 FinishOperationSyncSuccess ,异步与同步读取数据最终调用的是同一个方法。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Zen of CSS Design

The Zen of CSS Design

Dave Shea、Molly E. Holzschlag / Peachpit Press / 2005-2-27 / USD 44.99

Proving once and for all that standards-compliant design does not equal dull design, this inspiring tome uses examples from the landmark CSS Zen Garden site as the foundation for discussions on how to......一起来看看 《The Zen of CSS Design》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器