标签:ref amp 应用程序 利用 sequence 领域 个人 sockets 默认
Pipelines - .NET中的新IO API指引 作者 marcgravell 原文
此系列前两篇网上已有的译文
Pipelines - .NET中的新IO API指引(一)
Pipelines - .NET中的新IO API指引(二)
关于System.IO.Pipelines的一篇说明
System.IO.Pipelines: .NET高性能IO
本篇不是翻译,边看边译边记而已。
System.IO.Pipelines 是对IO的统一抽象,文件、com口、网络等等,重点在于让调用者注意力集中在读、写缓冲区上,典型的就是 IDuplexPipe中的Input Output。
可以理解为将IO类抽象为读、写两个缓冲区。
目前官方实现还处于preview状态,作者使用Socket和NetworkStream 实现了一个 Pipelines.Sockets.Unofficial
作者在前两篇中提到使用System.IO.Pipelines 改造StackExchange.Redis,在本篇中作者采用了改造现有的SimplSockets库来说明System.IO.Pipelines的使用。
文章中的代码(SimplPipelines,KestrelServer )
public interface IMemoryOwner<T> : IDisposable { Memory<T> Memory { get; } }
private sealed class ArrayPoolOwner<T>:IMemoryOwner<T>{ private readonly int _length; private T[] _oversized; internal ArrayPoolOwner(T[] oversized,int length){ _length=length; _oversized=oversized; } public Memory<T> Memory=>new Memory<T>(GetArray(),0,_length); private T[] GetArray()=>Interlocked.CompareExchange(ref _oversized,null,null) ?? throw new ObjectDisposedException(ToString()); public void Dispose(){ var arr=Interlocked.Exchange(ref _oversized,null); if(arr!=null) ArrayPool<T>.Shared.Return(arr); } }
void DoSomething(IMemoryOwner<byte> data){ using(data){ // ... other things here ... DoTheThing(data.Memory); } // ... more things here ... }
通过ArrayPool的借、还机制避免频繁分配。
public static IMemoryOwner<T> Lease<T>(this ReadOnlySequence<T> source) { if (source.IsEmpty) return Empty<T>(); int len = checked((int)source.Length); var arr = ArrayPool<T>.Shared.Rent(len);//借出 source.CopyTo(arr); return new ArrayPoolOwner<T>(arr, len);//dispose时归还 }
public abstract class SimplPipeline : IDisposable { private IDuplexPipe _pipe; protected SimplPipeline(IDuplexPipe pipe) => _pipe = pipe; public void Dispose() => Close(); public void Close() {/* burn the pipe*/} }
protected async ValueTask WriteAsync(IMemoryOwner<byte> payload, int messageId)//调用方不再使用payload,需要我们清理 { using (payload) { await WriteAsync(payload.Memory, messageId); } } protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId);//调用方自己清理
messageId标识一条消息,写入消息头部, 用于之后处理响应回复信息。
private readonly SemaphoreSlim _singleWriter= new SemaphoreSlim(1); protected async ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId) { await _singleWriter.WaitAsync(); try { WriteFrameHeader(writer, payload.Length, messageId); await writer.WriteAsync(payload); } finally { _singleWriter.Release(); } }
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId) { // try to get the conch; if not, switch to async //writer已经被占用,异步 if (!_singleWriter.Wait(0)) return WriteAsyncSlowPath(payload, messageId); bool release = true; try { WriteFrameHeader(writer, payload.Length, messageId); var write = writer.WriteAsync(payload); if (write.IsCompletedSuccessfully) return default; release = false; return AwaitFlushAndRelease(write); } finally { if (release) _singleWriter.Release(); } } async ValueTask AwaitFlushAndRelease(ValueTask<FlushResult> flush) { try { await flush; } finally { _singleWriter.Release(); } }
三个地方
void WriteFrameHeader(PipeWriter writer, int length, int messageId) { var span = writer.GetSpan(8); BinaryPrimitives.WriteInt32LittleEndian( span, length); BinaryPrimitives.WriteInt32LittleEndian( span.Slice(4), messageId); writer.Advance(8); }
public class SimplPipelineClient : SimplPipeline { public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message) { var tcs = new TaskCompletionSource<IMemoryOwner<byte>>(); int messageId; lock (_awaitingResponses) { messageId = ++_nextMessageId; if (messageId == 0) messageId = 1; _awaitingResponses.Add(messageId, tcs); } await WriteAsync(message, messageId); return await tcs.Task; } public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message) { using (message) { return await SendReceiveAsync(message.Memory); } } }
- _awaitingResponses 是个字典,保存已经发送的消息,用于将来处理对某条(messageId)消息的回复。
protected async Task StartReceiveLoopAsync(CancellationToken cancellationToken = default) { try { while (!cancellationToken.IsCancellationRequested) { var readResult = await reader.ReadAsync(cancellationToken); if (readResult.IsCanceled) break; var buffer = readResult.Buffer; var makingProgress = false; while (TryParseFrame(ref buffer, out var payload, out var messageId)) { makingProgress = true; await OnReceiveAsync(payload, messageId); } reader.AdvanceTo(buffer.Start, buffer.End); if (!makingProgress && readResult.IsCompleted) break; } try { reader.Complete(); } catch { } } catch (Exception ex) { try { reader.Complete(ex); } catch { } } } protected abstract ValueTask OnReceiveAsync(ReadOnlySequence<byte> payload, int messageId);
private bool TryParseFrame( ref ReadOnlySequence<byte> input, out ReadOnlySequence<byte> payload, out int messageId) { if (input.Length < 8) { // not enough data for the header payload = default; messageId = default; return false; } int length; if (input.First.Length >= 8) { // already 8 bytes in the first segment length = ParseFrameHeader( input.First.Span, out messageId); } else { // copy 8 bytes into a local span Span<byte> local = stackalloc byte[8]; input.Slice(0, 8).CopyTo(local); length = ParseFrameHeader( local, out messageId); } // do we have the "length" bytes? if (input.Length < length + 8) { payload = default; return false; } // success! payload = input.Slice(8, length); input = input.Slice(payload.End); return true; }
static int ParseFrameHeader( ReadOnlySpan<byte> input, out int messageId) { var length = BinaryPrimitives .ReadInt32LittleEndian(input); messageId = BinaryPrimitives .ReadInt32LittleEndian(input.Slice(4)); return length; }
protected override ValueTask OnReceiveAsync( ReadOnlySequence<byte> payload, int messageId) { if (messageId != 0) { // request/response TaskCompletionSource<IMemoryOwner<byte>> tcs; lock (_awaitingResponses) { if (_awaitingResponses.TryGetValue(messageId, out tcs)) { _awaitingResponses.Remove(messageId); } } tcs?.TrySetResult(payload.Lease()); } else { // unsolicited MessageReceived?.Invoke(payload.Lease()); } return default; }
Pipelines - .NET中的新IO API指引(三) 边看边记
标签:ref amp 应用程序 利用 sequence 领域 个人 sockets 默认
原文地址:https://www.cnblogs.com/cerl/p/9925879.html