diff --git a/Client/Program.cs b/Client/Program.cs index e1121cd..d45c881 100644 --- a/Client/Program.cs +++ b/Client/Program.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; +using Microsoft.CodeAnalysis.Operations; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans.Configuration; @@ -27,8 +28,8 @@ using IHost host = new HostBuilder() }); }) + .ConfigureLogging(logging => logging.AddConsole()) .Build(); - await host.StartAsync(); IClusterClient client = host.Services.GetRequiredService(); @@ -44,39 +45,15 @@ var task = Task.Run(async () => while (true) { logger.LogInformation("Sending number {Number}", num); - Console.WriteLine("Sending number {0}", num); - //await stream.OnNextAsync(num++); - await stream.OnCompletedAsync(); + await stream.OnNextAsync(num++); + if (num == 20) { break; } await Task.Delay(1000); } - await stream.OnCompletedAsync(); }); -var streamId2 = StreamId.Create("stringgenerator", "random"); -var stream2 = streamProvider.GetStream(streamId2); -List strings = new() { "one", "two", "three", "four", "five" }; -var task2 = Task.Run(async () => -{ - var num = 0; - while (true) - { - var str = strings[num % strings.Count]; - logger.LogInformation("Sending string {String}", str); - Console.WriteLine("Sending string {0}", str); - await stream2.OnNextAsync(str); - num++; - if (num == 10) - { - break; - } - - } - await stream2.OnCompletedAsync(); - Console.WriteLine("Stream completed"); -}); Console.ReadLine(); \ No newline at end of file diff --git a/Server/Program.cs b/Server/Program.cs index f683484..04ba6aa 100644 --- a/Server/Program.cs +++ b/Server/Program.cs @@ -16,6 +16,11 @@ var builder = new HostBuilder() silo.Services.AddSingleton(sp => { return ConnectionMultiplexer.Connect("localhost").GetDatabase(); + }) + .Configure(options => + { + options.CollectionQuantum = TimeSpan.FromSeconds(5); + options.CollectionAge = TimeSpan.FromSeconds(15); }); silo.ConfigureLogging(logging => logging.AddConsole()); silo.AddMemoryGrainStorage("PubSubStore"); @@ -49,16 +54,12 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver { var streamProvider = this.GetStreamProvider("RedisStream"); var streamId = StreamId.Create("numbergenerator", this.GetPrimaryKeyString()); - - _logger.LogInformation("Subscribing to stream {StreamId}", streamId); - _logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString()); var stream = streamProvider.GetStream(streamId); await stream.SubscribeAsync(this); await base.OnActivateAsync(ct); } public Task OnCompletedAsync() { - _logger.LogInformation("Stream completed"); return Task.CompletedTask; } @@ -72,52 +73,10 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver { _logger.LogInformation("Received number {Number}", item); await Task.Delay(2000); - + } } -[ImplicitStreamSubscription("stringgenerator")] -public class StringGeneratorGrain : Grain, IStringGeneratorGrain, IAsyncObserver -{ - private ILogger _logger { get; } - public StringGeneratorGrain(ILogger logger) - { - _logger = logger; - } - - public override async Task OnActivateAsync(CancellationToken ct) - { - var streamProvider = this.GetStreamProvider("RedisStream"); - var streamId = StreamId.Create("stringgenerator", this.GetPrimaryKeyString()); - - _logger.LogInformation("Subscribing to stream {StreamId}", streamId); - _logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString()); - var stream = streamProvider.GetStream(streamId); - await stream.SubscribeAsync(this); - await base.OnActivateAsync(ct); - - } - public Task OnCompletedAsync() - { - _logger.LogInformation("Stream completed"); - return Task.CompletedTask; - } - - public Task OnErrorAsync(Exception ex) - { - _logger.LogError("Error: {Error}", ex.Message); - return Task.CompletedTask; - } - - public async Task OnNextAsync(string item, StreamSequenceToken? token = null) - { - _logger.LogInformation("Received string {Number}", item); - - await Task.Delay(2000); - await this.OnCompletedAsync(); - } -} public interface INumberGeneratorGrain : IGrainWithStringKey { } -public interface IStringGeneratorGrain : IGrainWithStringKey { } \ No newline at end of file