Skip to content

Commit 6bb5d5f

Browse files
Bicep.RpcClient - use PipeReader & PipeWriter APIs (#18212)
* Use PipeReader & PipeWriter APIs * Ensure the child process is explicitly killed
1 parent 180d3d5 commit 6bb5d5f

File tree

3 files changed

+158
-76
lines changed

3 files changed

+158
-76
lines changed

src/Bicep.RpcClient.Tests/PublicApiTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public void Dependencies_should_be_minimal()
4141
// Be careful when adding new dependencies to the ClientTools assembly - this assembly is intentionally slim.
4242
// The assembly is used in Microsoft internal tools, where dependency management is complex, so we want to avoid transitively depending on ResourceStack
4343
"System.Collections.Immutable",
44+
"System.IO.Pipelines",
4445
"System.Memory",
4546
"System.Text.Encodings.Web",
4647
"System.Text.Json",

src/Bicep.RpcClient/BicepClient.cs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
using System;
55
using System.Diagnostics;
66
using System.IO;
7+
using System.IO.Pipelines;
78
using System.IO.Pipes;
89
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Bicep.RpcClient;
912
using Bicep.RpcClient.JsonRpc;
1013
using Bicep.RpcClient.Models;
1114

@@ -17,12 +20,13 @@ internal class BicepClient : IBicepClient
1720
private readonly JsonRpcClient jsonRpcClient;
1821
private readonly Task backgroundTask;
1922
private readonly CancellationTokenSource cts;
23+
private string? cachedVersion;
2024

2125
private BicepClient(NamedPipeServerStream pipeStream, Process cliProcess, JsonRpcClient jsonRpcClient, CancellationTokenSource cts)
2226
{
2327
this.cliProcess = cliProcess;
2428
this.jsonRpcClient = jsonRpcClient;
25-
this.backgroundTask = CreateBackgroundTask(pipeStream, cliProcess, jsonRpcClient, cts.Token);
29+
this.backgroundTask = jsonRpcClient.Listen(onComplete: pipeStream.Dispose, cts.Token);
2630
this.cts = cts;
2731
}
2832

@@ -37,7 +41,7 @@ public static async Task<IBicepClient> Initialize(string bicepCliPath, Cancellat
3741
}
3842

3943
var pipeName = Guid.NewGuid().ToString();
40-
var pipeStream = new NamedPipeServerStream(pipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
44+
var pipeStream = new NamedPipeServerStream(pipeName, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, System.IO.Pipes.PipeOptions.Asynchronous);
4145

4246
var psi = new ProcessStartInfo
4347
{
@@ -54,29 +58,11 @@ public static async Task<IBicepClient> Initialize(string bicepCliPath, Cancellat
5458

5559
await pipeStream.WaitForConnectionAsync(cancellationToken).ConfigureAwait(false);
5660
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
57-
var client = new JsonRpcClient(pipeStream, pipeStream);
61+
var client = new JsonRpcClient(PipeReader.Create(pipeStream), PipeWriter.Create(pipeStream));
5862

5963
return new BicepClient(pipeStream, cliProcess, client, cts);
6064
}
6165

62-
private static Task CreateBackgroundTask(NamedPipeServerStream pipeStream, Process cliProcess, JsonRpcClient jsonRpcClient, CancellationToken cancellationToken)
63-
=> Task.Run(async () =>
64-
{
65-
try
66-
{
67-
await jsonRpcClient.ReadLoop(cancellationToken).ConfigureAwait(false);
68-
}
69-
catch (OperationCanceledException)
70-
{
71-
// Expected when disposing
72-
}
73-
finally
74-
{
75-
pipeStream.Dispose();
76-
cliProcess.Dispose();
77-
}
78-
}, cancellationToken);
79-
8066
/// <inheritdoc/>
8167
public Task<CompileResponse> Compile(CompileRequest request, CancellationToken cancellationToken)
8268
=> jsonRpcClient.SendRequest<CompileRequest, CompileResponse>("bicep/compile", request, cancellationToken);
@@ -114,8 +100,15 @@ public async Task<GetSnapshotResponse> GetSnapshot(GetSnapshotRequest request, C
114100
/// <inheritdoc/>
115101
public async Task<string> GetVersion(CancellationToken cancellationToken)
116102
{
117-
var response = await jsonRpcClient.SendRequest<VersionRequest, VersionResponse>("bicep/version", new(), cancellationToken).ConfigureAwait(false);
118-
return response.Version;
103+
// This method is called frequently for version checks - cache the result
104+
if (cachedVersion is null)
105+
{
106+
var response = await jsonRpcClient.SendRequest<VersionRequest, VersionResponse>("bicep/version", new(), cancellationToken).ConfigureAwait(false);
107+
// No locks needed here, since replacing a reference is atomic
108+
cachedVersion = response.Version;
109+
}
110+
111+
return cachedVersion;
119112
}
120113

121114
private async Task EnsureMinimumVersion(string requiredVersion, string operationName, CancellationToken cancellationToken)
@@ -130,9 +123,15 @@ private async Task EnsureMinimumVersion(string requiredVersion, string operation
130123
public void Dispose()
131124
{
132125
cts.Cancel();
133-
if (!cliProcess.HasExited)
126+
jsonRpcClient.Dispose();
127+
try
128+
{
129+
cliProcess.Kill();
130+
// wait for 10 seconds for the process to exit
131+
cliProcess.WaitForExit(10000);
132+
}
133+
catch (InvalidOperationException)
134134
{
135-
cliProcess.WaitForExit();
136135
}
137136
}
138137
}
Lines changed: 133 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4+
using System;
5+
using System.Buffers;
46
using System.Collections.Concurrent;
5-
using System.Net.Security;
7+
using System.IO;
8+
using System.IO.Pipelines;
9+
using System.Linq;
610
using System.Text;
711
using System.Text.Encodings.Web;
812
using System.Text.Json;
913
using System.Text.Json.Nodes;
1014
using System.Text.Json.Serialization;
15+
using System.Threading;
16+
using System.Threading.Tasks;
1117

1218
namespace Bicep.RpcClient.JsonRpc;
1319

14-
internal class JsonRpcClient(Stream reader, Stream writer) : IDisposable
20+
internal class JsonRpcClient(PipeReader reader, PipeWriter writer) : IDisposable
1521
{
1622
private record JsonRpcRequest<T>(
1723
string Jsonrpc,
@@ -20,7 +26,6 @@ private record JsonRpcRequest<T>(
2026
int Id);
2127

2228
private record MinimalJsonRpcResponse(
23-
string Jsonrpc,
2429
int Id);
2530

2631
private record JsonRpcResponse<T>(
@@ -34,10 +39,9 @@ private record JsonRpcError(
3439
string Message,
3540
JsonNode? Data);
3641

37-
private readonly byte[] terminator = "\r\n\r\n"u8.ToArray();
3842
private int nextId = 0;
3943
private readonly SemaphoreSlim writeSemaphore = new(1, 1);
40-
private readonly ConcurrentDictionary<int, TaskCompletionSource<string>> pendingResponses = new();
44+
private readonly ConcurrentDictionary<int, TaskCompletionSource<byte[]>> pendingResponses = new();
4145

4246
private readonly JsonSerializerOptions jsonSerializerOptions = new()
4347
{
@@ -55,20 +59,20 @@ public async Task<TResponse> SendRequest<TRequest, TResponse>(string method, TRe
5559
var requestContent = JsonSerializer.Serialize(jsonRpcRequest, jsonSerializerOptions);
5660
var requestLength = Encoding.UTF8.GetByteCount(requestContent);
5761
var rawRequest = $"Content-Length: {requestLength}\r\n\r\n{requestContent}";
58-
var requestBytes = Encoding.UTF8.GetBytes(rawRequest);
5962

6063
await writeSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
6164
try
6265
{
63-
await writer.WriteAsync(requestBytes, 0, requestBytes.Length, cancellationToken).ConfigureAwait(false);
66+
var requestBytes = Encoding.UTF8.GetBytes(rawRequest).AsMemory();
67+
await writer.WriteAsync(requestBytes, cancellationToken).ConfigureAwait(false);
6468
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
6569
}
6670
finally
6771
{
6872
writeSemaphore.Release();
6973
}
7074

71-
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
75+
var tcs = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
7276
if (!pendingResponses.TryAdd(currentId, tcs))
7377
{
7478
throw new InvalidOperationException($"A request with ID {currentId} is already pending.");
@@ -87,13 +91,34 @@ public async Task<TResponse> SendRequest<TRequest, TResponse>(string method, TRe
8791
return jsonRpcResponse.Result;
8892
}
8993

90-
public async Task ReadLoop(CancellationToken cancellationToken)
94+
public Task Listen(Action onComplete, CancellationToken cancellationToken)
95+
=> Task.Run(async () =>
96+
{
97+
try
98+
{
99+
await ListenInternal(cancellationToken).ConfigureAwait(false);
100+
}
101+
catch (OperationCanceledException)
102+
{
103+
// Expected when disposing
104+
}
105+
finally
106+
{
107+
onComplete();
108+
}
109+
}, cancellationToken);
110+
111+
private async Task ListenInternal(CancellationToken cancellationToken)
91112
{
92113
while (true)
93114
{
94115
try
95116
{
96117
var message = await ReadMessage(cancellationToken).ConfigureAwait(false);
118+
if (message is null)
119+
{
120+
return;
121+
}
97122

98123
var response = JsonSerializer.Deserialize<MinimalJsonRpcResponse>(message, jsonSerializerOptions)
99124
?? throw new InvalidOperationException("Failed to deserialize JSON-RPC response");
@@ -105,77 +130,134 @@ public async Task ReadLoop(CancellationToken cancellationToken)
105130
}
106131
catch (Exception) when (cancellationToken.IsCancellationRequested)
107132
{
108-
reader.Dispose();
109-
writer.Dispose();
133+
await reader.CompleteAsync().ConfigureAwait(false);
134+
await writer.CompleteAsync().ConfigureAwait(false);
110135
break;
111136
}
112137
}
113138
}
114139

115-
private async Task<string> ReadUntilTerminator(CancellationToken cancellationToken)
116-
{
117-
using var outputStream = new MemoryStream();
118-
var patternIndex = 0;
119-
var byteBuffer = new byte[1];
140+
private record Headers(
141+
int ContentLength);
120142

143+
private async Task<Headers?> ReadHeaders(CancellationToken cancellationToken)
144+
{
145+
int? contentLength = null;
121146
while (true)
122147
{
123-
await ReadExactly(byteBuffer, byteBuffer.Length, cancellationToken).ConfigureAwait(false);
148+
var readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
124149

125-
await outputStream.WriteAsync(byteBuffer, 0, byteBuffer.Length, cancellationToken).ConfigureAwait(false);
126-
patternIndex = terminator[patternIndex] == byteBuffer[0] ? patternIndex + 1 : 0;
127-
if (patternIndex == terminator.Length)
150+
if (readResult.Buffer.IsEmpty && readResult.IsCompleted)
128151
{
129-
outputStream.Position = 0;
130-
outputStream.SetLength(outputStream.Length - terminator.Length);
131-
// return stream as string
132-
return Encoding.UTF8.GetString(outputStream.ToArray());
152+
return null; // remote end disconnected at a reasonable place.
153+
}
154+
155+
var lf = readResult.Buffer.PositionOf((byte)'\n');
156+
if (!lf.HasValue)
157+
{
158+
if (readResult.IsCompleted)
159+
{
160+
throw new EndOfStreamException();
161+
}
162+
163+
// Indicate that we can't find what we're looking for and read again.
164+
reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
165+
continue;
166+
}
167+
168+
var line = readResult.Buffer.Slice(0, lf.Value);
169+
170+
// Verify the line ends with an \r (that precedes the \n we already found)
171+
var cr = line.PositionOf((byte)'\r');
172+
if (!cr.HasValue || !line.GetPosition(1, cr.Value).Equals(lf))
173+
{
174+
throw new InvalidOperationException("Header does not end with expected \r\n character sequence");
175+
}
176+
177+
// Trim off the \r now that we confirmed it was there.
178+
line = line.Slice(0, line.Length - 1);
179+
180+
if (line.Length > 0)
181+
{
182+
var lineText = Encoding.UTF8.GetString(line.ToArray());
183+
var split = lineText.Split([':'], 2);
184+
if (split.Length != 2)
185+
{
186+
throw new InvalidOperationException("Colon not found in header.");
187+
}
188+
189+
var headerName = split[0].Trim();
190+
var headerValue = split[1].Trim();
191+
192+
if (headerName == "Content-Length")
193+
{
194+
contentLength = int.Parse(headerValue);
195+
}
196+
}
197+
198+
// Advance to the next line.
199+
reader.AdvanceTo(readResult.Buffer.GetPosition(1, lf.Value));
200+
201+
if (line.Length == 0)
202+
{
203+
// We found the empty line that constitutes the end of the HTTP headers.
204+
break;
133205
}
134206
}
135-
}
136207

137-
private async Task<string> ReadContent(int length, CancellationToken cancellationToken)
138-
{
139-
var byteBuffer = new byte[length];
140-
await ReadExactly(byteBuffer, length, cancellationToken).ConfigureAwait(false);
208+
if (!contentLength.HasValue)
209+
{
210+
throw new InvalidOperationException("Failed to obtain Content-Length header");
211+
}
141212

142-
return Encoding.UTF8.GetString(byteBuffer);
213+
return new(contentLength.Value);
143214
}
144215

145-
private async Task<string> ReadMessage(CancellationToken cancellationToken)
216+
protected async ValueTask<ReadResult> ReadAtLeastAsync(int requiredBytes, bool allowEmpty, CancellationToken cancellationToken)
146217
{
147-
var header = await ReadUntilTerminator(cancellationToken).ConfigureAwait(false);
148-
var parsed = header.Split(':').Select(x => x.Trim()).ToArray();
218+
var readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
219+
while (readResult.Buffer.Length < requiredBytes && !readResult.IsCompleted && !readResult.IsCanceled)
220+
{
221+
reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.End);
222+
readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
223+
}
149224

150-
if (parsed.Length != 2 ||
151-
!parsed[0].Equals("Content-Length", StringComparison.OrdinalIgnoreCase) ||
152-
!int.TryParse(parsed[1], out var contentLength) ||
153-
contentLength <= 0)
225+
if (allowEmpty && readResult.Buffer.Length == 0)
154226
{
155-
throw new InvalidOperationException($"Invalid header: {header}");
227+
return readResult;
156228
}
157229

158-
return await ReadContent(contentLength, cancellationToken).ConfigureAwait(false);
230+
if (readResult.Buffer.Length < requiredBytes)
231+
{
232+
throw readResult.IsCompleted ? new EndOfStreamException() :
233+
readResult.IsCanceled ? new OperationCanceledException() :
234+
throw new InvalidOperationException(); // should be unreachable
235+
}
236+
237+
return readResult;
159238
}
160239

161-
private async ValueTask ReadExactly(byte[] buffer, int minimumBytes, CancellationToken cancellationToken)
240+
private async Task<byte[]?> ReadMessage(CancellationToken cancellationToken)
162241
{
163-
var totalRead = 0;
164-
while (totalRead < minimumBytes)
242+
var headers = await ReadHeaders(cancellationToken).ConfigureAwait(false);
243+
if (headers is null)
165244
{
166-
var read = await reader.ReadAsync(buffer, totalRead, minimumBytes - totalRead, cancellationToken).ConfigureAwait(false);
167-
if (read == 0)
168-
{
169-
throw new EndOfStreamException("Stream closed before reading expected number of bytes");
170-
}
171-
172-
totalRead += read;
245+
return null;
173246
}
247+
248+
var readResult = await ReadAtLeastAsync(headers.ContentLength, allowEmpty: false, cancellationToken).ConfigureAwait(false);
249+
250+
var contentBuffer = readResult.Buffer.Slice(0, headers.ContentLength);
251+
var output = contentBuffer.ToArray();
252+
253+
reader.AdvanceTo(contentBuffer.End);
254+
255+
return output;
174256
}
175257

176258
public void Dispose()
177259
{
178-
writer.Dispose();
179-
reader.Dispose();
260+
writer.Complete();
261+
reader.Complete();
180262
}
181263
}

0 commit comments

Comments
 (0)