This commit is contained in:
Samson Amaugo 2024-06-26 18:01:21 +01:00
parent 8f54e0b69a
commit 9d74f05f79
2 changed files with 11 additions and 75 deletions

View File

@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.CodeAnalysis.Operations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
@ -27,8 +28,8 @@ using IHost host = new HostBuilder()
});
})
.ConfigureLogging(logging => logging.AddConsole())
.Build();
await host.StartAsync();
IClusterClient client = host.Services.GetRequiredService<IClusterClient>();
@ -44,39 +45,15 @@ var task = Task.Run(async () =>
while (true)
{
logger.LogInformation("Sending number {Number}", num);
Console.WriteLine("Sending number {0}", num);
//await stream.OnNextAsync(num++);
await stream.OnCompletedAsync();
await stream.OnNextAsync(num++);
if (num == 20)
{
break;
}
await Task.Delay(1000);
}
await stream.OnCompletedAsync();
});
var streamId2 = StreamId.Create("stringgenerator", "random");
var stream2 = streamProvider.GetStream<string>(streamId2);
List<string> strings = new() { "one", "two", "three", "four", "five" };
var task2 = Task.Run(async () =>
{
var num = 0;
while (true)
{
var str = strings[num % strings.Count];
logger.LogInformation("Sending string {String}", str);
Console.WriteLine("Sending string {0}", str);
await stream2.OnNextAsync(str);
num++;
if (num == 10)
{
break;
}
}
await stream2.OnCompletedAsync();
Console.WriteLine("Stream completed");
});
Console.ReadLine();

View File

@ -16,6 +16,11 @@ var builder = new HostBuilder()
silo.Services.AddSingleton<IDatabase>(sp =>
{
return ConnectionMultiplexer.Connect("localhost").GetDatabase();
})
.Configure<GrainCollectionOptions>(options =>
{
options.CollectionQuantum = TimeSpan.FromSeconds(5);
options.CollectionAge = TimeSpan.FromSeconds(15);
});
silo.ConfigureLogging(logging => logging.AddConsole());
silo.AddMemoryGrainStorage("PubSubStore");
@ -49,16 +54,12 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver
{
var streamProvider = this.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("numbergenerator", this.GetPrimaryKeyString());
_logger.LogInformation("Subscribing to stream {StreamId}", streamId);
_logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<int>(streamId);
await stream.SubscribeAsync(this);
await base.OnActivateAsync(ct);
}
public Task OnCompletedAsync()
{
_logger.LogInformation("Stream completed");
return Task.CompletedTask;
}
@ -76,48 +77,6 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver
}
}
[ImplicitStreamSubscription("stringgenerator")]
public class StringGeneratorGrain : Grain, IStringGeneratorGrain, IAsyncObserver<string>
{
private ILogger<StringGeneratorGrain> _logger { get; }
public StringGeneratorGrain(ILogger<StringGeneratorGrain> logger)
{
_logger = logger;
}
public override async Task OnActivateAsync(CancellationToken ct)
{
var streamProvider = this.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("stringgenerator", this.GetPrimaryKeyString());
_logger.LogInformation("Subscribing to stream {StreamId}", streamId);
_logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<string>(streamId);
await stream.SubscribeAsync(this);
await base.OnActivateAsync(ct);
}
public Task OnCompletedAsync()
{
_logger.LogInformation("Stream completed");
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
_logger.LogError("Error: {Error}", ex.Message);
return Task.CompletedTask;
}
public async Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation("Received string {Number}", item);
await Task.Delay(2000);
await this.OnCompletedAsync();
}
}
public interface INumberGeneratorGrain : IGrainWithStringKey { }
public interface IStringGeneratorGrain : IGrainWithStringKey { }