Skip to content

Commit 77fd7be

Browse files
carlodekcarloMobilesoftdluc
authored
Implement Response Streaming (#726)
## Motivation and Context (Why the change? What's the scenario?) Add option to stream Ask result tokens without waiting for the full answer to be ready. ## High level description (Approach, Design) - New `stream` boolean option for the `Ask` API, false by default. When true, answer tokens are streamed as soon as they are generated by LLMs. - New `MemoryAnswer.StreamState` enum property: `Error`, `Reset`, `Append`, `Last`. - If moderation is enabled, the content is validated at the end. In case of moderation failure, the service returns an answer with `StreamState` = `Reset` and the new content to show to the end user. - Streaming uses SSE message format. - By default, SSE streams end with a `[DONE]` token. This can be disabled via KM settings. - SSE payload is optimized, returning `RelevantSources` only in the first SSE message. --------- Co-authored-by: Carlo <[email protected]> Co-authored-by: Devis Lucato <[email protected]> Co-authored-by: Devis Lucato <[email protected]>
1 parent 53db61a commit 77fd7be

File tree

27 files changed

+841
-153
lines changed

27 files changed

+841
-153
lines changed

.github/_typos.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ extend-exclude = [
1515
"encoder.json",
1616
"appsettings.development.json",
1717
"appsettings.Development.json",
18+
"appsettings.*.json.*",
1819
"AzureAISearchFilteringTest.cs",
1920
"KernelMemory.sln.DotSettings"
2021
]

KernelMemory.sln.DotSettings

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SHA/@EntryIndexedValue">SHA</s:String>
121121
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SK/@EntryIndexedValue">SK</s:String>
122122
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SKHTTP/@EntryIndexedValue">SKHTTP</s:String>
123+
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SSE/@EntryIndexedValue">SSE</s:String>
123124
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SSL/@EntryIndexedValue">SSL</s:String>
124125
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=TTL/@EntryIndexedValue">TTL</s:String>
125126
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=UI/@EntryIndexedValue">UI</s:String>

clients/dotnet/SemanticKernelPlugin/MemoryPlugin.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ public async Task<string> AskAsync(
404404
MemoryAnswer answer = await this._memory.AskAsync(
405405
question: question,
406406
index: index ?? this._defaultIndex,
407+
options: new SearchOptions { Stream = false },
407408
filter: TagsToMemoryFilter(tags ?? this._defaultRetrievalTags),
408409
minRelevance: minRelevance,
409410
cancellationToken: cancellationToken).ConfigureAwait(false);

clients/dotnet/WebClient/MemoryWebClient.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
using System.Linq;
88
using System.Net;
99
using System.Net.Http;
10+
using System.Runtime.CompilerServices;
1011
using System.Text;
1112
using System.Text.Json;
1213
using System.Threading;
1314
using System.Threading.Tasks;
1415
using Microsoft.KernelMemory.Context;
16+
using Microsoft.KernelMemory.HTTP;
1517
using Microsoft.KernelMemory.Internals;
1618

1719
namespace Microsoft.KernelMemory;
@@ -337,28 +339,30 @@ public async Task<SearchResult> SearchAsync(
337339
}
338340

339341
/// <inheritdoc />
340-
public async Task<MemoryAnswer> AskAsync(
342+
public async IAsyncEnumerable<MemoryAnswer> AskStreamingAsync(
341343
string question,
342344
string? index = null,
343345
MemoryFilter? filter = null,
344346
ICollection<MemoryFilter>? filters = null,
345347
double minRelevance = 0,
348+
SearchOptions? options = null,
346349
IContext? context = null,
347-
CancellationToken cancellationToken = default)
350+
[EnumeratorCancellation] CancellationToken cancellationToken = default)
348351
{
349352
if (filter != null)
350353
{
351-
if (filters == null) { filters = []; }
352-
354+
filters ??= [];
353355
filters.Add(filter);
354356
}
355357

358+
var useStreaming = options?.Stream ?? false;
356359
MemoryQuery request = new()
357360
{
358361
Index = index,
359362
Question = question,
360363
Filters = (filters is { Count: > 0 }) ? filters.ToList() : [],
361364
MinRelevance = minRelevance,
365+
Stream = useStreaming,
362366
ContextArguments = (context?.Arguments ?? new Dictionary<string, object?>()).ToDictionary(),
363367
};
364368
using StringContent content = new(JsonSerializer.Serialize(request), Encoding.UTF8, "application/json");
@@ -367,8 +371,20 @@ public async Task<MemoryAnswer> AskAsync(
367371
HttpResponseMessage response = await this._client.PostAsync(url, content, cancellationToken).ConfigureAwait(false);
368372
response.EnsureSuccessStatusCode();
369373

370-
var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
371-
return JsonSerializer.Deserialize<MemoryAnswer>(json, s_caseInsensitiveJsonOptions) ?? new MemoryAnswer();
374+
if (useStreaming)
375+
{
376+
Stream stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
377+
IAsyncEnumerable<MemoryAnswer> answers = SSE.ParseStreamAsync<MemoryAnswer>(stream, cancellationToken);
378+
await foreach (MemoryAnswer answer in answers.ConfigureAwait(false))
379+
{
380+
yield return answer;
381+
}
382+
}
383+
else
384+
{
385+
var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
386+
yield return JsonSerializer.Deserialize<MemoryAnswer>(json, s_caseInsensitiveJsonOptions) ?? new MemoryAnswer();
387+
}
372388
}
373389

374390
#region private

examples/001-dotnet-WebClient/Program.cs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* without extracting memories. */
1616
internal static class Program
1717
{
18-
private static MemoryWebClient? s_memory;
18+
private static MemoryWebClient s_memory = null!;
1919
private static readonly List<string> s_toDelete = [];
2020

2121
// Change this to True and configure Azure Document Intelligence to test OCR and support for images
@@ -55,8 +55,8 @@ public static async Task Main()
5555
// === RETRIEVAL =========
5656
// =======================
5757

58-
await AskSimpleQuestion();
59-
await AskSimpleQuestionAndShowSources();
58+
await AskSimpleQuestionStreamingTheAnswer();
59+
await AskSimpleQuestionStreamingAndShowSources();
6060
await AskQuestionAboutImageContent();
6161
await AskQuestionUsingFilter();
6262
await AskQuestionsFilteringByUser();
@@ -249,16 +249,25 @@ private static async Task StoreJson()
249249
// =======================
250250

251251
// Question without filters
252-
private static async Task AskSimpleQuestion()
252+
private static async Task AskSimpleQuestionStreamingTheAnswer()
253253
{
254254
var question = "What's E = m*c^2?";
255255
Console.WriteLine($"Question: {question}");
256256
Console.WriteLine($"Expected result: formula explanation using the information loaded");
257257

258-
var answer = await s_memory.AskAsync(question, minRelevance: 0.6);
259-
Console.WriteLine($"\nAnswer: {answer.Result}");
258+
Console.Write("\nAnswer: ");
259+
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.6,
260+
options: new SearchOptions { Stream = true });
260261

261-
Console.WriteLine("\n====================================\n");
262+
await foreach (var answer in answerStream)
263+
{
264+
// Print token received by LLM
265+
Console.Write(answer.Result);
266+
// Slow down the stream for demo purpose
267+
await Task.Delay(25);
268+
}
269+
270+
Console.WriteLine("\n\n====================================\n");
262271

263272
/* OUTPUT
264273
@@ -275,17 +284,32 @@ due to the speed of light being a very large number when squared. This concept i
275284
}
276285

277286
// Another question without filters and show sources
278-
private static async Task AskSimpleQuestionAndShowSources()
287+
private static async Task AskSimpleQuestionStreamingAndShowSources()
279288
{
280289
var question = "What's Kernel Memory?";
281290
Console.WriteLine($"Question: {question}");
282291
Console.WriteLine($"Expected result: it should explain what KM project is (not generic kernel memory)");
283292

284-
var answer = await s_memory.AskAsync(question, minRelevance: 0.5);
285-
Console.WriteLine($"\nAnswer: {answer.Result}\n\n Sources:\n");
293+
Console.Write("\nAnswer: ");
294+
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.5,
295+
options: new SearchOptions { Stream = true });
296+
297+
List<Citation> sources = [];
298+
await foreach (var answer in answerStream)
299+
{
300+
// Print token received by LLM
301+
Console.Write(answer.Result);
302+
303+
// Collect sources
304+
sources.AddRange(answer.RelevantSources);
305+
306+
// Slow down the stream for demo purpose
307+
await Task.Delay(5);
308+
}
286309

287310
// Show sources / citations
288-
foreach (var x in answer.RelevantSources)
311+
Console.WriteLine("\n\nSources:\n");
312+
foreach (var x in sources)
289313
{
290314
Console.WriteLine(x.SourceUrl != null
291315
? $" - {x.SourceUrl} [{x.Partitions.First().LastUpdate:D}]"

examples/002-dotnet-Serverless/Program.cs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#pragma warning disable CS8602 // by design
1414
public static class Program
1515
{
16-
private static MemoryServerless? s_memory;
16+
private static MemoryServerless s_memory = null!;
1717
private static readonly List<string> s_toDelete = [];
1818

1919
// Remember to configure Azure Document Intelligence to test OCR and support for images
@@ -107,8 +107,8 @@ public static async Task Main()
107107
// === RETRIEVAL =========
108108
// =======================
109109

110-
await AskSimpleQuestion();
111-
await AskSimpleQuestionAndShowSources();
110+
await AskSimpleQuestionStreamingTheAnswer();
111+
await AskSimpleQuestionStreamingAndShowSources();
112112
await AskQuestionAboutImageContent();
113113
await AskQuestionUsingFilter();
114114
await AskQuestionsFilteringByUser();
@@ -303,16 +303,25 @@ private static async Task StoreJson()
303303
// =======================
304304

305305
// Question without filters
306-
private static async Task AskSimpleQuestion()
306+
private static async Task AskSimpleQuestionStreamingTheAnswer()
307307
{
308308
var question = "What's E = m*c^2?";
309309
Console.WriteLine($"Question: {question}");
310310
Console.WriteLine($"Expected result: formula explanation using the information loaded");
311311

312-
var answer = await s_memory.AskAsync(question, minRelevance: 0.6);
313-
Console.WriteLine($"\nAnswer: {answer.Result}");
312+
Console.Write("\nAnswer: ");
313+
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.6,
314+
options: new SearchOptions { Stream = true });
314315

315-
Console.WriteLine("\n====================================\n");
316+
await foreach (var answer in answerStream)
317+
{
318+
// Print token received by LLM
319+
Console.Write(answer.Result);
320+
// Slow down the stream for demo purpose
321+
await Task.Delay(25);
322+
}
323+
324+
Console.WriteLine("\n\n====================================\n");
316325

317326
/* OUTPUT
318327
@@ -329,17 +338,32 @@ due to the speed of light being a very large number when squared. This concept i
329338
}
330339

331340
// Another question without filters and show sources
332-
private static async Task AskSimpleQuestionAndShowSources()
341+
private static async Task AskSimpleQuestionStreamingAndShowSources()
333342
{
334343
var question = "What's Kernel Memory?";
335344
Console.WriteLine($"Question: {question}");
336345
Console.WriteLine($"Expected result: it should explain what KM project is (not generic kernel memory)");
337346

338-
var answer = await s_memory.AskAsync(question, minRelevance: 0.5);
339-
Console.WriteLine($"\nAnswer: {answer.Result}\n\n Sources:\n");
347+
Console.Write("\nAnswer: ");
348+
var answerStream = s_memory.AskStreamingAsync(question, minRelevance: 0.5,
349+
options: new SearchOptions { Stream = true });
350+
351+
List<Citation> sources = [];
352+
await foreach (var answer in answerStream)
353+
{
354+
// Print token received by LLM
355+
Console.Write(answer.Result);
356+
357+
// Collect sources
358+
sources.AddRange(answer.RelevantSources);
359+
360+
// Slow down the stream for demo purpose
361+
await Task.Delay(5);
362+
}
340363

341364
// Show sources / citations
342-
foreach (var x in answer.RelevantSources)
365+
Console.WriteLine("\n\nSources:\n");
366+
foreach (var x in sources)
343367
{
344368
Console.WriteLine(x.SourceUrl != null
345369
? $" - {x.SourceUrl} [{x.Partitions.First().LastUpdate:D}]"

extensions/AzureAISearch/AzureAISearch.TestApplication/Program.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Microsoft.KernelMemory;
99
using Microsoft.KernelMemory.MemoryDb.AzureAISearch;
1010
using Microsoft.KernelMemory.MemoryStorage;
11+
using AISearchOptions = Azure.Search.Documents.SearchOptions;
1112

1213
namespace Microsoft.AzureAISearch.TestApplication;
1314

@@ -246,7 +247,7 @@ private static async Task<IList<MemoryRecord>> SearchByFieldValueAsync(
246247

247248
fieldValue1 = fieldValue1.Replace("'", "''", StringComparison.Ordinal);
248249
fieldValue2 = fieldValue2.Replace("'", "''", StringComparison.Ordinal);
249-
SearchOptions options = new()
250+
AISearchOptions options = new()
250251
{
251252
Filter = fieldIsCollection
252253
? $"{fieldName}/any(s: s eq '{fieldValue1}') and {fieldName}/any(s: s eq '{fieldValue2}')"

extensions/AzureAISearch/AzureAISearch/AzureAISearchMemory.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using Microsoft.KernelMemory.Diagnostics;
2020
using Microsoft.KernelMemory.DocumentStorage;
2121
using Microsoft.KernelMemory.MemoryStorage;
22+
using AISearchOptions = Azure.Search.Documents.SearchOptions;
2223

2324
namespace Microsoft.KernelMemory.MemoryDb.AzureAISearch;
2425

@@ -184,7 +185,7 @@ await client.IndexDocumentsAsync(
184185
Exhaustive = false
185186
};
186187

187-
SearchOptions options = new()
188+
AISearchOptions options = new()
188189
{
189190
VectorSearch = new()
190191
{
@@ -246,7 +247,7 @@ public async IAsyncEnumerable<MemoryRecord> GetListAsync(
246247
{
247248
var client = this.GetSearchClient(index);
248249

249-
SearchOptions options = this.PrepareSearchOptions(null, withEmbeddings, filters, limit);
250+
AISearchOptions options = this.PrepareSearchOptions(null, withEmbeddings, filters, limit);
250251

251252
Response<SearchResults<AzureAISearchMemoryRecord>>? searchResult = null;
252253
try
@@ -596,13 +597,13 @@ at Azure.Search.Documents.SearchClient.SearchInternal[T](SearchOptions options,
596597
return indexSchema;
597598
}
598599

599-
private SearchOptions PrepareSearchOptions(
600-
SearchOptions? options,
600+
private AISearchOptions PrepareSearchOptions(
601+
AISearchOptions? options,
601602
bool withEmbeddings,
602603
ICollection<MemoryFilter>? filters = null,
603604
int limit = 1)
604605
{
605-
options ??= new SearchOptions();
606+
options ??= new AISearchOptions();
606607

607608
// Define which fields to fetch
608609
options.Select.Add(AzureAISearchMemoryRecord.IdField);

service/Abstractions/Abstractions.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<TargetFramework>net8.0</TargetFramework>
55
<AssemblyName>Microsoft.KernelMemory.Abstractions</AssemblyName>
66
<RootNamespace>Microsoft.KernelMemory</RootNamespace>
7-
<NoWarn>$(NoWarn);KMEXP00;CA1711;CA1724;CS1574;SKEXP0001;</NoWarn>
7+
<NoWarn>$(NoWarn);KMEXP00;SKEXP0001;CA1711;CA1724;CS1574;CA1812;</NoWarn>
88
</PropertyGroup>
99

1010
<ItemGroup>
@@ -13,6 +13,7 @@
1313
<PackageReference Include="Microsoft.Extensions.Hosting" />
1414
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
1515
<PackageReference Include="Microsoft.SemanticKernel.Abstractions" />
16+
<PackageReference Include="System.Linq.Async" />
1617
<PackageReference Include="System.Memory.Data" />
1718
<PackageReference Include="System.Numerics.Tensors" />
1819
</ItemGroup>

0 commit comments

Comments
 (0)