Skip to content

Commit b5750e6

Browse files
Add SseFormatter (#109832)
* Add SseFormatter. * Update src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx Co-authored-by: Stephen Toub <stoub@microsoft.com> * Document SseItem exceptions. * Misc improvements and fixes. * Reinstate ordering of parameters in serialization callback. * Add SseItem<T>.ReconnectionInterval. * Address feedback. * Add parser validation for too small or too large retry intervals. * Update src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs Co-authored-by: Stephen Toub <stoub@microsoft.com> * Handle CR line breaks. * Simplify PooledByteBufferWriter. --------- Co-authored-by: Stephen Toub <stoub@microsoft.com>
1 parent 6be24fd commit b5750e6

File tree

15 files changed

+704
-47
lines changed

15 files changed

+704
-47
lines changed

src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,22 @@
66

77
namespace System.Net.ServerSentEvents
88
{
9+
public static partial class SseFormatter
10+
{
11+
public static System.Threading.Tasks.Task WriteAsync(System.Collections.Generic.IAsyncEnumerable<System.Net.ServerSentEvents.SseItem<string>> source, System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
12+
public static System.Threading.Tasks.Task WriteAsync<T>(System.Collections.Generic.IAsyncEnumerable<System.Net.ServerSentEvents.SseItem<T>> source, System.IO.Stream destination, System.Action<System.Net.ServerSentEvents.SseItem<T>, System.Buffers.IBufferWriter<byte>> itemFormatter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
13+
}
914
public delegate T SseItemParser<out T>(string eventType, System.ReadOnlySpan<byte> data);
1015
public readonly partial struct SseItem<T>
1116
{
1217
private readonly T _Data_k__BackingField;
1318
private readonly object _dummy;
1419
private readonly int _dummyPrimitive;
15-
public SseItem(T data, string? eventType) { throw null; }
20+
public SseItem(T data, string? eventType = null) { throw null; }
1621
public T Data { get { throw null; } }
22+
public string? EventId { get { throw null; } init { } }
1723
public string EventType { get { throw null; } }
24+
public System.TimeSpan? ReconnectionInterval { get { throw null; } init { } }
1825
}
1926
public static partial class SseParser
2027
{

src/libraries/System.Net.ServerSentEvents/ref/System.Net.ServerSentEvents.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
<Compile Include="System.Net.ServerSentEvents.cs" />
99
</ItemGroup>
1010

11+
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
12+
<Compile Include="$(CoreLibSharedDir)System\Runtime\CompilerServices\IsExternalInit.cs" />
13+
</ItemGroup>
14+
1115
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
1216
<PackageReference Include="System.Memory" Version="$(SystemMemoryVersion)" />
1317
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\src\Microsoft.Bcl.AsyncInterfaces.csproj" />

src/libraries/System.Net.ServerSentEvents/src/Resources/Strings.resx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,10 @@
120120
<data name="InvalidOperation_EnumerateOnlyOnce" xml:space="preserve">
121121
<value>The enumerable may be enumerated only once.</value>
122122
</data>
123+
<data name="ArgumentException_CannotContainLineBreaks" xml:space="preserve">
124+
<value>The argument cannot contain line breaks.</value>
125+
</data>
126+
<data name="ArgumentException_CannotBeNegative" xml:space="preserve">
127+
<value>The argument cannot be a negative value.</value>
128+
</data>
123129
</root>

src/libraries/System.Net.ServerSentEvents/src/System.Net.ServerSentEvents.csproj

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<TargetFrameworks>$(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum)</TargetFrameworks>
@@ -11,10 +11,19 @@ System.Net.ServerSentEvents.SseParser</PackageDescription>
1111
</PropertyGroup>
1212

1313
<ItemGroup>
14+
<Compile Include="$(CommonPath)System\Net\ArrayBuffer.cs" Link="ProductionCode\Common\System\Net\ArrayBuffer.cs" />
15+
<Compile Include="System\Net\ServerSentEvents\Helpers.cs" />
16+
<Compile Include="System\Net\ServerSentEvents\PooledByteBufferWriter.cs" />
17+
<Compile Include="System\Net\ServerSentEvents\SseFormatter.cs" />
1418
<Compile Include="System\Net\ServerSentEvents\SseParser_1.cs" />
1519
<Compile Include="System\Net\ServerSentEvents\SseItem.cs" />
1620
<Compile Include="System\Net\ServerSentEvents\SseItemParser.cs" />
1721
<Compile Include="System\Net\ServerSentEvents\SseParser.cs" />
22+
<Compile Include="System\Net\ServerSentEvents\ThrowHelper.cs" />
23+
</ItemGroup>
24+
25+
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
26+
<Compile Include="$(CoreLibSharedDir)System\Runtime\CompilerServices\IsExternalInit.cs" />
1827
</ItemGroup>
1928

2029
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Buffers;
5+
using System.Diagnostics;
6+
using System.Globalization;
7+
using System.IO;
8+
using System.Runtime.InteropServices;
9+
using System.Text;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
13+
namespace System.Net.ServerSentEvents
14+
{
15+
internal static class Helpers
16+
{
17+
public static void WriteUtf8Number(this IBufferWriter<byte> writer, long value)
18+
{
19+
#if NET
20+
const int MaxDecimalDigits = 20;
21+
Span<byte> buffer = writer.GetSpan(MaxDecimalDigits);
22+
Debug.Assert(MaxDecimalDigits <= buffer.Length);
23+
24+
bool success = value.TryFormat(buffer, out int bytesWritten, provider: CultureInfo.InvariantCulture);
25+
Debug.Assert(success);
26+
writer.Advance(bytesWritten);
27+
#else
28+
writer.WriteUtf8String(value.ToString(CultureInfo.InvariantCulture));
29+
#endif
30+
}
31+
32+
public static void WriteUtf8String(this IBufferWriter<byte> writer, ReadOnlySpan<byte> value)
33+
{
34+
if (value.IsEmpty)
35+
{
36+
return;
37+
}
38+
39+
Span<byte> buffer = writer.GetSpan(value.Length);
40+
Debug.Assert(value.Length <= buffer.Length);
41+
value.CopyTo(buffer);
42+
writer.Advance(value.Length);
43+
}
44+
45+
public static unsafe void WriteUtf8String(this IBufferWriter<byte> writer, ReadOnlySpan<char> value)
46+
{
47+
if (value.IsEmpty)
48+
{
49+
return;
50+
}
51+
52+
int maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length);
53+
Span<byte> buffer = writer.GetSpan(maxByteCount);
54+
Debug.Assert(maxByteCount <= buffer.Length);
55+
int bytesWritten;
56+
#if NET
57+
bytesWritten = Encoding.UTF8.GetBytes(value, buffer);
58+
#else
59+
fixed (char* chars = value)
60+
fixed (byte* bytes = buffer)
61+
{
62+
bytesWritten = Encoding.UTF8.GetBytes(chars, value.Length, bytes, maxByteCount);
63+
}
64+
#endif
65+
writer.Advance(bytesWritten);
66+
}
67+
68+
public static bool ContainsLineBreaks(this ReadOnlySpan<char> text) =>
69+
text.IndexOfAny('\r', '\n') >= 0;
70+
71+
#if !NET
72+
73+
public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
74+
{
75+
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment))
76+
{
77+
return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken));
78+
}
79+
else
80+
{
81+
return WriteAsyncUsingPooledBuffer(stream, buffer, cancellationToken);
82+
83+
static async ValueTask WriteAsyncUsingPooledBuffer(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
84+
{
85+
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
86+
buffer.Span.CopyTo(sharedBuffer);
87+
try
88+
{
89+
await stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
90+
}
91+
finally
92+
{
93+
ArrayPool<byte>.Shared.Return(sharedBuffer);
94+
}
95+
}
96+
}
97+
}
98+
#endif
99+
}
100+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Buffers;
5+
using System.Diagnostics;
6+
7+
namespace System.Net.ServerSentEvents
8+
{
9+
internal sealed class PooledByteBufferWriter : IBufferWriter<byte>, IDisposable
10+
{
11+
private ArrayBuffer _buffer = new(initialSize: 256, usePool: true);
12+
13+
public void Advance(int count) => _buffer.Commit(count);
14+
15+
public Memory<byte> GetMemory(int sizeHint = 0)
16+
{
17+
_buffer.EnsureAvailableSpace(sizeHint);
18+
return _buffer.AvailableMemory;
19+
}
20+
21+
public Span<byte> GetSpan(int sizeHint = 0)
22+
{
23+
_buffer.EnsureAvailableSpace(sizeHint);
24+
return _buffer.AvailableSpan;
25+
}
26+
27+
public ReadOnlyMemory<byte> WrittenMemory => _buffer.ActiveMemory;
28+
public int Capacity => _buffer.Capacity;
29+
public int WrittenCount => _buffer.ActiveLength;
30+
public void Reset() => _buffer.Discard(_buffer.ActiveLength);
31+
public void Dispose() => _buffer.Dispose();
32+
}
33+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Buffers;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.IO;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
namespace System.Net.ServerSentEvents
12+
{
13+
/// <summary>
14+
/// Provides methods for formatting server-sent events.
15+
/// </summary>
16+
public static class SseFormatter
17+
{
18+
private static readonly byte[] s_newLine = "\n"u8.ToArray();
19+
20+
/// <summary>
21+
/// Writes the <paramref name="source"/> of server-sent events to the <paramref name="destination"/> stream.
22+
/// </summary>
23+
/// <param name="source">The events to write to the stream.</param>
24+
/// <param name="destination">The destination stream to write the events.</param>
25+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that can be used to cancel the write operation.</param>
26+
/// <returns>A task that represents the asynchronous write operation.</returns>
27+
public static Task WriteAsync(IAsyncEnumerable<SseItem<string>> source, Stream destination, CancellationToken cancellationToken = default)
28+
{
29+
if (source is null)
30+
{
31+
ThrowHelper.ThrowArgumentNullException(nameof(source));
32+
}
33+
34+
if (destination is null)
35+
{
36+
ThrowHelper.ThrowArgumentNullException(nameof(destination));
37+
}
38+
39+
return WriteAsyncCore(source, destination, static (item, writer) => writer.WriteUtf8String(item.Data), cancellationToken);
40+
}
41+
42+
/// <summary>
43+
/// Writes the <paramref name="source"/> of server-sent events to the <paramref name="destination"/> stream.
44+
/// </summary>
45+
/// <typeparam name="T">The data type of the event.</typeparam>
46+
/// <param name="source">The events to write to the stream.</param>
47+
/// <param name="destination">The destination stream to write the events.</param>
48+
/// <param name="itemFormatter">The formatter for the data field of given event.</param>
49+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that can be used to cancel the write operation.</param>
50+
/// <returns>A task that represents the asynchronous write operation.</returns>
51+
public static Task WriteAsync<T>(IAsyncEnumerable<SseItem<T>> source, Stream destination, Action<SseItem<T>, IBufferWriter<byte>> itemFormatter, CancellationToken cancellationToken = default)
52+
{
53+
if (source is null)
54+
{
55+
ThrowHelper.ThrowArgumentNullException(nameof(source));
56+
}
57+
58+
if (destination is null)
59+
{
60+
ThrowHelper.ThrowArgumentNullException(nameof(destination));
61+
}
62+
63+
if (itemFormatter is null)
64+
{
65+
ThrowHelper.ThrowArgumentNullException(nameof(itemFormatter));
66+
}
67+
68+
return WriteAsyncCore(source, destination, itemFormatter, cancellationToken);
69+
}
70+
71+
private static async Task WriteAsyncCore<T>(IAsyncEnumerable<SseItem<T>> source, Stream destination, Action<SseItem<T>, IBufferWriter<byte>> itemFormatter, CancellationToken cancellationToken)
72+
{
73+
using PooledByteBufferWriter bufferWriter = new();
74+
using PooledByteBufferWriter userDataBufferWriter = new();
75+
76+
await foreach (SseItem<T> item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
77+
{
78+
itemFormatter(item, userDataBufferWriter);
79+
80+
FormatSseEvent(
81+
bufferWriter,
82+
eventType: item._eventType, // Do not use the public property since it normalizes to "message" if null
83+
data: userDataBufferWriter.WrittenMemory.Span,
84+
eventId: item.EventId,
85+
reconnectionInterval: item.ReconnectionInterval);
86+
87+
await destination.WriteAsync(bufferWriter.WrittenMemory, cancellationToken).ConfigureAwait(false);
88+
89+
userDataBufferWriter.Reset();
90+
bufferWriter.Reset();
91+
}
92+
}
93+
94+
private static void FormatSseEvent(
95+
PooledByteBufferWriter bufferWriter,
96+
string? eventType,
97+
ReadOnlySpan<byte> data,
98+
string? eventId,
99+
TimeSpan? reconnectionInterval)
100+
{
101+
Debug.Assert(bufferWriter.WrittenCount is 0);
102+
103+
if (eventType is not null)
104+
{
105+
Debug.Assert(!eventType.ContainsLineBreaks());
106+
107+
bufferWriter.WriteUtf8String("event: "u8);
108+
bufferWriter.WriteUtf8String(eventType);
109+
bufferWriter.WriteUtf8String(s_newLine);
110+
}
111+
112+
WriteLinesWithPrefix(bufferWriter, prefix: "data: "u8, data);
113+
bufferWriter.Write(s_newLine);
114+
115+
if (eventId is not null)
116+
{
117+
Debug.Assert(!eventId.ContainsLineBreaks());
118+
119+
bufferWriter.WriteUtf8String("id: "u8);
120+
bufferWriter.WriteUtf8String(eventId);
121+
bufferWriter.WriteUtf8String(s_newLine);
122+
}
123+
124+
if (reconnectionInterval is { } retry)
125+
{
126+
Debug.Assert(retry >= TimeSpan.Zero);
127+
128+
bufferWriter.WriteUtf8String("retry: "u8);
129+
bufferWriter.WriteUtf8Number((long)retry.TotalMilliseconds);
130+
bufferWriter.WriteUtf8String(s_newLine);
131+
}
132+
133+
bufferWriter.WriteUtf8String(s_newLine);
134+
}
135+
136+
private static void WriteLinesWithPrefix(PooledByteBufferWriter writer, ReadOnlySpan<byte> prefix, ReadOnlySpan<byte> data)
137+
{
138+
// Writes a potentially multi-line string, prefixing each line with the given prefix.
139+
// Both \n and \r\n sequences are normalized to \n.
140+
141+
while (true)
142+
{
143+
writer.WriteUtf8String(prefix);
144+
145+
int i = data.IndexOfAny((byte)'\r', (byte)'\n');
146+
if (i < 0)
147+
{
148+
writer.WriteUtf8String(data);
149+
return;
150+
}
151+
152+
int lineLength = i;
153+
if (data[i++] == '\r' && i < data.Length && data[i] == '\n')
154+
{
155+
i++;
156+
}
157+
158+
ReadOnlySpan<byte> nextLine = data.Slice(0, lineLength);
159+
data = data.Slice(i);
160+
161+
writer.WriteUtf8String(nextLine);
162+
writer.WriteUtf8String(s_newLine);
163+
}
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)