diff --git a/Provider/RedisStreamAdapter.cs b/Provider/RedisStreamAdapter.cs index c46cf61..bdcd29f 100644 --- a/Provider/RedisStreamAdapter.cs +++ b/Provider/RedisStreamAdapter.cs @@ -16,10 +16,10 @@ namespace Provider public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory) { - _database = database; - _providerName = providerName; - _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper; - _loggerFactory = loggerFactory; + _database = database ?? throw new ArgumentNullException(nameof(database)); + _providerName = providerName ?? throw new ArgumentNullException(nameof(providerName)); + _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper ?? throw new ArgumentNullException(nameof(hashRingBasedStreamQueueMapper)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); _logger = loggerFactory.CreateLogger(); } diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs index 9041332..1daae03 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Provider/RedisStreamFactory.cs @@ -24,11 +24,16 @@ namespace Provider HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions ) { - _connectionMultiplexer = connectionMultiplexer; - _loggerFactory = loggerFactory; - _providerName = providerName; - _streamFailureHandler = streamFailureHandler; - _simpleQueueCacheOptions = simpleQueueCacheOptions; + if (hashRingStreamQueueMapperOptions is null) + { + throw new ArgumentNullException(nameof(hashRingStreamQueueMapperOptions)); + } + + _connectionMultiplexer = connectionMultiplexer ?? throw new ArgumentNullException(nameof(connectionMultiplexer)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); + _providerName = providerName ?? throw new ArgumentNullException(nameof(providerName)); + _streamFailureHandler = streamFailureHandler ?? throw new ArgumentNullException(nameof(streamFailureHandler)); + _simpleQueueCacheOptions = simpleQueueCacheOptions ?? throw new ArgumentNullException(nameof(simpleQueueCacheOptions)); _hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName); } diff --git a/Provider/RedisStreamFailureHandler.cs b/Provider/RedisStreamFailureHandler.cs index 6734c12..0cf0790 100644 --- a/Provider/RedisStreamFailureHandler.cs +++ b/Provider/RedisStreamFailureHandler.cs @@ -10,7 +10,7 @@ namespace Provider public RedisStreamFailureHandler(ILogger logger) { - _logger = logger; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public bool ShouldFaultSubsriptionOnError => true; diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs index b1c61cb..fb2ddf2 100644 --- a/Provider/RedisStreamReceiver.cs +++ b/Provider/RedisStreamReceiver.cs @@ -16,8 +16,8 @@ namespace Provider public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger logger) { _queueId = queueId; - _database = database; - _logger = logger; + _database = database ?? throw new ArgumentNullException(nameof(database)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task?> GetQueueMessagesAsync(int maxCount) diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index 3feeb5e..9318f4d 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csp EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -27,6 +29,10 @@ Global {70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs new file mode 100644 index 0000000..4cb399a --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs @@ -0,0 +1,48 @@ +using Moq; +using StackExchange.Redis; +using Microsoft.Extensions.Logging; +using Orleans.Streams; +using Xunit; +using Provider; +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.Configuration; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamAdapterTests + { + private readonly Mock _mockDatabase; + private readonly Mock _mockQueueMapper; + private readonly Mock _mockLoggerFactory; + private readonly RedisStreamAdapter _adapter; + + public RedisStreamAdapterTests() + { + _mockDatabase = new Mock(); + var options = new HashRingStreamQueueMapperOptions { TotalQueueCount = 1 }; + _mockQueueMapper = new Mock(options, "queueNamePrefix"); + _mockLoggerFactory = new Mock(); + + _adapter = new RedisStreamAdapter(_mockDatabase.Object, "TestProvider", _mockQueueMapper.Object, _mockLoggerFactory.Object); + } + + [Fact] + public void Constructor_ShouldInitializeProperties() + { + Assert.Equal("TestProvider", _adapter.Name); + Assert.False(_adapter.IsRewindable); + Assert.Equal(StreamProviderDirection.ReadWrite, _adapter.Direction); + } + + [Fact] + public void CreateReceiver_ShouldReturnRedisStreamReceiver() + { + var queueId = QueueId.GetQueueId("queueName", 0, 1); + var receiver = _adapter.CreateReceiver(queueId); + + Assert.NotNull(receiver); + Assert.IsType(receiver); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs new file mode 100644 index 0000000..9b6c5d1 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs @@ -0,0 +1,80 @@ +using Provider; +using StackExchange.Redis; +using System.Text.Json; +using Xunit; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamBatchContainerTests + { + [Fact] + public void Constructor_ShouldInitializeProperties() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + + // Act + var container = new RedisStreamBatchContainer(streamEntry); + + // Assert + Assert.Equal("testNamespace", container.StreamId.GetNamespace()); + Assert.Equal("testKey", container.StreamId.GetKeyAsString()); + Assert.Equal(streamEntry.Id.ToString().Split('-').First(), container.SequenceToken.SequenceNumber.ToString()); + } + + [Fact] + public void GetEvents_ShouldReturnDeserializedEvents() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + var container = new RedisStreamBatchContainer(streamEntry); + + // Act + var events = container.GetEvents().ToList(); + + // Assert + Assert.Single(events); + Assert.Equal(1, events[0].Item1.Id); + Assert.Equal("Test", events[0].Item1.Name); + Assert.Equal(streamEntry.Id.ToString().Split('-').First(), events[0].Item2.SequenceNumber.ToString()); + } + + [Fact] + public void ImportRequestContext_ShouldReturnFalse() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + var container = new RedisStreamBatchContainer(streamEntry); + + // Act + var result = container.ImportRequestContext(); + + // Assert + Assert.False(result); + } + + private class TestEvent + { + public int Id { get; set; } + public string Name { get; set; } + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs new file mode 100644 index 0000000..9f32d7b --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs @@ -0,0 +1,90 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Moq; +using Orleans.Configuration; +using Orleans.Providers.Streams.Common; +using Orleans.Streams; +using Provider; +using StackExchange.Redis; +using Xunit; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamFactoryTests + { + private readonly Mock _mockConnectionMultiplexer; + private readonly Mock _mockLoggerFactory; + private readonly Mock _mockServiceProvider; + private readonly Mock _mockStreamFailureHandler; + private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions; + private readonly HashRingStreamQueueMapperOptions _hashRingStreamQueueMapperOptions; + private readonly string _providerName = "TestProvider"; + + public RedisStreamFactoryTests() + { + _mockConnectionMultiplexer = new Mock(); + _mockConnectionMultiplexer.Setup(x => x.GetDatabase(It.IsAny(), It.IsAny())).Returns(new Mock().Object); + + _mockLoggerFactory = new Mock(); + _mockServiceProvider = new Mock(); + _mockStreamFailureHandler = new Mock(); + _simpleQueueCacheOptions = new SimpleQueueCacheOptions(); + _hashRingStreamQueueMapperOptions = new HashRingStreamQueueMapperOptions(); + } + + [Fact] + public void Constructor_ShouldThrowArgumentNullException_WhenAnyArgumentIsNull() + { + Assert.Throws(() => new RedisStreamFactory(null, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, null, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, null, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, null, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, null, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, null)); + } + + [Fact] + public async Task CreateAdapter_ShouldReturnRedisStreamAdapterInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var adapter = await factory.CreateAdapter(); + + Assert.NotNull(adapter); + Assert.IsType(adapter); + } + + [Fact] + public async Task GetDeliveryFailureHandler_ShouldReturnStreamFailureHandler() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var handler = await factory.GetDeliveryFailureHandler(new QueueId()); + + Assert.NotNull(handler); + Assert.Equal(_mockStreamFailureHandler.Object, handler); + } + + [Fact] + public void GetQueueAdapterCache_ShouldReturnSimpleQueueAdapterCacheInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var cache = factory.GetQueueAdapterCache(); + + Assert.NotNull(cache); + Assert.IsType(cache); + } + + [Fact] + public void GetStreamQueueMapper_ShouldReturnHashRingBasedStreamQueueMapperInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var mapper = factory.GetStreamQueueMapper(); + + Assert.NotNull(mapper); + Assert.IsType(mapper); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs new file mode 100644 index 0000000..442e3ce --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs @@ -0,0 +1,104 @@ +using Microsoft.Extensions.Logging; +using Moq; +using Orleans.Streams; +using Provider; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Xunit; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamReceiverTests + { + private readonly Mock _mockDatabase; + private readonly Mock> _mockLogger; + private readonly QueueId _queueId; + private readonly RedisStreamReceiver _receiver; + + public RedisStreamReceiverTests() + { + _mockDatabase = new Mock(); + _mockLogger = new Mock>(); + _queueId = QueueId.GetQueueId("testQueue", 0, 0); // Added the missing 'hash' parameter + _receiver = new RedisStreamReceiver(_queueId, _mockDatabase.Object, _mockLogger.Object); + } + + [Fact] + public async Task GetQueueMessagesAsync_ReturnsBatches() + { + // Arrange + var streamEntries = new[] + { + new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")}), + new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")}) + }; + _mockDatabase.Setup(db => db.StreamReadGroupAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(streamEntries); + + // Act + var result = await _receiver.GetQueueMessagesAsync(10); + + // Assert + Assert.NotNull(result); + Assert.Equal(2, result.Count); + } + + [Fact] + public async Task Initialize_CreatesConsumerGroup() + { + // Arrange + _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(true); + + // Act + await _receiver.Initialize(TimeSpan.FromSeconds(5)); + + // Assert + _mockDatabase.Verify(db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), Times.Once); + } + + [Fact] + public async Task MessagesDeliveredAsync_AcknowledgesMessages() + { + // Arrange + var messages = new List + { + new RedisStreamBatchContainer(new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")})), + new RedisStreamBatchContainer(new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")})) + }; + _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(2); + + // Act + await _receiver.MessagesDeliveredAsync(messages); + + // Assert + _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once); + _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once); + } + + [Fact] + public async Task Shutdown_WaitsForPendingTasks() + { + // Arrange + var tcs = new TaskCompletionSource(); + _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .Returns(tcs.Task); + + // Act + var getMessagesTask = _receiver.GetQueueMessagesAsync(10); + await _receiver.Shutdown(TimeSpan.FromSeconds(5)); + + // Assert + Assert.True(getMessagesTask.IsCompleted); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs new file mode 100644 index 0000000..e5912c0 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs @@ -0,0 +1,132 @@ +using Xunit; +using StackExchange.Redis; +using Provider; +using Orleans.Streams; +using System; +using Moq; + +namespace Provider.Tests +{ + public class RedisStreamSequenceTokenTests + { + [Fact] + public void Constructor_ShouldInitializeProperties_FromRedisValue() + { + // Arrange + var redisValue = new RedisValue("123-456"); + + // Act + var token = new RedisStreamSequenceToken(redisValue); + + // Assert + Assert.Equal(123, token.SequenceNumber); + Assert.Equal(456, token.EventIndex); + } + + [Fact] + public void Constructor_ShouldInitializeProperties_FromParameters() + { + // Arrange + long sequenceNumber = 123; + int eventIndex = 456; + + // Act + var token = new RedisStreamSequenceToken(sequenceNumber, eventIndex); + + // Assert + Assert.Equal(sequenceNumber, token.SequenceNumber); + Assert.Equal(eventIndex, token.EventIndex); + } + + [Fact] + public void CompareTo_ShouldReturnZero_ForEqualTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.Equal(0, result); + } + + [Fact] + public void CompareTo_ShouldReturnPositive_ForGreaterToken() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 455); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.True(result > 0); + } + + [Fact] + public void CompareTo_ShouldReturnNegative_ForLesserToken() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 455); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.True(result < 0); + } + + [Fact] + public void Equals_ShouldReturnTrue_ForEqualTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.Equals(token2); + + // Assert + Assert.True(result); + } + + [Fact] + public void Equals_ShouldReturnFalse_ForDifferentTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 457); + + // Act + var result = token1.Equals(token2); + + // Assert + Assert.False(result); + } + + [Fact] + public void CompareTo_ShouldThrowArgumentNullException_ForNullToken() + { + // Arrange + var token = new RedisStreamSequenceToken(123, 456); + + // Act & Assert + Assert.Throws(() => token.CompareTo(null)); + } + + [Fact] + public void CompareTo_ShouldThrowArgumentException_ForInvalidTokenType() + { + // Arrange + var token = new RedisStreamSequenceToken(123, 456); + var invalidToken = new Mock().Object; + + // Act & Assert + Assert.Throws(() => token.CompareTo(invalidToken)); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj new file mode 100644 index 0000000..e52d684 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj @@ -0,0 +1,26 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + + + + + +