This commit is contained in:
Samson Amaugo 2024-06-25 20:48:05 +01:00
parent 8441f1e874
commit 38b99d98f4
5 changed files with 237 additions and 13 deletions

View File

@ -1,12 +1,62 @@
using System; using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Streams;
using StackExchange.Redis;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Text.Json;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Provider namespace Provider
{ {
internal class RedisStreamAdapter internal class RedisStreamAdapter : IQueueAdapter
{ {
private readonly IDatabase _database;
private readonly string _providerName;
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<RedisStreamAdapter> _logger;
public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory)
{
_database = database;
_providerName = providerName;
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<RedisStreamAdapter>();
}
public string Name => _providerName;
public bool IsRewindable => false;
public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;
public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
return new RedisStreamReceiver(queueId, _database, _loggerFactory.CreateLogger<RedisStreamReceiver>());
}
public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
try
{
foreach (var @event in events)
{
NameValueEntry streamNamespaceEntry = new("streamNamespace", streamId.Namespace);
NameValueEntry streamKeyEntry = new("streamKey", streamId.Key);
NameValueEntry eventTypeEntry = new("eventType", @event!.GetType().Name);
NameValueEntry dataEntry = new("data", JsonSerializer.Serialize(@event));
var queueId = _hashRingBasedStreamQueueMapper.GetQueueForStream(streamId);
await _database.StreamAddAsync(queueId.ToString(), [streamNamespaceEntry, streamKeyEntry, eventTypeEntry, dataEntry]);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error adding event to stream {StreamId}", streamId);
}
}
} }
} }

View File

@ -1,27 +1,45 @@
using Orleans.Runtime; using Orleans.Runtime;
using Orleans.Streams; using Orleans.Streams;
using StackExchange.Redis;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Text.Json;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Provider namespace Provider
{ {
public class RedisStreamBatchContainer : IBatchContainer public class RedisStreamBatchContainer : IBatchContainer
{ {
public StreamId StreamId => throw new NotImplementedException(); public StreamId StreamId { get; }
private rea
public StreamSequenceToken SequenceToken => throw new NotImplementedException();
public StreamSequenceToken SequenceToken { get; }
public StreamEntry StreamEntry { get; }
public RedisStreamBatchContainer(StreamEntry streamEntry)
{
StreamEntry = streamEntry;
var streamNamespace = StreamEntry.Values[0].Value;
var steamKey = StreamEntry.Values[1].Value;
StreamId = StreamId.Create(streamNamespace!, steamKey!);
SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
}
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>() public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{ {
throw new NotImplementedException(); List<Tuple<T, StreamSequenceToken>> events = new();
var eventType = typeof(T).Name;
if (eventType == StreamEntry.Values[2].Value)
{
var data = StreamEntry.Values[3].Value;
var @event = JsonSerializer.Deserialize<T>(data!);
events.Add(new(@event!, SequenceToken));
}
return events;
} }
public bool ImportRequestContext() public bool ImportRequestContext()
{ {
throw new NotImplementedException(); return false;
} }
} }
} }

View File

@ -1,7 +1,53 @@
namespace Provider using Microsoft.Extensions.Logging;
{ using Orleans.Configuration;
public class RedisStreamFactory using Orleans.Providers.Streams.Common;
{ using Orleans.Streams;
using StackExchange.Redis;
namespace Provider
{
public class RedisStreamFactory : IQueueAdapterFactory
{
private readonly IDatabase _database;
private readonly ILoggerFactory _loggerFactory;
private readonly string _providerName;
private readonly IStreamFailureHandler _streamFailureHandler;
private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
public RedisStreamFactory(IDatabase database,
ILoggerFactory loggerFactory,
string providerName,
IStreamFailureHandler streamFailureHandler,
SimpleQueueCacheOptions simpleQueueCacheOptions,
HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions
)
{
_database = database;
_loggerFactory = loggerFactory;
_providerName = providerName;
_streamFailureHandler = streamFailureHandler;
_simpleQueueCacheOptions = simpleQueueCacheOptions;
_hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName);
}
public Task<IQueueAdapter> CreateAdapter()
{
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory);
}
public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
{
return Task.FromResult(_streamFailureHandler);
}
public IQueueAdapterCache GetQueueAdapterCache()
{
return new SimpleQueueAdapterCache(_simpleQueueCacheOptions,_providerName, _loggerFactory);
}
public IStreamQueueMapper GetStreamQueueMapper()
{
return _hashRingBasedStreamQueueMapper;
}
} }
} }

View File

@ -0,0 +1,87 @@
using Microsoft.Extensions.Logging;
using Orleans.Streams;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Provider
{
internal class RedisStreamReceiver : IQueueAdapterReceiver
{
private readonly QueueId _queueId;
private readonly IDatabase _database;
private readonly ILogger<RedisStreamReceiver> _logger;
private bool _checkBacklog = true;
private string _lastId = "0";
public RedisStreamReceiver(QueueId queueId, IDatabase database, Microsoft.Extensions.Logging.ILogger<RedisStreamReceiver> logger)
{
_queueId = queueId;
_database = database;
_logger = logger;
}
public async Task<IList<IBatchContainer>> GetQueueMessagesAsync(int maxCount)
{
try
{
if (_checkBacklog) maxCount = 0;
var events = await _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount);
_lastId = ">";
_checkBacklog = false;
var batches = events.Select(e => new RedisStreamBatchContainer(e)).ToList<IBatchContainer>();
return batches;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error reading from stream {QueueId}", _queueId);
return default;
}
}
public async Task Initialize(TimeSpan timeout)
{
try
{
using (var cts = new CancellationTokenSource(timeout))
{
var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true);
await task.WaitAsync(timeout, cts.Token);
}
}
catch (Exception ex) when (ex.Message.Contains("name already exists")) { }
catch (Exception ex)
{
_logger.LogError(ex, "Error initializing stream {QueueId}", _queueId);
}
}
public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
{
try
{
foreach (var message in messages)
{
var container = message as RedisStreamBatchContainer;
if (container != null)
{
await _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId);
}
}
public Task Shutdown(TimeSpan timeout)
{
// implement any shut
}
}
}

View File

@ -1,5 +1,6 @@
using Orleans; using Orleans;
using Orleans.Streams; using Orleans.Streams;
using StackExchange.Redis;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -16,14 +17,36 @@ namespace Provider
[Id(1)] [Id(1)]
public override int EventIndex { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } public override int EventIndex { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); }
public RedisStreamSequenceToken(RedisValue id)
{
var split = id.ToString().Split("-");
SequenceNumber = long.Parse(split[0]);
EventIndex = int.Parse(split[1]);
}
public RedisStreamSequenceToken(long sequenceNumber, int eventIndex)
{
SequenceNumber = sequenceNumber;
EventIndex = eventIndex;
}
public override int CompareTo(StreamSequenceToken other) public override int CompareTo(StreamSequenceToken other)
{ {
throw new NotImplementedException(); if(other is null) throw new ArgumentNullException(nameof(other));
if(other is RedisStreamSequenceToken token)
{
if(SequenceNumber == token.SequenceNumber)
{
return EventIndex.CompareTo(token.EventIndex);
}
return SequenceNumber.CompareTo(token.SequenceNumber);
}
throw new ArgumentException("Invalid token type", nameof(other));
} }
public override bool Equals(StreamSequenceToken other) public override bool Equals(StreamSequenceToken other)
{ {
throw new NotImplementedException(); var token = other as RedisStreamSequenceToken;
return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex;
} }
} }
} }