diff --git a/.gitea/workflows/ci-pipeline.yml b/.gitea/workflows/ci-pipeline.yml new file mode 100644 index 0000000..9670a18 --- /dev/null +++ b/.gitea/workflows/ci-pipeline.yml @@ -0,0 +1,52 @@ +name: CI Build + +on: + push: + branches: [ master ] + paths-ignore: + - '**/*.md' + - '**/*.gitignore' + - '**/*.gitattributes' + pull_request: + branches: [ master ] + workflow_dispatch: +permissions: + contents: write + packages: write + +env: + DOTNET_NOLOGO: true # Disable the .NET logo + DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true # Disable the .NET first time experience + DOTNET_CLI_TELEMETRY_OPTOUT: true # Disable sending .NET CLI telemetry + MINOR_VERSION_OVERRIDE: 0 + + GITEA_SERVER_URL: ${{ vars.V_GITEA_SERVER_URL }} + GITEA_PACKAGE_OWNER: ${{ vars.V_GITEA_PACKAGE_OWNER }} + GITEA_NUGET_SOURCE_NAME: ${{ vars.V_GITEA_NUGET_SOURCE_NAME }} + GITEA_PACKAGE_OWNER_USER: ${{ secrets.S_GITEA_PACKAGE_OWNER_USER }} + GITEA_PACKAGE_OWNER_PASSWORD: ${{ secrets.S_GITEA_PACKAGE_OWNER_PASSWORD }} + + +jobs: + build-ci-api: + runs-on: [linux,self-hosted] + name: CI Build API + steps: + - name: Checkout + uses: https://github.com/actions/checkout@v4 + + - name: Setup .NET 9 + uses: https://github.com/actions/setup-dotnet@v4 + with: + dotnet-version: 9.x + + - name: Make Build File Executable + shell: bash + run: | + chmod +x ./build.cmd + chmod +x ./build.sh + + - name: Run Nuke Build + shell: bash + run: | + ./build.cmd -Target Publish diff --git a/.gitignore b/.gitignore index 9491a2f..59b290a 100644 --- a/.gitignore +++ b/.gitignore @@ -360,4 +360,7 @@ MigrationBackup/ .ionide/ # Fody - auto-generated XML schema -FodyWeavers.xsd \ No newline at end of file +FodyWeavers.xsd + +# Mac Finder +*.DS_Store diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore new file mode 100644 index 0000000..806d58f --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore @@ -0,0 +1,13 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Rider ignored files +/contentModel.xml +/.idea.RedisStreamsInOrleans.iml +/modules.xml +/projectSettingsUpdater.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml new file mode 100644 index 0000000..df87cf9 --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml new file mode 100644 index 0000000..7b08163 --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.nuke/build.schema.json b/.nuke/build.schema.json new file mode 100644 index 0000000..94f28e8 --- /dev/null +++ b/.nuke/build.schema.json @@ -0,0 +1,146 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "definitions": { + "Host": { + "type": "string", + "enum": [ + "AppVeyor", + "AzurePipelines", + "Bamboo", + "Bitbucket", + "Bitrise", + "GitHubActions", + "GitLab", + "Jenkins", + "Rider", + "SpaceAutomation", + "TeamCity", + "Terminal", + "TravisCI", + "VisualStudio", + "VSCode" + ] + }, + "ExecutableTarget": { + "type": "string", + "enum": [ + "Clean", + "Compile", + "CreateAndPushGitTag", + "Pack", + "Publish", + "Test" + ] + }, + "Verbosity": { + "type": "string", + "description": "", + "enum": [ + "Verbose", + "Normal", + "Minimal", + "Quiet" + ] + }, + "NukeBuild": { + "properties": { + "Continue": { + "type": "boolean", + "description": "Indicates to continue a previously failed build attempt" + }, + "Help": { + "type": "boolean", + "description": "Shows the help text for this build assembly" + }, + "Host": { + "description": "Host for execution. Default is 'automatic'", + "$ref": "#/definitions/Host" + }, + "NoLogo": { + "type": "boolean", + "description": "Disables displaying the NUKE logo" + }, + "Partition": { + "type": "string", + "description": "Partition to use on CI" + }, + "Plan": { + "type": "boolean", + "description": "Shows the execution plan (HTML)" + }, + "Profile": { + "type": "array", + "description": "Defines the profiles to load", + "items": { + "type": "string" + } + }, + "Root": { + "type": "string", + "description": "Root directory during build execution" + }, + "Skip": { + "type": "array", + "description": "List of targets to be skipped. Empty list skips all dependencies", + "items": { + "$ref": "#/definitions/ExecutableTarget" + } + }, + "Target": { + "type": "array", + "description": "List of targets to be invoked. Default is '{default_target}'", + "items": { + "$ref": "#/definitions/ExecutableTarget" + } + }, + "Verbosity": { + "description": "Logging verbosity during build execution. Default is 'Normal'", + "$ref": "#/definitions/Verbosity" + } + } + } + }, + "allOf": [ + { + "properties": { + "BuildNumber": { + "type": "string", + "description": "Build number override" + }, + "Configuration": { + "type": "string", + "description": "Configuration to build - Default is 'Debug' (local) or 'Release' (server)", + "enum": [ + "Debug", + "Release" + ] + }, + "GiteaNugetSourceName": { + "type": "string", + "description": "Gitea Nuget package source name" + }, + "GiteaPackageOwnerPassword": { + "type": "string", + "description": "Gitea package owner password", + "default": "Secrets must be entered via 'nuke :secrets [profile]'" + }, + "GiteaPackageOwnerUser": { + "type": "string", + "description": "Gitea package owner user", + "default": "Secrets must be entered via 'nuke :secrets [profile]'" + }, + "NuGetApiKey": { + "type": "string", + "description": "NuGet API key" + }, + "Solution": { + "type": "string", + "description": "Path to a solution file that is automatically loaded" + } + } + }, + { + "$ref": "#/definitions/NukeBuild" + } + ] +} diff --git a/.nuke/parameters.json b/.nuke/parameters.json new file mode 100644 index 0000000..edf20b1 --- /dev/null +++ b/.nuke/parameters.json @@ -0,0 +1,4 @@ +{ + "$schema": "build.schema.json", + "Solution": "RedisStreamsInOrleans.sln" +} diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs deleted file mode 100644 index c799067..0000000 --- a/Provider/RedisStreamBatchContainer.cs +++ /dev/null @@ -1,40 +0,0 @@ -using Orleans.Runtime; -using Orleans.Streams; -using StackExchange.Redis; -using System.Text.Json; - -namespace Provider -{ - public class RedisStreamBatchContainer : IBatchContainer - { - public StreamId StreamId { get; } - - public StreamSequenceToken SequenceToken { get; } - public StreamEntry StreamEntry { get; } - public RedisStreamBatchContainer(StreamEntry streamEntry) - { - StreamEntry = streamEntry; - var streamNamespace = StreamEntry.Values[0].Value; - var steamKey = StreamEntry.Values[1].Value; - StreamId = StreamId.Create(streamNamespace!, steamKey!); - SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id); - } - public IEnumerable> GetEvents() - { - List> events = new(); - var eventType = typeof(T).Name; - if (eventType == StreamEntry.Values[2].Value) - { - var data = StreamEntry.Values[3].Value; - var @event = JsonSerializer.Deserialize(data!); - events.Add(new(@event!, SequenceToken)); - } - return events; - } - - public bool ImportRequestContext() - { - return false; - } - } -} diff --git a/README.md b/README.md index b38d442..8840fd4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,82 @@ -# RedisStreamsInOrleans -This is a project that demonstrates how to create a custom Redis Stream Provider for Orleans. -https://swacblooms.com/custom-redis-streams-provider-for-orleans/ \ No newline at end of file +# Universley.OrleansContrib.StreamsProvider.Redis + +## Summary +This library provides an integration of Redis Streams with Microsoft Orleans, allowing you to use Redis as a streaming provider within your Orleans applications. It enables seamless communication and data streaming between Orleans grains and external clients using Redis Streams. + +## How to Use the Redis Provider with Orleans + +### 1. With Grain as a Client +To use Redis Streams with a grain as a client, follow these steps: + +1. Install the necessary NuGet packages: + ```sh + dotnet add package Orleans.Streaming.Redis + ``` + +2. Configure the Redis stream provider in your Orleans silo configuration: + ```csharp + var host = new SiloHostBuilder() + .AddRedisStreams("RedisProvider", options => + { + options.ConnectionString = "your_redis_connection_string"; + }) + .Build(); + ``` + +3. In your grain, use the stream provider to send and receive messages: + ```csharp + public class MyGrain : Grain, IMyGrain + { + private IAsyncStream _stream; + + public override async Task OnActivateAsync() + { + var streamProvider = GetStreamProvider("RedisProvider"); + _stream = streamProvider.GetStream(this.GetPrimaryKey(), "streamNamespace"); + await base.OnActivateAsync(); + } + + public async Task SendMessage(string message) + { + await _stream.OnNextAsync(message); + } + + public async Task ReceiveMessages() + { + var handle = await _stream.SubscribeAsync((message, token) => + { + Console.WriteLine($"Received message: {message}"); + return Task.CompletedTask; + }); + } + } + ``` + +### 2. With External Client +To use Redis Streams with an external client, follow these steps: + +1. Install the StackExchange.Redis package: + ```sh + dotnet add package StackExchange.Redis + ``` + +2. Connect to the Redis server and interact with the stream: + ```csharp + using StackExchange.Redis; + + var redis = ConnectionMultiplexer.Connect("your_redis_connection_string"); + var db = redis.GetDatabase(); + + // Add a message to the stream + var messageId = await db.StreamAddAsync("mystream", "message", "Hello, Redis!"); + + // Read messages from the stream + var messages = await db.StreamReadAsync("mystream", "0-0"); + foreach (var message in messages) + { + Console.WriteLine($"Message ID: {message.Id}, Values: {string.Join(", ", message.Values)}"); + } + ``` + +## Credit +This library is based on the original repository by [sammychinedu2ky](https://github.com/sammychinedu2ky/RedisStreamsInOrleans). \ No newline at end of file diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln index 3feeb5e..886d220 100644 --- a/RedisStreamsInOrleans.sln +++ b/RedisStreamsInOrleans.sln @@ -3,11 +3,15 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.10.34928.147 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "example\Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "example\Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -15,6 +19,8 @@ Global Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -27,6 +33,10 @@ Global {70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs new file mode 100644 index 0000000..87fb0f1 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs @@ -0,0 +1,45 @@ +using Moq; +using StackExchange.Redis; +using Microsoft.Extensions.Logging; +using Orleans.Streams; +using Orleans.Configuration; +using Universley.OrleansContrib.StreamsProvider.Redis; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamAdapterTests + { + private readonly Mock _mockDatabase; + private readonly Mock _mockQueueMapper; + private readonly Mock _mockLoggerFactory; + private readonly RedisStreamAdapter _adapter; + + public RedisStreamAdapterTests() + { + _mockDatabase = new Mock(); + var options = new HashRingStreamQueueMapperOptions { TotalQueueCount = 1 }; + _mockQueueMapper = new Mock(options, "queueNamePrefix"); + _mockLoggerFactory = new Mock(); + + _adapter = new RedisStreamAdapter(_mockDatabase.Object, "TestProvider", _mockQueueMapper.Object, _mockLoggerFactory.Object); + } + + [Fact] + public void Constructor_ShouldInitializeProperties() + { + Assert.Equal("TestProvider", _adapter.Name); + Assert.False(_adapter.IsRewindable); + Assert.Equal(StreamProviderDirection.ReadWrite, _adapter.Direction); + } + + [Fact] + public void CreateReceiver_ShouldReturnRedisStreamReceiver() + { + var queueId = QueueId.GetQueueId("queueName", 0, 1); + var receiver = _adapter.CreateReceiver(queueId); + + Assert.NotNull(receiver); + Assert.IsType(receiver); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs new file mode 100644 index 0000000..5e2d307 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs @@ -0,0 +1,79 @@ +using StackExchange.Redis; +using System.Text.Json; +using Universley.OrleansContrib.StreamsProvider.Redis; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamBatchContainerTests + { + [Fact] + public void Constructor_ShouldInitializeProperties() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + + // Act + var container = new RedisStreamBatchContainer(streamEntry); + + // Assert + Assert.Equal("testNamespace", container.StreamId.GetNamespace()); + Assert.Equal("testKey", container.StreamId.GetKeyAsString()); + Assert.Equal(streamEntry.Id.ToString().Split('-').First(), container.SequenceToken.SequenceNumber.ToString()); + } + + [Fact] + public void GetEvents_ShouldReturnDeserializedEvents() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + var container = new RedisStreamBatchContainer(streamEntry); + + // Act + var events = container.GetEvents().ToList(); + + // Assert + Assert.Single(events); + Assert.Equal(1, events[0].Item1.Id); + Assert.Equal("Test", events[0].Item1.Name); + Assert.Equal(streamEntry.Id.ToString().Split('-').First(), events[0].Item2.SequenceNumber.ToString()); + } + + [Fact] + public void ImportRequestContext_ShouldReturnFalse() + { + // Arrange + var streamEntry = new StreamEntry("1-0", new NameValueEntry[] + { + new NameValueEntry("namespace", "testNamespace"), + new NameValueEntry("key", "testKey"), + new NameValueEntry("type", "TestEvent"), + new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" })) + }); + var container = new RedisStreamBatchContainer(streamEntry); + + // Act + var result = container.ImportRequestContext(); + + // Assert + Assert.False(result); + } + + private class TestEvent + { + public int Id { get; set; } + public string Name { get; set; } + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs new file mode 100644 index 0000000..665827b --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs @@ -0,0 +1,88 @@ +using Microsoft.Extensions.Logging; +using Moq; +using Orleans.Configuration; +using Orleans.Providers.Streams.Common; +using Orleans.Streams; +using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamFactoryTests + { + private readonly Mock _mockConnectionMultiplexer; + private readonly Mock _mockLoggerFactory; + private readonly Mock _mockServiceProvider; + private readonly Mock _mockStreamFailureHandler; + private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions; + private readonly HashRingStreamQueueMapperOptions _hashRingStreamQueueMapperOptions; + private readonly string _providerName = "TestProvider"; + + public RedisStreamFactoryTests() + { + _mockConnectionMultiplexer = new Mock(); + _mockConnectionMultiplexer.Setup(x => x.GetDatabase(It.IsAny(), It.IsAny())).Returns(new Mock().Object); + + _mockLoggerFactory = new Mock(); + _mockServiceProvider = new Mock(); + _mockStreamFailureHandler = new Mock(); + _simpleQueueCacheOptions = new SimpleQueueCacheOptions(); + _hashRingStreamQueueMapperOptions = new HashRingStreamQueueMapperOptions(); + } + + [Fact] + public void Constructor_ShouldThrowArgumentNullException_WhenAnyArgumentIsNull() + { + Assert.Throws(() => new RedisStreamFactory(null, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, null, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, null, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, null, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, null, _hashRingStreamQueueMapperOptions)); + Assert.Throws(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, null)); + } + + [Fact] + public async Task CreateAdapter_ShouldReturnRedisStreamAdapterInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var adapter = await factory.CreateAdapter(); + + Assert.NotNull(adapter); + Assert.IsType(adapter); + } + + [Fact] + public async Task GetDeliveryFailureHandler_ShouldReturnStreamFailureHandler() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var handler = await factory.GetDeliveryFailureHandler(new QueueId()); + + Assert.NotNull(handler); + Assert.Equal(_mockStreamFailureHandler.Object, handler); + } + + [Fact] + public void GetQueueAdapterCache_ShouldReturnSimpleQueueAdapterCacheInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var cache = factory.GetQueueAdapterCache(); + + Assert.NotNull(cache); + Assert.IsType(cache); + } + + [Fact] + public void GetStreamQueueMapper_ShouldReturnHashRingBasedStreamQueueMapperInstance() + { + var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions); + + var mapper = factory.GetStreamQueueMapper(); + + Assert.NotNull(mapper); + Assert.IsType(mapper); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs new file mode 100644 index 0000000..09efc47 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs @@ -0,0 +1,123 @@ +using Microsoft.Extensions.Logging; +using Moq; +using Orleans.Streams; +using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamReceiverTests + { + private readonly Mock _mockDatabase; + private readonly Mock> _mockLogger; + private readonly QueueId _queueId; + private readonly RedisStreamReceiver _receiver; + + public RedisStreamReceiverTests() + { + _mockDatabase = new Mock(); + _mockLogger = new Mock>(); + _queueId = QueueId.GetQueueId("testQueue", 0, 0); // Added the missing 'hash' parameter + _receiver = new RedisStreamReceiver(_queueId, _mockDatabase.Object, _mockLogger.Object); + } + + [Fact] + public async Task GetQueueMessagesAsync_ReturnsBatches() + { + // Arrange + var streamEntries = new[] + { + new StreamEntry("1-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ]), + new StreamEntry("2-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ]) + }; + _mockDatabase.Setup(db => db.StreamReadGroupAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(streamEntries); + + // Act + var result = await _receiver.GetQueueMessagesAsync(10); + + // Assert + Assert.NotNull(result); + Assert.Equal(2, result.Count); + } + + [Fact] + public async Task Initialize_CreatesConsumerGroup() + { + // Arrange + _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), CommandFlags.None)) + .ReturnsAsync(true); + + // Act + await _receiver.Initialize(TimeSpan.FromSeconds(5)); + + // Assert + _mockDatabase.Verify( + db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), + Times.Once); + } + + [Fact] + public async Task MessagesDeliveredAsync_AcknowledgesMessages() + { + // Arrange + var messages = new List + { + new RedisStreamBatchContainer(new StreamEntry("1-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ])), + new RedisStreamBatchContainer(new StreamEntry("2-0", [ + new("namespace", "testNamespace"), + new("key", "testKey"), + new("eventType", "testEventType" ), + new( "data", "testData" ) + ])) + }; + _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(), + It.IsAny(), CommandFlags.None)) + .ReturnsAsync(2); + + // Act + await _receiver.MessagesDeliveredAsync(messages); + + // Assert + _mockDatabase.Verify( + db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once); + _mockDatabase.Verify( + db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once); + } + + [Fact] + public async Task Shutdown_WaitsForPendingTasks() + { + // Arrange + var tcs = new TaskCompletionSource(); + _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None)) + .Returns(tcs.Task); + + // Act + var getMessagesTask = _receiver.GetQueueMessagesAsync(10); + await _receiver.Shutdown(TimeSpan.FromSeconds(5)); + + // Assert + Assert.True(getMessagesTask.IsCompleted); + } + } +} \ No newline at end of file diff --git a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs new file mode 100644 index 0000000..1222146 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs @@ -0,0 +1,130 @@ +using Moq; +using Orleans.Streams; +using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; + +namespace RedisStreamsProvider.UnitTests +{ + public class RedisStreamSequenceTokenTests + { + [Fact] + public void Constructor_ShouldInitializeProperties_FromRedisValue() + { + // Arrange + var redisValue = new RedisValue("123-456"); + + // Act + var token = new RedisStreamSequenceToken(redisValue); + + // Assert + Assert.Equal(123, token.SequenceNumber); + Assert.Equal(456, token.EventIndex); + } + + [Fact] + public void Constructor_ShouldInitializeProperties_FromParameters() + { + // Arrange + long sequenceNumber = 123; + int eventIndex = 456; + + // Act + var token = new RedisStreamSequenceToken(sequenceNumber, eventIndex); + + // Assert + Assert.Equal(sequenceNumber, token.SequenceNumber); + Assert.Equal(eventIndex, token.EventIndex); + } + + [Fact] + public void CompareTo_ShouldReturnZero_ForEqualTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.Equal(0, result); + } + + [Fact] + public void CompareTo_ShouldReturnPositive_ForGreaterToken() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 455); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.True(result > 0); + } + + [Fact] + public void CompareTo_ShouldReturnNegative_ForLesserToken() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 455); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.CompareTo(token2); + + // Assert + Assert.True(result < 0); + } + + [Fact] + public void Equals_ShouldReturnTrue_ForEqualTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 456); + + // Act + var result = token1.Equals(token2); + + // Assert + Assert.True(result); + } + + [Fact] + public void Equals_ShouldReturnFalse_ForDifferentTokens() + { + // Arrange + var token1 = new RedisStreamSequenceToken(123, 456); + var token2 = new RedisStreamSequenceToken(123, 457); + + // Act + var result = token1.Equals(token2); + + // Assert + Assert.False(result); + } + + [Fact] + public void CompareTo_ShouldThrowArgumentNullException_ForNullToken() + { + // Arrange + var token = new RedisStreamSequenceToken(123, 456); + + // Act & Assert + Assert.Throws(() => token.CompareTo(null)); + } + + [Fact] + public void CompareTo_ShouldThrowArgumentException_ForInvalidTokenType() + { + // Arrange + var token = new RedisStreamSequenceToken(123, 456); + var invalidToken = new Mock().Object; + + // Act & Assert + Assert.Throws(() => token.CompareTo(invalidToken)); + } + } +} diff --git a/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj new file mode 100644 index 0000000..35e6cd5 --- /dev/null +++ b/RedisStreamsProvider.UnitTests/RedisStreamsProvider.UnitTests.csproj @@ -0,0 +1,26 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + + + + + + diff --git a/Provider/RedisStreamAdapter.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamAdapter.cs similarity index 83% rename from Provider/RedisStreamAdapter.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamAdapter.cs index c46cf61..87a70e5 100644 --- a/Provider/RedisStreamAdapter.cs +++ b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamAdapter.cs @@ -1,10 +1,9 @@ using Microsoft.Extensions.Logging; -using Orleans.Runtime; using Orleans.Streams; using StackExchange.Redis; using System.Text.Json; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamAdapter : IQueueAdapter { @@ -16,10 +15,10 @@ namespace Provider public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory) { - _database = database; - _providerName = providerName; - _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper; - _loggerFactory = loggerFactory; + _database = database ?? throw new ArgumentNullException(nameof(database)); + _providerName = providerName ?? throw new ArgumentNullException(nameof(providerName)); + _hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper ?? throw new ArgumentNullException(nameof(hashRingBasedStreamQueueMapper)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); _logger = loggerFactory.CreateLogger(); } diff --git a/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamBatchContainer.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamBatchContainer.cs new file mode 100644 index 0000000..d403425 --- /dev/null +++ b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamBatchContainer.cs @@ -0,0 +1,75 @@ +using Orleans.Streams; +using StackExchange.Redis; +using System.Text.Json; + +namespace Universley.OrleansContrib.StreamsProvider.Redis +{ + [GenerateSerializer] + [Alias("Universley.OrleansContrib.StreamsProvider.Redis.RedisStreamBatchContainer")] + public class RedisStreamBatchContainer : IBatchContainer + { + [Id(0)] + public StreamId StreamId { get; } + + [Id(1)] + public StreamSequenceToken SequenceToken { get; } + + [Id(2)] + public string EventType { get; } + + [Id(3)] + public string Data { get; } + + [Id(4)] + public string StreamEntryId { get; } + + public RedisStreamBatchContainer(StreamEntry streamEntry) + { + var streamNamespace = streamEntry.Values[0].Value; + var steamKey = streamEntry.Values[1].Value; + var eventType = streamEntry.Values[2].Value; + var data = streamEntry.Values[3].Value; + StreamEntryId = streamEntry.Id.ToString() ?? throw new ArgumentNullException(nameof(streamEntry.Id)); + + // Check incoming data + if (string.IsNullOrWhiteSpace(streamNamespace)) + { + throw new ArgumentNullException(nameof(streamNamespace)); + } + if (string.IsNullOrWhiteSpace(steamKey)) + { + throw new ArgumentNullException(nameof(steamKey)); + } + if (string.IsNullOrWhiteSpace(eventType)) + { + throw new ArgumentNullException(nameof(eventType)); + } + if (string.IsNullOrWhiteSpace(data)) + { + throw new ArgumentNullException(nameof(data)); + } + + StreamId = StreamId.Create(streamNamespace!, steamKey!); + SequenceToken = new RedisStreamSequenceToken(streamEntry.Id); + EventType = eventType!; + Data = data!; + } + public IEnumerable> GetEvents() + { + List> events = new(); + var eventType = typeof(T).Name; + if (eventType == EventType) + { + var data = Data; + var @event = JsonSerializer.Deserialize(data); + events.Add(new(@event!, SequenceToken)); + } + return events; + } + + public bool ImportRequestContext() + { + return false; + } + } +} diff --git a/Provider/RedisStreamFactory.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFactory.cs similarity index 62% rename from Provider/RedisStreamFactory.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFactory.cs index a25c1ab..a705675 100644 --- a/Provider/RedisStreamFactory.cs +++ b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFactory.cs @@ -5,18 +5,18 @@ using Orleans.Providers.Streams.Common; using Orleans.Streams; using StackExchange.Redis; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamFactory : IQueueAdapterFactory { - private readonly IDatabase _database; + private readonly IConnectionMultiplexer _connectionMultiplexer; private readonly ILoggerFactory _loggerFactory; private readonly string _providerName; private readonly IStreamFailureHandler _streamFailureHandler; private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions; private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper; - public RedisStreamFactory(IDatabase database, + public RedisStreamFactory(IConnectionMultiplexer connectionMultiplexer, ILoggerFactory loggerFactory, string providerName, IStreamFailureHandler streamFailureHandler, @@ -24,28 +24,33 @@ namespace Provider HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions ) { - _database = database; - _loggerFactory = loggerFactory; - _providerName = providerName; - _streamFailureHandler = streamFailureHandler; - _simpleQueueCacheOptions = simpleQueueCacheOptions; + if (hashRingStreamQueueMapperOptions is null) + { + throw new ArgumentNullException(nameof(hashRingStreamQueueMapperOptions)); + } + + _connectionMultiplexer = connectionMultiplexer ?? throw new ArgumentNullException(nameof(connectionMultiplexer)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); + _providerName = providerName ?? throw new ArgumentNullException(nameof(providerName)); + _streamFailureHandler = streamFailureHandler ?? throw new ArgumentNullException(nameof(streamFailureHandler)); + _simpleQueueCacheOptions = simpleQueueCacheOptions ?? throw new ArgumentNullException(nameof(simpleQueueCacheOptions)); _hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName); } public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName) { - var database = provider.GetRequiredService(); + var connMuliplexer = provider.GetRequiredService(); var loggerFactory = provider.GetRequiredService(); var simpleQueueCacheOptions = provider.GetOptionsByName(providerName); var hashRingStreamQueueMapperOptions = provider.GetOptionsByName(providerName); var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger()); - return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions); + return new RedisStreamFactory(connMuliplexer, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions); } public Task CreateAdapter() { - return Task.FromResult(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory)); + return Task.FromResult(new RedisStreamAdapter(_connectionMultiplexer.GetDatabase(), _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory)); } public Task GetDeliveryFailureHandler(QueueId queueId) diff --git a/Provider/RedisStreamFailureHandler.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFailureHandler.cs similarity index 89% rename from Provider/RedisStreamFailureHandler.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFailureHandler.cs index 6734c12..600b0ae 100644 --- a/Provider/RedisStreamFailureHandler.cs +++ b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamFailureHandler.cs @@ -1,8 +1,7 @@ using Microsoft.Extensions.Logging; -using Orleans.Runtime; using Orleans.Streams; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamFailureHandler : IStreamFailureHandler { @@ -10,7 +9,7 @@ namespace Provider public RedisStreamFailureHandler(ILogger logger) { - _logger = logger; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public bool ShouldFaultSubsriptionOnError => true; diff --git a/Provider/RedisStreamReceiver.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamReceiver.cs similarity index 91% rename from Provider/RedisStreamReceiver.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamReceiver.cs index b1c61cb..f752799 100644 --- a/Provider/RedisStreamReceiver.cs +++ b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamReceiver.cs @@ -3,7 +3,7 @@ using Orleans.Streams; using StackExchange.Redis; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { public class RedisStreamReceiver : IQueueAdapterReceiver { @@ -16,8 +16,8 @@ namespace Provider public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger logger) { _queueId = queueId; - _database = database; - _logger = logger; + _database = database ?? throw new ArgumentNullException(nameof(database)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task?> GetQueueMessagesAsync(int maxCount) @@ -69,7 +69,7 @@ namespace Provider var container = message as RedisStreamBatchContainer; if (container != null) { - var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id); + var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntryId); pendingTasks = ack; await ack; } diff --git a/Provider/RedisStreamSequenceToken.cs b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamSequenceToken.cs similarity index 83% rename from Provider/RedisStreamSequenceToken.cs rename to Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamSequenceToken.cs index cd2fcc5..388ef7f 100644 --- a/Provider/RedisStreamSequenceToken.cs +++ b/Universley.OrleansContrib.StreamsProvider.Redis/RedisStreamSequenceToken.cs @@ -1,15 +1,15 @@ using Orleans.Streams; using StackExchange.Redis; -namespace Provider +namespace Universley.OrleansContrib.StreamsProvider.Redis { [GenerateSerializer] public class RedisStreamSequenceToken : StreamSequenceToken { [Id(0)] - public override long SequenceNumber { get; protected set; } + public sealed override long SequenceNumber { get; protected set; } [Id(1)] - public override int EventIndex { get; protected set; } + public sealed override int EventIndex { get; protected set; } public RedisStreamSequenceToken(RedisValue id) { @@ -37,7 +37,7 @@ namespace Provider throw new ArgumentException("Invalid token type", nameof(other)); } - public override bool Equals(StreamSequenceToken other) + public override bool Equals(StreamSequenceToken? other) { var token = other as RedisStreamSequenceToken; return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex; diff --git a/Provider/Provider.csproj b/Universley.OrleansContrib.StreamsProvider.Redis/Universley.OrleansContrib.StreamsProvider.Redis.csproj similarity index 55% rename from Provider/Provider.csproj rename to Universley.OrleansContrib.StreamsProvider.Redis/Universley.OrleansContrib.StreamsProvider.Redis.csproj index bf92fe9..5703a5f 100644 --- a/Provider/Provider.csproj +++ b/Universley.OrleansContrib.StreamsProvider.Redis/Universley.OrleansContrib.StreamsProvider.Redis.csproj @@ -1,15 +1,15 @@  - net8.0 + net9.0 enable enable - - - + + + diff --git a/build.cmd b/build.cmd new file mode 100755 index 0000000..b08cc59 --- /dev/null +++ b/build.cmd @@ -0,0 +1,7 @@ +:; set -eo pipefail +:; SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd) +:; ${SCRIPT_DIR}/build.sh "$@" +:; exit $? + +@ECHO OFF +powershell -ExecutionPolicy ByPass -NoProfile -File "%~dp0build.ps1" %* diff --git a/build.ps1 b/build.ps1 new file mode 100644 index 0000000..4634dc0 --- /dev/null +++ b/build.ps1 @@ -0,0 +1,74 @@ +[CmdletBinding()] +Param( + [Parameter(Position=0,Mandatory=$false,ValueFromRemainingArguments=$true)] + [string[]]$BuildArguments +) + +Write-Output "PowerShell $($PSVersionTable.PSEdition) version $($PSVersionTable.PSVersion)" + +Set-StrictMode -Version 2.0; $ErrorActionPreference = "Stop"; $ConfirmPreference = "None"; trap { Write-Error $_ -ErrorAction Continue; exit 1 } +$PSScriptRoot = Split-Path $MyInvocation.MyCommand.Path -Parent + +########################################################################### +# CONFIGURATION +########################################################################### + +$BuildProjectFile = "$PSScriptRoot\build\_build.csproj" +$TempDirectory = "$PSScriptRoot\\.nuke\temp" + +$DotNetGlobalFile = "$PSScriptRoot\\global.json" +$DotNetInstallUrl = "https://dot.net/v1/dotnet-install.ps1" +$DotNetChannel = "STS" + +$env:DOTNET_CLI_TELEMETRY_OPTOUT = 1 +$env:DOTNET_NOLOGO = 1 + +########################################################################### +# EXECUTION +########################################################################### + +function ExecSafe([scriptblock] $cmd) { + & $cmd + if ($LASTEXITCODE) { exit $LASTEXITCODE } +} + +# If dotnet CLI is installed globally and it matches requested version, use for execution +if ($null -ne (Get-Command "dotnet" -ErrorAction SilentlyContinue) -and ` + $(dotnet --version) -and $LASTEXITCODE -eq 0) { + $env:DOTNET_EXE = (Get-Command "dotnet").Path +} +else { + # Download install script + $DotNetInstallFile = "$TempDirectory\dotnet-install.ps1" + New-Item -ItemType Directory -Path $TempDirectory -Force | Out-Null + [Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12 + (New-Object System.Net.WebClient).DownloadFile($DotNetInstallUrl, $DotNetInstallFile) + + # If global.json exists, load expected version + if (Test-Path $DotNetGlobalFile) { + $DotNetGlobal = $(Get-Content $DotNetGlobalFile | Out-String | ConvertFrom-Json) + if ($DotNetGlobal.PSObject.Properties["sdk"] -and $DotNetGlobal.sdk.PSObject.Properties["version"]) { + $DotNetVersion = $DotNetGlobal.sdk.version + } + } + + # Install by channel or version + $DotNetDirectory = "$TempDirectory\dotnet-win" + if (!(Test-Path variable:DotNetVersion)) { + ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Channel $DotNetChannel -NoPath } + } else { + ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Version $DotNetVersion -NoPath } + } + $env:DOTNET_EXE = "$DotNetDirectory\dotnet.exe" + $env:PATH = "$DotNetDirectory;$env:PATH" +} + +Write-Output "Microsoft (R) .NET SDK version $(& $env:DOTNET_EXE --version)" + +if (Test-Path env:NUKE_ENTERPRISE_TOKEN) { + & $env:DOTNET_EXE nuget remove source "nuke-enterprise" > $null + & $env:DOTNET_EXE nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password $env:NUKE_ENTERPRISE_TOKEN > $null +} + +ExecSafe { & $env:DOTNET_EXE build $BuildProjectFile /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet } +ExecSafe { & $env:DOTNET_EXE run --project $BuildProjectFile --no-build -- $BuildArguments } diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..fdff0c6 --- /dev/null +++ b/build.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash + +bash --version 2>&1 | head -n 1 + +set -eo pipefail +SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd) + +########################################################################### +# CONFIGURATION +########################################################################### + +BUILD_PROJECT_FILE="$SCRIPT_DIR/build/_build.csproj" +TEMP_DIRECTORY="$SCRIPT_DIR//.nuke/temp" + +DOTNET_GLOBAL_FILE="$SCRIPT_DIR//global.json" +DOTNET_INSTALL_URL="https://dot.net/v1/dotnet-install.sh" +DOTNET_CHANNEL="STS" + +export DOTNET_CLI_TELEMETRY_OPTOUT=1 +export DOTNET_NOLOGO=1 + +########################################################################### +# EXECUTION +########################################################################### + +function FirstJsonValue { + perl -nle 'print $1 if m{"'"$1"'": "([^"]+)",?}' <<< "${@:2}" +} + +# If dotnet CLI is installed globally and it matches requested version, use for execution +if [ -x "$(command -v dotnet)" ] && dotnet --version &>/dev/null; then + export DOTNET_EXE="$(command -v dotnet)" +else + # Download install script + DOTNET_INSTALL_FILE="$TEMP_DIRECTORY/dotnet-install.sh" + mkdir -p "$TEMP_DIRECTORY" + curl -Lsfo "$DOTNET_INSTALL_FILE" "$DOTNET_INSTALL_URL" + chmod +x "$DOTNET_INSTALL_FILE" + + # If global.json exists, load expected version + if [[ -f "$DOTNET_GLOBAL_FILE" ]]; then + DOTNET_VERSION=$(FirstJsonValue "version" "$(cat "$DOTNET_GLOBAL_FILE")") + if [[ "$DOTNET_VERSION" == "" ]]; then + unset DOTNET_VERSION + fi + fi + + # Install by channel or version + DOTNET_DIRECTORY="$TEMP_DIRECTORY/dotnet-unix" + if [[ -z ${DOTNET_VERSION+x} ]]; then + "$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --channel "$DOTNET_CHANNEL" --no-path + else + "$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "$DOTNET_VERSION" --no-path + fi + export DOTNET_EXE="$DOTNET_DIRECTORY/dotnet" + export PATH="$DOTNET_DIRECTORY:$PATH" +fi + +echo "Microsoft (R) .NET SDK version $("$DOTNET_EXE" --version)" + +if [[ ! -z ${NUKE_ENTERPRISE_TOKEN+x} && "$NUKE_ENTERPRISE_TOKEN" != "" ]]; then + "$DOTNET_EXE" nuget remove source "nuke-enterprise" &>/dev/null || true + "$DOTNET_EXE" nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password "$NUKE_ENTERPRISE_TOKEN" --store-password-in-clear-text &>/dev/null || true +fi + +"$DOTNET_EXE" build "$BUILD_PROJECT_FILE" /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet +"$DOTNET_EXE" run --project "$BUILD_PROJECT_FILE" --no-build -- "$@" diff --git a/build/.editorconfig b/build/.editorconfig new file mode 100644 index 0000000..31e43dc --- /dev/null +++ b/build/.editorconfig @@ -0,0 +1,11 @@ +[*.cs] +dotnet_style_qualification_for_field = false:warning +dotnet_style_qualification_for_property = false:warning +dotnet_style_qualification_for_method = false:warning +dotnet_style_qualification_for_event = false:warning +dotnet_style_require_accessibility_modifiers = never:warning + +csharp_style_expression_bodied_methods = true:silent +csharp_style_expression_bodied_properties = true:warning +csharp_style_expression_bodied_indexers = true:warning +csharp_style_expression_bodied_accessors = true:warning diff --git a/build/Build.cs b/build/Build.cs new file mode 100644 index 0000000..c39b478 --- /dev/null +++ b/build/Build.cs @@ -0,0 +1,177 @@ +using System; +using System.IO; +using System.Linq; +using Nuke.Common; +using Nuke.Common.CI; +using Nuke.Common.Execution; +using Nuke.Common.IO; +using Nuke.Common.ProjectModel; +using Nuke.Common.Tooling; +using Nuke.Common.Tools.DotNet; +using Nuke.Common.Tools.Git; +using Nuke.Common.Tools.GitVersion; +using Nuke.Common.Utilities.Collections; +using Serilog; +using static Nuke.Common.EnvironmentInfo; +using static Nuke.Common.IO.PathConstruction; + +class Build : NukeBuild +{ + /// Support plugins are available for: + /// - JetBrains ReSharper https://nuke.build/resharper + /// - JetBrains Rider https://nuke.build/rider + /// - Microsoft VisualStudio https://nuke.build/visualstudio + /// - Microsoft VSCode https://nuke.build/vscode + + public static int Main() => Execute(x => x.Pack); + + private const string LibraryProjectName = "Universley.OrleansContrib.StreamsProvider.Redis"; + + [Parameter("Configuration to build - Default is 'Debug' (local) or 'Release' (server)")] + readonly Configuration Configuration = IsLocalBuild ? Configuration.Debug : Configuration.Release; + + [Solution] readonly Solution Solution; + + AbsolutePath SourceDirectory => RootDirectory; + AbsolutePath ArtifactsDirectory => RootDirectory / "artifacts"; + + AbsolutePath NuGetPackagesDirectory => ArtifactsDirectory / "nuget"; + AbsolutePath TestResultDirectory => ArtifactsDirectory / "test-results"; + + // Nuget API key can be also set as an environment variable + [Parameter("NuGet API key")] + readonly string NuGetApiKey; + + [Parameter("Build number override")] + readonly string BuildNumber + = Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER") ?? $"{0}"; + + [Parameter("Gitea Nuget package source name")] + readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME"); + + [Parameter("Gitea package owner user")] + [Secret] + readonly string GiteaPackageOwnerUser = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_USER"); + [Parameter("Gitea package owner password")] + [Secret] + readonly string GiteaPackageOwnerPassword = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_PASSWORD"); + + Target Clean => _ => _ + .Before(Compile) + .Executes(() => + { + SourceDirectory + .GlobDirectories("**/bin", "**/obj") + .Except(RootDirectory.GlobDirectories("build/**/bin", "build/**/obj")) + .ForEach(ap => Directory.Delete(ap, true)); + ArtifactsDirectory.CreateOrCleanDirectory(); + }); + + Target Compile => _ => _ + .Executes(() => + { + var semVer = Version; + var semFileVer = Version; + var informationalVersion = Version; + + Log.Logger.Information("AssemblySemVer: {semVer}", semVer); + + DotNetTasks.DotNetBuild(s => s + .SetProjectFile(Solution) + .SetConfiguration(Configuration) + .SetAssemblyVersion(semVer) + .SetFileVersion(semFileVer) + .SetInformationalVersion(informationalVersion)); + }); + + Target Test => _ => _ + .DependsOn(Compile) + .Executes(() => + { + DotNetTasks.DotNetTest(s => s + .SetProjectFile(Solution) + .SetConfiguration(Configuration) + .SetLoggers($"trx;LogFileName={TestResultDirectory / "testresults.trx"}") + ); + }); + + Target Pack => _ => _ + .DependsOn(Test) + .Executes(() => + { + + DotNetTasks.DotNetPack(s => s + .SetProject(SourceDirectory / "Universley.OrleansContrib.StreamsProvider.Redis" / $"{LibraryProjectName}.csproj") + .SetConfiguration(Configuration) + .SetOutputDirectory(NuGetPackagesDirectory) + .SetVersion(Version) + .SetNoBuild(true) + ); + }); + + Target Publish => _ => _ + .DependsOn(Pack, CreateAndPushGitTag) + .Executes(() => + { + try + { + DotNetTasks.DotNetNuGetAddSource(c => c + .EnableStorePasswordInClearText() + .SetUsername(GiteaPackageOwnerUser) + .SetPassword(GiteaPackageOwnerPassword) + .SetName("Gitea") + .SetSource($"{GiteaNugetSourceName}/index.json")); + } + catch (Exception e) + { + Log.Warning("Error adding Gitea source: {e}", e); + } + + DotNetTasks.DotNetNuGetPush(s => s + .SetSource($"{GiteaNugetSourceName}/symbols") + .SetApiKey(NuGetApiKey) + .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.snupkg")); + + DotNetTasks.DotNetNuGetPush(s => s + .SetSource($"{GiteaNugetSourceName}/index.json") + .SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.nupkg")); + }); + + Target CreateAndPushGitTag => _ => _ + .Executes(() => + { + var gitTag = $"{Version}"; + GitTasks.Git($"tag {gitTag}"); + GitTasks.Git($"push origin {gitTag}"); + }); + + private string Version + { + get + { + if (version == null) + { + version = GetVersion(); + } + return version; + } + } + + private string version = null; + + private string GetVersion() + { + Log.Information("GitHub run number is {number}", Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER")); + var buildNumber = BuildNumber; + if (string.IsNullOrEmpty(buildNumber)) + { + return "1.0.0"; + } + + var currentDateTime = DateTimeOffset.UtcNow; + + var assembledVersion = $"{currentDateTime.Year}.{currentDateTime.Month}.{buildNumber}"; + Log.Information("Assembled version: {assembledVersion}", assembledVersion); + return assembledVersion; + } +} diff --git a/build/Configuration.cs b/build/Configuration.cs new file mode 100644 index 0000000..9c08b1a --- /dev/null +++ b/build/Configuration.cs @@ -0,0 +1,16 @@ +using System; +using System.ComponentModel; +using System.Linq; +using Nuke.Common.Tooling; + +[TypeConverter(typeof(TypeConverter))] +public class Configuration : Enumeration +{ + public static Configuration Debug = new Configuration { Value = nameof(Debug) }; + public static Configuration Release = new Configuration { Value = nameof(Release) }; + + public static implicit operator string(Configuration configuration) + { + return configuration.Value; + } +} diff --git a/build/Directory.Build.props b/build/Directory.Build.props new file mode 100644 index 0000000..e147d63 --- /dev/null +++ b/build/Directory.Build.props @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/build/Directory.Build.targets b/build/Directory.Build.targets new file mode 100644 index 0000000..2532609 --- /dev/null +++ b/build/Directory.Build.targets @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/build/_build.csproj b/build/_build.csproj new file mode 100644 index 0000000..c73279e --- /dev/null +++ b/build/_build.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0 + + CS0649;CS0169;CA1050;CA1822;CA2211;IDE1006 + .. + .. + 1 + false + + + + + + + + + + + diff --git a/build/_build.csproj.DotSettings b/build/_build.csproj.DotSettings new file mode 100644 index 0000000..88a8824 --- /dev/null +++ b/build/_build.csproj.DotSettings @@ -0,0 +1,31 @@ + + DO_NOT_SHOW + DO_NOT_SHOW + DO_NOT_SHOW + DO_NOT_SHOW + DO_NOT_SHOW + Implicit + Implicit + ExpressionBody + 0 + NEXT_LINE + True + False + 120 + IF_OWNER_IS_SINGLE_LINE + WRAP_IF_LONG + False + <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> + <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /> + <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy> + <Policy><Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"><ElementKinds><Kind Name="FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy> + True + True + True + True + True + True + True + True + True + True diff --git a/Client/Client.csproj b/example/Client/Client.csproj similarity index 57% rename from Client/Client.csproj rename to example/Client/Client.csproj index ce9902e..a3e8a16 100644 --- a/Client/Client.csproj +++ b/example/Client/Client.csproj @@ -2,19 +2,19 @@ Exe - net8.0 + net9.0 enable enable - - - + + + - + diff --git a/Client/Program.cs b/example/Client/Program.cs similarity index 73% rename from Client/Program.cs rename to example/Client/Program.cs index 3cef062..7b0426a 100644 --- a/Client/Program.cs +++ b/example/Client/Program.cs @@ -2,17 +2,16 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans.Configuration; -using Orleans.Runtime; -using Provider; +using Orleans.Streams; using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; using IHost host = new HostBuilder() .UseOrleansClient(clientBuilder => { - clientBuilder.Services.AddSingleton(sp => + clientBuilder.Services.AddSingleton(sp => { - IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase(); - return db; + return ConnectionMultiplexer.Connect("localhost"); }); clientBuilder.UseLocalhostClustering(); @@ -38,6 +37,15 @@ var streamProvider = client.GetStreamProvider("RedisStream"); var streamId = StreamId.Create("numbergenerator", "consecutive"); var stream = streamProvider.GetStream(streamId); +var newStreamId = StreamId.Create("numbergenerator", "consecutive-back"); +var newStream = streamProvider.GetStream(newStreamId); + +await stream.SubscribeAsync((i, token) => +{ + logger.LogInformation("Received back number {Number}", i); + return Task.CompletedTask; +}); + var task = Task.Run(async () => { var num = 0; diff --git a/Server/Program.cs b/example/Server/Program.cs similarity index 78% rename from Server/Program.cs rename to example/Server/Program.cs index 51e15be..e2b823b 100644 --- a/Server/Program.cs +++ b/example/Server/Program.cs @@ -2,21 +2,21 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans.Configuration; -using Orleans.Runtime; using Orleans.Streams; using StackExchange.Redis; +using Universley.OrleansContrib.StreamsProvider.Redis; var builder = new HostBuilder() .UseOrleans(silo => { silo.UseLocalhostClustering(); - silo.Services.AddSingleton(sp => + silo.Services.AddSingleton(sp => { - return ConnectionMultiplexer.Connect("localhost").GetDatabase(); + return ConnectionMultiplexer.Connect("localhost"); }); silo.ConfigureLogging(logging => logging.AddConsole()); silo.AddMemoryGrainStorage("PubSubStore"); - silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null); + silo.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null); silo.AddMemoryGrainStorageAsDefault(); }).UseConsoleLifetime(); builder.ConfigureServices(services => @@ -66,6 +66,11 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver _logger.LogInformation("Received number {Number}", item); await Task.Delay(2000); + var newStreamId = StreamId.Create("numbergenerator", "consecutive-back"); + var newStream = this.GetStreamProvider("RedisStream").GetStream(newStreamId); + _logger.LogInformation("Sending number {Number} to new stream", item); + await newStream.OnNextAsync(item); + } } diff --git a/Server/Server.csproj b/example/Server/Server.csproj similarity index 57% rename from Server/Server.csproj rename to example/Server/Server.csproj index a6e6fa8..d7da8fe 100644 --- a/Server/Server.csproj +++ b/example/Server/Server.csproj @@ -2,19 +2,19 @@ Exe - net8.0 + net9.0 enable enable - - - + + + - +