Skip to content

Commit 0d1194b

Browse files
sergeimammtmk
andauthored
Added verification of the consumer sequence number for pull ordered consumers (#981)
* Added verification of the consumer sequence number for pull ordered consumer * Fix an ordered consumer retry logic for MaxBytes mode * Add tests minor fixes * Fix msg count, size check * Fix test --------- Co-authored-by: Ziya Suzen <ziya@suzen.net>
1 parent 2b763d4 commit 0d1194b

File tree

3 files changed

+308
-30
lines changed

3 files changed

+308
-30
lines changed

src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
7979
consumerName = consumer.Info.Name;
8080
_logger.LogInformation(NatsJSLogEvents.NewConsumer, "Created {ConsumerName} with sequence {Seq}", consumerName, seq);
8181

82+
ulong cseq = 0;
8283
NatsJSProtocolException? protocolException = default;
8384

8485
await using (var cc = await consumer.OrderedConsumeInternalAsync(serializer, opts, cancellationToken))
@@ -126,7 +127,15 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
126127
if (msg.Metadata is not { } metadata)
127128
continue;
128129

130+
var expected = cseq + 1;
131+
if (metadata.Sequence.Consumer != expected)
132+
{
133+
_logger.LogWarning(NatsJSLogEvents.Retry, "Consumer sequence mismatch. Expected {Expected}, was {SequenceConsumer}. Retrying...", expected, metadata.Sequence.Consumer);
134+
goto CONSUME_LOOP;
135+
}
136+
129137
seq = metadata.Sequence.Stream;
138+
cseq = metadata.Sequence.Consumer;
130139

131140
yield return msg;
132141
}
@@ -180,23 +189,65 @@ public async IAsyncEnumerable<NatsJSMsg<T>> FetchAsync<T>(
180189
[EnumeratorCancellation] CancellationToken cancellationToken = default)
181190
{
182191
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, cancellationToken).Token;
192+
var processed = 0;
193+
var bytesProcessed = 0;
183194

184-
var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
185-
_fetchConsumerName = consumer.Info.Name;
186-
187-
await foreach (var msg in consumer.FetchAsync(opts, serializer, cancellationToken))
195+
var retry = 0;
196+
while (!cancellationToken.IsCancellationRequested)
188197
{
189-
if (msg.Metadata is not { } metadata)
190-
continue;
198+
if ((opts.MaxMsgs.HasValue && processed >= opts.MaxMsgs) || (opts.MaxBytes.HasValue && bytesProcessed >= opts.MaxBytes))
199+
yield break;
191200

192-
_fetchSeq = metadata.Sequence.Stream;
193-
yield return msg;
194-
}
201+
var mismatch = false;
202+
ulong cseq = 0;
203+
204+
var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
205+
_fetchConsumerName = consumer.Info.Name;
206+
207+
try
208+
{
209+
var fetchOpts = opts with { MaxMsgs = opts.MaxMsgs - processed, MaxBytes = opts.MaxBytes - bytesProcessed };
210+
211+
await foreach (var msg in consumer.FetchAsync(fetchOpts, serializer, cancellationToken))
212+
{
213+
if (msg.Metadata is not { } metadata)
214+
continue;
215+
216+
var expected = cseq + 1;
217+
if (metadata.Sequence.Consumer != expected)
218+
{
219+
_logger.LogWarning(NatsJSLogEvents.Retry, "Consumer sequence mismatch. Expected {Expected}, was {SequenceConsumer}. Retrying...", expected, metadata.Sequence.Consumer);
220+
mismatch = true;
221+
break;
222+
}
223+
224+
_fetchSeq = metadata.Sequence.Stream;
225+
cseq = metadata.Sequence.Consumer;
226+
227+
processed++;
228+
bytesProcessed += msg.Size;
229+
230+
yield return msg;
231+
}
232+
}
233+
finally
234+
{
235+
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
236+
237+
if (deleted)
238+
_fetchConsumerName = string.Empty;
239+
}
195240

196-
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
241+
if (!mismatch)
242+
yield break;
243+
244+
if (retry == _opts.MaxResetAttempts)
245+
{
246+
throw new NatsJSException("Maximum number of retry attempts reached.");
247+
}
197248

198-
if (deleted)
199-
_fetchConsumerName = string.Empty;
249+
await _context.Connection.Opts.BackoffWithJitterAsync(retry++, cancellationToken);
250+
}
200251
}
201252

202253
/// <inheritdoc />
@@ -206,26 +257,63 @@ public async IAsyncEnumerable<NatsJSMsg<T>> FetchNoWaitAsync<T>(
206257
[EnumeratorCancellation] CancellationToken cancellationToken = default)
207258
{
208259
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, cancellationToken).Token;
260+
var processed = 0;
261+
var bytesProcessed = 0;
209262

210-
var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
211-
_fetchConsumerName = consumer.Info.Name;
212-
try
263+
var retry = 0;
264+
while (!cancellationToken.IsCancellationRequested)
213265
{
214-
await foreach (var msg in consumer.FetchNoWaitAsync(opts, serializer, cancellationToken))
266+
if ((opts.MaxMsgs.HasValue && processed >= opts.MaxMsgs) || (opts.MaxBytes.HasValue && bytesProcessed >= opts.MaxBytes))
267+
yield break;
268+
269+
var mismatch = false;
270+
ulong cseq = 0;
271+
var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken);
272+
_fetchConsumerName = consumer.Info.Name;
273+
274+
try
275+
{
276+
var fetchOpts = opts with { MaxMsgs = opts.MaxMsgs - processed, MaxBytes = opts.MaxBytes - bytesProcessed };
277+
278+
await foreach (var msg in consumer.FetchNoWaitAsync(fetchOpts, serializer, cancellationToken))
279+
{
280+
if (msg.Metadata is not { } metadata)
281+
continue;
282+
283+
var expected = cseq + 1;
284+
if (metadata.Sequence.Consumer != expected)
285+
{
286+
_logger.LogWarning(NatsJSLogEvents.Retry, "Consumer sequence mismatch. Expected {Expected}, was {SequenceConsumer}. Retrying...", expected, metadata.Sequence.Consumer);
287+
mismatch = true;
288+
break;
289+
}
290+
291+
_fetchSeq = metadata.Sequence.Stream;
292+
cseq = metadata.Sequence.Consumer;
293+
294+
processed++;
295+
bytesProcessed += msg.Size;
296+
297+
yield return msg;
298+
}
299+
}
300+
finally
215301
{
216-
if (msg.Metadata is not { } metadata)
217-
continue;
302+
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
218303

219-
_fetchSeq = metadata.Sequence.Stream;
220-
yield return msg;
304+
if (deleted)
305+
_fetchConsumerName = string.Empty;
306+
}
307+
308+
if (!mismatch)
309+
yield break;
310+
311+
if (retry == _opts.MaxResetAttempts)
312+
{
313+
throw new NatsJSException("Maximum number of retry attempts reached.");
221314
}
222-
}
223-
finally
224-
{
225-
var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken);
226315

227-
if (deleted)
228-
_fetchConsumerName = string.Empty;
316+
await _context.Connection.Opts.BackoffWithJitterAsync(retry++, cancellationToken);
229317
}
230318
}
231319

tests/NATS.Client.CoreUnit.Tests/NuidTests.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public class NuidTests
1010
private static readonly Regex NuidRegex = new("[A-z0-9]{22}");
1111

1212
private readonly ITestOutputHelper _outputHelper;
13+
private int _result;
1314

1415
public NuidTests(ITestOutputHelper outputHelper)
1516
{
@@ -119,7 +120,6 @@ public void GetNextNuid_ContainsOnlyValidCharacters_Char()
119120
[Fact]
120121
public void GetNextNuid_PrefixRenewed_Char()
121122
{
122-
var result = false;
123123
var firstNuid = new char[22];
124124
var secondNuid = new char[22];
125125

@@ -129,15 +129,18 @@ public void GetNextNuid_PrefixRenewed_Char()
129129
var maxSequential = 839299365868340224ul - increment - 1;
130130
SetSequentialAndIncrement(maxSequential, increment);
131131

132-
result = Nuid.TryWriteNuid(firstNuid);
133-
result &= Nuid.TryWriteNuid(secondNuid);
132+
if (Nuid.TryWriteNuid(firstNuid))
133+
Interlocked.Increment(ref _result);
134+
135+
if (Nuid.TryWriteNuid(secondNuid))
136+
Interlocked.Increment(ref _result);
134137
});
135138

136139
executionThread.Start();
137140
executionThread.Join(1_000);
138141

139142
// Assert
140-
Assert.True(result);
143+
Assert.Equal(2, Interlocked.CompareExchange(ref _result, 0, 0));
141144
Assert.False(firstNuid.AsSpan(0, 12).SequenceEqual(secondNuid.AsSpan(0, 12)));
142145
}
143146

0 commit comments

Comments
 (0)