diff --git a/.gitea/workflows/ci-pipeline.yml b/.gitea/workflows/ci-pipeline.yml
new file mode 100644
index 0000000..9670a18
--- /dev/null
+++ b/.gitea/workflows/ci-pipeline.yml
@@ -0,0 +1,52 @@
+name: CI Build
+
+on:
+ push:
+ branches: [ master ]
+ paths-ignore:
+ - '**/*.md'
+ - '**/*.gitignore'
+ - '**/*.gitattributes'
+ pull_request:
+ branches: [ master ]
+ workflow_dispatch:
+permissions:
+ contents: write
+ packages: write
+
+env:
+ DOTNET_NOLOGO: true # Disable the .NET logo
+ DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true # Disable the .NET first time experience
+ DOTNET_CLI_TELEMETRY_OPTOUT: true # Disable sending .NET CLI telemetry
+ MINOR_VERSION_OVERRIDE: 0
+
+ GITEA_SERVER_URL: ${{ vars.V_GITEA_SERVER_URL }}
+ GITEA_PACKAGE_OWNER: ${{ vars.V_GITEA_PACKAGE_OWNER }}
+ GITEA_NUGET_SOURCE_NAME: ${{ vars.V_GITEA_NUGET_SOURCE_NAME }}
+ GITEA_PACKAGE_OWNER_USER: ${{ secrets.S_GITEA_PACKAGE_OWNER_USER }}
+ GITEA_PACKAGE_OWNER_PASSWORD: ${{ secrets.S_GITEA_PACKAGE_OWNER_PASSWORD }}
+
+
+jobs:
+ build-ci-api:
+ runs-on: [linux,self-hosted]
+ name: CI Build API
+ steps:
+ - name: Checkout
+ uses: https://github.com/actions/checkout@v4
+
+ - name: Setup .NET 9
+ uses: https://github.com/actions/setup-dotnet@v4
+ with:
+ dotnet-version: 9.x
+
+ - name: Make Build File Executable
+ shell: bash
+ run: |
+ chmod +x ./build.cmd
+ chmod +x ./build.sh
+
+ - name: Run Nuke Build
+ shell: bash
+ run: |
+ ./build.cmd -Target Publish
diff --git a/.gitignore b/.gitignore
index 9491a2f..59b290a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -360,4 +360,7 @@ MigrationBackup/
.ionide/
# Fody - auto-generated XML schema
-FodyWeavers.xsd
\ No newline at end of file
+FodyWeavers.xsd
+
+# Mac Finder
+*.DS_Store
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
new file mode 100644
index 0000000..806d58f
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
@@ -0,0 +1,13 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Rider ignored files
+/contentModel.xml
+/.idea.RedisStreamsInOrleans.iml
+/modules.xml
+/projectSettingsUpdater.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
new file mode 100644
index 0000000..df87cf9
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
new file mode 100644
index 0000000..7b08163
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.nuke/build.schema.json b/.nuke/build.schema.json
new file mode 100644
index 0000000..94f28e8
--- /dev/null
+++ b/.nuke/build.schema.json
@@ -0,0 +1,146 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "definitions": {
+ "Host": {
+ "type": "string",
+ "enum": [
+ "AppVeyor",
+ "AzurePipelines",
+ "Bamboo",
+ "Bitbucket",
+ "Bitrise",
+ "GitHubActions",
+ "GitLab",
+ "Jenkins",
+ "Rider",
+ "SpaceAutomation",
+ "TeamCity",
+ "Terminal",
+ "TravisCI",
+ "VisualStudio",
+ "VSCode"
+ ]
+ },
+ "ExecutableTarget": {
+ "type": "string",
+ "enum": [
+ "Clean",
+ "Compile",
+ "CreateAndPushGitTag",
+ "Pack",
+ "Publish",
+ "Test"
+ ]
+ },
+ "Verbosity": {
+ "type": "string",
+ "description": "",
+ "enum": [
+ "Verbose",
+ "Normal",
+ "Minimal",
+ "Quiet"
+ ]
+ },
+ "NukeBuild": {
+ "properties": {
+ "Continue": {
+ "type": "boolean",
+ "description": "Indicates to continue a previously failed build attempt"
+ },
+ "Help": {
+ "type": "boolean",
+ "description": "Shows the help text for this build assembly"
+ },
+ "Host": {
+ "description": "Host for execution. Default is 'automatic'",
+ "$ref": "#/definitions/Host"
+ },
+ "NoLogo": {
+ "type": "boolean",
+ "description": "Disables displaying the NUKE logo"
+ },
+ "Partition": {
+ "type": "string",
+ "description": "Partition to use on CI"
+ },
+ "Plan": {
+ "type": "boolean",
+ "description": "Shows the execution plan (HTML)"
+ },
+ "Profile": {
+ "type": "array",
+ "description": "Defines the profiles to load",
+ "items": {
+ "type": "string"
+ }
+ },
+ "Root": {
+ "type": "string",
+ "description": "Root directory during build execution"
+ },
+ "Skip": {
+ "type": "array",
+ "description": "List of targets to be skipped. Empty list skips all dependencies",
+ "items": {
+ "$ref": "#/definitions/ExecutableTarget"
+ }
+ },
+ "Target": {
+ "type": "array",
+ "description": "List of targets to be invoked. Default is '{default_target}'",
+ "items": {
+ "$ref": "#/definitions/ExecutableTarget"
+ }
+ },
+ "Verbosity": {
+ "description": "Logging verbosity during build execution. Default is 'Normal'",
+ "$ref": "#/definitions/Verbosity"
+ }
+ }
+ }
+ },
+ "allOf": [
+ {
+ "properties": {
+ "BuildNumber": {
+ "type": "string",
+ "description": "Build number override"
+ },
+ "Configuration": {
+ "type": "string",
+ "description": "Configuration to build - Default is 'Debug' (local) or 'Release' (server)",
+ "enum": [
+ "Debug",
+ "Release"
+ ]
+ },
+ "GiteaNugetSourceName": {
+ "type": "string",
+ "description": "Gitea Nuget package source name"
+ },
+ "GiteaPackageOwnerPassword": {
+ "type": "string",
+ "description": "Gitea package owner password",
+ "default": "Secrets must be entered via 'nuke :secrets [profile]'"
+ },
+ "GiteaPackageOwnerUser": {
+ "type": "string",
+ "description": "Gitea package owner user",
+ "default": "Secrets must be entered via 'nuke :secrets [profile]'"
+ },
+ "NuGetApiKey": {
+ "type": "string",
+ "description": "NuGet API key"
+ },
+ "Solution": {
+ "type": "string",
+ "description": "Path to a solution file that is automatically loaded"
+ }
+ }
+ },
+ {
+ "$ref": "#/definitions/NukeBuild"
+ }
+ ]
+}
diff --git a/.nuke/parameters.json b/.nuke/parameters.json
new file mode 100644
index 0000000..edf20b1
--- /dev/null
+++ b/.nuke/parameters.json
@@ -0,0 +1,4 @@
+{
+ "$schema": "build.schema.json",
+ "Solution": "RedisStreamsInOrleans.sln"
+}
diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs
deleted file mode 100644
index c799067..0000000
--- a/Provider/RedisStreamBatchContainer.cs
+++ /dev/null
@@ -1,40 +0,0 @@
-using Orleans.Runtime;
-using Orleans.Streams;
-using StackExchange.Redis;
-using System.Text.Json;
-
-namespace Provider
-{
- public class RedisStreamBatchContainer : IBatchContainer
- {
- public StreamId StreamId { get; }
-
- public StreamSequenceToken SequenceToken { get; }
- public StreamEntry StreamEntry { get; }
- public RedisStreamBatchContainer(StreamEntry streamEntry)
- {
- StreamEntry = streamEntry;
- var streamNamespace = StreamEntry.Values[0].Value;
- var steamKey = StreamEntry.Values[1].Value;
- StreamId = StreamId.Create(streamNamespace!, steamKey!);
- SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
- }
- public IEnumerable> GetEvents()
- {
- List> events = new();
- var eventType = typeof(T).Name;
- if (eventType == StreamEntry.Values[2].Value)
- {
- var data = StreamEntry.Values[3].Value;
- var @event = JsonSerializer.Deserialize(data!);
- events.Add(new(@event!, SequenceToken));
- }
- return events;
- }
-
- public bool ImportRequestContext()
- {
- return false;
- }
- }
-}
diff --git a/README.md b/README.md
index b38d442..8840fd4 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,82 @@
-# RedisStreamsInOrleans
-This is a project that demonstrates how to create a custom Redis Stream Provider for Orleans.
-https://swacblooms.com/custom-redis-streams-provider-for-orleans/
\ No newline at end of file
+# Universley.OrleansContrib.StreamsProvider.Redis
+
+## Summary
+This library provides an integration of Redis Streams with Microsoft Orleans, allowing you to use Redis as a streaming provider within your Orleans applications. It enables seamless communication and data streaming between Orleans grains and external clients using Redis Streams.
+
+## How to Use the Redis Provider with Orleans
+
+### 1. With Grain as a Client
+To use Redis Streams with a grain as a client, follow these steps:
+
+1. Install the necessary NuGet packages:
+ ```sh
+ dotnet add package Orleans.Streaming.Redis
+ ```
+
+2. Configure the Redis stream provider in your Orleans silo configuration:
+ ```csharp
+ var host = new SiloHostBuilder()
+ .AddRedisStreams("RedisProvider", options =>
+ {
+ options.ConnectionString = "your_redis_connection_string";
+ })
+ .Build();
+ ```
+
+3. In your grain, use the stream provider to send and receive messages:
+ ```csharp
+ public class MyGrain : Grain, IMyGrain
+ {
+ private IAsyncStream _stream;
+
+ public override async Task OnActivateAsync()
+ {
+ var streamProvider = GetStreamProvider("RedisProvider");
+ _stream = streamProvider.GetStream(this.GetPrimaryKey(), "streamNamespace");
+ await base.OnActivateAsync();
+ }
+
+ public async Task SendMessage(string message)
+ {
+ await _stream.OnNextAsync(message);
+ }
+
+ public async Task ReceiveMessages()
+ {
+ var handle = await _stream.SubscribeAsync((message, token) =>
+ {
+ Console.WriteLine($"Received message: {message}");
+ return Task.CompletedTask;
+ });
+ }
+ }
+ ```
+
+### 2. With External Client
+To use Redis Streams with an external client, follow these steps:
+
+1. Install the StackExchange.Redis package:
+ ```sh
+ dotnet add package StackExchange.Redis
+ ```
+
+2. Connect to the Redis server and interact with the stream:
+ ```csharp
+ using StackExchange.Redis;
+
+ var redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
+ var db = redis.GetDatabase();
+
+ // Add a message to the stream
+ var messageId = await db.StreamAddAsync("mystream", "message", "Hello, Redis!");
+
+ // Read messages from the stream
+ var messages = await db.StreamReadAsync("mystream", "0-0");
+ foreach (var message in messages)
+ {
+ Console.WriteLine($"Message ID: {message.Id}, Values: {string.Join(", ", message.Values)}");
+ }
+ ```
+
+## Credit
+This library is based on the original repository by [sammychinedu2ky](https://github.com/sammychinedu2ky/RedisStreamsInOrleans).
\ No newline at end of file
diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln
index 3feeb5e..886d220 100644
--- a/RedisStreamsInOrleans.sln
+++ b/RedisStreamsInOrleans.sln
@@ -3,11 +3,15 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.10.34928.147
MinimumVisualStudioVersion = 10.0.40219.1
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "example\Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "example\Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -15,6 +19,8 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -27,6 +33,10 @@ Global
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs
new file mode 100644
index 0000000..87fb0f1
--- /dev/null
+++ b/RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs
@@ -0,0 +1,45 @@
+using Moq;
+using StackExchange.Redis;
+using Microsoft.Extensions.Logging;
+using Orleans.Streams;
+using Orleans.Configuration;
+using Universley.OrleansContrib.StreamsProvider.Redis;
+
+namespace RedisStreamsProvider.UnitTests
+{
+ public class RedisStreamAdapterTests
+ {
+ private readonly Mock _mockDatabase;
+ private readonly Mock _mockQueueMapper;
+ private readonly Mock _mockLoggerFactory;
+ private readonly RedisStreamAdapter _adapter;
+
+ public RedisStreamAdapterTests()
+ {
+ _mockDatabase = new Mock();
+ var options = new HashRingStreamQueueMapperOptions { TotalQueueCount = 1 };
+ _mockQueueMapper = new Mock(options, "queueNamePrefix");
+ _mockLoggerFactory = new Mock();
+
+ _adapter = new RedisStreamAdapter(_mockDatabase.Object, "TestProvider", _mockQueueMapper.Object, _mockLoggerFactory.Object);
+ }
+
+ [Fact]
+ public void Constructor_ShouldInitializeProperties()
+ {
+ Assert.Equal("TestProvider", _adapter.Name);
+ Assert.False(_adapter.IsRewindable);
+ Assert.Equal(StreamProviderDirection.ReadWrite, _adapter.Direction);
+ }
+
+ [Fact]
+ public void CreateReceiver_ShouldReturnRedisStreamReceiver()
+ {
+ var queueId = QueueId.GetQueueId("queueName", 0, 1);
+ var receiver = _adapter.CreateReceiver(queueId);
+
+ Assert.NotNull(receiver);
+ Assert.IsType(receiver);
+ }
+ }
+}
diff --git a/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs
new file mode 100644
index 0000000..5e2d307
--- /dev/null
+++ b/RedisStreamsProvider.UnitTests/RedisStreamBatchContainerTests.cs
@@ -0,0 +1,79 @@
+using StackExchange.Redis;
+using System.Text.Json;
+using Universley.OrleansContrib.StreamsProvider.Redis;
+
+namespace RedisStreamsProvider.UnitTests
+{
+ public class RedisStreamBatchContainerTests
+ {
+ [Fact]
+ public void Constructor_ShouldInitializeProperties()
+ {
+ // Arrange
+ var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
+ {
+ new NameValueEntry("namespace", "testNamespace"),
+ new NameValueEntry("key", "testKey"),
+ new NameValueEntry("type", "TestEvent"),
+ new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
+ });
+
+ // Act
+ var container = new RedisStreamBatchContainer(streamEntry);
+
+ // Assert
+ Assert.Equal("testNamespace", container.StreamId.GetNamespace());
+ Assert.Equal("testKey", container.StreamId.GetKeyAsString());
+ Assert.Equal(streamEntry.Id.ToString().Split('-').First(), container.SequenceToken.SequenceNumber.ToString());
+ }
+
+ [Fact]
+ public void GetEvents_ShouldReturnDeserializedEvents()
+ {
+ // Arrange
+ var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
+ {
+ new NameValueEntry("namespace", "testNamespace"),
+ new NameValueEntry("key", "testKey"),
+ new NameValueEntry("type", "TestEvent"),
+ new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
+ });
+ var container = new RedisStreamBatchContainer(streamEntry);
+
+ // Act
+ var events = container.GetEvents().ToList();
+
+ // Assert
+ Assert.Single(events);
+ Assert.Equal(1, events[0].Item1.Id);
+ Assert.Equal("Test", events[0].Item1.Name);
+ Assert.Equal(streamEntry.Id.ToString().Split('-').First(), events[0].Item2.SequenceNumber.ToString());
+ }
+
+ [Fact]
+ public void ImportRequestContext_ShouldReturnFalse()
+ {
+ // Arrange
+ var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
+ {
+ new NameValueEntry("namespace", "testNamespace"),
+ new NameValueEntry("key", "testKey"),
+ new NameValueEntry("type", "TestEvent"),
+ new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
+ });
+ var container = new RedisStreamBatchContainer(streamEntry);
+
+ // Act
+ var result = container.ImportRequestContext();
+
+ // Assert
+ Assert.False(result);
+ }
+
+ private class TestEvent
+ {
+ public int Id { get; set; }
+ public string Name { get; set; }
+ }
+ }
+}
diff --git a/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs
new file mode 100644
index 0000000..665827b
--- /dev/null
+++ b/RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs
@@ -0,0 +1,88 @@
+using Microsoft.Extensions.Logging;
+using Moq;
+using Orleans.Configuration;
+using Orleans.Providers.Streams.Common;
+using Orleans.Streams;
+using StackExchange.Redis;
+using Universley.OrleansContrib.StreamsProvider.Redis;
+
+namespace RedisStreamsProvider.UnitTests
+{
+ public class RedisStreamFactoryTests
+ {
+ private readonly Mock _mockConnectionMultiplexer;
+ private readonly Mock _mockLoggerFactory;
+ private readonly Mock _mockServiceProvider;
+ private readonly Mock _mockStreamFailureHandler;
+ private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
+ private readonly HashRingStreamQueueMapperOptions _hashRingStreamQueueMapperOptions;
+ private readonly string _providerName = "TestProvider";
+
+ public RedisStreamFactoryTests()
+ {
+ _mockConnectionMultiplexer = new Mock();
+ _mockConnectionMultiplexer.Setup(x => x.GetDatabase(It.IsAny(), It.IsAny