Samson Amaugo 460a2d7b76 done
2024-06-26 21:34:43 +01:00

58 lines
2.4 KiB
C#

using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Streams;
using StackExchange.Redis;
using System.Text.Json;
namespace Provider
{
public class RedisStreamAdapter : IQueueAdapter
{
private readonly IDatabase _database;
private readonly string _providerName;
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<RedisStreamAdapter> _logger;
public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory)
{
_database = database;
_providerName = providerName;
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<RedisStreamAdapter>();
}
public string Name => _providerName;
public bool IsRewindable => false;
public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;
public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
return new RedisStreamReceiver(queueId, _database, _loggerFactory.CreateLogger<RedisStreamReceiver>());
}
public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
try
{
foreach (var @event in events)
{
NameValueEntry streamNamespaceEntry = new("streamNamespace", streamId.Namespace);
NameValueEntry streamKeyEntry = new("streamKey", streamId.Key);
NameValueEntry eventTypeEntry = new("eventType", @event!.GetType().Name);
NameValueEntry dataEntry = new("data", JsonSerializer.Serialize(@event));
var queueId = _hashRingBasedStreamQueueMapper.GetQueueForStream(streamId);
await _database.StreamAddAsync(queueId.ToString(), [streamNamespaceEntry, streamKeyEntry, eventTypeEntry, dataEntry]);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error adding event to stream {StreamId}", streamId);
}
}
}
}