using Microsoft.Extensions.Logging; using Orleans.Runtime; using Orleans.Streams; using StackExchange.Redis; using System.Text.Json; namespace Provider { internal class RedisStreamAdapter : IQueueAdapter { private readonly IDatabase _database; private readonly string _providerName; private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory) { _database = database; _providerName = providerName; _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); } public string Name => _providerName; public bool IsRewindable => false; public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite; public IQueueAdapterReceiver CreateReceiver(QueueId queueId) { return new RedisStreamReceiver(queueId, _database, _loggerFactory.CreateLogger()); } public async Task QueueMessageBatchAsync(StreamId streamId, IEnumerable events, StreamSequenceToken token, Dictionary requestContext) { try { foreach (var @event in events) { NameValueEntry streamNamespaceEntry = new("streamNamespace", streamId.Namespace); NameValueEntry streamKeyEntry = new("streamKey", streamId.Key); NameValueEntry eventTypeEntry = new("eventType", @event!.GetType().Name); NameValueEntry dataEntry = new("data", JsonSerializer.Serialize(@event)); var queueId = _hashRingBasedStreamQueueMapper.GetQueueForStream(streamId); await _database.StreamAddAsync(queueId.ToString(), [streamNamespaceEntry, streamKeyEntry, eventTypeEntry, dataEntry]); } } catch (Exception ex) { _logger.LogError(ex, "Error adding event to stream {StreamId}", streamId); } } } }