net9-update-refactorings #1

Merged
michael merged 17 commits from net9-update-refactorings into master 2025-01-30 06:33:51 +00:00
38 changed files with 1386 additions and 98 deletions

View File

@ -0,0 +1,52 @@
name: CI Build
on:
push:
branches: [ master ]
paths-ignore:
- '**/*.md'
- '**/*.gitignore'
- '**/*.gitattributes'
pull_request:
branches: [ master ]
workflow_dispatch:
permissions:
contents: write
packages: write
env:
DOTNET_NOLOGO: true # Disable the .NET logo
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true # Disable the .NET first time experience
DOTNET_CLI_TELEMETRY_OPTOUT: true # Disable sending .NET CLI telemetry
MINOR_VERSION_OVERRIDE: 0
GITEA_SERVER_URL: ${{ vars.V_GITEA_SERVER_URL }}
GITEA_PACKAGE_OWNER: ${{ vars.V_GITEA_PACKAGE_OWNER }}
GITEA_NUGET_SOURCE_NAME: ${{ vars.V_GITEA_NUGET_SOURCE_NAME }}
GITEA_PACKAGE_OWNER_USER: ${{ secrets.S_GITEA_PACKAGE_OWNER_USER }}
GITEA_PACKAGE_OWNER_PASSWORD: ${{ secrets.S_GITEA_PACKAGE_OWNER_PASSWORD }}
jobs:
build-ci-api:
runs-on: [linux,self-hosted]
name: CI Build API
steps:
- name: Checkout
uses: https://github.com/actions/checkout@v4
- name: Setup .NET 9
uses: https://github.com/actions/setup-dotnet@v4
with:
dotnet-version: 9.x
- name: Make Build File Executable
shell: bash
run: |
chmod +x ./build.cmd
chmod +x ./build.sh
- name: Run Nuke Build
shell: bash
run: |
./build.cmd -Target Publish

5
.gitignore vendored
View File

@ -360,4 +360,7 @@ MigrationBackup/
.ionide/
# Fody - auto-generated XML schema
FodyWeavers.xsd
FodyWeavers.xsd
# Mac Finder
*.DS_Store

13
.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,13 @@
# Default ignored files
/shelf/
/workspace.xml
# Rider ignored files
/contentModel.xml
/.idea.RedisStreamsInOrleans.iml
/modules.xml
/projectSettingsUpdater.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding" addBOMForNewFiles="with BOM under Windows, with no BOM otherwise" />
</project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="UserContentModel">
<attachedFolders />
<explicitIncludes />
<explicitExcludes />
</component>
</project>

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

146
.nuke/build.schema.json Normal file
View File

@ -0,0 +1,146 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"definitions": {
"Host": {
"type": "string",
"enum": [
"AppVeyor",
"AzurePipelines",
"Bamboo",
"Bitbucket",
"Bitrise",
"GitHubActions",
"GitLab",
"Jenkins",
"Rider",
"SpaceAutomation",
"TeamCity",
"Terminal",
"TravisCI",
"VisualStudio",
"VSCode"
]
},
"ExecutableTarget": {
"type": "string",
"enum": [
"Clean",
"Compile",
"CreateAndPushGitTag",
"Pack",
"Publish",
"Test"
]
},
"Verbosity": {
"type": "string",
"description": "",
"enum": [
"Verbose",
"Normal",
"Minimal",
"Quiet"
]
},
"NukeBuild": {
"properties": {
"Continue": {
"type": "boolean",
"description": "Indicates to continue a previously failed build attempt"
},
"Help": {
"type": "boolean",
"description": "Shows the help text for this build assembly"
},
"Host": {
"description": "Host for execution. Default is 'automatic'",
"$ref": "#/definitions/Host"
},
"NoLogo": {
"type": "boolean",
"description": "Disables displaying the NUKE logo"
},
"Partition": {
"type": "string",
"description": "Partition to use on CI"
},
"Plan": {
"type": "boolean",
"description": "Shows the execution plan (HTML)"
},
"Profile": {
"type": "array",
"description": "Defines the profiles to load",
"items": {
"type": "string"
}
},
"Root": {
"type": "string",
"description": "Root directory during build execution"
},
"Skip": {
"type": "array",
"description": "List of targets to be skipped. Empty list skips all dependencies",
"items": {
"$ref": "#/definitions/ExecutableTarget"
}
},
"Target": {
"type": "array",
"description": "List of targets to be invoked. Default is '{default_target}'",
"items": {
"$ref": "#/definitions/ExecutableTarget"
}
},
"Verbosity": {
"description": "Logging verbosity during build execution. Default is 'Normal'",
"$ref": "#/definitions/Verbosity"
}
}
}
},
"allOf": [
{
"properties": {
"BuildNumber": {
"type": "string",
"description": "Build number override"
},
"Configuration": {
"type": "string",
"description": "Configuration to build - Default is 'Debug' (local) or 'Release' (server)",
"enum": [
"Debug",
"Release"
]
},
"GiteaNugetSourceName": {
"type": "string",
"description": "Gitea Nuget package source name"
},
"GiteaPackageOwnerPassword": {
"type": "string",
"description": "Gitea package owner password",
"default": "Secrets must be entered via 'nuke :secrets [profile]'"
},
"GiteaPackageOwnerUser": {
"type": "string",
"description": "Gitea package owner user",
"default": "Secrets must be entered via 'nuke :secrets [profile]'"
},
"NuGetApiKey": {
"type": "string",
"description": "NuGet API key"
},
"Solution": {
"type": "string",
"description": "Path to a solution file that is automatically loaded"
}
}
},
{
"$ref": "#/definitions/NukeBuild"
}
]
}

4
.nuke/parameters.json Normal file
View File

@ -0,0 +1,4 @@
{
"$schema": "build.schema.json",
"Solution": "RedisStreamsInOrleans.sln"
}

View File

@ -1,40 +0,0 @@
using Orleans.Runtime;
using Orleans.Streams;
using StackExchange.Redis;
using System.Text.Json;
namespace Provider
{
public class RedisStreamBatchContainer : IBatchContainer
{
public StreamId StreamId { get; }
public StreamSequenceToken SequenceToken { get; }
public StreamEntry StreamEntry { get; }
public RedisStreamBatchContainer(StreamEntry streamEntry)
{
StreamEntry = streamEntry;
var streamNamespace = StreamEntry.Values[0].Value;
var steamKey = StreamEntry.Values[1].Value;
StreamId = StreamId.Create(streamNamespace!, steamKey!);
SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
}
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
List<Tuple<T, StreamSequenceToken>> events = new();
var eventType = typeof(T).Name;
if (eventType == StreamEntry.Values[2].Value)
{
var data = StreamEntry.Values[3].Value;
var @event = JsonSerializer.Deserialize<T>(data!);
events.Add(new(@event!, SequenceToken));
}
return events;
}
public bool ImportRequestContext()
{
return false;
}
}
}

View File

@ -1,3 +1,82 @@
# RedisStreamsInOrleans
This is a project that demonstrates how to create a custom Redis Stream Provider for Orleans.
https://swacblooms.com/custom-redis-streams-provider-for-orleans/
# Universley.OrleansContrib.StreamsProvider.Redis
## Summary
This library provides an integration of Redis Streams with Microsoft Orleans, allowing you to use Redis as a streaming provider within your Orleans applications. It enables seamless communication and data streaming between Orleans grains and external clients using Redis Streams.
## How to Use the Redis Provider with Orleans
### 1. With Grain as a Client
To use Redis Streams with a grain as a client, follow these steps:
1. Install the necessary NuGet packages:
```sh
dotnet add package Orleans.Streaming.Redis
```
2. Configure the Redis stream provider in your Orleans silo configuration:
```csharp
var host = new SiloHostBuilder()
.AddRedisStreams("RedisProvider", options =>
{
options.ConnectionString = "your_redis_connection_string";
})
.Build();
```
3. In your grain, use the stream provider to send and receive messages:
```csharp
public class MyGrain : Grain, IMyGrain
{
private IAsyncStream<string> _stream;
public override async Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("RedisProvider");
_stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "streamNamespace");
await base.OnActivateAsync();
}
public async Task SendMessage(string message)
{
await _stream.OnNextAsync(message);
}
public async Task ReceiveMessages()
{
var handle = await _stream.SubscribeAsync((message, token) =>
{
Console.WriteLine($"Received message: {message}");
return Task.CompletedTask;
});
}
}
```
### 2. With External Client
To use Redis Streams with an external client, follow these steps:
1. Install the StackExchange.Redis package:
```sh
dotnet add package StackExchange.Redis
```
2. Connect to the Redis server and interact with the stream:
```csharp
using StackExchange.Redis;
var redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
var db = redis.GetDatabase();
// Add a message to the stream
var messageId = await db.StreamAddAsync("mystream", "message", "Hello, Redis!");
// Read messages from the stream
var messages = await db.StreamReadAsync("mystream", "0-0");
foreach (var message in messages)
{
Console.WriteLine($"Message ID: {message.Id}, Values: {string.Join(", ", message.Values)}");
}
```
## Credit
This library is based on the original repository by [sammychinedu2ky](https://github.com/sammychinedu2ky/RedisStreamsInOrleans).

View File

@ -3,11 +3,15 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.10.34928.147
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "example\Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "example\Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -15,6 +19,8 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -27,6 +33,10 @@ Global
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -0,0 +1,45 @@
using Moq;
using StackExchange.Redis;
using Microsoft.Extensions.Logging;
using Orleans.Streams;
using Orleans.Configuration;
using Universley.OrleansContrib.StreamsProvider.Redis;
namespace RedisStreamsProvider.UnitTests
{
public class RedisStreamAdapterTests
{
private readonly Mock<IDatabase> _mockDatabase;
private readonly Mock<HashRingBasedStreamQueueMapper> _mockQueueMapper;
private readonly Mock<ILoggerFactory> _mockLoggerFactory;
private readonly RedisStreamAdapter _adapter;
public RedisStreamAdapterTests()
{
_mockDatabase = new Mock<IDatabase>();
var options = new HashRingStreamQueueMapperOptions { TotalQueueCount = 1 };
_mockQueueMapper = new Mock<HashRingBasedStreamQueueMapper>(options, "queueNamePrefix");
_mockLoggerFactory = new Mock<ILoggerFactory>();
_adapter = new RedisStreamAdapter(_mockDatabase.Object, "TestProvider", _mockQueueMapper.Object, _mockLoggerFactory.Object);
}
[Fact]
public void Constructor_ShouldInitializeProperties()
{
Assert.Equal("TestProvider", _adapter.Name);
Assert.False(_adapter.IsRewindable);
Assert.Equal(StreamProviderDirection.ReadWrite, _adapter.Direction);
}
[Fact]
public void CreateReceiver_ShouldReturnRedisStreamReceiver()
{
var queueId = QueueId.GetQueueId("queueName", 0, 1);
var receiver = _adapter.CreateReceiver(queueId);
Assert.NotNull(receiver);
Assert.IsType<RedisStreamReceiver>(receiver);
}
}
}

View File

@ -0,0 +1,79 @@
using StackExchange.Redis;
using System.Text.Json;
using Universley.OrleansContrib.StreamsProvider.Redis;
namespace RedisStreamsProvider.UnitTests
{
public class RedisStreamBatchContainerTests
{
[Fact]
public void Constructor_ShouldInitializeProperties()
{
// Arrange
var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
{
new NameValueEntry("namespace", "testNamespace"),
new NameValueEntry("key", "testKey"),
new NameValueEntry("type", "TestEvent"),
new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
});
// Act
var container = new RedisStreamBatchContainer(streamEntry);
// Assert
Assert.Equal("testNamespace", container.StreamId.GetNamespace());
Assert.Equal("testKey", container.StreamId.GetKeyAsString());
Assert.Equal(streamEntry.Id.ToString().Split('-').First(), container.SequenceToken.SequenceNumber.ToString());
}
[Fact]
public void GetEvents_ShouldReturnDeserializedEvents()
{
// Arrange
var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
{
new NameValueEntry("namespace", "testNamespace"),
new NameValueEntry("key", "testKey"),
new NameValueEntry("type", "TestEvent"),
new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
});
var container = new RedisStreamBatchContainer(streamEntry);
// Act
var events = container.GetEvents<TestEvent>().ToList();
// Assert
Assert.Single(events);
Assert.Equal(1, events[0].Item1.Id);
Assert.Equal("Test", events[0].Item1.Name);
Assert.Equal(streamEntry.Id.ToString().Split('-').First(), events[0].Item2.SequenceNumber.ToString());
}
[Fact]
public void ImportRequestContext_ShouldReturnFalse()
{
// Arrange
var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
{
new NameValueEntry("namespace", "testNamespace"),
new NameValueEntry("key", "testKey"),
new NameValueEntry("type", "TestEvent"),
new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
});
var container = new RedisStreamBatchContainer(streamEntry);
// Act
var result = container.ImportRequestContext();
// Assert
Assert.False(result);
}
private class TestEvent
{
public int Id { get; set; }
public string Name { get; set; }
}
}
}

View File

@ -0,0 +1,88 @@
using Microsoft.Extensions.Logging;
using Moq;
using Orleans.Configuration;
using Orleans.Providers.Streams.Common;
using Orleans.Streams;
using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
namespace RedisStreamsProvider.UnitTests
{
public class RedisStreamFactoryTests
{
private readonly Mock<IConnectionMultiplexer> _mockConnectionMultiplexer;
private readonly Mock<ILoggerFactory> _mockLoggerFactory;
private readonly Mock<IServiceProvider> _mockServiceProvider;
private readonly Mock<IStreamFailureHandler> _mockStreamFailureHandler;
private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
private readonly HashRingStreamQueueMapperOptions _hashRingStreamQueueMapperOptions;
private readonly string _providerName = "TestProvider";
public RedisStreamFactoryTests()
{
_mockConnectionMultiplexer = new Mock<IConnectionMultiplexer>();
_mockConnectionMultiplexer.Setup(x => x.GetDatabase(It.IsAny<int>(), It.IsAny<object>())).Returns(new Mock<IDatabase>().Object);
_mockLoggerFactory = new Mock<ILoggerFactory>();
_mockServiceProvider = new Mock<IServiceProvider>();
_mockStreamFailureHandler = new Mock<IStreamFailureHandler>();
_simpleQueueCacheOptions = new SimpleQueueCacheOptions();
_hashRingStreamQueueMapperOptions = new HashRingStreamQueueMapperOptions();
}
[Fact]
public void Constructor_ShouldThrowArgumentNullException_WhenAnyArgumentIsNull()
{
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(null, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, null, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, null, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, null, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, null, _hashRingStreamQueueMapperOptions));
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, null));
}
[Fact]
public async Task CreateAdapter_ShouldReturnRedisStreamAdapterInstance()
{
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
var adapter = await factory.CreateAdapter();
Assert.NotNull(adapter);
Assert.IsType<RedisStreamAdapter>(adapter);
}
[Fact]
public async Task GetDeliveryFailureHandler_ShouldReturnStreamFailureHandler()
{
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
var handler = await factory.GetDeliveryFailureHandler(new QueueId());
Assert.NotNull(handler);
Assert.Equal(_mockStreamFailureHandler.Object, handler);
}
[Fact]
public void GetQueueAdapterCache_ShouldReturnSimpleQueueAdapterCacheInstance()
{
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
var cache = factory.GetQueueAdapterCache();
Assert.NotNull(cache);
Assert.IsType<SimpleQueueAdapterCache>(cache);
}
[Fact]
public void GetStreamQueueMapper_ShouldReturnHashRingBasedStreamQueueMapperInstance()
{
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
var mapper = factory.GetStreamQueueMapper();
Assert.NotNull(mapper);
Assert.IsType<HashRingBasedStreamQueueMapper>(mapper);
}
}
}

View File

@ -0,0 +1,123 @@
using Microsoft.Extensions.Logging;
using Moq;
using Orleans.Streams;
using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
namespace RedisStreamsProvider.UnitTests
{
public class RedisStreamReceiverTests
{
private readonly Mock<IDatabase> _mockDatabase;
private readonly Mock<ILogger<RedisStreamReceiver>> _mockLogger;
private readonly QueueId _queueId;
private readonly RedisStreamReceiver _receiver;
public RedisStreamReceiverTests()
{
_mockDatabase = new Mock<IDatabase>();
_mockLogger = new Mock<ILogger<RedisStreamReceiver>>();
_queueId = QueueId.GetQueueId("testQueue", 0, 0); // Added the missing 'hash' parameter
_receiver = new RedisStreamReceiver(_queueId, _mockDatabase.Object, _mockLogger.Object);
}
[Fact]
public async Task GetQueueMessagesAsync_ReturnsBatches()
{
// Arrange
var streamEntries = new[]
{
new StreamEntry("1-0", [
new("namespace", "testNamespace"),
new("key", "testKey"),
new("eventType", "testEventType" ),
new( "data", "testData" )
]),
new StreamEntry("2-0", [
new("namespace", "testNamespace"),
new("key", "testKey"),
new("eventType", "testEventType" ),
new( "data", "testData" )
])
};
_mockDatabase.Setup(db => db.StreamReadGroupAsync(
It.IsAny<RedisKey>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue?>(),
It.IsAny<int?>(), It.IsAny<bool>(), CommandFlags.None))
.ReturnsAsync(streamEntries);
// Act
var result = await _receiver.GetQueueMessagesAsync(10);
// Assert
Assert.NotNull(result);
Assert.Equal(2, result.Count);
}
[Fact]
public async Task Initialize_CreatesConsumerGroup()
{
// Arrange
_mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
It.IsAny<RedisValue>(), It.IsAny<bool>(), CommandFlags.None))
.ReturnsAsync(true);
// Act
await _receiver.Initialize(TimeSpan.FromSeconds(5));
// Assert
_mockDatabase.Verify(
db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None),
Times.Once);
}
[Fact]
public async Task MessagesDeliveredAsync_AcknowledgesMessages()
{
// Arrange
var messages = new List<IBatchContainer>
{
new RedisStreamBatchContainer(new StreamEntry("1-0", [
new("namespace", "testNamespace"),
new("key", "testKey"),
new("eventType", "testEventType" ),
new( "data", "testData" )
])),
new RedisStreamBatchContainer(new StreamEntry("2-0", [
new("namespace", "testNamespace"),
new("key", "testKey"),
new("eventType", "testEventType" ),
new( "data", "testData" )
]))
};
_mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
It.IsAny<RedisValue>(), CommandFlags.None))
.ReturnsAsync(2);
// Act
await _receiver.MessagesDeliveredAsync(messages);
// Assert
_mockDatabase.Verify(
db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once);
_mockDatabase.Verify(
db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once);
}
[Fact]
public async Task Shutdown_WaitsForPendingTasks()
{
// Arrange
var tcs = new TaskCompletionSource<StreamEntry[]>();
_mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<int>(), CommandFlags.None))
.Returns(tcs.Task);
// Act
var getMessagesTask = _receiver.GetQueueMessagesAsync(10);
await _receiver.Shutdown(TimeSpan.FromSeconds(5));
// Assert
Assert.True(getMessagesTask.IsCompleted);
}
}
}

View File

@ -0,0 +1,130 @@
using Moq;
using Orleans.Streams;
using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
namespace RedisStreamsProvider.UnitTests
{
public class RedisStreamSequenceTokenTests
{
[Fact]
public void Constructor_ShouldInitializeProperties_FromRedisValue()
{
// Arrange
var redisValue = new RedisValue("123-456");
// Act
var token = new RedisStreamSequenceToken(redisValue);
// Assert
Assert.Equal(123, token.SequenceNumber);
Assert.Equal(456, token.EventIndex);
}
[Fact]
public void Constructor_ShouldInitializeProperties_FromParameters()
{
// Arrange
long sequenceNumber = 123;
int eventIndex = 456;
// Act
var token = new RedisStreamSequenceToken(sequenceNumber, eventIndex);
// Assert
Assert.Equal(sequenceNumber, token.SequenceNumber);
Assert.Equal(eventIndex, token.EventIndex);
}
[Fact]
public void CompareTo_ShouldReturnZero_ForEqualTokens()
{
// Arrange
var token1 = new RedisStreamSequenceToken(123, 456);
var token2 = new RedisStreamSequenceToken(123, 456);
// Act
var result = token1.CompareTo(token2);
// Assert
Assert.Equal(0, result);
}
[Fact]
public void CompareTo_ShouldReturnPositive_ForGreaterToken()
{
// Arrange
var token1 = new RedisStreamSequenceToken(123, 456);
var token2 = new RedisStreamSequenceToken(123, 455);
// Act
var result = token1.CompareTo(token2);
// Assert
Assert.True(result > 0);
}
[Fact]
public void CompareTo_ShouldReturnNegative_ForLesserToken()
{
// Arrange
var token1 = new RedisStreamSequenceToken(123, 455);
var token2 = new RedisStreamSequenceToken(123, 456);
// Act
var result = token1.CompareTo(token2);
// Assert
Assert.True(result < 0);
}
[Fact]
public void Equals_ShouldReturnTrue_ForEqualTokens()
{
// Arrange
var token1 = new RedisStreamSequenceToken(123, 456);
var token2 = new RedisStreamSequenceToken(123, 456);
// Act
var result = token1.Equals(token2);
// Assert
Assert.True(result);
}
[Fact]
public void Equals_ShouldReturnFalse_ForDifferentTokens()
{
// Arrange
var token1 = new RedisStreamSequenceToken(123, 456);
var token2 = new RedisStreamSequenceToken(123, 457);
// Act
var result = token1.Equals(token2);
// Assert
Assert.False(result);
}
[Fact]
public void CompareTo_ShouldThrowArgumentNullException_ForNullToken()
{
// Arrange
var token = new RedisStreamSequenceToken(123, 456);
// Act & Assert
Assert.Throws<ArgumentNullException>(() => token.CompareTo(null));
}
[Fact]
public void CompareTo_ShouldThrowArgumentException_ForInvalidTokenType()
{
// Arrange
var token = new RedisStreamSequenceToken(123, 456);
var invalidToken = new Mock<StreamSequenceToken>().Object;
// Act & Assert
Assert.Throws<ArgumentException>(() => token.CompareTo(invalidToken));
}
}
}

View File

@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj" />
</ItemGroup>
<ItemGroup>
<Using Include="Xunit" />
</ItemGroup>
</Project>

View File

@ -1,10 +1,9 @@
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Streams;
using StackExchange.Redis;
using System.Text.Json;
namespace Provider
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
public class RedisStreamAdapter : IQueueAdapter
{
@ -16,10 +15,10 @@ namespace Provider
public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory)
{
_database = database;
_providerName = providerName;
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper;
_loggerFactory = loggerFactory;
_database = database ?? throw new ArgumentNullException(nameof(database));
_providerName = providerName ?? throw new ArgumentNullException(nameof(providerName));
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper ?? throw new ArgumentNullException(nameof(hashRingBasedStreamQueueMapper));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_logger = loggerFactory.CreateLogger<RedisStreamAdapter>();
}

View File

@ -0,0 +1,75 @@
using Orleans.Streams;
using StackExchange.Redis;
using System.Text.Json;
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
[GenerateSerializer]
[Alias("Universley.OrleansContrib.StreamsProvider.Redis.RedisStreamBatchContainer")]
public class RedisStreamBatchContainer : IBatchContainer
{
[Id(0)]
public StreamId StreamId { get; }
[Id(1)]
public StreamSequenceToken SequenceToken { get; }
[Id(2)]
public string EventType { get; }
[Id(3)]
public string Data { get; }
[Id(4)]
public string StreamEntryId { get; }
public RedisStreamBatchContainer(StreamEntry streamEntry)
{
var streamNamespace = streamEntry.Values[0].Value;
var steamKey = streamEntry.Values[1].Value;
var eventType = streamEntry.Values[2].Value;
var data = streamEntry.Values[3].Value;
StreamEntryId = streamEntry.Id.ToString() ?? throw new ArgumentNullException(nameof(streamEntry.Id));
// Check incoming data
if (string.IsNullOrWhiteSpace(streamNamespace))
{
throw new ArgumentNullException(nameof(streamNamespace));
}
if (string.IsNullOrWhiteSpace(steamKey))
{
throw new ArgumentNullException(nameof(steamKey));
}
if (string.IsNullOrWhiteSpace(eventType))
{
throw new ArgumentNullException(nameof(eventType));
}
if (string.IsNullOrWhiteSpace(data))
{
throw new ArgumentNullException(nameof(data));
}
StreamId = StreamId.Create(streamNamespace!, steamKey!);
SequenceToken = new RedisStreamSequenceToken(streamEntry.Id);
EventType = eventType!;
Data = data!;
}
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
List<Tuple<T, StreamSequenceToken>> events = new();
var eventType = typeof(T).Name;
if (eventType == EventType)
{
var data = Data;
var @event = JsonSerializer.Deserialize<T>(data);
events.Add(new(@event!, SequenceToken));
}
return events;
}
public bool ImportRequestContext()
{
return false;
}
}
}

View File

@ -5,18 +5,18 @@ using Orleans.Providers.Streams.Common;
using Orleans.Streams;
using StackExchange.Redis;
namespace Provider
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
public class RedisStreamFactory : IQueueAdapterFactory
{
private readonly IDatabase _database;
private readonly IConnectionMultiplexer _connectionMultiplexer;
private readonly ILoggerFactory _loggerFactory;
private readonly string _providerName;
private readonly IStreamFailureHandler _streamFailureHandler;
private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
public RedisStreamFactory(IDatabase database,
public RedisStreamFactory(IConnectionMultiplexer connectionMultiplexer,
ILoggerFactory loggerFactory,
string providerName,
IStreamFailureHandler streamFailureHandler,
@ -24,28 +24,33 @@ namespace Provider
HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions
)
{
_database = database;
_loggerFactory = loggerFactory;
_providerName = providerName;
_streamFailureHandler = streamFailureHandler;
_simpleQueueCacheOptions = simpleQueueCacheOptions;
if (hashRingStreamQueueMapperOptions is null)
{
throw new ArgumentNullException(nameof(hashRingStreamQueueMapperOptions));
}
_connectionMultiplexer = connectionMultiplexer ?? throw new ArgumentNullException(nameof(connectionMultiplexer));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_providerName = providerName ?? throw new ArgumentNullException(nameof(providerName));
_streamFailureHandler = streamFailureHandler ?? throw new ArgumentNullException(nameof(streamFailureHandler));
_simpleQueueCacheOptions = simpleQueueCacheOptions ?? throw new ArgumentNullException(nameof(simpleQueueCacheOptions));
_hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName);
}
public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName)
{
var database = provider.GetRequiredService<IDatabase>();
var connMuliplexer = provider.GetRequiredService<IConnectionMultiplexer>();
var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var simpleQueueCacheOptions = provider.GetOptionsByName<SimpleQueueCacheOptions>(providerName);
var hashRingStreamQueueMapperOptions = provider.GetOptionsByName<HashRingStreamQueueMapperOptions>(providerName);
var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger<RedisStreamFailureHandler>());
return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions);
return new RedisStreamFactory(connMuliplexer, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions);
}
public Task<IQueueAdapter> CreateAdapter()
{
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory));
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_connectionMultiplexer.GetDatabase(), _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory));
}
public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)

View File

@ -1,8 +1,7 @@
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Streams;
namespace Provider
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
public class RedisStreamFailureHandler : IStreamFailureHandler
{
@ -10,7 +9,7 @@ namespace Provider
public RedisStreamFailureHandler(ILogger<RedisStreamFailureHandler> logger)
{
_logger = logger;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public bool ShouldFaultSubsriptionOnError => true;

View File

@ -3,7 +3,7 @@ using Orleans.Streams;
using StackExchange.Redis;
namespace Provider
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
public class RedisStreamReceiver : IQueueAdapterReceiver
{
@ -16,8 +16,8 @@ namespace Provider
public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger<RedisStreamReceiver> logger)
{
_queueId = queueId;
_database = database;
_logger = logger;
_database = database ?? throw new ArgumentNullException(nameof(database));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<IList<IBatchContainer>?> GetQueueMessagesAsync(int maxCount)
@ -69,7 +69,7 @@ namespace Provider
var container = message as RedisStreamBatchContainer;
if (container != null)
{
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntryId);
pendingTasks = ack;
await ack;
}

View File

@ -1,15 +1,15 @@
using Orleans.Streams;
using StackExchange.Redis;
namespace Provider
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
[GenerateSerializer]
public class RedisStreamSequenceToken : StreamSequenceToken
{
[Id(0)]
public override long SequenceNumber { get; protected set; }
public sealed override long SequenceNumber { get; protected set; }
[Id(1)]
public override int EventIndex { get; protected set; }
public sealed override int EventIndex { get; protected set; }
public RedisStreamSequenceToken(RedisValue id)
{
@ -37,7 +37,7 @@ namespace Provider
throw new ArgumentException("Invalid token type", nameof(other));
}
public override bool Equals(StreamSequenceToken other)
public override bool Equals(StreamSequenceToken? other)
{
var token = other as RedisStreamSequenceToken;
return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex;

View File

@ -1,15 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
<PackageReference Include="Microsoft.Orleans.Sdk" Version="8.1.0" />
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.0.1" />
<PackageReference Include="Microsoft.Orleans.Sdk" Version="9.0.1" />
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
</ItemGroup>
</Project>

7
build.cmd Executable file
View File

@ -0,0 +1,7 @@
:; set -eo pipefail
:; SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
:; ${SCRIPT_DIR}/build.sh "$@"
:; exit $?
@ECHO OFF
powershell -ExecutionPolicy ByPass -NoProfile -File "%~dp0build.ps1" %*

74
build.ps1 Normal file
View File

@ -0,0 +1,74 @@
[CmdletBinding()]
Param(
[Parameter(Position=0,Mandatory=$false,ValueFromRemainingArguments=$true)]
[string[]]$BuildArguments
)
Write-Output "PowerShell $($PSVersionTable.PSEdition) version $($PSVersionTable.PSVersion)"
Set-StrictMode -Version 2.0; $ErrorActionPreference = "Stop"; $ConfirmPreference = "None"; trap { Write-Error $_ -ErrorAction Continue; exit 1 }
$PSScriptRoot = Split-Path $MyInvocation.MyCommand.Path -Parent
###########################################################################
# CONFIGURATION
###########################################################################
$BuildProjectFile = "$PSScriptRoot\build\_build.csproj"
$TempDirectory = "$PSScriptRoot\\.nuke\temp"
$DotNetGlobalFile = "$PSScriptRoot\\global.json"
$DotNetInstallUrl = "https://dot.net/v1/dotnet-install.ps1"
$DotNetChannel = "STS"
$env:DOTNET_CLI_TELEMETRY_OPTOUT = 1
$env:DOTNET_NOLOGO = 1
###########################################################################
# EXECUTION
###########################################################################
function ExecSafe([scriptblock] $cmd) {
& $cmd
if ($LASTEXITCODE) { exit $LASTEXITCODE }
}
# If dotnet CLI is installed globally and it matches requested version, use for execution
if ($null -ne (Get-Command "dotnet" -ErrorAction SilentlyContinue) -and `
$(dotnet --version) -and $LASTEXITCODE -eq 0) {
$env:DOTNET_EXE = (Get-Command "dotnet").Path
}
else {
# Download install script
$DotNetInstallFile = "$TempDirectory\dotnet-install.ps1"
New-Item -ItemType Directory -Path $TempDirectory -Force | Out-Null
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
(New-Object System.Net.WebClient).DownloadFile($DotNetInstallUrl, $DotNetInstallFile)
# If global.json exists, load expected version
if (Test-Path $DotNetGlobalFile) {
$DotNetGlobal = $(Get-Content $DotNetGlobalFile | Out-String | ConvertFrom-Json)
if ($DotNetGlobal.PSObject.Properties["sdk"] -and $DotNetGlobal.sdk.PSObject.Properties["version"]) {
$DotNetVersion = $DotNetGlobal.sdk.version
}
}
# Install by channel or version
$DotNetDirectory = "$TempDirectory\dotnet-win"
if (!(Test-Path variable:DotNetVersion)) {
ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Channel $DotNetChannel -NoPath }
} else {
ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Version $DotNetVersion -NoPath }
}
$env:DOTNET_EXE = "$DotNetDirectory\dotnet.exe"
$env:PATH = "$DotNetDirectory;$env:PATH"
}
Write-Output "Microsoft (R) .NET SDK version $(& $env:DOTNET_EXE --version)"
if (Test-Path env:NUKE_ENTERPRISE_TOKEN) {
& $env:DOTNET_EXE nuget remove source "nuke-enterprise" > $null
& $env:DOTNET_EXE nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password $env:NUKE_ENTERPRISE_TOKEN > $null
}
ExecSafe { & $env:DOTNET_EXE build $BuildProjectFile /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet }
ExecSafe { & $env:DOTNET_EXE run --project $BuildProjectFile --no-build -- $BuildArguments }

67
build.sh Executable file
View File

@ -0,0 +1,67 @@
#!/usr/bin/env bash
bash --version 2>&1 | head -n 1
set -eo pipefail
SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
###########################################################################
# CONFIGURATION
###########################################################################
BUILD_PROJECT_FILE="$SCRIPT_DIR/build/_build.csproj"
TEMP_DIRECTORY="$SCRIPT_DIR//.nuke/temp"
DOTNET_GLOBAL_FILE="$SCRIPT_DIR//global.json"
DOTNET_INSTALL_URL="https://dot.net/v1/dotnet-install.sh"
DOTNET_CHANNEL="STS"
export DOTNET_CLI_TELEMETRY_OPTOUT=1
export DOTNET_NOLOGO=1
###########################################################################
# EXECUTION
###########################################################################
function FirstJsonValue {
perl -nle 'print $1 if m{"'"$1"'": "([^"]+)",?}' <<< "${@:2}"
}
# If dotnet CLI is installed globally and it matches requested version, use for execution
if [ -x "$(command -v dotnet)" ] && dotnet --version &>/dev/null; then
export DOTNET_EXE="$(command -v dotnet)"
else
# Download install script
DOTNET_INSTALL_FILE="$TEMP_DIRECTORY/dotnet-install.sh"
mkdir -p "$TEMP_DIRECTORY"
curl -Lsfo "$DOTNET_INSTALL_FILE" "$DOTNET_INSTALL_URL"
chmod +x "$DOTNET_INSTALL_FILE"
# If global.json exists, load expected version
if [[ -f "$DOTNET_GLOBAL_FILE" ]]; then
DOTNET_VERSION=$(FirstJsonValue "version" "$(cat "$DOTNET_GLOBAL_FILE")")
if [[ "$DOTNET_VERSION" == "" ]]; then
unset DOTNET_VERSION
fi
fi
# Install by channel or version
DOTNET_DIRECTORY="$TEMP_DIRECTORY/dotnet-unix"
if [[ -z ${DOTNET_VERSION+x} ]]; then
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --channel "$DOTNET_CHANNEL" --no-path
else
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "$DOTNET_VERSION" --no-path
fi
export DOTNET_EXE="$DOTNET_DIRECTORY/dotnet"
export PATH="$DOTNET_DIRECTORY:$PATH"
fi
echo "Microsoft (R) .NET SDK version $("$DOTNET_EXE" --version)"
if [[ ! -z ${NUKE_ENTERPRISE_TOKEN+x} && "$NUKE_ENTERPRISE_TOKEN" != "" ]]; then
"$DOTNET_EXE" nuget remove source "nuke-enterprise" &>/dev/null || true
"$DOTNET_EXE" nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password "$NUKE_ENTERPRISE_TOKEN" --store-password-in-clear-text &>/dev/null || true
fi
"$DOTNET_EXE" build "$BUILD_PROJECT_FILE" /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet
"$DOTNET_EXE" run --project "$BUILD_PROJECT_FILE" --no-build -- "$@"

11
build/.editorconfig Normal file
View File

@ -0,0 +1,11 @@
[*.cs]
dotnet_style_qualification_for_field = false:warning
dotnet_style_qualification_for_property = false:warning
dotnet_style_qualification_for_method = false:warning
dotnet_style_qualification_for_event = false:warning
dotnet_style_require_accessibility_modifiers = never:warning
csharp_style_expression_bodied_methods = true:silent
csharp_style_expression_bodied_properties = true:warning
csharp_style_expression_bodied_indexers = true:warning
csharp_style_expression_bodied_accessors = true:warning

177
build/Build.cs Normal file
View File

@ -0,0 +1,177 @@
using System;
using System.IO;
using System.Linq;
using Nuke.Common;
using Nuke.Common.CI;
using Nuke.Common.Execution;
using Nuke.Common.IO;
using Nuke.Common.ProjectModel;
using Nuke.Common.Tooling;
using Nuke.Common.Tools.DotNet;
using Nuke.Common.Tools.Git;
using Nuke.Common.Tools.GitVersion;
using Nuke.Common.Utilities.Collections;
using Serilog;
using static Nuke.Common.EnvironmentInfo;
using static Nuke.Common.IO.PathConstruction;
class Build : NukeBuild
{
/// Support plugins are available for:
/// - JetBrains ReSharper https://nuke.build/resharper
/// - JetBrains Rider https://nuke.build/rider
/// - Microsoft VisualStudio https://nuke.build/visualstudio
/// - Microsoft VSCode https://nuke.build/vscode
public static int Main() => Execute<Build>(x => x.Pack);
private const string LibraryProjectName = "Universley.OrleansContrib.StreamsProvider.Redis";
[Parameter("Configuration to build - Default is 'Debug' (local) or 'Release' (server)")]
readonly Configuration Configuration = IsLocalBuild ? Configuration.Debug : Configuration.Release;
[Solution] readonly Solution Solution;
AbsolutePath SourceDirectory => RootDirectory;
AbsolutePath ArtifactsDirectory => RootDirectory / "artifacts";
AbsolutePath NuGetPackagesDirectory => ArtifactsDirectory / "nuget";
AbsolutePath TestResultDirectory => ArtifactsDirectory / "test-results";
// Nuget API key can be also set as an environment variable
[Parameter("NuGet API key")]
readonly string NuGetApiKey;
[Parameter("Build number override")]
readonly string BuildNumber
= Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER") ?? $"{0}";
[Parameter("Gitea Nuget package source name")]
readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME");
[Parameter("Gitea package owner user")]
[Secret]
readonly string GiteaPackageOwnerUser = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_USER");
[Parameter("Gitea package owner password")]
[Secret]
readonly string GiteaPackageOwnerPassword = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_PASSWORD");
Target Clean => _ => _
.Before(Compile)
.Executes(() =>
{
SourceDirectory
.GlobDirectories("**/bin", "**/obj")
.Except(RootDirectory.GlobDirectories("build/**/bin", "build/**/obj"))
.ForEach(ap => Directory.Delete(ap, true));
ArtifactsDirectory.CreateOrCleanDirectory();
});
Target Compile => _ => _
.Executes(() =>
{
var semVer = Version;
var semFileVer = Version;
var informationalVersion = Version;
Log.Logger.Information("AssemblySemVer: {semVer}", semVer);
DotNetTasks.DotNetBuild(s => s
.SetProjectFile(Solution)
.SetConfiguration(Configuration)
.SetAssemblyVersion(semVer)
.SetFileVersion(semFileVer)
.SetInformationalVersion(informationalVersion));
});
Target Test => _ => _
.DependsOn(Compile)
.Executes(() =>
{
DotNetTasks.DotNetTest(s => s
.SetProjectFile(Solution)
.SetConfiguration(Configuration)
.SetLoggers($"trx;LogFileName={TestResultDirectory / "testresults.trx"}")
);
});
Target Pack => _ => _
.DependsOn(Test)
.Executes(() =>
{
DotNetTasks.DotNetPack(s => s
.SetProject(SourceDirectory / "Universley.OrleansContrib.StreamsProvider.Redis" / $"{LibraryProjectName}.csproj")
.SetConfiguration(Configuration)
.SetOutputDirectory(NuGetPackagesDirectory)
.SetVersion(Version)
.SetNoBuild(true)
);
});
Target Publish => _ => _
.DependsOn(Pack, CreateAndPushGitTag)
.Executes(() =>
{
try
{
DotNetTasks.DotNetNuGetAddSource(c => c
.EnableStorePasswordInClearText()
.SetUsername(GiteaPackageOwnerUser)
.SetPassword(GiteaPackageOwnerPassword)
.SetName("Gitea")
.SetSource($"{GiteaNugetSourceName}/index.json"));
}
catch (Exception e)
{
Log.Warning("Error adding Gitea source: {e}", e);
}
DotNetTasks.DotNetNuGetPush(s => s
.SetSource($"{GiteaNugetSourceName}/symbols")
.SetApiKey(NuGetApiKey)
.SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.snupkg"));
DotNetTasks.DotNetNuGetPush(s => s
.SetSource($"{GiteaNugetSourceName}/index.json")
.SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.nupkg"));
});
Target CreateAndPushGitTag => _ => _
.Executes(() =>
{
var gitTag = $"{Version}";
GitTasks.Git($"tag {gitTag}");
GitTasks.Git($"push origin {gitTag}");
});
private string Version
{
get
{
if (version == null)
{
version = GetVersion();
}
return version;
}
}
private string version = null;
private string GetVersion()
{
Log.Information("GitHub run number is {number}", Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER"));
var buildNumber = BuildNumber;
if (string.IsNullOrEmpty(buildNumber))
{
return "1.0.0";
}
var currentDateTime = DateTimeOffset.UtcNow;
var assembledVersion = $"{currentDateTime.Year}.{currentDateTime.Month}.{buildNumber}";
Log.Information("Assembled version: {assembledVersion}", assembledVersion);
return assembledVersion;
}
}

16
build/Configuration.cs Normal file
View File

@ -0,0 +1,16 @@
using System;
using System.ComponentModel;
using System.Linq;
using Nuke.Common.Tooling;
[TypeConverter(typeof(TypeConverter<Configuration>))]
public class Configuration : Enumeration
{
public static Configuration Debug = new Configuration { Value = nameof(Debug) };
public static Configuration Release = new Configuration { Value = nameof(Release) };
public static implicit operator string(Configuration configuration)
{
return configuration.Value;
}
}

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<!-- This file prevents unintended imports of unrelated MSBuild files -->
<!-- Uncomment to include parent Directory.Build.props file -->
<!--<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />-->
</Project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<!-- This file prevents unintended imports of unrelated MSBuild files -->
<!-- Uncomment to include parent Directory.Build.targets file -->
<!--<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />-->
</Project>

22
build/_build.csproj Normal file
View File

@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace></RootNamespace>
<NoWarn>CS0649;CS0169;CA1050;CA1822;CA2211;IDE1006</NoWarn>
<NukeRootDirectory>..</NukeRootDirectory>
<NukeScriptDirectory>..</NukeScriptDirectory>
<NukeTelemetryVersion>1</NukeTelemetryVersion>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Nuke.Common" Version="9.0.4" />
</ItemGroup>
<ItemGroup>
<PackageDownload Include="GitVersion.Tool" Version="[6.1.0]" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,31 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=HeapView_002EDelegateAllocation/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=VariableHidesOuterVariable/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ClassNeverInstantiated_002EGlobal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=MemberCanBeMadeStatic_002ELocal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=InterpolatedStringExpressionIsNotIFormattable/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/DEFAULT_INTERNAL_MODIFIER/@EntryValue">Implicit</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/DEFAULT_PRIVATE_MODIFIER/@EntryValue">Implicit</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/METHOD_OR_OPERATOR_BODY/@EntryValue">ExpressionBody</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/ThisQualifier/INSTANCE_MEMBERS_QUALIFY_MEMBERS/@EntryValue">0</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/ANONYMOUS_METHOD_DECLARATION_BRACES/@EntryValue">NEXT_LINE</s:String>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_USER_LINEBREAKS/@EntryValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_AFTER_INVOCATION_LPAR/@EntryValue">False</s:Boolean>
<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_ATTRIBUTE_LENGTH_FOR_SAME_LINE/@EntryValue">120</s:Int64>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_FIELD_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">IF_OWNER_IS_SINGLE_LINE</s:String>
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_ARGUMENTS_STYLE/@EntryValue">WRAP_IF_LONG</s:String>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_ANONYMOUSMETHOD_ON_SINGLE_LINE/@EntryValue">False</s:Boolean>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateStaticFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=4a98fdf6_002D7d98_002D4f5a_002Dafeb_002Dea44ad98c70c/@EntryIndexedValue">&lt;Policy&gt;&lt;Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"&gt;&lt;ElementKinds&gt;&lt;Kind Name="FIELD" /&gt;&lt;Kind Name="READONLY_FIELD" /&gt;&lt;/ElementKinds&gt;&lt;/Descriptor&gt;&lt;Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /&gt;&lt;/Policy&gt;</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=f9fce829_002De6f4_002D4cb2_002D80f1_002D5497c44f51df/@EntryIndexedValue">&lt;Policy&gt;&lt;Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"&gt;&lt;ElementKinds&gt;&lt;Kind Name="FIELD" /&gt;&lt;/ElementKinds&gt;&lt;/Descriptor&gt;&lt;Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /&gt;&lt;/Policy&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpAttributeForSingleLineMethodUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EPredefinedNamingRulesToUserRulesUpgrade/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

View File

@ -2,19 +2,19 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Client" Version="8.1.0" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
<PackageReference Include="Microsoft.Orleans.Client" Version="9.0.1" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.0.1" />
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Provider\Provider.csproj" />
<ProjectReference Include="..\..\Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj" />
</ItemGroup>
</Project>

View File

@ -2,17 +2,16 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Runtime;
using Provider;
using Orleans.Streams;
using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
using IHost host = new HostBuilder()
.UseOrleansClient(clientBuilder =>
{
clientBuilder.Services.AddSingleton<IDatabase>(sp =>
clientBuilder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
{
IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase();
return db;
return ConnectionMultiplexer.Connect("localhost");
});
clientBuilder.UseLocalhostClustering();
@ -38,6 +37,15 @@ var streamProvider = client.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("numbergenerator", "consecutive");
var stream = streamProvider.GetStream<int>(streamId);
var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
var newStream = streamProvider.GetStream<int>(newStreamId);
await stream.SubscribeAsync((i, token) =>
{
logger.LogInformation("Received back number {Number}", i);
return Task.CompletedTask;
});
var task = Task.Run(async () =>
{
var num = 0;

View File

@ -2,21 +2,21 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Streams;
using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
var builder = new HostBuilder()
.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.Services.AddSingleton<IDatabase>(sp =>
silo.Services.AddSingleton<IConnectionMultiplexer>(sp =>
{
return ConnectionMultiplexer.Connect("localhost").GetDatabase();
return ConnectionMultiplexer.Connect("localhost");
});
silo.ConfigureLogging(logging => logging.AddConsole());
silo.AddMemoryGrainStorage("PubSubStore");
silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null);
silo.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null);
silo.AddMemoryGrainStorageAsDefault();
}).UseConsoleLifetime();
builder.ConfigureServices(services =>
@ -66,6 +66,11 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver
_logger.LogInformation("Received number {Number}", item);
await Task.Delay(2000);
var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
var newStream = this.GetStreamProvider("RedisStream").GetStream<int>(newStreamId);
_logger.LogInformation("Sending number {Number} to new stream", item);
await newStream.OnNextAsync(item);
}
}

View File

@ -2,19 +2,19 @@
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Orleans.Server" Version="8.1.0" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
<PackageReference Include="Microsoft.Orleans.Server" Version="9.0.1" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.0.1" />
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Provider\Provider.csproj" />
<ProjectReference Include="..\..\Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj" />
</ItemGroup>
</Project>