This commit is contained in:
Samson Amaugo 2024-06-26 16:50:33 +01:00
parent 38b99d98f4
commit 8f54e0b69a
6 changed files with 234 additions and 24 deletions

View File

@ -1,2 +1,82 @@
// See https://aka.ms/new-console-template for more information using Microsoft.Extensions.DependencyInjection;
Console.WriteLine("Hello, World!"); using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Runtime;
using Provider;
using StackExchange.Redis;
using IHost host = new HostBuilder()
.UseOrleansClient(clientBuilder =>
{
clientBuilder.Services.AddSingleton<IDatabase>(sp =>
{
IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase();
return db;
});
clientBuilder.UseLocalhostClustering();
clientBuilder.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null);
clientBuilder.ConfigureServices(services =>
{
services.AddOptions<HashRingStreamQueueMapperOptions>("RedisStream")
.Configure(options =>
{
options.TotalQueueCount = 8;
});
});
})
.Build();
await host.StartAsync();
IClusterClient client = host.Services.GetRequiredService<IClusterClient>();
ILogger<Program> logger = host.Services.GetRequiredService<ILogger<Program>>();
var streamProvider = client.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("numbergenerator", "consecutive");
var stream = streamProvider.GetStream<int>(streamId);
var task = Task.Run(async () =>
{
var num = 0;
while (true)
{
logger.LogInformation("Sending number {Number}", num);
Console.WriteLine("Sending number {0}", num);
//await stream.OnNextAsync(num++);
await stream.OnCompletedAsync();
if (num == 20)
{
break;
}
await Task.Delay(1000);
}
await stream.OnCompletedAsync();
});
var streamId2 = StreamId.Create("stringgenerator", "random");
var stream2 = streamProvider.GetStream<string>(streamId2);
List<string> strings = new() { "one", "two", "three", "four", "five" };
var task2 = Task.Run(async () =>
{
var num = 0;
while (true)
{
var str = strings[num % strings.Count];
logger.LogInformation("Sending string {String}", str);
Console.WriteLine("Sending string {0}", str);
await stream2.OnNextAsync(str);
num++;
if (num == 10)
{
break;
}
}
await stream2.OnCompletedAsync();
Console.WriteLine("Stream completed");
});
Console.ReadLine();

View File

@ -8,6 +8,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" /> <PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
<PackageReference Include="Microsoft.Orleans.Sdk" Version="8.1.0" />
<PackageReference Include="StackExchange.Redis" Version="2.7.33" /> <PackageReference Include="StackExchange.Redis" Version="2.7.33" />
</ItemGroup> </ItemGroup>

View File

@ -1,4 +1,6 @@
using Microsoft.Extensions.Logging; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Configuration; using Orleans.Configuration;
using Orleans.Providers.Streams.Common; using Orleans.Providers.Streams.Common;
using Orleans.Streams; using Orleans.Streams;
@ -30,9 +32,21 @@ namespace Provider
_simpleQueueCacheOptions = simpleQueueCacheOptions; _simpleQueueCacheOptions = simpleQueueCacheOptions;
_hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName); _hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName);
} }
public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName)
{
var database = provider.GetRequiredService<IDatabase>();
var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var simpleQueueCacheOptions = provider.GetOptionsByName<SimpleQueueCacheOptions>(providerName);
var hashRingStreamQueueMapperOptions = provider.GetOptionsByName<HashRingStreamQueueMapperOptions>(providerName);
var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger<RedisStreamFailureHandler>());
return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions);
}
public Task<IQueueAdapter> CreateAdapter() public Task<IQueueAdapter> CreateAdapter()
{ {
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory); return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory));
} }
public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId) public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)

View File

@ -1,11 +1,7 @@
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Orleans.Streams; using Orleans.Streams;
using StackExchange.Redis; using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Provider namespace Provider
{ {
@ -16,23 +12,23 @@ namespace Provider
private readonly ILogger<RedisStreamReceiver> _logger; private readonly ILogger<RedisStreamReceiver> _logger;
private bool _checkBacklog = true; private bool _checkBacklog = true;
private string _lastId = "0"; private string _lastId = "0";
private Task? pendingTasks;
public RedisStreamReceiver(QueueId queueId, IDatabase database, Microsoft.Extensions.Logging.ILogger<RedisStreamReceiver> logger) public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger<RedisStreamReceiver> logger)
{ {
_queueId = queueId; _queueId = queueId;
_database = database; _database = database;
_logger = logger; _logger = logger;
} }
public async Task<IList<IBatchContainer>> GetQueueMessagesAsync(int maxCount) public async Task<IList<IBatchContainer>?> GetQueueMessagesAsync(int maxCount)
{ {
try try
{ {
if (_checkBacklog) maxCount = 0; var events = _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount);
var events = await _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount); pendingTasks = events;
_lastId = ">"; _lastId = ">";
_checkBacklog = false; var batches = (await events).Select(e => new RedisStreamBatchContainer(e)).ToList<IBatchContainer>();
var batches = events.Select(e => new RedisStreamBatchContainer(e)).ToList<IBatchContainer>();
return batches; return batches;
} }
catch (Exception ex) catch (Exception ex)
@ -40,6 +36,11 @@ namespace Provider
_logger.LogError(ex, "Error reading from stream {QueueId}", _queueId); _logger.LogError(ex, "Error reading from stream {QueueId}", _queueId);
return default; return default;
} }
finally
{
pendingTasks = null;
}
} }
@ -50,7 +51,7 @@ namespace Provider
using (var cts = new CancellationTokenSource(timeout)) using (var cts = new CancellationTokenSource(timeout))
{ {
var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true); var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true);
await task.WaitAsync(timeout, cts.Token); await task.WaitAsync(timeout, cts.Token);
} }
} }
catch (Exception ex) when (ex.Message.Contains("name already exists")) { } catch (Exception ex) when (ex.Message.Contains("name already exists")) { }
@ -69,7 +70,9 @@ namespace Provider
var container = message as RedisStreamBatchContainer; var container = message as RedisStreamBatchContainer;
if (container != null) if (container != null)
{ {
await _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id); var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
pendingTasks = ack;
await ack;
} }
} }
} }
@ -77,11 +80,23 @@ namespace Provider
{ {
_logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId); _logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId);
} }
finally
{
pendingTasks = null;
}
} }
public Task Shutdown(TimeSpan timeout) public async Task Shutdown(TimeSpan timeout)
{ {
// implement any shut using (var cts = new CancellationTokenSource(timeout))
{
if (pendingTasks is not null)
{
await pendingTasks.WaitAsync(timeout, cts.Token);
}
}
_logger.LogInformation("Shutting down stream {QueueId}", _queueId);
} }
} }
} }

View File

@ -13,9 +13,9 @@ namespace Provider
internal class RedisStreamSequenceToken : StreamSequenceToken internal class RedisStreamSequenceToken : StreamSequenceToken
{ {
[Id(0)] [Id(0)]
public override long SequenceNumber { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } public override long SequenceNumber { get; protected set; }
[Id(1)] [Id(1)]
public override int EventIndex { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } public override int EventIndex { get ; protected set ; }
public RedisStreamSequenceToken(RedisValue id) public RedisStreamSequenceToken(RedisValue id)
{ {

View File

@ -3,6 +3,10 @@
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Streams;
using Provider;
using StackExchange.Redis; using StackExchange.Redis;
var builder = new HostBuilder() var builder = new HostBuilder()
@ -15,9 +19,105 @@ var builder = new HostBuilder()
}); });
silo.ConfigureLogging(logging => logging.AddConsole()); silo.ConfigureLogging(logging => logging.AddConsole());
silo.AddMemoryGrainStorage("PubSubStore"); silo.AddMemoryGrainStorage("PubSubStore");
silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null);
silo.AddMemoryGrainStorageAsDefault(); silo.AddMemoryGrainStorageAsDefault();
}).UseConsoleLifetime(); }).UseConsoleLifetime();
builder.ConfigureServices(services =>
{
services.AddOptions<HashRingStreamQueueMapperOptions>("RedisStream")
.Configure(options =>
{
options.TotalQueueCount = 8;
});
services.AddOptions<SimpleQueueCacheOptions>("RedisStream");
});
using IHost host = builder.Build(); using IHost host = builder.Build();
await host.RunAsync(); await host.RunAsync();
[ImplicitStreamSubscription("numbergenerator")]
public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver<int>
{
private ILogger<NumberGeneratorGrain> _logger { get; }
public NumberGeneratorGrain(ILogger<NumberGeneratorGrain> logger)
{
_logger = logger;
}
public override async Task OnActivateAsync(CancellationToken ct)
{
var streamProvider = this.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("numbergenerator", this.GetPrimaryKeyString());
_logger.LogInformation("Subscribing to stream {StreamId}", streamId);
_logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<int>(streamId);
await stream.SubscribeAsync(this);
await base.OnActivateAsync(ct);
}
public Task OnCompletedAsync()
{
_logger.LogInformation("Stream completed");
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
_logger.LogError("Error: {Error}", ex.Message);
return Task.CompletedTask;
}
public async Task OnNextAsync(int item, StreamSequenceToken? token = null)
{
_logger.LogInformation("Received number {Number}", item);
await Task.Delay(2000);
}
}
[ImplicitStreamSubscription("stringgenerator")]
public class StringGeneratorGrain : Grain, IStringGeneratorGrain, IAsyncObserver<string>
{
private ILogger<StringGeneratorGrain> _logger { get; }
public StringGeneratorGrain(ILogger<StringGeneratorGrain> logger)
{
_logger = logger;
}
public override async Task OnActivateAsync(CancellationToken ct)
{
var streamProvider = this.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("stringgenerator", this.GetPrimaryKeyString());
_logger.LogInformation("Subscribing to stream {StreamId}", streamId);
_logger.LogInformation("Grain id is {Id}", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<string>(streamId);
await stream.SubscribeAsync(this);
await base.OnActivateAsync(ct);
}
public Task OnCompletedAsync()
{
_logger.LogInformation("Stream completed");
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
_logger.LogError("Error: {Error}", ex.Message);
return Task.CompletedTask;
}
public async Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation("Received string {Number}", item);
await Task.Delay(2000);
await this.OnCompletedAsync();
}
}
public interface INumberGeneratorGrain : IGrainWithStringKey { }
public interface IStringGeneratorGrain : IGrainWithStringKey { }