diff --git a/Provider/RedisStreamAdapter.cs b/Provider/RedisStreamAdapter.cs index e8d5def..29b72db 100644 --- a/Provider/RedisStreamAdapter.cs +++ b/Provider/RedisStreamAdapter.cs @@ -1,12 +1,62 @@ -using System; +using Microsoft.Extensions.Logging; +using Orleans.Runtime; +using Orleans.Streams; +using StackExchange.Redis; +using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Text.Json; using System.Threading.Tasks; namespace Provider { - internal class RedisStreamAdapter + internal class RedisStreamAdapter : IQueueAdapter { + private readonly IDatabase _database; + private readonly string _providerName; + private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper; + private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; + + public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory) + { + _database = database; + _providerName = providerName; + _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper; + _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); + } + + public string Name => _providerName; + + public bool IsRewindable => false; + + public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite; + + public IQueueAdapterReceiver CreateReceiver(QueueId queueId) + { + return new RedisStreamReceiver(queueId, _database, _loggerFactory.CreateLogger()); + } + + public async Task QueueMessageBatchAsync(StreamId streamId, IEnumerable events, StreamSequenceToken token, Dictionary requestContext) + { + try + { + foreach (var @event in events) + { + NameValueEntry streamNamespaceEntry = new("streamNamespace", streamId.Namespace); + NameValueEntry streamKeyEntry = new("streamKey", streamId.Key); + NameValueEntry eventTypeEntry = new("eventType", @event!.GetType().Name); + NameValueEntry dataEntry = new("data", JsonSerializer.Serialize(@event)); + var queueId = _hashRingBasedStreamQueueMapper.GetQueueForStream(streamId); + await _database.StreamAddAsync(queueId.ToString(), [streamNamespaceEntry, streamKeyEntry, eventTypeEntry, dataEntry]); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error adding event to stream {StreamId}", streamId); + } + } } } diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs index 6da99c4..adf7cd6 100644 --- a/Provider/RedisStreamBatchContainer.cs +++ b/Provider/RedisStreamBatchContainer.cs @@ -1,27 +1,45 @@ using Orleans.Runtime; using Orleans.Streams; +using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Text.Json; using System.Threading.Tasks; namespace Provider { public class RedisStreamBatchContainer : IBatchContainer { - public StreamId StreamId => throw new NotImplementedException(); - private rea - public StreamSequenceToken SequenceToken => throw new NotImplementedException(); + public StreamId StreamId { get; } + public StreamSequenceToken SequenceToken { get; } + public StreamEntry StreamEntry { get; } + public RedisStreamBatchContainer(StreamEntry streamEntry) + { + StreamEntry = streamEntry; + var streamNamespace = StreamEntry.Values[0].Value; + var steamKey = StreamEntry.Values[1].Value; + StreamId = StreamId.Create(streamNamespace!, steamKey!); + SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id); + } public IEnumerable> GetEvents() { - throw new NotImplementedException(); + List> events = new(); + var eventType = typeof(T).Name; + if (eventType == StreamEntry.Values[2].Value) + { + var data = StreamEntry.Values[3].Value; + var @event = JsonSerializer.Deserialize(data!); + events.Add(new(@event!, SequenceToken)); + } + return events; } public bool ImportRequestContext() { - throw new NotImplementedException(); + return false; } } } diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs index 5d03a3f..ee97c48 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Provider/RedisStreamFactory.cs @@ -1,7 +1,53 @@ -namespace Provider -{ - public class RedisStreamFactory - { +using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.Providers.Streams.Common; +using Orleans.Streams; +using StackExchange.Redis; +namespace Provider +{ + public class RedisStreamFactory : IQueueAdapterFactory + { + private readonly IDatabase _database; + private readonly ILoggerFactory _loggerFactory; + private readonly string _providerName; + private readonly IStreamFailureHandler _streamFailureHandler; + private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions; + private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper; + + public RedisStreamFactory(IDatabase database, + ILoggerFactory loggerFactory, + string providerName, + IStreamFailureHandler streamFailureHandler, + SimpleQueueCacheOptions simpleQueueCacheOptions, + HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions + ) + { + _database = database; + _loggerFactory = loggerFactory; + _providerName = providerName; + _streamFailureHandler = streamFailureHandler; + _simpleQueueCacheOptions = simpleQueueCacheOptions; + _hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName); + } + public Task CreateAdapter() + { + return Task.FromResult(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory); + } + + public Task GetDeliveryFailureHandler(QueueId queueId) + { + return Task.FromResult(_streamFailureHandler); + } + + public IQueueAdapterCache GetQueueAdapterCache() + { + return new SimpleQueueAdapterCache(_simpleQueueCacheOptions,_providerName, _loggerFactory); + } + + public IStreamQueueMapper GetStreamQueueMapper() + { + return _hashRingBasedStreamQueueMapper; + } } } diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs new file mode 100644 index 0000000..af78981 --- /dev/null +++ b/Provider/RedisStreamReceiver.cs @@ -0,0 +1,87 @@ +using Microsoft.Extensions.Logging; +using Orleans.Streams; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Provider +{ + internal class RedisStreamReceiver : IQueueAdapterReceiver + { + private readonly QueueId _queueId; + private readonly IDatabase _database; + private readonly ILogger _logger; + private bool _checkBacklog = true; + private string _lastId = "0"; + + public RedisStreamReceiver(QueueId queueId, IDatabase database, Microsoft.Extensions.Logging.ILogger logger) + { + _queueId = queueId; + _database = database; + _logger = logger; + } + + public async Task> GetQueueMessagesAsync(int maxCount) + { + try + { + if (_checkBacklog) maxCount = 0; + var events = await _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount); + _lastId = ">"; + _checkBacklog = false; + var batches = events.Select(e => new RedisStreamBatchContainer(e)).ToList(); + return batches; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error reading from stream {QueueId}", _queueId); + return default; + } + + } + + public async Task Initialize(TimeSpan timeout) + { + try + { + using (var cts = new CancellationTokenSource(timeout)) + { + var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true); + await task.WaitAsync(timeout, cts.Token); + } + } + catch (Exception ex) when (ex.Message.Contains("name already exists")) { } + catch (Exception ex) + { + _logger.LogError(ex, "Error initializing stream {QueueId}", _queueId); + } + } + + public async Task MessagesDeliveredAsync(IList messages) + { + try + { + foreach (var message in messages) + { + var container = message as RedisStreamBatchContainer; + if (container != null) + { + await _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId); + } + } + + public Task Shutdown(TimeSpan timeout) + { + // implement any shut + } + } +} diff --git a/Provider/RedisStreamSequenceToken.cs b/Provider/RedisStreamSequenceToken.cs index d180177..2271b97 100644 --- a/Provider/RedisStreamSequenceToken.cs +++ b/Provider/RedisStreamSequenceToken.cs @@ -1,5 +1,6 @@ using Orleans; using Orleans.Streams; +using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; @@ -16,14 +17,36 @@ namespace Provider [Id(1)] public override int EventIndex { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } + public RedisStreamSequenceToken(RedisValue id) + { + + var split = id.ToString().Split("-"); + SequenceNumber = long.Parse(split[0]); + EventIndex = int.Parse(split[1]); + } + public RedisStreamSequenceToken(long sequenceNumber, int eventIndex) + { + SequenceNumber = sequenceNumber; + EventIndex = eventIndex; + } public override int CompareTo(StreamSequenceToken other) { - throw new NotImplementedException(); + if(other is null) throw new ArgumentNullException(nameof(other)); + if(other is RedisStreamSequenceToken token) + { + if(SequenceNumber == token.SequenceNumber) + { + return EventIndex.CompareTo(token.EventIndex); + } + return SequenceNumber.CompareTo(token.SequenceNumber); + } + throw new ArgumentException("Invalid token type", nameof(other)); } public override bool Equals(StreamSequenceToken other) { - throw new NotImplementedException(); + var token = other as RedisStreamSequenceToken; + return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex; } } }