内容简介:最近遇到 NetworkStream.ReadAsync 在 Linux 上高并发读取数据的问题,由此激发了阅读 corefx 中 System.Net.Sockets 实现源码(基于这篇随笔是阅读 NetworkStream.ReadAsync 相关源码的简单笔记,基于在 Linux 上运行的场景。NetworkStream 继承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法签名是
最近遇到 NetworkStream.ReadAsync 在 Linux 上高并发读取数据的问题,由此激发了阅读 corefx 中 System.Net.Sockets 实现源码(基于 corefx 2.2 )的兴趣。
这篇随笔是阅读 NetworkStream.ReadAsync 相关源码的简单笔记,基于在 Linux 上运行的场景。
NetworkStream 继承自 System.IO.Stream ,System.IO.Stream.ReadAsync 方法签名是
public Task<int> ReadAsync(byte[] buffer, int offset, int count);
实际调用的是
public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
上面的的方法被 NetworkStream 重写(override),调用的是 Socket 的 ReceiveAsync 方法
return _streamSocket.ReceiveAsync(
new Memory<byte>(buffer, offset, size),
SocketFlags.None,
fromNetworkStream: true,
cancellationToken).AsTask();
Socket.ReceiveAsync 的方法签名
internal ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, bool fromNetworkStream, CancellationToken cancellationToken)
主要实现代码
AwaitableSocketAsyncEventArgs saea = LazyInitializer.EnsureInitialized(ref LazyInitializer.EnsureInitialized(ref _cachedTaskEventArgs).ValueTaskReceive);
if (saea.Reserve())
{
saea.SetBuffer(buffer);
saea.SocketFlags = socketFlags;
saea.WrapExceptionsInIOExceptions = fromNetworkStream;
var result = saea.ReceiveAsync(this);
return result;
}
else
{
// We couldn't get a cached instance, due to a concurrent receive operation on the socket.
// Fall back to wrapping APM.
return new ValueTask<int>(ReceiveAsyncApm(buffer, socketFlags));
}
通常情况下都会使用 AwaitableSocketAsyncEventArgs 异步读取数据,所以我们这里只从 saea.ReceiveAsync 往下看。
saea.ReceiveAsync 调用的是 Socket.ReceiveAsync(SocketAsyncEventArgs e) 方法,而后者调用的是 SocketAsyncEventArgs.DoOperationReceive(SafeCloseSocket handle) 。
在 Linux 上 DoOperationReceive 的实现在 SocketAsyncEventArgs.Unix.cs 中,主要代码如下
internal unsafe SocketError DoOperationReceive(SafeCloseSocket handle)
{
//...
if (_bufferList == null)
{
errorCode = handle.AsyncContext.ReceiveAsync(_buffer.Slice(_offset, _count), _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
}
else
{
errorCode = handle.AsyncContext.ReceiveAsync(_bufferListInternal, _socketFlags, out bytesReceived, out flags, TransferCompletionCallback);
}
if (errorCode != SocketError.IOPending)
{
CompleteTransferOperation(bytesReceived, null, 0, flags, errorCode);
FinishOperationSync(errorCode, bytesReceived, flags);
}
return errorCode;
}
handle.AsyncContext.ReceiveAsync 对应的 Linux 实现在 SocketAsyncContext.Unix.cs 中,调用的是 SocketAsyncContext 的 ReceiveFrom 方法,ReceiveFrom 的主要实现代码如下
public SocketError ReceiveFromAsync(Memory<byte> buffer, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, Action<int, byte[], int, SocketFlags, SocketError> callback)
{
SetNonBlocking();
SocketError errorCode;
int observedSequenceNumber;
if (_receiveQueue.IsReady(this, out observedSequenceNumber) &&
SocketPal.TryCompleteReceiveFrom(_socket, buffer.Span, flags, socketAddress, ref socketAddressLen, out bytesReceived, out receivedFlags, out errorCode))
{
return errorCode;
}
BufferMemoryReceiveOperation operation = RentBufferMemoryReceiveOperation();
operation.Callback = callback;
operation.Buffer = buffer;
operation.Flags = flags;
operation.SocketAddress = socketAddress;
operation.SocketAddressLen = socketAddressLen;
if (!_receiveQueue.StartAsyncOperation(this, operation, observedSequenceNumber))
{
receivedFlags = operation.ReceivedFlags;
bytesReceived = operation.BytesTransferred;
errorCode = operation.ErrorCode;
ReturnOperation(operation);
return errorCode;
}
bytesReceived = 0;
receivedFlags = SocketFlags.None;
return SocketError.IOPending;
}
SocketPal.TryCompleteReceiveFrom 的实现代码在 SocketPal.Unix.cs 中,所调用的另一个 TryCompleteReceiveFrom 方法的签名是
public static unsafe bool TryCompleteReceiveFrom(SafeCloseSocket socket, Span<byte> buffer, IList<ArraySegment<byte>> buffers, SocketFlags flags, byte[] socketAddress, ref int socketAddressLen, out int bytesReceived, out SocketFlags receivedFlags, out SocketError errorCode)
该方法调用的是 Receive 方法
private static unsafe int Receive(SafeCloseSocket socket, SocketFlags flags, IList<ArraySegment<byte>> buffers, byte[] socketAddress, ref int socketAddressLen, out SocketFlags receivedFlags, out Interop.Error errno)
在 Receive 方法中调用了
errno = Interop.Sys.ReceiveMessage(
socket.DangerousGetHandle(),
&messageHeader,
flags,
&received);
Interop.Sys.ReceiveMessage 对应的是 Linux 本地库中的方法
internal static partial class Sys
{
[DllImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReceiveMessage")]
internal static extern unsafe Error ReceiveMessage(IntPtr socket, MessageHeader* messageHeader, SocketFlags flags, long* received);
}
Libraries.SystemNative 对应的是哪个库呢?
它就是 System.Native.so
$ find /usr/share/dotnet/ -name System.Native.so /usr/share/dotnet/shared/Microsoft.NETCore.App/2.2.0/System.Native.so
接下来根据 SocketError.IOPending 的情况阅读源码。
SocketAsyncEventArgs 在 DoOperationReceive 方法中调用 SocketAsyncContext.ReceiveFrom 方法时(handle.AsyncContext.ReceiveAsync)传递了 TransferCompletionCallback 参数值,在异步操作时是通过这个 callback 读取 socket 数据的,对应的方法是 TransferCompletionCallbackCore 。
private void TransferCompletionCallbackCore(int bytesTransferred, byte[] socketAddress, int socketAddressSize, SocketFlags receivedFlags, SocketError socketError)
{
CompleteTransferOperation(bytesTransferred, socketAddress, socketAddressSize, receivedFlags, socketError);
CompletionCallback(bytesTransferred, receivedFlags, socketError);
}
TransferCompletionCallbackCore 中进一步调用 CompletionCallback
private void CompletionCallback(int bytesTransferred, SocketFlags flags, SocketError socketError)
{
if (socketError == SocketError.Success)
{
FinishOperationAsyncSuccess(bytesTransferred, flags);
}
else
{
if (_currentSocket.CleanedUp)
{
socketError = SocketError.OperationAborted;
}
FinishOperationAsyncFailure(socketError, bytesTransferred, flags);
}
}
在 CompletionCallback 中当 SocketError.Success 时进一步调用 FinishOperationAsyncSuccess
internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flags)
{
FinishOperationSyncSuccess(bytesTransferred, flags);
// Raise completion event.
if (_context == null)
{
OnCompleted(this);
}
else
{
ExecutionContext.Run(_context, s_executionCallback, this);
}
}
从上面的代码可以看出实际调用的也是 FinishOperationSyncSuccess ,异步与同步读取数据最终调用的是同一个方法。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- jQuery源码学习:异步操作--Callbacks
- JStorm 源码分析 - 异步循环线程 AsyncLoopThread
- YYText 源码剖析:CoreText 与异步绘制
- React Fiber源码分析 第三篇(异步状态)
- 原 荐 Java异步编程——深入源码分析FutureTask
- SpringBoot | :异步开发之异步调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
C语言名题精选百则技巧篇
冼镜光 / 机械工业出版社 / 2005-7 / 44.00元
《C语言名题精选百则》(技巧篇)收集了100则C语言程序设计题,共分9类。第一类比较简单,主要希望读者了解到《C语言名题精选百则》(技巧篇)的题目、解法与其他书籍之间的差异;第二至六类分别是关于数字、组合数学或离散数学、查找、排序、字符串等方面的题目;第七类列出了一些不太容易归类的题目,如Buffon丢针问题、Dijkstra的三色旗问题等;第八类则收录了一些有趣的、娱乐性的题目,如魔方阵等;第九......一起来看看 《C语言名题精选百则技巧篇》 这本书的介绍吧!