From 8f54e0b69a28cbb71322e49abd8ec8880aa35485 Mon Sep 17 00:00:00 2001 From: Samson Amaugo Date: Wed, 26 Jun 2024 16:50:33 +0100 Subject: [PATCH] done --- Client/Program.cs | 84 ++++++++++++++++++++- Provider/Provider.csproj | 1 + Provider/RedisStreamFactory.cs | 18 ++++- Provider/RedisStreamReceiver.cs | 45 ++++++++---- Provider/RedisStreamSequenceToken.cs | 4 +- Server/Program.cs | 106 ++++++++++++++++++++++++++- 6 files changed, 234 insertions(+), 24 deletions(-) diff --git a/Client/Program.cs b/Client/Program.cs index 3751555..e1121cd 100644 --- a/Client/Program.cs +++ b/Client/Program.cs @@ -1,2 +1,82 @@ -// See https://aka.ms/new-console-template for more information -Console.WriteLine("Hello, World!"); +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.Runtime; +using Provider; +using StackExchange.Redis; + +using IHost host = new HostBuilder() + .UseOrleansClient(clientBuilder => + { + clientBuilder.Services.AddSingleton(sp => + { + IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase(); + return db; + }); + clientBuilder.UseLocalhostClustering(); + + clientBuilder.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null); + clientBuilder.ConfigureServices(services => + { + services.AddOptions("RedisStream") + .Configure(options => + { + options.TotalQueueCount = 8; + }); + + }); + }) + .Build(); + +await host.StartAsync(); + +IClusterClient client = host.Services.GetRequiredService(); +ILogger logger = host.Services.GetRequiredService>(); +var streamProvider = client.GetStreamProvider("RedisStream"); + +var streamId = StreamId.Create("numbergenerator", "consecutive"); +var stream = streamProvider.GetStream(streamId); + +var task = Task.Run(async () => +{ + var num = 0; + while (true) + { + logger.LogInformation("Sending number {Number}", num); + Console.WriteLine("Sending number {0}", num); + //await stream.OnNextAsync(num++); + await stream.OnCompletedAsync(); + if (num == 20) + { + break; + } + await Task.Delay(1000); + } + await stream.OnCompletedAsync(); +}); + +var streamId2 = StreamId.Create("stringgenerator", "random"); +var stream2 = streamProvider.GetStream(streamId2); +List strings = new() { "one", "two", "three", "four", "five" }; + +var task2 = Task.Run(async () => +{ + var num = 0; + while (true) + { + var str = strings[num % strings.Count]; + logger.LogInformation("Sending string {String}", str); + Console.WriteLine("Sending string {0}", str); + await stream2.OnNextAsync(str); + num++; + if (num == 10) + { + break; + } + + } + await stream2.OnCompletedAsync(); + Console.WriteLine("Stream completed"); +}); +Console.ReadLine(); \ No newline at end of file diff --git a/Provider/Provider.csproj b/Provider/Provider.csproj index 8eec680..bf92fe9 100644 --- a/Provider/Provider.csproj +++ b/Provider/Provider.csproj @@ -8,6 +8,7 @@ + diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs index ee97c48..64ca81d 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Provider/RedisStreamFactory.cs @@ -1,4 +1,6 @@ -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Orleans; using Orleans.Configuration; using Orleans.Providers.Streams.Common; using Orleans.Streams; @@ -30,9 +32,21 @@ namespace Provider _simpleQueueCacheOptions = simpleQueueCacheOptions; _hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName); } + + public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName) + { + var database = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + var simpleQueueCacheOptions = provider.GetOptionsByName(providerName); + var hashRingStreamQueueMapperOptions = provider.GetOptionsByName(providerName); + var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger()); + return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions); + + } + public Task CreateAdapter() { - return Task.FromResult(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory); + return Task.FromResult(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory)); } public Task GetDeliveryFailureHandler(QueueId queueId) diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs index af78981..f5019cd 100644 --- a/Provider/RedisStreamReceiver.cs +++ b/Provider/RedisStreamReceiver.cs @@ -1,11 +1,7 @@ using Microsoft.Extensions.Logging; using Orleans.Streams; using StackExchange.Redis; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; + namespace Provider { @@ -16,23 +12,23 @@ namespace Provider private readonly ILogger _logger; private bool _checkBacklog = true; private string _lastId = "0"; + private Task? pendingTasks; - public RedisStreamReceiver(QueueId queueId, IDatabase database, Microsoft.Extensions.Logging.ILogger logger) + public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger logger) { _queueId = queueId; _database = database; _logger = logger; } - public async Task> GetQueueMessagesAsync(int maxCount) + public async Task?> GetQueueMessagesAsync(int maxCount) { try { - if (_checkBacklog) maxCount = 0; - var events = await _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount); + var events = _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount); + pendingTasks = events; _lastId = ">"; - _checkBacklog = false; - var batches = events.Select(e => new RedisStreamBatchContainer(e)).ToList(); + var batches = (await events).Select(e => new RedisStreamBatchContainer(e)).ToList(); return batches; } catch (Exception ex) @@ -40,6 +36,11 @@ namespace Provider _logger.LogError(ex, "Error reading from stream {QueueId}", _queueId); return default; } + finally + { + pendingTasks = null; + } + } @@ -50,7 +51,7 @@ namespace Provider using (var cts = new CancellationTokenSource(timeout)) { var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true); - await task.WaitAsync(timeout, cts.Token); + await task.WaitAsync(timeout, cts.Token); } } catch (Exception ex) when (ex.Message.Contains("name already exists")) { } @@ -69,7 +70,9 @@ namespace Provider var container = message as RedisStreamBatchContainer; if (container != null) { - await _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id); + var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id); + pendingTasks = ack; + await ack; } } } @@ -77,11 +80,23 @@ namespace Provider { _logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId); } + finally + { + pendingTasks = null; + } } - public Task Shutdown(TimeSpan timeout) + public async Task Shutdown(TimeSpan timeout) { - // implement any shut + using (var cts = new CancellationTokenSource(timeout)) + { + + if (pendingTasks is not null) + { + await pendingTasks.WaitAsync(timeout, cts.Token); + } + } + _logger.LogInformation("Shutting down stream {QueueId}", _queueId); } } } diff --git a/Provider/RedisStreamSequenceToken.cs b/Provider/RedisStreamSequenceToken.cs index 2271b97..46fca61 100644 --- a/Provider/RedisStreamSequenceToken.cs +++ b/Provider/RedisStreamSequenceToken.cs @@ -13,9 +13,9 @@ namespace Provider internal class RedisStreamSequenceToken : StreamSequenceToken { [Id(0)] - public override long SequenceNumber { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } + public override long SequenceNumber { get; protected set; } [Id(1)] - public override int EventIndex { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } + public override int EventIndex { get ; protected set ; } public RedisStreamSequenceToken(RedisValue id) { diff --git a/Server/Program.cs b/Server/Program.cs index 17e3f3b..f683484 100644 --- a/Server/Program.cs +++ b/Server/Program.cs @@ -3,6 +3,10 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.Runtime; +using Orleans.Streams; +using Provider; using StackExchange.Redis; var builder = new HostBuilder() @@ -15,9 +19,105 @@ var builder = new HostBuilder() }); silo.ConfigureLogging(logging => logging.AddConsole()); silo.AddMemoryGrainStorage("PubSubStore"); - + silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null); silo.AddMemoryGrainStorageAsDefault(); }).UseConsoleLifetime(); - +builder.ConfigureServices(services => +{ + services.AddOptions("RedisStream") + .Configure(options => + { + options.TotalQueueCount = 8; + }); + services.AddOptions("RedisStream"); +}); using IHost host = builder.Build(); -await host.RunAsync(); \ No newline at end of file +await host.RunAsync(); + + +[ImplicitStreamSubscription("numbergenerator")] +public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver +{ + private ILogger _logger { get; } + + public NumberGeneratorGrain(ILogger logger) + { + _logger = logger; + } + + public override async Task OnActivateAsync(CancellationToken ct) + { + var streamProvider = this.GetStreamProvider("RedisStream"); + var streamId = StreamId.Create("numbergenerator", this.GetPrimaryKeyString()); + + _logger.LogInformation("Subscribing to stream {StreamId}", streamId); + _logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString()); + var stream = streamProvider.GetStream(streamId); + await stream.SubscribeAsync(this); + await base.OnActivateAsync(ct); + } + public Task OnCompletedAsync() + { + _logger.LogInformation("Stream completed"); + return Task.CompletedTask; + } + + public Task OnErrorAsync(Exception ex) + { + _logger.LogError("Error: {Error}", ex.Message); + return Task.CompletedTask; + } + + public async Task OnNextAsync(int item, StreamSequenceToken? token = null) + { + _logger.LogInformation("Received number {Number}", item); + await Task.Delay(2000); + + } +} + +[ImplicitStreamSubscription("stringgenerator")] +public class StringGeneratorGrain : Grain, IStringGeneratorGrain, IAsyncObserver +{ + private ILogger _logger { get; } + + public StringGeneratorGrain(ILogger logger) + { + _logger = logger; + } + + public override async Task OnActivateAsync(CancellationToken ct) + { + var streamProvider = this.GetStreamProvider("RedisStream"); + var streamId = StreamId.Create("stringgenerator", this.GetPrimaryKeyString()); + + _logger.LogInformation("Subscribing to stream {StreamId}", streamId); + _logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString()); + var stream = streamProvider.GetStream(streamId); + await stream.SubscribeAsync(this); + await base.OnActivateAsync(ct); + + } + public Task OnCompletedAsync() + { + _logger.LogInformation("Stream completed"); + return Task.CompletedTask; + } + + public Task OnErrorAsync(Exception ex) + { + _logger.LogError("Error: {Error}", ex.Message); + return Task.CompletedTask; + } + + public async Task OnNextAsync(string item, StreamSequenceToken? token = null) + { + _logger.LogInformation("Received string {Number}", item); + + await Task.Delay(2000); + await this.OnCompletedAsync(); + } +} + +public interface INumberGeneratorGrain : IGrainWithStringKey { } +public interface IStringGeneratorGrain : IGrainWithStringKey { } \ No newline at end of file