From 85876688a9482c4a3da3defb355844d1e85d6ef4 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Mon, 20 Jan 2025 18:18:59 -0700 Subject: [PATCH 01/17] updated to .NET9 and cache ConnectionMultiplexer instead of IDatabase as per documentation --- Client/Client.csproj | 8 ++++---- Client/Program.cs | 5 ++--- Provider/Provider.csproj | 8 ++++---- Provider/RedisStreamFactory.cs | 12 ++++++------ Server/Program.cs | 4 ++-- Server/Server.csproj | 8 ++++---- 6 files changed, 22 insertions(+), 23 deletions(-) diff --git a/Client/Client.csproj b/Client/Client.csproj index ce9902e..45633c4 100644 --- a/Client/Client.csproj +++ b/Client/Client.csproj @@ -2,15 +2,15 @@ Exe - net8.0 + net9.0 enable enable - - - + + + diff --git a/Client/Program.cs b/Client/Program.cs index 3cef062..1b6a20f 100644 --- a/Client/Program.cs +++ b/Client/Program.cs @@ -9,10 +9,9 @@ using StackExchange.Redis; using IHost host = new HostBuilder() .UseOrleansClient(clientBuilder => { - clientBuilder.Services.AddSingleton(sp => + clientBuilder.Services.AddSingleton(sp => { - IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase(); - return db; + return ConnectionMultiplexer.Connect("localhost"); }); clientBuilder.UseLocalhostClustering(); diff --git a/Provider/Provider.csproj b/Provider/Provider.csproj index bf92fe9..5703a5f 100644 --- a/Provider/Provider.csproj +++ b/Provider/Provider.csproj @@ -1,15 +1,15 @@  - net8.0 + net9.0 enable enable - - - + + + diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs index a25c1ab..9041332 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Provider/RedisStreamFactory.cs @@ -9,14 +9,14 @@ namespace Provider { public class RedisStreamFactory : IQueueAdapterFactory { - private readonly IDatabase _database; + private readonly IConnectionMultiplexer _connectionMultiplexer; private readonly ILoggerFactory _loggerFactory; private readonly string _providerName; private readonly IStreamFailureHandler _streamFailureHandler; private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions; private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper; - public RedisStreamFactory(IDatabase database, + public RedisStreamFactory(IConnectionMultiplexer connectionMultiplexer, ILoggerFactory loggerFactory, string providerName, IStreamFailureHandler streamFailureHandler, @@ -24,7 +24,7 @@ namespace Provider HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions ) { - _database = database; + _connectionMultiplexer = connectionMultiplexer; _loggerFactory = loggerFactory; _providerName = providerName; _streamFailureHandler = streamFailureHandler; @@ -34,18 +34,18 @@ namespace Provider public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName) { - var database = provider.GetRequiredService(); + var connMuliplexer = provider.GetRequiredService(); var loggerFactory = provider.GetRequiredService(); var simpleQueueCacheOptions = provider.GetOptionsByName(providerName); var hashRingStreamQueueMapperOptions = provider.GetOptionsByName(providerName); var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger()); - return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions); + return new RedisStreamFactory(connMuliplexer, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions); } public Task CreateAdapter() { - return Task.FromResult(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory)); + return Task.FromResult(new RedisStreamAdapter(_connectionMultiplexer.GetDatabase(), _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory)); } public Task GetDeliveryFailureHandler(QueueId queueId) diff --git a/Server/Program.cs b/Server/Program.cs index 51e15be..61c9a86 100644 --- a/Server/Program.cs +++ b/Server/Program.cs @@ -10,9 +10,9 @@ var builder = new HostBuilder() .UseOrleans(silo => { silo.UseLocalhostClustering(); - silo.Services.AddSingleton(sp => + silo.Services.AddSingleton(sp => { - return ConnectionMultiplexer.Connect("localhost").GetDatabase(); + return ConnectionMultiplexer.Connect("localhost"); }); silo.ConfigureLogging(logging => logging.AddConsole()); silo.AddMemoryGrainStorage("PubSubStore"); diff --git a/Server/Server.csproj b/Server/Server.csproj index a6e6fa8..fe8d26c 100644 --- a/Server/Server.csproj +++ b/Server/Server.csproj @@ -2,15 +2,15 @@ Exe - net8.0 + net9.0 enable enable - - - + + + -- 2.34.1 From f59a6da2da0450307aa1dbd49864e116ee1c819a Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Mon, 20 Jan 2025 19:14:48 -0700 Subject: [PATCH 02/17] added basic unit tests --- Provider/RedisStreamAdapter.cs | 8 +- Provider/RedisStreamFactory.cs | 15 +- Provider/RedisStreamFailureHandler.cs | 2 +- Provider/RedisStreamReceiver.cs | 4 +- RedisStreamsInOrleans.sln | 6 + .../RedisStreamAdapterTests.cs | 48 +++++++ .../RedisStreamBatchContainerTests.cs | 80 +++++++++++ .../RedisStreamFactoryTests.cs | 90 ++++++++++++ .../RedisStreamReceiverTests.cs | 104 ++++++++++++++ .../RedisStreamSequenceTokenTests.cs | 132 ++++++++++++++++++ .../RedisStreamsProvider.UnitTests.csproj | 26 ++++ 11 files changed, 503 insertions(+), 12 deletions(-) create mode 100644 RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs create mode 100644 RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs create mode 100644 RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs create mode 100644 RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs create mode 100644 RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs create mode 100644 RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj diff --git a/Provider/RedisStreamAdapter.cs b/Provider/RedisStreamAdapter.cs index c46cf61..bdcd29f 100644 --- a/Provider/RedisStreamAdapter.cs +++ b/Provider/RedisStreamAdapter.cs @@ -16,10 +16,10 @@ namespace Provider public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory) { - _database = database; - _providerName = providerName; - _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper; - _loggerFactory = loggerFactory; + _database = database ?? throw new ArgumentNullException(nameof(database)); + _providerName = providerName ?? throw new ArgumentNullException(nameof(providerName)); + _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper ?? throw new ArgumentNullException(nameof(hashRingBasedStreamQueueMapper)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); _logger = loggerFactory.CreateLogger(); } diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs index 9041332..1daae03 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Provider/RedisStreamFactory.cs @@ -24,11 +24,16 @@ namespace Provider HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions ) { - _connectionMultiplexer = connectionMultiplexer; - _loggerFactory = loggerFactory; - _providerName = providerName; - _streamFailureHandler = streamFailureHandler; - _simpleQueueCacheOptions = simpleQueueCacheOptions; + if (hashRingStreamQueueMapperOptions is null) + { + throw new ArgumentNullException(nameof(hashRingStreamQueueMapperOptions)); + } + + _connectionMultiplexer = connectionMultiplexer ?? throw new ArgumentNullException(nameof(connectionMultiplexer)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); + _providerName = providerName ?? throw new ArgumentNullException(nameof(providerName)); + _streamFailureHandler = streamFailureHandler ?? throw new ArgumentNullException(nameof(streamFailureHandler)); + _simpleQueueCacheOptions = simpleQueueCacheOptions ?? throw new ArgumentNullException(nameof(simpleQueueCacheOptions)); _hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName); } diff --git a/Provider/RedisStreamFailureHandler.cs b/Provider/RedisStreamFailureHandler.cs index 6734c12..0cf0790 100644 --- a/Provider/RedisStreamFailureHandler.cs +++ b/Provider/RedisStreamFailureHandler.cs @@ -10,7 +10,7 @@ namespace Provider public RedisStreamFailureHandler(ILogger logger) { - _logger = logger; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public bool ShouldFaultSubsriptionOnError => true; diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs index b1c61cb..fb2ddf2 100644 --- a/Provider/RedisStreamReceiver.cs +++ b/Provider/RedisStreamReceiver.cs @@ -16,8 +16,8 @@ namespace Provider public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger logger) { _queueId = queueId; - _database = database; - _logger = logger; + _database = database ?? throw new ArgumentNullException(nameof(database)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task?> GetQueueMessagesAsync(int maxCount) diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index 3feeb5e..9318f4d 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csp EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -27,6 +29,10 @@ Global {70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs new file mode 100644 index 0000000..4cb399a --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs @@ -0,0 +1,48 @@ +using Moq; +using StackExchange.Redis; +using Microsoft.Extensions.Logging; +using Orleans.Streams; +using Xunit; +using Provider; +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.Configuration; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamAdapterTests + { + private readonly Mock _mockDatabase; + private readonly Mock _mockQueueMapper; + private readonly Mock _mockLoggerFactory; + private readonly RedisStreamAdapter _adapter; + + public RedisStreamAdapterTests() + { + _mockDatabase = new Mock(); + var options = new HashRingStreamQueueMapperOptions { TotalQueueCount = 1 }; + _mockQueueMapper = new Mock(options, "queueNamePrefix"); + _mockLoggerFactory = new Mock(); + + _adapter = new RedisStreamAdapter(_mockDatabase.Object, "TestProvider", _mockQueueMapper.Object, _mockLoggerFactory.Object); + } + + [Fact] + public void Constructor_ShouldInitializeProperties() + { + Assert.Equal("TestProvider", _adapter.Name); + Assert.False(_adapter.IsRewindable); + Assert.Equal(StreamProviderDirection.ReadWrite, _adapter.Direction); + } + + [Fact] + public void CreateReceiver_ShouldReturnRedisStreamReceiver() + { + var queueId = QueueId.GetQueueId("queueName", 0, 1); + var receiver = _adapter.CreateReceiver(queueId); + + Assert.NotNull(receiver); + Assert.IsType(receiver); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs new file mode 100644 index 0000000..9b6c5d1 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs @@ -0,0 +1,80 @@ +using Provider; +using StackExchange.Redis; +using System.Text.Json; +using Xunit; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamBatchContainerTests + { + [Fact] + public void Constructor_ShouldInitializeProperties() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + + // Act + var container = new RedisStreamBatchContainer(streamEntry); + + // Assert + Assert.Equal("testNamespace", container.StreamId.GetNamespace()); + Assert.Equal("testKey", container.StreamId.GetKeyAsString()); + Assert.Equal(streamEntry.Id.ToString().Split('-').First(), container.SequenceToken.SequenceNumber.ToString()); + } + + [Fact] + public void GetEvents_ShouldReturnDeserializedEvents() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + var container = new RedisStreamBatchContainer(streamEntry); + + // Act + var events = container.GetEvents().ToList(); + + // Assert + Assert.Single(events); + Assert.Equal(1, events[0].Item1.Id); + Assert.Equal("Test", events[0].Item1.Name); + Assert.Equal(streamEntry.Id.ToString().Split('-').First(), events[0].Item2.SequenceNumber.ToString()); + } + + [Fact] + public void ImportRequestContext_ShouldReturnFalse() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + var container = new RedisStreamBatchContainer(streamEntry); + + // Act + var result = container.ImportRequestContext(); + + // Assert + Assert.False(result); + } + + private class TestEvent + { + public int Id { get; set; } + public string Name { get; set; } + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs new file mode 100644 index 0000000..9f32d7b --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs @@ -0,0 +1,90 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Moq; +using Orleans.Configuration; +using Orleans.Providers.Streams.Common; +using Orleans.Streams; +using Provider; +using StackExchange.Redis; +using Xunit; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamFactoryTests + { + private readonly Mock _mockConnectionMultiplexer; + private readonly Mock _mockLoggerFactory; + private readonly Mock _mockServiceProvider; + private readonly Mock _mockStreamFailureHandler; + private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions; + private readonly HashRingStreamQueueMapperOptions _hashRingStreamQueueMapperOptions; + private readonly string _providerName = "TestProvider"; + + public RedisStreamFactoryTests() + { + _mockConnectionMultiplexer = new Mock(); + _mockConnectionMultiplexer.Setup(x => x.GetDatabase(It.IsAny(), It.IsAny())).Returns(new Mock().Object); + + _mockLoggerFactory = new Mock(); + _mockServiceProvider = new Mock(); + _mockStreamFailureHandler = new Mock(); + _simpleQueueCacheOptions = new SimpleQueueCacheOptions(); + _hashRingStreamQueueMapperOptions = new HashRingStreamQueueMapperOptions(); + } + + [Fact] + public void Constructor_ShouldThrowArgumentNullException_WhenAnyArgumentIsNull() + { + Assert.Throws(() => new RedisStreamFactory(null, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, null, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, null, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, null, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, null, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, null)); + } + + [Fact] + public async Task CreateAdapter_ShouldReturnRedisStreamAdapterInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var adapter = await factory.CreateAdapter(); + + Assert.NotNull(adapter); + Assert.IsType(adapter); + } + + [Fact] + public async Task GetDeliveryFailureHandler_ShouldReturnStreamFailureHandler() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var handler = await factory.GetDeliveryFailureHandler(new QueueId()); + + Assert.NotNull(handler); + Assert.Equal(_mockStreamFailureHandler.Object, handler); + } + + [Fact] + public void GetQueueAdapterCache_ShouldReturnSimpleQueueAdapterCacheInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var cache = factory.GetQueueAdapterCache(); + + Assert.NotNull(cache); + Assert.IsType(cache); + } + + [Fact] + public void GetStreamQueueMapper_ShouldReturnHashRingBasedStreamQueueMapperInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var mapper = factory.GetStreamQueueMapper(); + + Assert.NotNull(mapper); + Assert.IsType(mapper); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs new file mode 100644 index 0000000..442e3ce --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs @@ -0,0 +1,104 @@ +using Microsoft.Extensions.Logging; +using Moq; +using Orleans.Streams; +using Provider; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Xunit; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamReceiverTests + { + private readonly Mock _mockDatabase; + private readonly Mock> _mockLogger; + private readonly QueueId _queueId; + private readonly RedisStreamReceiver _receiver; + + public RedisStreamReceiverTests() + { + _mockDatabase = new Mock(); + _mockLogger = new Mock>(); + _queueId = QueueId.GetQueueId("testQueue", 0, 0); // Added the missing 'hash' parameter + _receiver = new RedisStreamReceiver(_queueId, _mockDatabase.Object, _mockLogger.Object); + } + + [Fact] + public async Task GetQueueMessagesAsync_ReturnsBatches() + { + // Arrange + var streamEntries = new[] + { + new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")}), + new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")}) + }; + _mockDatabase.Setup(db => db.StreamReadGroupAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(streamEntries); + + // Act + var result = await _receiver.GetQueueMessagesAsync(10); + + // Assert + Assert.NotNull(result); + Assert.Equal(2, result.Count); + } + + [Fact] + public async Task Initialize_CreatesConsumerGroup() + { + // Arrange + _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(true); + + // Act + await _receiver.Initialize(TimeSpan.FromSeconds(5)); + + // Assert + _mockDatabase.Verify(db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), Times.Once); + } + + [Fact] + public async Task MessagesDeliveredAsync_AcknowledgesMessages() + { + // Arrange + var messages = new List + { + new RedisStreamBatchContainer(new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")})), + new RedisStreamBatchContainer(new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey")})) + }; + _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(2); + + // Act + await _receiver.MessagesDeliveredAsync(messages); + + // Assert + _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once); + _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once); + } + + [Fact] + public async Task Shutdown_WaitsForPendingTasks() + { + // Arrange + var tcs = new TaskCompletionSource(); + _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .Returns(tcs.Task); + + // Act + var getMessagesTask = _receiver.GetQueueMessagesAsync(10); + await _receiver.Shutdown(TimeSpan.FromSeconds(5)); + + // Assert + Assert.True(getMessagesTask.IsCompleted); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs new file mode 100644 index 0000000..e5912c0 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs @@ -0,0 +1,132 @@ +using Xunit; +using StackExchange.Redis; +using Provider; +using Orleans.Streams; +using System; +using Moq; + +namespace Provider.Tests +{ + public class RedisStreamSequenceTokenTests + { + [Fact] + public void Constructor_ShouldInitializeProperties_FromRedisValue() + { + // Arrange + var redisValue = new RedisValue("123-456"); + + // Act + var token = new RedisStreamSequenceToken(redisValue); + + // Assert + Assert.Equal(123, token.SequenceNumber); + Assert.Equal(456, token.EventIndex); + } + + [Fact] + public void Constructor_ShouldInitializeProperties_FromParameters() + { + // Arrange + long sequenceNumber = 123; + int eventIndex = 456; + + // Act + var token = new RedisStreamSequenceToken(sequenceNumber, eventIndex); + + // Assert + Assert.Equal(sequenceNumber, token.SequenceNumber); + Assert.Equal(eventIndex, token.EventIndex); + } + + [Fact] + public void CompareTo_ShouldReturnZero_ForEqualTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.Equal(0, result); + } + + [Fact] + public void CompareTo_ShouldReturnPositive_ForGreaterToken() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 455); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.True(result > 0); + } + + [Fact] + public void CompareTo_ShouldReturnNegative_ForLesserToken() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 455); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.True(result < 0); + } + + [Fact] + public void Equals_ShouldReturnTrue_ForEqualTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.Equals(token2); + + // Assert + Assert.True(result); + } + + [Fact] + public void Equals_ShouldReturnFalse_ForDifferentTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 457); + + // Act + var result = token1.Equals(token2); + + // Assert + Assert.False(result); + } + + [Fact] + public void CompareTo_ShouldThrowArgumentNullException_ForNullToken() + { + // Arrange + var token = new RedisStreamSequenceToken(123, 456); + + // Act & Assert + Assert.Throws(() => token.CompareTo(null)); + } + + [Fact] + public void CompareTo_ShouldThrowArgumentException_ForInvalidTokenType() + { + // Arrange + var token = new RedisStreamSequenceToken(123, 456); + var invalidToken = new Mock().Object; + + // Act & Assert + Assert.Throws(() => token.CompareTo(invalidToken)); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj new file mode 100644 index 0000000..e52d684 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj @@ -0,0 +1,26 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + + + + + + -- 2.34.1 From b3035f5a1f480bb2203956b1ac7e8fb5e7b00798 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Mon, 20 Jan 2025 19:20:18 -0700 Subject: [PATCH 03/17] renamed Provider project to have less common namespace --- Client/Client.csproj | 2 +- Client/Program.cs | 3 +-- Provider/RedisStreamAdapter.cs | 3 +-- Provider/RedisStreamBatchContainer.cs | 5 ++--- Provider/RedisStreamFactory.cs | 2 +- Provider/RedisStreamFailureHandler.cs | 3 +-- Provider/RedisStreamReceiver.cs | 2 +- Provider/RedisStreamSequenceToken.cs | 2 +- ... Universley.OrleansContrib.StreamsProvider.Redis.csproj} | 0 RedisStreamsInOrleans.sln | 2 +- RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs | 5 +---- .../RedisStreamBatchContainerTests.cs | 3 +-- RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs | 4 +--- RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs | 6 +----- .../RedisStreamSequenceTokenTests.cs | 6 ++---- .../RedisStreamsProvider.UnitTests.csproj | 2 +- Server/Program.cs | 4 ++-- Server/Server.csproj | 2 +- 18 files changed, 20 insertions(+), 36 deletions(-) rename Provider/{Provider.csproj => Universley.OrleansContrib.StreamsProvider.Redis.csproj} (100%) diff --git a/Client/Client.csproj b/Client/Client.csproj index 45633c4..2c7b60f 100644 --- a/Client/Client.csproj +++ b/Client/Client.csproj @@ -14,7 +14,7 @@ - + diff --git a/Client/Program.cs b/Client/Program.cs index 1b6a20f..15ad374 100644 --- a/Client/Program.cs +++ b/Client/Program.cs @@ -2,9 +2,8 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans.Configuration; -using Orleans.Runtime; -using Provider; using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; using IHost host = new HostBuilder() .UseOrleansClient(clientBuilder => diff --git a/Provider/RedisStreamAdapter.cs b/Provider/RedisStreamAdapter.cs index bdcd29f..87a70e5 100644 --- a/Provider/RedisStreamAdapter.cs +++ b/Provider/RedisStreamAdapter.cs @@ -1,10 +1,9 @@ using Microsoft.Extensions.Logging; -using Orleans.Runtime; using Orleans.Streams; using StackExchange.Redis; using System.Text.Json; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamAdapter : IQueueAdapter { diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs index c799067..1da1f52 100644 --- a/Provider/RedisStreamBatchContainer.cs +++ b/Provider/RedisStreamBatchContainer.cs @@ -1,9 +1,8 @@ -using Orleans.Runtime; -using Orleans.Streams; +using Orleans.Streams; using StackExchange.Redis; using System.Text.Json; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamBatchContainer : IBatchContainer { diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs index 1daae03..a705675 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Provider/RedisStreamFactory.cs @@ -5,7 +5,7 @@ using Orleans.Providers.Streams.Common; using Orleans.Streams; using StackExchange.Redis; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamFactory : IQueueAdapterFactory { diff --git a/Provider/RedisStreamFailureHandler.cs b/Provider/RedisStreamFailureHandler.cs index 0cf0790..600b0ae 100644 --- a/Provider/RedisStreamFailureHandler.cs +++ b/Provider/RedisStreamFailureHandler.cs @@ -1,8 +1,7 @@ using Microsoft.Extensions.Logging; -using Orleans.Runtime; using Orleans.Streams; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamFailureHandler : IStreamFailureHandler { diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs index fb2ddf2..a6928d7 100644 --- a/Provider/RedisStreamReceiver.cs +++ b/Provider/RedisStreamReceiver.cs @@ -3,7 +3,7 @@ using Orleans.Streams; using StackExchange.Redis; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamReceiver : IQueueAdapterReceiver { diff --git a/Provider/RedisStreamSequenceToken.cs b/Provider/RedisStreamSequenceToken.cs index cd2fcc5..e32c34e 100644 --- a/Provider/RedisStreamSequenceToken.cs +++ b/Provider/RedisStreamSequenceToken.cs @@ -1,7 +1,7 @@ using Orleans.Streams; using StackExchange.Redis; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { [GenerateSerializer] public class RedisStreamSequenceToken : StreamSequenceToken diff --git a/Provider/Provider.csproj b/Provider/Universley.OrleansContrib.StreamsProvider.Redis.csproj similarity index 100% rename from Provider/Provider.csproj rename to Provider/Universley.OrleansContrib.StreamsProvider.Redis.csproj diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index 9318f4d..279ec62 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -7,7 +7,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csp EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Provider\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}" EndProject diff --git a/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs index 4cb399a..87fb0f1 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs @@ -2,11 +2,8 @@ using Moq; using StackExchange.Redis; using Microsoft.Extensions.Logging; using Orleans.Streams; -using Xunit; -using Provider; -using System.Collections.Generic; -using System.Threading.Tasks; using Orleans.Configuration; +using Universley.OrleansContrib.StreamsProvider.Redis; namespace RedisStreamsProvider.UnitTests { diff --git a/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs index 9b6c5d1..5e2d307 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs @@ -1,7 +1,6 @@ -using Provider; using StackExchange.Redis; using System.Text.Json; -using Xunit; +using Universley.OrleansContrib.StreamsProvider.Redis; namespace RedisStreamsProvider.UnitTests { diff --git a/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs index 9f32d7b..665827b 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs @@ -1,12 +1,10 @@ -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Moq; using Orleans.Configuration; using Orleans.Providers.Streams.Common; using Orleans.Streams; -using Provider; using StackExchange.Redis; -using Xunit; +using Universley.OrleansContrib.StreamsProvider.Redis; namespace RedisStreamsProvider.UnitTests { diff --git a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs index 442e3ce..86761d2 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs @@ -1,12 +1,8 @@ using Microsoft.Extensions.Logging; using Moq; using Orleans.Streams; -using Provider; using StackExchange.Redis; -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Xunit; +using Universley.OrleansContrib.StreamsProvider.Redis; namespace RedisStreamsProvider.UnitTests { diff --git a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs index e5912c0..9446117 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs @@ -1,9 +1,7 @@ -using Xunit; -using StackExchange.Redis; -using Provider; +using StackExchange.Redis; using Orleans.Streams; -using System; using Moq; +using Universley.OrleansContrib.StreamsProvider.Redis; namespace Provider.Tests { diff --git a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj index e52d684..6cff32f 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj +++ b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj @@ -16,7 +16,7 @@ - + diff --git a/Server/Program.cs b/Server/Program.cs index 61c9a86..e45ff42 100644 --- a/Server/Program.cs +++ b/Server/Program.cs @@ -2,9 +2,9 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans.Configuration; -using Orleans.Runtime; using Orleans.Streams; using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; var builder = new HostBuilder() .UseOrleans(silo => @@ -16,7 +16,7 @@ var builder = new HostBuilder() }); silo.ConfigureLogging(logging => logging.AddConsole()); silo.AddMemoryGrainStorage("PubSubStore"); - silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null); + silo.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null); silo.AddMemoryGrainStorageAsDefault(); }).UseConsoleLifetime(); builder.ConfigureServices(services => diff --git a/Server/Server.csproj b/Server/Server.csproj index fe8d26c..f34bf50 100644 --- a/Server/Server.csproj +++ b/Server/Server.csproj @@ -14,7 +14,7 @@ - + -- 2.34.1 From 162506888429398b4bada6b044b041ac3b5b8687 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Mon, 20 Jan 2025 19:54:49 -0700 Subject: [PATCH 04/17] added nuke build pipeline for nuget package generation --- .nuke/build.schema.json | 128 ++++++++++++++++++++++++++++++++ .nuke/parameters.json | 4 + RedisStreamsInOrleans.sln | 4 + build.cmd | 7 ++ build.ps1 | 74 ++++++++++++++++++ build.sh | 67 +++++++++++++++++ build/.editorconfig | 11 +++ build/Build.cs | 119 +++++++++++++++++++++++++++++ build/Configuration.cs | 16 ++++ build/Directory.Build.props | 8 ++ build/Directory.Build.targets | 8 ++ build/_build.csproj | 22 ++++++ build/_build.csproj.DotSettings | 27 +++++++ 13 files changed, 495 insertions(+) create mode 100644 .nuke/build.schema.json create mode 100644 .nuke/parameters.json create mode 100755 build.cmd create mode 100644 build.ps1 create mode 100755 build.sh create mode 100644 build/.editorconfig create mode 100644 build/Build.cs create mode 100644 build/Configuration.cs create mode 100644 build/Directory.Build.props create mode 100644 build/Directory.Build.targets create mode 100644 build/_build.csproj create mode 100644 build/_build.csproj.DotSettings diff --git a/.nuke/build.schema.json b/.nuke/build.schema.json new file mode 100644 index 0000000..e2e2cf9 --- /dev/null +++ b/.nuke/build.schema.json @@ -0,0 +1,128 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "definitions": { + "Host": { + "type": "string", + "enum": [ + "AppVeyor", + "AzurePipelines", + "Bamboo", + "Bitbucket", + "Bitrise", + "GitHubActions", + "GitLab", + "Jenkins", + "Rider", + "SpaceAutomation", + "TeamCity", + "Terminal", + "TravisCI", + "VisualStudio", + "VSCode" + ] + }, + "ExecutableTarget": { + "type": "string", + "enum": [ + "Clean", + "Compile", + "Pack", + "Publish", + "Restore", + "Test" + ] + }, + "Verbosity": { + "type": "string", + "description": "", + "enum": [ + "Verbose", + "Normal", + "Minimal", + "Quiet" + ] + }, + "NukeBuild": { + "properties": { + "Continue": { + "type": "boolean", + "description": "Indicates to continue a previously failed build attempt" + }, + "Help": { + "type": "boolean", + "description": "Shows the help text for this build assembly" + }, + "Host": { + "description": "Host for execution. Default is 'automatic'", + "$ref": "#/definitions/Host" + }, + "NoLogo": { + "type": "boolean", + "description": "Disables displaying the NUKE logo" + }, + "Partition": { + "type": "string", + "description": "Partition to use on CI" + }, + "Plan": { + "type": "boolean", + "description": "Shows the execution plan (HTML)" + }, + "Profile": { + "type": "array", + "description": "Defines the profiles to load", + "items": { + "type": "string" + } + }, + "Root": { + "type": "string", + "description": "Root directory during build execution" + }, + "Skip": { + "type": "array", + "description": "List of targets to be skipped. Empty list skips all dependencies", + "items": { + "$ref": "#/definitions/ExecutableTarget" + } + }, + "Target": { + "type": "array", + "description": "List of targets to be invoked. Default is '{default_target}'", + "items": { + "$ref": "#/definitions/ExecutableTarget" + } + }, + "Verbosity": { + "description": "Logging verbosity during build execution. Default is 'Normal'", + "$ref": "#/definitions/Verbosity" + } + } + } + }, + "allOf": [ + { + "properties": { + "Configuration": { + "type": "string", + "description": "Configuration to build - Default is 'Debug' (local) or 'Release' (server)", + "enum": [ + "Debug", + "Release" + ] + }, + "NuGetApiKey": { + "type": "string", + "description": "NuGet API key" + }, + "Solution": { + "type": "string", + "description": "Path to a solution file that is automatically loaded" + } + } + }, + { + "$ref": "#/definitions/NukeBuild" + } + ] +} diff --git a/.nuke/parameters.json b/.nuke/parameters.json new file mode 100644 index 0000000..edf20b1 --- /dev/null +++ b/.nuke/parameters.json @@ -0,0 +1,4 @@ +{ + "$schema": "build.schema.json", + "Solution": "RedisStreamsInOrleans.sln" +} diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index 279ec62..f5d8720 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -11,12 +11,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/build.cmd b/build.cmd new file mode 100755 index 0000000..b08cc59 --- /dev/null +++ b/build.cmd @@ -0,0 +1,7 @@ +:; set -eo pipefail +:; SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd) +:; ${SCRIPT_DIR}/build.sh "$@" +:; exit $? + +@ECHO OFF +powershell -ExecutionPolicy ByPass -NoProfile -File "%~dp0build.ps1" %* diff --git a/build.ps1 b/build.ps1 new file mode 100644 index 0000000..4634dc0 --- /dev/null +++ b/build.ps1 @@ -0,0 +1,74 @@ +[CmdletBinding()] +Param( + [Parameter(Position=0,Mandatory=$false,ValueFromRemainingArguments=$true)] + [string[]]$BuildArguments +) + +Write-Output "PowerShell $($PSVersionTable.PSEdition) version $($PSVersionTable.PSVersion)" + +Set-StrictMode -Version 2.0; $ErrorActionPreference = "Stop"; $ConfirmPreference = "None"; trap { Write-Error $_ -ErrorAction Continue; exit 1 } +$PSScriptRoot = Split-Path $MyInvocation.MyCommand.Path -Parent + +########################################################################### +# CONFIGURATION +########################################################################### + +$BuildProjectFile = "$PSScriptRoot\build\_build.csproj" +$TempDirectory = "$PSScriptRoot\\.nuke\temp" + +$DotNetGlobalFile = "$PSScriptRoot\\global.json" +$DotNetInstallUrl = "https://dot.net/v1/dotnet-install.ps1" +$DotNetChannel = "STS" + +$env:DOTNET_CLI_TELEMETRY_OPTOUT = 1 +$env:DOTNET_NOLOGO = 1 + +########################################################################### +# EXECUTION +########################################################################### + +function ExecSafe([scriptblock] $cmd) { + & $cmd + if ($LASTEXITCODE) { exit $LASTEXITCODE } +} + +# If dotnet CLI is installed globally and it matches requested version, use for execution +if ($null -ne (Get-Command "dotnet" -ErrorAction SilentlyContinue) -and ` + $(dotnet --version) -and $LASTEXITCODE -eq 0) { + $env:DOTNET_EXE = (Get-Command "dotnet").Path +} +else { + # Download install script + $DotNetInstallFile = "$TempDirectory\dotnet-install.ps1" + New-Item -ItemType Directory -Path $TempDirectory -Force | Out-Null + [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12 + (New-Object System.Net.WebClient).DownloadFile($DotNetInstallUrl, $DotNetInstallFile) + + # If global.json exists, load expected version + if (Test-Path $DotNetGlobalFile) { + $DotNetGlobal = $(Get-Content $DotNetGlobalFile | Out-String | ConvertFrom-Json) + if ($DotNetGlobal.PSObject.Properties["sdk"] -and $DotNetGlobal.sdk.PSObject.Properties["version"]) { + $DotNetVersion = $DotNetGlobal.sdk.version + } + } + + # Install by channel or version + $DotNetDirectory = "$TempDirectory\dotnet-win" + if (!(Test-Path variable:DotNetVersion)) { + ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Channel $DotNetChannel -NoPath } + } else { + ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Version $DotNetVersion -NoPath } + } + $env:DOTNET_EXE = "$DotNetDirectory\dotnet.exe" + $env:PATH = "$DotNetDirectory;$env:PATH" +} + +Write-Output "Microsoft (R) .NET SDK version $(& $env:DOTNET_EXE --version)" + +if (Test-Path env:NUKE_ENTERPRISE_TOKEN) { + & $env:DOTNET_EXE nuget remove source "nuke-enterprise" > $null + & $env:DOTNET_EXE nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password $env:NUKE_ENTERPRISE_TOKEN > $null +} + +ExecSafe { & $env:DOTNET_EXE build $BuildProjectFile /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet } +ExecSafe { & $env:DOTNET_EXE run --project $BuildProjectFile --no-build -- $BuildArguments } diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..fdff0c6 --- /dev/null +++ b/build.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash + +bash --version 2>&1 | head -n 1 + +set -eo pipefail +SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd) + +########################################################################### +# CONFIGURATION +########################################################################### + +BUILD_PROJECT_FILE="$SCRIPT_DIR/build/_build.csproj" +TEMP_DIRECTORY="$SCRIPT_DIR//.nuke/temp" + +DOTNET_GLOBAL_FILE="$SCRIPT_DIR//global.json" +DOTNET_INSTALL_URL="https://dot.net/v1/dotnet-install.sh" +DOTNET_CHANNEL="STS" + +export DOTNET_CLI_TELEMETRY_OPTOUT=1 +export DOTNET_NOLOGO=1 + +########################################################################### +# EXECUTION +########################################################################### + +function FirstJsonValue { + perl -nle 'print $1 if m{"'"$1"'": "([^"]+)",?}' <<< "${@:2}" +} + +# If dotnet CLI is installed globally and it matches requested version, use for execution +if [ -x "$(command -v dotnet)" ] && dotnet --version &>/dev/null; then + export DOTNET_EXE="$(command -v dotnet)" +else + # Download install script + DOTNET_INSTALL_FILE="$TEMP_DIRECTORY/dotnet-install.sh" + mkdir -p "$TEMP_DIRECTORY" + curl -Lsfo "$DOTNET_INSTALL_FILE" "$DOTNET_INSTALL_URL" + chmod +x "$DOTNET_INSTALL_FILE" + + # If global.json exists, load expected version + if [[ -f "$DOTNET_GLOBAL_FILE" ]]; then + DOTNET_VERSION=$(FirstJsonValue "version" "$(cat "$DOTNET_GLOBAL_FILE")") + if [[ "$DOTNET_VERSION" == "" ]]; then + unset DOTNET_VERSION + fi + fi + + # Install by channel or version + DOTNET_DIRECTORY="$TEMP_DIRECTORY/dotnet-unix" + if [[ -z ${DOTNET_VERSION+x} ]]; then + "$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --channel "$DOTNET_CHANNEL" --no-path + else + "$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "$DOTNET_VERSION" --no-path + fi + export DOTNET_EXE="$DOTNET_DIRECTORY/dotnet" + export PATH="$DOTNET_DIRECTORY:$PATH" +fi + +echo "Microsoft (R) .NET SDK version $("$DOTNET_EXE" --version)" + +if [[ ! -z ${NUKE_ENTERPRISE_TOKEN+x} && "$NUKE_ENTERPRISE_TOKEN" != "" ]]; then + "$DOTNET_EXE" nuget remove source "nuke-enterprise" &>/dev/null || true + "$DOTNET_EXE" nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password "$NUKE_ENTERPRISE_TOKEN" --store-password-in-clear-text &>/dev/null || true +fi + +"$DOTNET_EXE" build "$BUILD_PROJECT_FILE" /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet +"$DOTNET_EXE" run --project "$BUILD_PROJECT_FILE" --no-build -- "$@" diff --git a/build/.editorconfig b/build/.editorconfig new file mode 100644 index 0000000..31e43dc --- /dev/null +++ b/build/.editorconfig @@ -0,0 +1,11 @@ +[*.cs] +dotnet_style_qualification_for_field = false:warning +dotnet_style_qualification_for_property = false:warning +dotnet_style_qualification_for_method = false:warning +dotnet_style_qualification_for_event = false:warning +dotnet_style_require_accessibility_modifiers = never:warning + +csharp_style_expression_bodied_methods = true:silent +csharp_style_expression_bodied_properties = true:warning +csharp_style_expression_bodied_indexers = true:warning +csharp_style_expression_bodied_accessors = true:warning diff --git a/build/Build.cs b/build/Build.cs new file mode 100644 index 0000000..d0c241f --- /dev/null +++ b/build/Build.cs @@ -0,0 +1,119 @@ +using System; +using System.IO; +using System.Linq; +using Nuke.Common; +using Nuke.Common.CI; +using Nuke.Common.Execution; +using Nuke.Common.IO; +using Nuke.Common.ProjectModel; +using Nuke.Common.Tooling; +using Nuke.Common.Tools.DotNet; +using Nuke.Common.Tools.GitVersion; +using Nuke.Common.Utilities.Collections; +using static Nuke.Common.EnvironmentInfo; +using static Nuke.Common.IO.PathConstruction; + +class Build : NukeBuild +{ + /// Support plugins are available for: + /// - JetBrains ReSharper https://nuke.build/resharper + /// - JetBrains Rider https://nuke.build/rider + /// - Microsoft VisualStudio https://nuke.build/visualstudio + /// - Microsoft VSCode https://nuke.build/vscode + + public static int Main() => Execute(x => x.Pack); + + private const string NuGetSourceUrl = "https://api.nuget.org/v3/index.json"; + private const string LibraryProjectName = "Universley.OrleansContrib.StreamsProvider.Redis"; + + [Parameter("Configuration to build - Default is 'Debug' (local) or 'Release' (server)")] + readonly Configuration Configuration = IsLocalBuild ? Configuration.Debug : Configuration.Release; + + [Solution] readonly Solution Solution; + + [GitVersion(NoFetch = true)] readonly GitVersion GitVersion; + + AbsolutePath SourceDirectory => RootDirectory; + AbsolutePath ArtifactsDirectory => RootDirectory / "artifacts"; + + AbsolutePath NuGetPackagesDirectory => ArtifactsDirectory / "nuget"; + AbsolutePath TestResultDirectory => ArtifactsDirectory / "test-results"; + + // Nuget API key can be also set as an environment variable + [Parameter("NuGet API key")] + readonly string NuGetApiKey; + + Target Clean => _ => _ + .Before(Restore) + .Executes(() => + { + SourceDirectory.GlobDirectories("**/bin", "**/obj").ForEach(ap => Directory.Delete(ap, true)); + ArtifactsDirectory.CreateOrCleanDirectory(); + }); + + Target Restore => _ => _ + .Executes(() => + { + DotNetTasks.DotNetRestore(s => s + .SetProjectFile(Solution)); + }); + + Target Compile => _ => _ + .DependsOn(Restore) + .Executes(() => + { + var semVer = GitVersion.AssemblySemVer; + var semFileVer = GitVersion.AssemblySemFileVer; + var informationalVersion = GitVersion.InformationalVersion; + + DotNetTasks.DotNetBuild(s => s + .SetProjectFile(Solution) + .SetConfiguration(Configuration) + .SetAssemblyVersion(semVer) + .SetFileVersion(semFileVer) + .SetInformationalVersion(informationalVersion) + .EnableNoRestore()); + }); + + Target Test => _ => _ + .DependsOn(Compile) + .Executes(() => + { + DotNetTasks.DotNetTest(s => s + .SetProjectFile(Solution) + .SetConfiguration(Configuration) + .SetLoggers($"trx;LogFileName={TestResultDirectory / "testresults.trx"}") + ); + }); + + Target Pack => _ => _ + .DependsOn(Test) + .Executes(() => + { + + DotNetTasks.DotNetPack(s => s + .SetProject(SourceDirectory / "Provider" / $"{LibraryProjectName}.csproj") + .SetConfiguration(Configuration) + .SetOutputDirectory(NuGetPackagesDirectory) + .SetVersion(GitVersion.NuGetVersionV2) + .SetNoBuild(true) + ); + }); + + Target Publish => _ => _ + .DependsOn(Pack) + .Requires(() => !string.IsNullOrEmpty(NuGetApiKey)) + .Executes(() => + { + DotNetTasks.DotNetNuGetPush(s => s + .SetSource(NuGetSourceUrl) + .SetApiKey(NuGetApiKey) + .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{GitVersion.NuGetVersionV2}.snupkg")); + + DotNetTasks.DotNetNuGetPush(s => s + .SetSource(NuGetSourceUrl) + .SetApiKey(NuGetApiKey) + .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{GitVersion.NuGetVersionV2}.nupkg")); + }); + +} diff --git a/build/Configuration.cs b/build/Configuration.cs new file mode 100644 index 0000000..9c08b1a --- /dev/null +++ b/build/Configuration.cs @@ -0,0 +1,16 @@ +using System; +using System.ComponentModel; +using System.Linq; +using Nuke.Common.Tooling; + +[TypeConverter(typeof(TypeConverter))] +public class Configuration : Enumeration +{ + public static Configuration Debug = new Configuration { Value = nameof(Debug) }; + public static Configuration Release = new Configuration { Value = nameof(Release) }; + + public static implicit operator string(Configuration configuration) + { + return configuration.Value; + } +} diff --git a/build/Directory.Build.props b/build/Directory.Build.props new file mode 100644 index 0000000..e147d63 --- /dev/null +++ b/build/Directory.Build.props @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/build/Directory.Build.targets b/build/Directory.Build.targets new file mode 100644 index 0000000..2532609 --- /dev/null +++ b/build/Directory.Build.targets @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/build/_build.csproj b/build/_build.csproj new file mode 100644 index 0000000..c73279e --- /dev/null +++ b/build/_build.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0 + + CS0649;CS0169;CA1050;CA1822;CA2211;IDE1006 + .. + .. + 1 + false + + + + + + + + + + + diff --git a/build/_build.csproj.DotSettings b/build/_build.csproj.DotSettings new file mode 100644 index 0000000..a778f33 --- /dev/null +++ b/build/_build.csproj.DotSettings @@ -0,0 +1,27 @@ + + DO_NOT_SHOW + DO_NOT_SHOW + DO_NOT_SHOW + DO_NOT_SHOW + DO_NOT_SHOW + Implicit + Implicit + ExpressionBody + 0 + NEXT_LINE + True + False + 120 + IF_OWNER_IS_SINGLE_LINE + WRAP_IF_LONG + False + <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> + <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> + True + True + True + True + True + True + True + True -- 2.34.1 From 7ca04eb538ed47811819b0cbafa622d3788422c8 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Tue, 21 Jan 2025 20:40:56 -0700 Subject: [PATCH 05/17] Fixed Serialization to allow to stream events from Grains --- .gitignore | 5 +- .../.idea/.gitignore | 13 ++++ .../.idea/encodings.xml | 4 ++ .../.idea/indexLayout.xml | 8 +++ .../.idea.RedisStreamsInOrleans/.idea/vcs.xml | 6 ++ Client/Program.cs | 10 +++ Provider/RedisStreamBatchContainer.cs | 52 ++++++++++++--- Provider/RedisStreamReceiver.cs | 2 +- Provider/RedisStreamSequenceToken.cs | 6 +- .../RedisStreamReceiverTests.cs | 65 +++++++++++++------ .../RedisStreamSequenceTokenTests.cs | 6 +- Server/Program.cs | 5 ++ build/_build.csproj.DotSettings | 8 ++- 13 files changed, 151 insertions(+), 39 deletions(-) create mode 100644 .idea/.idea.RedisStreamsInOrleans/.idea/.gitignore create mode 100644 .idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml create mode 100644 .idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml create mode 100644 .idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml diff --git a/.gitignore b/.gitignore index 9491a2f..59b290a 100644 --- a/.gitignore +++ b/.gitignore @@ -360,4 +360,7 @@ MigrationBackup/ .ionide/ # Fody - auto-generated XML schema -FodyWeavers.xsd \ No newline at end of file +FodyWeavers.xsd + +# Mac Finder +*.DS_Store diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore new file mode 100644 index 0000000..806d58f --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore @@ -0,0 +1,13 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Rider ignored files +/contentModel.xml +/.idea.RedisStreamsInOrleans.iml +/modules.xml +/projectSettingsUpdater.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml new file mode 100644 index 0000000..df87cf9 --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml new file mode 100644 index 0000000..7b08163 --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Client/Program.cs b/Client/Program.cs index 15ad374..7b0426a 100644 --- a/Client/Program.cs +++ b/Client/Program.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans.Configuration; +using Orleans.Streams; using StackExchange.Redis; using Universley.OrleansContrib.StreamsProvider.Redis; @@ -36,6 +37,15 @@ var streamProvider = client.GetStreamProvider("RedisStream"); var streamId = StreamId.Create("numbergenerator", "consecutive"); var stream = streamProvider.GetStream(streamId); +var newStreamId = StreamId.Create("numbergenerator", "consecutive-back"); +var newStream = streamProvider.GetStream(newStreamId); + +await stream.SubscribeAsync((i, token) => +{ + logger.LogInformation("Received back number {Number}", i); + return Task.CompletedTask; +}); + var task = Task.Run(async () => { var num = 0; diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs index 1da1f52..d403425 100644 --- a/Provider/RedisStreamBatchContainer.cs +++ b/Provider/RedisStreamBatchContainer.cs @@ -4,28 +4,64 @@ using System.Text.Json; namespace Universley.OrleansContrib.StreamsProvider.Redis { + [GenerateSerializer] + [Alias("Universley.OrleansContrib.StreamsProvider.Redis.RedisStreamBatchContainer")] public class RedisStreamBatchContainer : IBatchContainer { + [Id(0)] public StreamId StreamId { get; } + [Id(1)] public StreamSequenceToken SequenceToken { get; } - public StreamEntry StreamEntry { get; } + + [Id(2)] + public string EventType { get; } + + [Id(3)] + public string Data { get; } + + [Id(4)] + public string StreamEntryId { get; } + public RedisStreamBatchContainer(StreamEntry streamEntry) { - StreamEntry = streamEntry; - var streamNamespace = StreamEntry.Values[0].Value; - var steamKey = StreamEntry.Values[1].Value; + var streamNamespace = streamEntry.Values[0].Value; + var steamKey = streamEntry.Values[1].Value; + var eventType = streamEntry.Values[2].Value; + var data = streamEntry.Values[3].Value; + StreamEntryId = streamEntry.Id.ToString() ?? throw new ArgumentNullException(nameof(streamEntry.Id)); + + // Check incoming data + if (string.IsNullOrWhiteSpace(streamNamespace)) + { + throw new ArgumentNullException(nameof(streamNamespace)); + } + if (string.IsNullOrWhiteSpace(steamKey)) + { + throw new ArgumentNullException(nameof(steamKey)); + } + if (string.IsNullOrWhiteSpace(eventType)) + { + throw new ArgumentNullException(nameof(eventType)); + } + if (string.IsNullOrWhiteSpace(data)) + { + throw new ArgumentNullException(nameof(data)); + } + StreamId = StreamId.Create(streamNamespace!, steamKey!); - SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id); + SequenceToken = new RedisStreamSequenceToken(streamEntry.Id); + EventType = eventType!; + Data = data!; } public IEnumerable> GetEvents() { List> events = new(); var eventType = typeof(T).Name; - if (eventType == StreamEntry.Values[2].Value) + if (eventType == EventType) { - var data = StreamEntry.Values[3].Value; - var @event = JsonSerializer.Deserialize(data!); + var data = Data; + var @event = JsonSerializer.Deserialize(data); events.Add(new(@event!, SequenceToken)); } return events; diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs index a6928d7..f752799 100644 --- a/Provider/RedisStreamReceiver.cs +++ b/Provider/RedisStreamReceiver.cs @@ -69,7 +69,7 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis var container = message as RedisStreamBatchContainer; if (container != null) { - var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id); + var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntryId); pendingTasks = ack; await ack; } diff --git a/Provider/RedisStreamSequenceToken.cs b/Provider/RedisStreamSequenceToken.cs index e32c34e..388ef7f 100644 --- a/Provider/RedisStreamSequenceToken.cs +++ b/Provider/RedisStreamSequenceToken.cs @@ -7,9 +7,9 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis public class RedisStreamSequenceToken : StreamSequenceToken { [Id(0)] - public override long SequenceNumber { get; protected set; } + public sealed override long SequenceNumber { get; protected set; } [Id(1)] - public override int EventIndex { get; protected set; } + public sealed override int EventIndex { get; protected set; } public RedisStreamSequenceToken(RedisValue id) { @@ -37,7 +37,7 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis throw new ArgumentException("Invalid token type", nameof(other)); } - public override bool Equals(StreamSequenceToken other) + public override bool Equals(StreamSequenceToken? other) { var token = other as RedisStreamSequenceToken; return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex; diff --git a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs index 86761d2..09efc47 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs @@ -27,15 +27,23 @@ namespace RedisStreamsProvider.UnitTests // Arrange var streamEntries = new[] { - new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), - new NameValueEntry("key", "testKey")}), - new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), - new NameValueEntry("key", "testKey")}) + new StreamEntry("1-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ]), + new StreamEntry("2-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ]) }; _mockDatabase.Setup(db => db.StreamReadGroupAsync( - It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny(), It.IsAny(), CommandFlags.None)) - .ReturnsAsync(streamEntries); + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(streamEntries); // Act var result = await _receiver.GetQueueMessagesAsync(10); @@ -49,14 +57,17 @@ namespace RedisStreamsProvider.UnitTests public async Task Initialize_CreatesConsumerGroup() { // Arrange - _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) - .ReturnsAsync(true); + _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(true); // Act await _receiver.Initialize(TimeSpan.FromSeconds(5)); // Assert - _mockDatabase.Verify(db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), Times.Once); + _mockDatabase.Verify( + db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), + Times.Once); } [Fact] @@ -65,20 +76,31 @@ namespace RedisStreamsProvider.UnitTests // Arrange var messages = new List { - new RedisStreamBatchContainer(new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), - new NameValueEntry("key", "testKey")})), - new RedisStreamBatchContainer(new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"), - new NameValueEntry("key", "testKey")})) + new RedisStreamBatchContainer(new StreamEntry("1-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ])), + new RedisStreamBatchContainer(new StreamEntry("2-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ])) }; - _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) - .ReturnsAsync(2); + _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(), + It.IsAny(), CommandFlags.None)) + .ReturnsAsync(2); // Act await _receiver.MessagesDeliveredAsync(messages); // Assert - _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once); - _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once); + _mockDatabase.Verify( + db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once); + _mockDatabase.Verify( + db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once); } [Fact] @@ -86,8 +108,9 @@ namespace RedisStreamsProvider.UnitTests { // Arrange var tcs = new TaskCompletionSource(); - _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) - .Returns(tcs.Task); + _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .Returns(tcs.Task); // Act var getMessagesTask = _receiver.GetQueueMessagesAsync(10); @@ -97,4 +120,4 @@ namespace RedisStreamsProvider.UnitTests Assert.True(getMessagesTask.IsCompleted); } } -} +} \ No newline at end of file diff --git a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs index 9446117..1222146 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs +++ b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs @@ -1,9 +1,9 @@ -using StackExchange.Redis; +using Moq; using Orleans.Streams; -using Moq; +using StackExchange.Redis; using Universley.OrleansContrib.StreamsProvider.Redis; -namespace Provider.Tests +namespace RedisStreamsProvider.UnitTests { public class RedisStreamSequenceTokenTests { diff --git a/Server/Program.cs b/Server/Program.cs index e45ff42..e2b823b 100644 --- a/Server/Program.cs +++ b/Server/Program.cs @@ -66,6 +66,11 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver _logger.LogInformation("Received number {Number}", item); await Task.Delay(2000); + var newStreamId = StreamId.Create("numbergenerator", "consecutive-back"); + var newStream = this.GetStreamProvider("RedisStream").GetStream(newStreamId); + _logger.LogInformation("Sending number {Number} to new stream", item); + await newStream.OnNextAsync(item); + } } diff --git a/build/_build.csproj.DotSettings b/build/_build.csproj.DotSettings index a778f33..88a8824 100644 --- a/build/_build.csproj.DotSettings +++ b/build/_build.csproj.DotSettings @@ -1,4 +1,4 @@ - + DO_NOT_SHOW DO_NOT_SHOW DO_NOT_SHOW @@ -17,11 +17,15 @@ False <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> + <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy> + <Policy><Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"><ElementKinds><Kind Name="FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy> True True True True + True True True True - True + True + True -- 2.34.1 From e6d2c8ac37c81cd8426c36e2a013869db7a5878e Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 09:34:13 -0700 Subject: [PATCH 06/17] added gitea pipeline --- .gitea/workflows/ci-pipeline.yml | 52 ++++++++++++++++++++++++++++++++ RedisStreamsInOrleans.sln | 16 ++++++++-- build/Build.cs | 18 ++++++++--- 3 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 .gitea/workflows/ci-pipeline.yml diff --git a/.gitea/workflows/ci-pipeline.yml b/.gitea/workflows/ci-pipeline.yml new file mode 100644 index 0000000..18316bd --- /dev/null +++ b/.gitea/workflows/ci-pipeline.yml @@ -0,0 +1,52 @@ +name: CI Build + +on: + push: + branches: [ master ] + paths-ignore: + - '**/*.md' + - '**/*.gitignore' + - '**/*.gitattributes' + pull_request: + branches: [ master ] + workflow_dispatch: +permissions: + contents: write + packages: write + +env: + DOTNET_NOLOGO: true # Disable the .NET logo + DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true # Disable the .NET first time experience + DOTNET_CLI_TELEMETRY_OPTOUT: true # Disable sending .NET CLI telemetry + MINOR_VERSION_OVERRIDE: 0 + + GITEA_SERVER_URL: ${{ secrets.S_GITEA_SERVER_URL }} + GITEA_PACKAGE_OWNER: ${{ secrets.S_GITEA_PACKAGE_OWNER }} + GITEA_PACKAGE_OWNER_USER: ${{ secrets.S_GITEA_PACKAGE_OWNER_USER }} + GITEA_PACKAGE_OWNER_PASSWORD: ${{ secrets.S_GITEA_PACKAGE_OWNER_PASSWORD }} + GITEA_NUGET_SOURCE_NAME: ${{ vars.S_GITEA_NUGET_SOURCE_NAME }} + + +jobs: + build-ci-api: + runs-on: [linux,self-hosted] + name: CI Build API + steps: + - name: Checkout + uses: https://github.com/actions/checkout@v4 + + - name: Setup .NET 9 + uses: https://github.com/actions/setup-dotnet@v4 + with: + dotnet-version: 9.x + + - name: Make Build File Executable + shell: bash + run: | + chmod +x ./build.cmd + chmod +x ./build.sh + + - name: Run Nuke Build + shell: bash + run: | + ./build.cmd -Target Publish \ No newline at end of file diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index f5d8720..973af9e 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -13,14 +13,21 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTe EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ci", "ci", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".gitea", ".gitea", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" + ProjectSection(SolutionItems) = preProject + .gitea\workflows\ci-pipeline.yml = .gitea\workflows\ci-pipeline.yml + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -37,10 +44,15 @@ Global {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} + EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9A8081FD-99C7-4302-A892-DC1DE30F4853} EndGlobalSection diff --git a/build/Build.cs b/build/Build.cs index d0c241f..5053307 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -8,6 +8,7 @@ using Nuke.Common.IO; using Nuke.Common.ProjectModel; using Nuke.Common.Tooling; using Nuke.Common.Tools.DotNet; +using Nuke.Common.Tools.Git; using Nuke.Common.Tools.GitVersion; using Nuke.Common.Utilities.Collections; using static Nuke.Common.EnvironmentInfo; @@ -23,7 +24,6 @@ class Build : NukeBuild public static int Main() => Execute(x => x.Pack); - private const string NuGetSourceUrl = "https://api.nuget.org/v3/index.json"; private const string LibraryProjectName = "Universley.OrleansContrib.StreamsProvider.Redis"; [Parameter("Configuration to build - Default is 'Debug' (local) or 'Release' (server)")] @@ -43,6 +43,9 @@ class Build : NukeBuild [Parameter("NuGet API key")] readonly string NuGetApiKey; + [Parameter("Gitea Nuget package source name")] + readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME"); + Target Clean => _ => _ .Before(Restore) .Executes(() => @@ -106,14 +109,21 @@ class Build : NukeBuild .Executes(() => { DotNetTasks.DotNetNuGetPush(s => s - .SetSource(NuGetSourceUrl) + .SetSource(GiteaNugetSourceName) .SetApiKey(NuGetApiKey) .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{GitVersion.NuGetVersionV2}.snupkg")); DotNetTasks.DotNetNuGetPush(s => s - .SetSource(NuGetSourceUrl) - .SetApiKey(NuGetApiKey) + .SetSource(GiteaNugetSourceName) .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{GitVersion.NuGetVersionV2}.nupkg")); }); + Target CreateAndPushGitTag => _ => _ + .Executes(() => + { + var gitTag = $"{GitVersion.NuGetVersionV2}"; + GitTasks.Git($"tag {gitTag}"); + GitTasks.Git($"push origin {gitTag}"); + }); +} } -- 2.34.1 From ae766c70cbe128e16a0ffe2c4b067bc62537e220 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 10:34:34 -0700 Subject: [PATCH 07/17] reverted the sln so it won't crash Rider --- RedisStreamsInOrleans.sln | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index 973af9e..f5d8720 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -13,21 +13,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTe EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ci", "ci", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".gitea", ".gitea", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{02EA681E-C7D8-13C7-8484-4AC65E1B71E8}" - ProjectSection(SolutionItems) = preProject - .gitea\workflows\ci-pipeline.yml = .gitea\workflows\ci-pipeline.yml - EndProjectSection -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -44,15 +37,10 @@ Global {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU - {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection - GlobalSection(NestedProjects) = preSolution - {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} - EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9A8081FD-99C7-4302-A892-DC1DE30F4853} EndGlobalSection -- 2.34.1 From 791fe321afb7a6f3bca8d9cf3807e8fbf66a09fe Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 10:57:01 -0700 Subject: [PATCH 08/17] fixed syntax error --- .gitea/workflows/ci-pipeline.yml | 8 ++++---- build/Build.cs | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.gitea/workflows/ci-pipeline.yml b/.gitea/workflows/ci-pipeline.yml index 18316bd..9670a18 100644 --- a/.gitea/workflows/ci-pipeline.yml +++ b/.gitea/workflows/ci-pipeline.yml @@ -20,11 +20,11 @@ env: DOTNET_CLI_TELEMETRY_OPTOUT: true # Disable sending .NET CLI telemetry MINOR_VERSION_OVERRIDE: 0 - GITEA_SERVER_URL: ${{ secrets.S_GITEA_SERVER_URL }} - GITEA_PACKAGE_OWNER: ${{ secrets.S_GITEA_PACKAGE_OWNER }} + GITEA_SERVER_URL: ${{ vars.V_GITEA_SERVER_URL }} + GITEA_PACKAGE_OWNER: ${{ vars.V_GITEA_PACKAGE_OWNER }} + GITEA_NUGET_SOURCE_NAME: ${{ vars.V_GITEA_NUGET_SOURCE_NAME }} GITEA_PACKAGE_OWNER_USER: ${{ secrets.S_GITEA_PACKAGE_OWNER_USER }} GITEA_PACKAGE_OWNER_PASSWORD: ${{ secrets.S_GITEA_PACKAGE_OWNER_PASSWORD }} - GITEA_NUGET_SOURCE_NAME: ${{ vars.S_GITEA_NUGET_SOURCE_NAME }} jobs: @@ -49,4 +49,4 @@ jobs: - name: Run Nuke Build shell: bash run: | - ./build.cmd -Target Publish \ No newline at end of file + ./build.cmd -Target Publish diff --git a/build/Build.cs b/build/Build.cs index 5053307..31a83c2 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -126,4 +126,3 @@ class Build : NukeBuild GitTasks.Git($"push origin {gitTag}"); }); } -} -- 2.34.1 From fda34f23b4935e660def9f9fc2f390860a0c6166 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 11:00:37 -0700 Subject: [PATCH 09/17] no need for nuget api key for gitea --- build/Build.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/build/Build.cs b/build/Build.cs index 31a83c2..e2fef0a 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -105,7 +105,6 @@ class Build : NukeBuild Target Publish => _ => _ .DependsOn(Pack) - .Requires(() => !string.IsNullOrEmpty(NuGetApiKey)) .Executes(() => { DotNetTasks.DotNetNuGetPush(s => s -- 2.34.1 From 4058972e824d467885dd8c8344b20b71b9964490 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 12:19:31 -0700 Subject: [PATCH 10/17] changed build pipeline --- build/Build.cs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/build/Build.cs b/build/Build.cs index e2fef0a..1d9efcf 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -11,6 +11,7 @@ using Nuke.Common.Tools.DotNet; using Nuke.Common.Tools.Git; using Nuke.Common.Tools.GitVersion; using Nuke.Common.Utilities.Collections; +using Serilog; using static Nuke.Common.EnvironmentInfo; using static Nuke.Common.IO.PathConstruction; @@ -47,35 +48,31 @@ class Build : NukeBuild readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME"); Target Clean => _ => _ - .Before(Restore) + .Before(Compile) .Executes(() => { - SourceDirectory.GlobDirectories("**/bin", "**/obj").ForEach(ap => Directory.Delete(ap, true)); + SourceDirectory + .GlobDirectories("**/bin", "**/obj") + .Except(RootDirectory.GlobDirectories("build/**/bin", "build/**/obj")) + .ForEach(ap => Directory.Delete(ap, true)); ArtifactsDirectory.CreateOrCleanDirectory(); }); - Target Restore => _ => _ - .Executes(() => - { - DotNetTasks.DotNetRestore(s => s - .SetProjectFile(Solution)); - }); - Target Compile => _ => _ - .DependsOn(Restore) .Executes(() => { var semVer = GitVersion.AssemblySemVer; var semFileVer = GitVersion.AssemblySemFileVer; var informationalVersion = GitVersion.InformationalVersion; + Log.Logger.Information("AssemblySemVer: {semVer}", semVer); + DotNetTasks.DotNetBuild(s => s .SetProjectFile(Solution) .SetConfiguration(Configuration) .SetAssemblyVersion(semVer) .SetFileVersion(semFileVer) - .SetInformationalVersion(informationalVersion) - .EnableNoRestore()); + .SetInformationalVersion(informationalVersion)); }); Target Test => _ => _ @@ -104,7 +101,7 @@ class Build : NukeBuild }); Target Publish => _ => _ - .DependsOn(Pack) + .DependsOn(Pack, CreateAndPushGitTag) .Executes(() => { DotNetTasks.DotNetNuGetPush(s => s -- 2.34.1 From db05731456a8c7cf3bd93f0f9e59386bca90fbc8 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 12:41:34 -0700 Subject: [PATCH 11/17] changed build versioning to get rid of GitVersion dependency --- build/Build.cs | 50 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/build/Build.cs b/build/Build.cs index 1d9efcf..934118e 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -32,8 +32,6 @@ class Build : NukeBuild [Solution] readonly Solution Solution; - [GitVersion(NoFetch = true)] readonly GitVersion GitVersion; - AbsolutePath SourceDirectory => RootDirectory; AbsolutePath ArtifactsDirectory => RootDirectory / "artifacts"; @@ -44,6 +42,10 @@ class Build : NukeBuild [Parameter("NuGet API key")] readonly string NuGetApiKey; + [Parameter("Build number override")] + readonly string BuildNumber + = Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER") ?? $"{0}"; + [Parameter("Gitea Nuget package source name")] readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME"); @@ -61,9 +63,9 @@ class Build : NukeBuild Target Compile => _ => _ .Executes(() => { - var semVer = GitVersion.AssemblySemVer; - var semFileVer = GitVersion.AssemblySemFileVer; - var informationalVersion = GitVersion.InformationalVersion; + var semVer = Version; + var semFileVer = Version; + var informationalVersion = Version; Log.Logger.Information("AssemblySemVer: {semVer}", semVer); @@ -95,7 +97,7 @@ class Build : NukeBuild .SetProject(SourceDirectory / "Provider" / $"{LibraryProjectName}.csproj") .SetConfiguration(Configuration) .SetOutputDirectory(NuGetPackagesDirectory) - .SetVersion(GitVersion.NuGetVersionV2) + .SetVersion(Version) .SetNoBuild(true) ); }); @@ -107,18 +109,48 @@ class Build : NukeBuild DotNetTasks.DotNetNuGetPush(s => s .SetSource(GiteaNugetSourceName) .SetApiKey(NuGetApiKey) - .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{GitVersion.NuGetVersionV2}.snupkg")); + .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.snupkg")); DotNetTasks.DotNetNuGetPush(s => s .SetSource(GiteaNugetSourceName) - .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{GitVersion.NuGetVersionV2}.nupkg")); + .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.nupkg")); }); Target CreateAndPushGitTag => _ => _ .Executes(() => { - var gitTag = $"{GitVersion.NuGetVersionV2}"; + var gitTag = $"{Version}"; GitTasks.Git($"tag {gitTag}"); GitTasks.Git($"push origin {gitTag}"); }); + + private string Version + { + get + { + if (version == null) + { + version = GetVersion(); + } + return version; + } + } + + private string version = null; + + private string GetVersion() + { + Log.Information("GitHub run number is {number}", Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER")); + var buildNumber = BuildNumber; + if (string.IsNullOrEmpty(buildNumber)) + { + return "1.0.0"; + } + + var currentDateTime = DateTimeOffset.UtcNow; + + var assembledVersion = $"{currentDateTime.Year}.{currentDateTime.Month}.{buildNumber}"; + Log.Information("Assembled version: {assembledVersion}", assembledVersion); + return assembledVersion; + } } -- 2.34.1 From f92480615d67328bb35a15d2fb5a119ab3b0ae64 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 12:55:25 -0700 Subject: [PATCH 12/17] added package authentication for nuget push --- build/Build.cs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/build/Build.cs b/build/Build.cs index 934118e..1018557 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -49,6 +49,13 @@ class Build : NukeBuild [Parameter("Gitea Nuget package source name")] readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME"); + [Parameter("Gitea package owner user")] + [Secret] + readonly string GiteaPackageOwnerUser = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_USER"); + [Parameter("Gitea package owner password")] + [Secret] + readonly string GiteaPackageOwnerPassword = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_PASSWORD"); + Target Clean => _ => _ .Before(Compile) .Executes(() => @@ -106,13 +113,19 @@ class Build : NukeBuild .DependsOn(Pack, CreateAndPushGitTag) .Executes(() => { + DotNetTasks.DotNetNuGetAddSource(c => c + .SetUsername(GiteaPackageOwnerUser) + .SetPassword(GiteaPackageOwnerPassword) + .SetName("Gitea") + .SetSource($"{GiteaNugetSourceName}/index.json")); + DotNetTasks.DotNetNuGetPush(s => s - .SetSource(GiteaNugetSourceName) + .SetSource($"{GiteaNugetSourceName}/symbols") .SetApiKey(NuGetApiKey) .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.snupkg")); DotNetTasks.DotNetNuGetPush(s => s - .SetSource(GiteaNugetSourceName) + .SetSource($"{GiteaNugetSourceName}/index.json") .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.nupkg")); }); -- 2.34.1 From a3a853c1d041b2b394ee6cb1318a9b208e256dcf Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 22 Jan 2025 13:32:16 -0700 Subject: [PATCH 13/17] setting plain text password for temp nuget feed reference --- build/Build.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/build/Build.cs b/build/Build.cs index 1018557..b9ff664 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -114,6 +114,7 @@ class Build : NukeBuild .Executes(() => { DotNetTasks.DotNetNuGetAddSource(c => c + .EnableStorePasswordInClearText() .SetUsername(GiteaPackageOwnerUser) .SetPassword(GiteaPackageOwnerPassword) .SetName("Gitea") -- 2.34.1 From 30726d7aa1a5b157347f9d868b75c2e38fb6c5f3 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 29 Jan 2025 17:29:59 -0700 Subject: [PATCH 14/17] changed solution folder structure --- RedisStreamsInOrleans.sln | 6 +++--- .../RedisStreamsProvider.UnitTests.csproj | 2 +- .../RedisStreamAdapter.cs | 0 .../RedisStreamBatchContainer.cs | 0 .../RedisStreamFactory.cs | 0 .../RedisStreamFailureHandler.cs | 0 .../RedisStreamReceiver.cs | 0 .../RedisStreamSequenceToken.cs | 0 .../Universley.OrleansContrib.StreamsProvider.Redis.csproj | 0 {Client => example/Client}/Client.csproj | 2 +- {Client => example/Client}/Program.cs | 0 {Server => example/Server}/Program.cs | 0 {Server => example/Server}/Server.csproj | 2 +- 13 files changed, 6 insertions(+), 6 deletions(-) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/RedisStreamAdapter.cs (100%) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/RedisStreamBatchContainer.cs (100%) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/RedisStreamFactory.cs (100%) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/RedisStreamFailureHandler.cs (100%) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/RedisStreamReceiver.cs (100%) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/RedisStreamSequenceToken.cs (100%) rename {Provider => Universley.OrleansContrib.StreamsProvider.Redis}/Universley.OrleansContrib.StreamsProvider.Redis.csproj (100%) rename {Client => example/Client}/Client.csproj (78%) rename {Client => example/Client}/Program.cs (100%) rename {Server => example/Server}/Program.cs (100%) rename {Server => example/Server}/Server.csproj (78%) diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index f5d8720..886d220 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -3,11 +3,11 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.10.34928.147 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "example\Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "example\Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Provider\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}" EndProject diff --git a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj index 6cff32f..35e6cd5 100644 --- a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj +++ b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj @@ -16,7 +16,7 @@ - + diff --git a/Provider/RedisStreamAdapter.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamAdapter.cs similarity index 100% rename from Provider/RedisStreamAdapter.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamAdapter.cs diff --git a/Provider/RedisStreamBatchContainer.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamBatchContainer.cs similarity index 100% rename from Provider/RedisStreamBatchContainer.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamBatchContainer.cs diff --git a/Provider/RedisStreamFactory.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFactory.cs similarity index 100% rename from Provider/RedisStreamFactory.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFactory.cs diff --git a/Provider/RedisStreamFailureHandler.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFailureHandler.cs similarity index 100% rename from Provider/RedisStreamFailureHandler.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFailureHandler.cs diff --git a/Provider/RedisStreamReceiver.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamReceiver.cs similarity index 100% rename from Provider/RedisStreamReceiver.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamReceiver.cs diff --git a/Provider/RedisStreamSequenceToken.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamSequenceToken.cs similarity index 100% rename from Provider/RedisStreamSequenceToken.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamSequenceToken.cs diff --git a/Provider/Universley.OrleansContrib.StreamsProvider.Redis.csproj b/Universley.OrleansContrib.StreamsProvider.Redis/Universley.OrleansContrib.StreamsProvider.Redis.csproj similarity index 100% rename from Provider/Universley.OrleansContrib.StreamsProvider.Redis.csproj rename to Universley.OrleansContrib.StreamsProvider.Redis/Universley.OrleansContrib.StreamsProvider.Redis.csproj diff --git a/Client/Client.csproj b/example/Client/Client.csproj similarity index 78% rename from Client/Client.csproj rename to example/Client/Client.csproj index 2c7b60f..a3e8a16 100644 --- a/Client/Client.csproj +++ b/example/Client/Client.csproj @@ -14,7 +14,7 @@ - + diff --git a/Client/Program.cs b/example/Client/Program.cs similarity index 100% rename from Client/Program.cs rename to example/Client/Program.cs diff --git a/Server/Program.cs b/example/Server/Program.cs similarity index 100% rename from Server/Program.cs rename to example/Server/Program.cs diff --git a/Server/Server.csproj b/example/Server/Server.csproj similarity index 78% rename from Server/Server.csproj rename to example/Server/Server.csproj index f34bf50..d7da8fe 100644 --- a/Server/Server.csproj +++ b/example/Server/Server.csproj @@ -14,7 +14,7 @@ - + -- 2.34.1 From 47bede52d1bf8d44dca9c9d70a9a884dbfac96a4 Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 29 Jan 2025 23:22:57 -0700 Subject: [PATCH 15/17] fixed Project file reference after the structure change --- .nuke/build.schema.json | 20 +++++++++++++++++++- build/Build.cs | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/.nuke/build.schema.json b/.nuke/build.schema.json index e2e2cf9..94f28e8 100644 --- a/.nuke/build.schema.json +++ b/.nuke/build.schema.json @@ -26,9 +26,9 @@ "enum": [ "Clean", "Compile", + "CreateAndPushGitTag", "Pack", "Publish", - "Restore", "Test" ] }, @@ -103,6 +103,10 @@ "allOf": [ { "properties": { + "BuildNumber": { + "type": "string", + "description": "Build number override" + }, "Configuration": { "type": "string", "description": "Configuration to build - Default is 'Debug' (local) or 'Release' (server)", @@ -111,6 +115,20 @@ "Release" ] }, + "GiteaNugetSourceName": { + "type": "string", + "description": "Gitea Nuget package source name" + }, + "GiteaPackageOwnerPassword": { + "type": "string", + "description": "Gitea package owner password", + "default": "Secrets must be entered via 'nuke :secrets [profile]'" + }, + "GiteaPackageOwnerUser": { + "type": "string", + "description": "Gitea package owner user", + "default": "Secrets must be entered via 'nuke :secrets [profile]'" + }, "NuGetApiKey": { "type": "string", "description": "NuGet API key" diff --git a/build/Build.cs b/build/Build.cs index b9ff664..e2c098a 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -101,7 +101,7 @@ class Build : NukeBuild { DotNetTasks.DotNetPack(s => s - .SetProject(SourceDirectory / "Provider" / $"{LibraryProjectName}.csproj") + .SetProject(SourceDirectory / "Universley.OrleansContrib.StreamsProvider.Redis" / $"{LibraryProjectName}.csproj") .SetConfiguration(Configuration) .SetOutputDirectory(NuGetPackagesDirectory) .SetVersion(Version) -- 2.34.1 From 7992e3de0fbc8e492a7ac10ecd27896e2af6e4cd Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 29 Jan 2025 23:26:19 -0700 Subject: [PATCH 16/17] killing the exception when trying to add a duplicate nuget source --- build/Build.cs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/build/Build.cs b/build/Build.cs index e2c098a..c39b478 100644 --- a/build/Build.cs +++ b/build/Build.cs @@ -113,12 +113,19 @@ class Build : NukeBuild .DependsOn(Pack, CreateAndPushGitTag) .Executes(() => { - DotNetTasks.DotNetNuGetAddSource(c => c - .EnableStorePasswordInClearText() - .SetUsername(GiteaPackageOwnerUser) - .SetPassword(GiteaPackageOwnerPassword) - .SetName("Gitea") - .SetSource($"{GiteaNugetSourceName}/index.json")); + try + { + DotNetTasks.DotNetNuGetAddSource(c => c + .EnableStorePasswordInClearText() + .SetUsername(GiteaPackageOwnerUser) + .SetPassword(GiteaPackageOwnerPassword) + .SetName("Gitea") + .SetSource($"{GiteaNugetSourceName}/index.json")); + } + catch (Exception e) + { + Log.Warning("Error adding Gitea source: {e}", e); + } DotNetTasks.DotNetNuGetPush(s => s .SetSource($"{GiteaNugetSourceName}/symbols") -- 2.34.1 From 09517e59a33617e4f5dba259582fffb67fc3ae4f Mon Sep 17 00:00:00 2001 From: Michael Samorokov Date: Wed, 29 Jan 2025 23:32:42 -0700 Subject: [PATCH 17/17] added README.md --- README.md | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index b38d442..8840fd4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,82 @@ -# RedisStreamsInOrleans -This is a project that demonstrates how to create a custom Redis Stream Provider for Orleans. -https://swacblooms.com/custom-redis-streams-provider-for-orleans/ \ No newline at end of file +# Universley.OrleansContrib.StreamsProvider.Redis + +## Summary +This library provides an integration of Redis Streams with Microsoft Orleans, allowing you to use Redis as a streaming provider within your Orleans applications. It enables seamless communication and data streaming between Orleans grains and external clients using Redis Streams. + +## How to Use the Redis Provider with Orleans + +### 1. With Grain as a Client +To use Redis Streams with a grain as a client, follow these steps: + +1. Install the necessary NuGet packages: + ```sh + dotnet add package Orleans.Streaming.Redis + ``` + +2. Configure the Redis stream provider in your Orleans silo configuration: + ```csharp + var host = new SiloHostBuilder() + .AddRedisStreams("RedisProvider", options => + { + options.ConnectionString = "your_redis_connection_string"; + }) + .Build(); + ``` + +3. In your grain, use the stream provider to send and receive messages: + ```csharp + public class MyGrain : Grain, IMyGrain + { + private IAsyncStream _stream; + + public override async Task OnActivateAsync() + { + var streamProvider = GetStreamProvider("RedisProvider"); + _stream = streamProvider.GetStream(this.GetPrimaryKey(), "streamNamespace"); + await base.OnActivateAsync(); + } + + public async Task SendMessage(string message) + { + await _stream.OnNextAsync(message); + } + + public async Task ReceiveMessages() + { + var handle = await _stream.SubscribeAsync((message, token) => + { + Console.WriteLine($"Received message: {message}"); + return Task.CompletedTask; + }); + } + } + ``` + +### 2. With External Client +To use Redis Streams with an external client, follow these steps: + +1. Install the StackExchange.Redis package: + ```sh + dotnet add package StackExchange.Redis + ``` + +2. Connect to the Redis server and interact with the stream: + ```csharp + using StackExchange.Redis; + + var redis = ConnectionMultiplexer.Connect("your_redis_connection_string"); + var db = redis.GetDatabase(); + + // Add a message to the stream + var messageId = await db.StreamAddAsync("mystream", "message", "Hello, Redis!"); + + // Read messages from the stream + var messages = await db.StreamReadAsync("mystream", "0-0"); + foreach (var message in messages) + { + Console.WriteLine($"Message ID: {message.Id}, Values: {string.Join(", ", message.Values)}"); + } + ``` + +## Credit +This library is based on the original repository by [sammychinedu2ky](https://github.com/sammychinedu2ky/RedisStreamsInOrleans). \ No newline at end of file -- 2.34.1