Universley.OrleansContrib.S.../Provider/RedisStreamReceiver.cs

102 lines
3.3 KiB
C#

using Microsoft.Extensions.Logging;
using Orleans.Streams;
using StackExchange.Redis;
namespace Provider
{
public class RedisStreamReceiver : IQueueAdapterReceiver
{
private readonly QueueId _queueId;
private readonly IDatabase _database;
private readonly ILogger<RedisStreamReceiver> _logger;
private string _lastId = "0";
private Task? pendingTasks;
public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger<RedisStreamReceiver> logger)
{
_queueId = queueId;
_database = database ?? throw new ArgumentNullException(nameof(database));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<IList<IBatchContainer>?> GetQueueMessagesAsync(int maxCount)
{
try
{
var events = _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount);
pendingTasks = events;
_lastId = ">";
var batches = (await events).Select(e => new RedisStreamBatchContainer(e)).ToList<IBatchContainer>();
return batches;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error reading from stream {QueueId}", _queueId);
return default;
}
finally
{
pendingTasks = null;
}
}
public async Task Initialize(TimeSpan timeout)
{
try
{
using (var cts = new CancellationTokenSource(timeout))
{
var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true);
await task.WaitAsync(timeout, cts.Token);
}
}
catch (Exception ex) when (ex.Message.Contains("name already exists")) { }
catch (Exception ex)
{
_logger.LogError(ex, "Error initializing stream {QueueId}", _queueId);
}
}
public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
{
try
{
foreach (var message in messages)
{
var container = message as RedisStreamBatchContainer;
if (container != null)
{
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
pendingTasks = ack;
await ack;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId);
}
finally
{
pendingTasks = null;
}
}
public async Task Shutdown(TimeSpan timeout)
{
using (var cts = new CancellationTokenSource(timeout))
{
if (pendingTasks is not null)
{
await pendingTasks.WaitAsync(timeout, cts.Token);
}
}
_logger.LogInformation("Shutting down stream {QueueId}", _queueId);
}
}
}