net9-update-refactorings #1
52
.gitea/workflows/ci-pipeline.yml
Normal file
52
.gitea/workflows/ci-pipeline.yml
Normal file
@ -0,0 +1,52 @@
|
||||
name: CI Build
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ master ]
|
||||
paths-ignore:
|
||||
- '**/*.md'
|
||||
- '**/*.gitignore'
|
||||
- '**/*.gitattributes'
|
||||
pull_request:
|
||||
branches: [ master ]
|
||||
workflow_dispatch:
|
||||
permissions:
|
||||
contents: write
|
||||
packages: write
|
||||
|
||||
env:
|
||||
DOTNET_NOLOGO: true # Disable the .NET logo
|
||||
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true # Disable the .NET first time experience
|
||||
DOTNET_CLI_TELEMETRY_OPTOUT: true # Disable sending .NET CLI telemetry
|
||||
MINOR_VERSION_OVERRIDE: 0
|
||||
|
||||
GITEA_SERVER_URL: ${{ vars.V_GITEA_SERVER_URL }}
|
||||
GITEA_PACKAGE_OWNER: ${{ vars.V_GITEA_PACKAGE_OWNER }}
|
||||
GITEA_NUGET_SOURCE_NAME: ${{ vars.V_GITEA_NUGET_SOURCE_NAME }}
|
||||
GITEA_PACKAGE_OWNER_USER: ${{ secrets.S_GITEA_PACKAGE_OWNER_USER }}
|
||||
GITEA_PACKAGE_OWNER_PASSWORD: ${{ secrets.S_GITEA_PACKAGE_OWNER_PASSWORD }}
|
||||
|
||||
|
||||
jobs:
|
||||
build-ci-api:
|
||||
runs-on: [linux,self-hosted]
|
||||
name: CI Build API
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: https://github.com/actions/checkout@v4
|
||||
|
||||
- name: Setup .NET 9
|
||||
uses: https://github.com/actions/setup-dotnet@v4
|
||||
with:
|
||||
dotnet-version: 9.x
|
||||
|
||||
- name: Make Build File Executable
|
||||
shell: bash
|
||||
run: |
|
||||
chmod +x ./build.cmd
|
||||
chmod +x ./build.sh
|
||||
|
||||
- name: Run Nuke Build
|
||||
shell: bash
|
||||
run: |
|
||||
./build.cmd -Target Publish
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@ -360,4 +360,7 @@ MigrationBackup/
|
||||
.ionide/
|
||||
|
||||
# Fody - auto-generated XML schema
|
||||
FodyWeavers.xsd
|
||||
FodyWeavers.xsd
|
||||
|
||||
# Mac Finder
|
||||
*.DS_Store
|
||||
|
||||
13
.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
generated
vendored
Normal file
13
.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
generated
vendored
Normal file
@ -0,0 +1,13 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Rider ignored files
|
||||
/contentModel.xml
|
||||
/.idea.RedisStreamsInOrleans.iml
|
||||
/modules.xml
|
||||
/projectSettingsUpdater.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
4
.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
generated
Normal file
4
.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
generated
Normal file
@ -0,0 +1,4 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="Encoding" addBOMForNewFiles="with BOM under Windows, with no BOM otherwise" />
|
||||
</project>
|
||||
8
.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
generated
Normal file
8
.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
generated
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="UserContentModel">
|
||||
<attachedFolders />
|
||||
<explicitIncludes />
|
||||
<explicitExcludes />
|
||||
</component>
|
||||
</project>
|
||||
6
.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
generated
Normal file
6
.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
generated
Normal file
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
146
.nuke/build.schema.json
Normal file
146
.nuke/build.schema.json
Normal file
@ -0,0 +1,146 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-04/schema#",
|
||||
"definitions": {
|
||||
"Host": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"AppVeyor",
|
||||
"AzurePipelines",
|
||||
"Bamboo",
|
||||
"Bitbucket",
|
||||
"Bitrise",
|
||||
"GitHubActions",
|
||||
"GitLab",
|
||||
"Jenkins",
|
||||
"Rider",
|
||||
"SpaceAutomation",
|
||||
"TeamCity",
|
||||
"Terminal",
|
||||
"TravisCI",
|
||||
"VisualStudio",
|
||||
"VSCode"
|
||||
]
|
||||
},
|
||||
"ExecutableTarget": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"Clean",
|
||||
"Compile",
|
||||
"CreateAndPushGitTag",
|
||||
"Pack",
|
||||
"Publish",
|
||||
"Test"
|
||||
]
|
||||
},
|
||||
"Verbosity": {
|
||||
"type": "string",
|
||||
"description": "",
|
||||
"enum": [
|
||||
"Verbose",
|
||||
"Normal",
|
||||
"Minimal",
|
||||
"Quiet"
|
||||
]
|
||||
},
|
||||
"NukeBuild": {
|
||||
"properties": {
|
||||
"Continue": {
|
||||
"type": "boolean",
|
||||
"description": "Indicates to continue a previously failed build attempt"
|
||||
},
|
||||
"Help": {
|
||||
"type": "boolean",
|
||||
"description": "Shows the help text for this build assembly"
|
||||
},
|
||||
"Host": {
|
||||
"description": "Host for execution. Default is 'automatic'",
|
||||
"$ref": "#/definitions/Host"
|
||||
},
|
||||
"NoLogo": {
|
||||
"type": "boolean",
|
||||
"description": "Disables displaying the NUKE logo"
|
||||
},
|
||||
"Partition": {
|
||||
"type": "string",
|
||||
"description": "Partition to use on CI"
|
||||
},
|
||||
"Plan": {
|
||||
"type": "boolean",
|
||||
"description": "Shows the execution plan (HTML)"
|
||||
},
|
||||
"Profile": {
|
||||
"type": "array",
|
||||
"description": "Defines the profiles to load",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"Root": {
|
||||
"type": "string",
|
||||
"description": "Root directory during build execution"
|
||||
},
|
||||
"Skip": {
|
||||
"type": "array",
|
||||
"description": "List of targets to be skipped. Empty list skips all dependencies",
|
||||
"items": {
|
||||
"$ref": "#/definitions/ExecutableTarget"
|
||||
}
|
||||
},
|
||||
"Target": {
|
||||
"type": "array",
|
||||
"description": "List of targets to be invoked. Default is '{default_target}'",
|
||||
"items": {
|
||||
"$ref": "#/definitions/ExecutableTarget"
|
||||
}
|
||||
},
|
||||
"Verbosity": {
|
||||
"description": "Logging verbosity during build execution. Default is 'Normal'",
|
||||
"$ref": "#/definitions/Verbosity"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"allOf": [
|
||||
{
|
||||
"properties": {
|
||||
"BuildNumber": {
|
||||
"type": "string",
|
||||
"description": "Build number override"
|
||||
},
|
||||
"Configuration": {
|
||||
"type": "string",
|
||||
"description": "Configuration to build - Default is 'Debug' (local) or 'Release' (server)",
|
||||
"enum": [
|
||||
"Debug",
|
||||
"Release"
|
||||
]
|
||||
},
|
||||
"GiteaNugetSourceName": {
|
||||
"type": "string",
|
||||
"description": "Gitea Nuget package source name"
|
||||
},
|
||||
"GiteaPackageOwnerPassword": {
|
||||
"type": "string",
|
||||
"description": "Gitea package owner password",
|
||||
"default": "Secrets must be entered via 'nuke :secrets [profile]'"
|
||||
},
|
||||
"GiteaPackageOwnerUser": {
|
||||
"type": "string",
|
||||
"description": "Gitea package owner user",
|
||||
"default": "Secrets must be entered via 'nuke :secrets [profile]'"
|
||||
},
|
||||
"NuGetApiKey": {
|
||||
"type": "string",
|
||||
"description": "NuGet API key"
|
||||
},
|
||||
"Solution": {
|
||||
"type": "string",
|
||||
"description": "Path to a solution file that is automatically loaded"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"$ref": "#/definitions/NukeBuild"
|
||||
}
|
||||
]
|
||||
}
|
||||
4
.nuke/parameters.json
Normal file
4
.nuke/parameters.json
Normal file
@ -0,0 +1,4 @@
|
||||
{
|
||||
"$schema": "build.schema.json",
|
||||
"Solution": "RedisStreamsInOrleans.sln"
|
||||
}
|
||||
@ -1,40 +0,0 @@
|
||||
using Orleans.Runtime;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace Provider
|
||||
{
|
||||
public class RedisStreamBatchContainer : IBatchContainer
|
||||
{
|
||||
public StreamId StreamId { get; }
|
||||
|
||||
public StreamSequenceToken SequenceToken { get; }
|
||||
public StreamEntry StreamEntry { get; }
|
||||
public RedisStreamBatchContainer(StreamEntry streamEntry)
|
||||
{
|
||||
StreamEntry = streamEntry;
|
||||
var streamNamespace = StreamEntry.Values[0].Value;
|
||||
var steamKey = StreamEntry.Values[1].Value;
|
||||
StreamId = StreamId.Create(streamNamespace!, steamKey!);
|
||||
SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
|
||||
}
|
||||
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
|
||||
{
|
||||
List<Tuple<T, StreamSequenceToken>> events = new();
|
||||
var eventType = typeof(T).Name;
|
||||
if (eventType == StreamEntry.Values[2].Value)
|
||||
{
|
||||
var data = StreamEntry.Values[3].Value;
|
||||
var @event = JsonSerializer.Deserialize<T>(data!);
|
||||
events.Add(new(@event!, SequenceToken));
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
public bool ImportRequestContext()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
85
README.md
85
README.md
@ -1,3 +1,82 @@
|
||||
# RedisStreamsInOrleans
|
||||
This is a project that demonstrates how to create a custom Redis Stream Provider for Orleans.
|
||||
https://swacblooms.com/custom-redis-streams-provider-for-orleans/
|
||||
# Universley.OrleansContrib.StreamsProvider.Redis
|
||||
|
||||
## Summary
|
||||
This library provides an integration of Redis Streams with Microsoft Orleans, allowing you to use Redis as a streaming provider within your Orleans applications. It enables seamless communication and data streaming between Orleans grains and external clients using Redis Streams.
|
||||
|
||||
## How to Use the Redis Provider with Orleans
|
||||
|
||||
### 1. With Grain as a Client
|
||||
To use Redis Streams with a grain as a client, follow these steps:
|
||||
|
||||
1. Install the necessary NuGet packages:
|
||||
```sh
|
||||
dotnet add package Orleans.Streaming.Redis
|
||||
```
|
||||
|
||||
2. Configure the Redis stream provider in your Orleans silo configuration:
|
||||
```csharp
|
||||
var host = new SiloHostBuilder()
|
||||
.AddRedisStreams("RedisProvider", options =>
|
||||
{
|
||||
options.ConnectionString = "your_redis_connection_string";
|
||||
})
|
||||
.Build();
|
||||
```
|
||||
|
||||
3. In your grain, use the stream provider to send and receive messages:
|
||||
```csharp
|
||||
public class MyGrain : Grain, IMyGrain
|
||||
{
|
||||
private IAsyncStream<string> _stream;
|
||||
|
||||
public override async Task OnActivateAsync()
|
||||
{
|
||||
var streamProvider = GetStreamProvider("RedisProvider");
|
||||
_stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "streamNamespace");
|
||||
await base.OnActivateAsync();
|
||||
}
|
||||
|
||||
public async Task SendMessage(string message)
|
||||
{
|
||||
await _stream.OnNextAsync(message);
|
||||
}
|
||||
|
||||
public async Task ReceiveMessages()
|
||||
{
|
||||
var handle = await _stream.SubscribeAsync((message, token) =>
|
||||
{
|
||||
Console.WriteLine($"Received message: {message}");
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 2. With External Client
|
||||
To use Redis Streams with an external client, follow these steps:
|
||||
|
||||
1. Install the StackExchange.Redis package:
|
||||
```sh
|
||||
dotnet add package StackExchange.Redis
|
||||
```
|
||||
|
||||
2. Connect to the Redis server and interact with the stream:
|
||||
```csharp
|
||||
using StackExchange.Redis;
|
||||
|
||||
var redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
|
||||
var db = redis.GetDatabase();
|
||||
|
||||
// Add a message to the stream
|
||||
var messageId = await db.StreamAddAsync("mystream", "message", "Hello, Redis!");
|
||||
|
||||
// Read messages from the stream
|
||||
var messages = await db.StreamReadAsync("mystream", "0-0");
|
||||
foreach (var message in messages)
|
||||
{
|
||||
Console.WriteLine($"Message ID: {message.Id}, Values: {string.Join(", ", message.Values)}");
|
||||
}
|
||||
```
|
||||
|
||||
## Credit
|
||||
This library is based on the original repository by [sammychinedu2ky](https://github.com/sammychinedu2ky/RedisStreamsInOrleans).
|
||||
@ -3,11 +3,15 @@ Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 17
|
||||
VisualStudioVersion = 17.10.34928.147
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}"
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "example\Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}"
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "example\Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}"
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Universley.OrleansContrib.StreamsProvider.Redis", "Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RedisStreamsProvider.UnitTests", "RedisStreamsProvider.UnitTests\RedisStreamsProvider.UnitTests.csproj", "{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}"
|
||||
EndProject
|
||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "_build", "build\_build.csproj", "{EE8F826D-AFB3-4708-BBA3-894C1B145C44}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
@ -15,6 +19,8 @@ Global
|
||||
Release|Any CPU = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{EE8F826D-AFB3-4708-BBA3-894C1B145C44}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
@ -27,6 +33,10 @@ Global
|
||||
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{DF927C2B-A141-4476-86CF-3B4DC8ECB4DE}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
|
||||
45
RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs
Normal file
45
RedisStreamsProvider.UnitTests/RedisStreamAdapterTests.cs
Normal file
@ -0,0 +1,45 @@
|
||||
using Moq;
|
||||
using StackExchange.Redis;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Streams;
|
||||
using Orleans.Configuration;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
public class RedisStreamAdapterTests
|
||||
{
|
||||
private readonly Mock<IDatabase> _mockDatabase;
|
||||
private readonly Mock<HashRingBasedStreamQueueMapper> _mockQueueMapper;
|
||||
private readonly Mock<ILoggerFactory> _mockLoggerFactory;
|
||||
private readonly RedisStreamAdapter _adapter;
|
||||
|
||||
public RedisStreamAdapterTests()
|
||||
{
|
||||
_mockDatabase = new Mock<IDatabase>();
|
||||
var options = new HashRingStreamQueueMapperOptions { TotalQueueCount = 1 };
|
||||
_mockQueueMapper = new Mock<HashRingBasedStreamQueueMapper>(options, "queueNamePrefix");
|
||||
_mockLoggerFactory = new Mock<ILoggerFactory>();
|
||||
|
||||
_adapter = new RedisStreamAdapter(_mockDatabase.Object, "TestProvider", _mockQueueMapper.Object, _mockLoggerFactory.Object);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_ShouldInitializeProperties()
|
||||
{
|
||||
Assert.Equal("TestProvider", _adapter.Name);
|
||||
Assert.False(_adapter.IsRewindable);
|
||||
Assert.Equal(StreamProviderDirection.ReadWrite, _adapter.Direction);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CreateReceiver_ShouldReturnRedisStreamReceiver()
|
||||
{
|
||||
var queueId = QueueId.GetQueueId("queueName", 0, 1);
|
||||
var receiver = _adapter.CreateReceiver(queueId);
|
||||
|
||||
Assert.NotNull(receiver);
|
||||
Assert.IsType<RedisStreamReceiver>(receiver);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,79 @@
|
||||
using StackExchange.Redis;
|
||||
using System.Text.Json;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
public class RedisStreamBatchContainerTests
|
||||
{
|
||||
[Fact]
|
||||
public void Constructor_ShouldInitializeProperties()
|
||||
{
|
||||
// Arrange
|
||||
var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
|
||||
{
|
||||
new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey"),
|
||||
new NameValueEntry("type", "TestEvent"),
|
||||
new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
|
||||
});
|
||||
|
||||
// Act
|
||||
var container = new RedisStreamBatchContainer(streamEntry);
|
||||
|
||||
// Assert
|
||||
Assert.Equal("testNamespace", container.StreamId.GetNamespace());
|
||||
Assert.Equal("testKey", container.StreamId.GetKeyAsString());
|
||||
Assert.Equal(streamEntry.Id.ToString().Split('-').First(), container.SequenceToken.SequenceNumber.ToString());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetEvents_ShouldReturnDeserializedEvents()
|
||||
{
|
||||
// Arrange
|
||||
var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
|
||||
{
|
||||
new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey"),
|
||||
new NameValueEntry("type", "TestEvent"),
|
||||
new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
|
||||
});
|
||||
var container = new RedisStreamBatchContainer(streamEntry);
|
||||
|
||||
// Act
|
||||
var events = container.GetEvents<TestEvent>().ToList();
|
||||
|
||||
// Assert
|
||||
Assert.Single(events);
|
||||
Assert.Equal(1, events[0].Item1.Id);
|
||||
Assert.Equal("Test", events[0].Item1.Name);
|
||||
Assert.Equal(streamEntry.Id.ToString().Split('-').First(), events[0].Item2.SequenceNumber.ToString());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ImportRequestContext_ShouldReturnFalse()
|
||||
{
|
||||
// Arrange
|
||||
var streamEntry = new StreamEntry("1-0", new NameValueEntry[]
|
||||
{
|
||||
new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey"),
|
||||
new NameValueEntry("type", "TestEvent"),
|
||||
new NameValueEntry("data", JsonSerializer.Serialize(new TestEvent { Id = 1, Name = "Test" }))
|
||||
});
|
||||
var container = new RedisStreamBatchContainer(streamEntry);
|
||||
|
||||
// Act
|
||||
var result = container.ImportRequestContext();
|
||||
|
||||
// Assert
|
||||
Assert.False(result);
|
||||
}
|
||||
|
||||
private class TestEvent
|
||||
{
|
||||
public int Id { get; set; }
|
||||
public string Name { get; set; }
|
||||
}
|
||||
}
|
||||
}
|
||||
88
RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs
Normal file
88
RedisStreamsProvider.UnitTests/RedisStreamFactoryTests.cs
Normal file
@ -0,0 +1,88 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Orleans.Configuration;
|
||||
using Orleans.Providers.Streams.Common;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
public class RedisStreamFactoryTests
|
||||
{
|
||||
private readonly Mock<IConnectionMultiplexer> _mockConnectionMultiplexer;
|
||||
private readonly Mock<ILoggerFactory> _mockLoggerFactory;
|
||||
private readonly Mock<IServiceProvider> _mockServiceProvider;
|
||||
private readonly Mock<IStreamFailureHandler> _mockStreamFailureHandler;
|
||||
private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
|
||||
private readonly HashRingStreamQueueMapperOptions _hashRingStreamQueueMapperOptions;
|
||||
private readonly string _providerName = "TestProvider";
|
||||
|
||||
public RedisStreamFactoryTests()
|
||||
{
|
||||
_mockConnectionMultiplexer = new Mock<IConnectionMultiplexer>();
|
||||
_mockConnectionMultiplexer.Setup(x => x.GetDatabase(It.IsAny<int>(), It.IsAny<object>())).Returns(new Mock<IDatabase>().Object);
|
||||
|
||||
_mockLoggerFactory = new Mock<ILoggerFactory>();
|
||||
_mockServiceProvider = new Mock<IServiceProvider>();
|
||||
_mockStreamFailureHandler = new Mock<IStreamFailureHandler>();
|
||||
_simpleQueueCacheOptions = new SimpleQueueCacheOptions();
|
||||
_hashRingStreamQueueMapperOptions = new HashRingStreamQueueMapperOptions();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_ShouldThrowArgumentNullException_WhenAnyArgumentIsNull()
|
||||
{
|
||||
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(null, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
|
||||
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, null, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
|
||||
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, null, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
|
||||
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, null, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions));
|
||||
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, null, _hashRingStreamQueueMapperOptions));
|
||||
Assert.Throws<ArgumentNullException>(() => new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, null));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CreateAdapter_ShouldReturnRedisStreamAdapterInstance()
|
||||
{
|
||||
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
|
||||
|
||||
var adapter = await factory.CreateAdapter();
|
||||
|
||||
Assert.NotNull(adapter);
|
||||
Assert.IsType<RedisStreamAdapter>(adapter);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetDeliveryFailureHandler_ShouldReturnStreamFailureHandler()
|
||||
{
|
||||
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
|
||||
|
||||
var handler = await factory.GetDeliveryFailureHandler(new QueueId());
|
||||
|
||||
Assert.NotNull(handler);
|
||||
Assert.Equal(_mockStreamFailureHandler.Object, handler);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetQueueAdapterCache_ShouldReturnSimpleQueueAdapterCacheInstance()
|
||||
{
|
||||
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
|
||||
|
||||
var cache = factory.GetQueueAdapterCache();
|
||||
|
||||
Assert.NotNull(cache);
|
||||
Assert.IsType<SimpleQueueAdapterCache>(cache);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetStreamQueueMapper_ShouldReturnHashRingBasedStreamQueueMapperInstance()
|
||||
{
|
||||
var factory = new RedisStreamFactory(_mockConnectionMultiplexer.Object, _mockLoggerFactory.Object, _providerName, _mockStreamFailureHandler.Object, _simpleQueueCacheOptions, _hashRingStreamQueueMapperOptions);
|
||||
|
||||
var mapper = factory.GetStreamQueueMapper();
|
||||
|
||||
Assert.NotNull(mapper);
|
||||
Assert.IsType<HashRingBasedStreamQueueMapper>(mapper);
|
||||
}
|
||||
}
|
||||
}
|
||||
123
RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs
Normal file
123
RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs
Normal file
@ -0,0 +1,123 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Moq;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
public class RedisStreamReceiverTests
|
||||
{
|
||||
private readonly Mock<IDatabase> _mockDatabase;
|
||||
private readonly Mock<ILogger<RedisStreamReceiver>> _mockLogger;
|
||||
private readonly QueueId _queueId;
|
||||
private readonly RedisStreamReceiver _receiver;
|
||||
|
||||
public RedisStreamReceiverTests()
|
||||
{
|
||||
_mockDatabase = new Mock<IDatabase>();
|
||||
_mockLogger = new Mock<ILogger<RedisStreamReceiver>>();
|
||||
_queueId = QueueId.GetQueueId("testQueue", 0, 0); // Added the missing 'hash' parameter
|
||||
_receiver = new RedisStreamReceiver(_queueId, _mockDatabase.Object, _mockLogger.Object);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetQueueMessagesAsync_ReturnsBatches()
|
||||
{
|
||||
// Arrange
|
||||
var streamEntries = new[]
|
||||
{
|
||||
new StreamEntry("1-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
]),
|
||||
new StreamEntry("2-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
])
|
||||
};
|
||||
_mockDatabase.Setup(db => db.StreamReadGroupAsync(
|
||||
It.IsAny<RedisKey>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue?>(),
|
||||
It.IsAny<int?>(), It.IsAny<bool>(), CommandFlags.None))
|
||||
.ReturnsAsync(streamEntries);
|
||||
|
||||
// Act
|
||||
var result = await _receiver.GetQueueMessagesAsync(10);
|
||||
|
||||
// Assert
|
||||
Assert.NotNull(result);
|
||||
Assert.Equal(2, result.Count);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Initialize_CreatesConsumerGroup()
|
||||
{
|
||||
// Arrange
|
||||
_mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
|
||||
It.IsAny<RedisValue>(), It.IsAny<bool>(), CommandFlags.None))
|
||||
.ReturnsAsync(true);
|
||||
|
||||
// Act
|
||||
await _receiver.Initialize(TimeSpan.FromSeconds(5));
|
||||
|
||||
// Assert
|
||||
_mockDatabase.Verify(
|
||||
db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None),
|
||||
Times.Once);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MessagesDeliveredAsync_AcknowledgesMessages()
|
||||
{
|
||||
// Arrange
|
||||
var messages = new List<IBatchContainer>
|
||||
{
|
||||
new RedisStreamBatchContainer(new StreamEntry("1-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
])),
|
||||
new RedisStreamBatchContainer(new StreamEntry("2-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
]))
|
||||
};
|
||||
_mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
|
||||
It.IsAny<RedisValue>(), CommandFlags.None))
|
||||
.ReturnsAsync(2);
|
||||
|
||||
// Act
|
||||
await _receiver.MessagesDeliveredAsync(messages);
|
||||
|
||||
// Assert
|
||||
_mockDatabase.Verify(
|
||||
db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once);
|
||||
_mockDatabase.Verify(
|
||||
db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Shutdown_WaitsForPendingTasks()
|
||||
{
|
||||
// Arrange
|
||||
var tcs = new TaskCompletionSource<StreamEntry[]>();
|
||||
_mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
|
||||
It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<int>(), CommandFlags.None))
|
||||
.Returns(tcs.Task);
|
||||
|
||||
// Act
|
||||
var getMessagesTask = _receiver.GetQueueMessagesAsync(10);
|
||||
await _receiver.Shutdown(TimeSpan.FromSeconds(5));
|
||||
|
||||
// Assert
|
||||
Assert.True(getMessagesTask.IsCompleted);
|
||||
}
|
||||
}
|
||||
}
|
||||
130
RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs
Normal file
130
RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs
Normal file
@ -0,0 +1,130 @@
|
||||
using Moq;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
public class RedisStreamSequenceTokenTests
|
||||
{
|
||||
[Fact]
|
||||
public void Constructor_ShouldInitializeProperties_FromRedisValue()
|
||||
{
|
||||
// Arrange
|
||||
var redisValue = new RedisValue("123-456");
|
||||
|
||||
// Act
|
||||
var token = new RedisStreamSequenceToken(redisValue);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(123, token.SequenceNumber);
|
||||
Assert.Equal(456, token.EventIndex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_ShouldInitializeProperties_FromParameters()
|
||||
{
|
||||
// Arrange
|
||||
long sequenceNumber = 123;
|
||||
int eventIndex = 456;
|
||||
|
||||
// Act
|
||||
var token = new RedisStreamSequenceToken(sequenceNumber, eventIndex);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(sequenceNumber, token.SequenceNumber);
|
||||
Assert.Equal(eventIndex, token.EventIndex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompareTo_ShouldReturnZero_ForEqualTokens()
|
||||
{
|
||||
// Arrange
|
||||
var token1 = new RedisStreamSequenceToken(123, 456);
|
||||
var token2 = new RedisStreamSequenceToken(123, 456);
|
||||
|
||||
// Act
|
||||
var result = token1.CompareTo(token2);
|
||||
|
||||
// Assert
|
||||
Assert.Equal(0, result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompareTo_ShouldReturnPositive_ForGreaterToken()
|
||||
{
|
||||
// Arrange
|
||||
var token1 = new RedisStreamSequenceToken(123, 456);
|
||||
var token2 = new RedisStreamSequenceToken(123, 455);
|
||||
|
||||
// Act
|
||||
var result = token1.CompareTo(token2);
|
||||
|
||||
// Assert
|
||||
Assert.True(result > 0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompareTo_ShouldReturnNegative_ForLesserToken()
|
||||
{
|
||||
// Arrange
|
||||
var token1 = new RedisStreamSequenceToken(123, 455);
|
||||
var token2 = new RedisStreamSequenceToken(123, 456);
|
||||
|
||||
// Act
|
||||
var result = token1.CompareTo(token2);
|
||||
|
||||
// Assert
|
||||
Assert.True(result < 0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Equals_ShouldReturnTrue_ForEqualTokens()
|
||||
{
|
||||
// Arrange
|
||||
var token1 = new RedisStreamSequenceToken(123, 456);
|
||||
var token2 = new RedisStreamSequenceToken(123, 456);
|
||||
|
||||
// Act
|
||||
var result = token1.Equals(token2);
|
||||
|
||||
// Assert
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Equals_ShouldReturnFalse_ForDifferentTokens()
|
||||
{
|
||||
// Arrange
|
||||
var token1 = new RedisStreamSequenceToken(123, 456);
|
||||
var token2 = new RedisStreamSequenceToken(123, 457);
|
||||
|
||||
// Act
|
||||
var result = token1.Equals(token2);
|
||||
|
||||
// Assert
|
||||
Assert.False(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompareTo_ShouldThrowArgumentNullException_ForNullToken()
|
||||
{
|
||||
// Arrange
|
||||
var token = new RedisStreamSequenceToken(123, 456);
|
||||
|
||||
// Act & Assert
|
||||
Assert.Throws<ArgumentNullException>(() => token.CompareTo(null));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompareTo_ShouldThrowArgumentException_ForInvalidTokenType()
|
||||
{
|
||||
// Arrange
|
||||
var token = new RedisStreamSequenceToken(123, 456);
|
||||
var invalidToken = new Mock<StreamSequenceToken>().Object;
|
||||
|
||||
// Act & Assert
|
||||
Assert.Throws<ArgumentException>(() => token.CompareTo(invalidToken));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,26 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
<IsPackable>false</IsPackable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="coverlet.collector" Version="6.0.2" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
|
||||
<PackageReference Include="Moq" Version="4.20.72" />
|
||||
<PackageReference Include="xunit" Version="2.9.2" />
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Using Include="Xunit" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@ -1,10 +1,9 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Runtime;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace Provider
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
public class RedisStreamAdapter : IQueueAdapter
|
||||
{
|
||||
@ -16,10 +15,10 @@ namespace Provider
|
||||
|
||||
public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory)
|
||||
{
|
||||
_database = database;
|
||||
_providerName = providerName;
|
||||
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper;
|
||||
_loggerFactory = loggerFactory;
|
||||
_database = database ?? throw new ArgumentNullException(nameof(database));
|
||||
_providerName = providerName ?? throw new ArgumentNullException(nameof(providerName));
|
||||
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper ?? throw new ArgumentNullException(nameof(hashRingBasedStreamQueueMapper));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_logger = loggerFactory.CreateLogger<RedisStreamAdapter>();
|
||||
}
|
||||
|
||||
@ -0,0 +1,75 @@
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
[GenerateSerializer]
|
||||
[Alias("Universley.OrleansContrib.StreamsProvider.Redis.RedisStreamBatchContainer")]
|
||||
public class RedisStreamBatchContainer : IBatchContainer
|
||||
{
|
||||
[Id(0)]
|
||||
public StreamId StreamId { get; }
|
||||
|
||||
[Id(1)]
|
||||
public StreamSequenceToken SequenceToken { get; }
|
||||
|
||||
[Id(2)]
|
||||
public string EventType { get; }
|
||||
|
||||
[Id(3)]
|
||||
public string Data { get; }
|
||||
|
||||
[Id(4)]
|
||||
public string StreamEntryId { get; }
|
||||
|
||||
public RedisStreamBatchContainer(StreamEntry streamEntry)
|
||||
{
|
||||
var streamNamespace = streamEntry.Values[0].Value;
|
||||
var steamKey = streamEntry.Values[1].Value;
|
||||
var eventType = streamEntry.Values[2].Value;
|
||||
var data = streamEntry.Values[3].Value;
|
||||
StreamEntryId = streamEntry.Id.ToString() ?? throw new ArgumentNullException(nameof(streamEntry.Id));
|
||||
|
||||
// Check incoming data
|
||||
if (string.IsNullOrWhiteSpace(streamNamespace))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(streamNamespace));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(steamKey))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(steamKey));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(eventType))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(eventType));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(data))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(data));
|
||||
}
|
||||
|
||||
StreamId = StreamId.Create(streamNamespace!, steamKey!);
|
||||
SequenceToken = new RedisStreamSequenceToken(streamEntry.Id);
|
||||
EventType = eventType!;
|
||||
Data = data!;
|
||||
}
|
||||
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
|
||||
{
|
||||
List<Tuple<T, StreamSequenceToken>> events = new();
|
||||
var eventType = typeof(T).Name;
|
||||
if (eventType == EventType)
|
||||
{
|
||||
var data = Data;
|
||||
var @event = JsonSerializer.Deserialize<T>(data);
|
||||
events.Add(new(@event!, SequenceToken));
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
public bool ImportRequestContext()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,18 +5,18 @@ using Orleans.Providers.Streams.Common;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace Provider
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
public class RedisStreamFactory : IQueueAdapterFactory
|
||||
{
|
||||
private readonly IDatabase _database;
|
||||
private readonly IConnectionMultiplexer _connectionMultiplexer;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly string _providerName;
|
||||
private readonly IStreamFailureHandler _streamFailureHandler;
|
||||
private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
|
||||
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
|
||||
|
||||
public RedisStreamFactory(IDatabase database,
|
||||
public RedisStreamFactory(IConnectionMultiplexer connectionMultiplexer,
|
||||
ILoggerFactory loggerFactory,
|
||||
string providerName,
|
||||
IStreamFailureHandler streamFailureHandler,
|
||||
@ -24,28 +24,33 @@ namespace Provider
|
||||
HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions
|
||||
)
|
||||
{
|
||||
_database = database;
|
||||
_loggerFactory = loggerFactory;
|
||||
_providerName = providerName;
|
||||
_streamFailureHandler = streamFailureHandler;
|
||||
_simpleQueueCacheOptions = simpleQueueCacheOptions;
|
||||
if (hashRingStreamQueueMapperOptions is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(hashRingStreamQueueMapperOptions));
|
||||
}
|
||||
|
||||
_connectionMultiplexer = connectionMultiplexer ?? throw new ArgumentNullException(nameof(connectionMultiplexer));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_providerName = providerName ?? throw new ArgumentNullException(nameof(providerName));
|
||||
_streamFailureHandler = streamFailureHandler ?? throw new ArgumentNullException(nameof(streamFailureHandler));
|
||||
_simpleQueueCacheOptions = simpleQueueCacheOptions ?? throw new ArgumentNullException(nameof(simpleQueueCacheOptions));
|
||||
_hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName);
|
||||
}
|
||||
|
||||
public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName)
|
||||
{
|
||||
var database = provider.GetRequiredService<IDatabase>();
|
||||
var connMuliplexer = provider.GetRequiredService<IConnectionMultiplexer>();
|
||||
var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
|
||||
var simpleQueueCacheOptions = provider.GetOptionsByName<SimpleQueueCacheOptions>(providerName);
|
||||
var hashRingStreamQueueMapperOptions = provider.GetOptionsByName<HashRingStreamQueueMapperOptions>(providerName);
|
||||
var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger<RedisStreamFailureHandler>());
|
||||
return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions);
|
||||
return new RedisStreamFactory(connMuliplexer, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions);
|
||||
|
||||
}
|
||||
|
||||
public Task<IQueueAdapter> CreateAdapter()
|
||||
{
|
||||
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory));
|
||||
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_connectionMultiplexer.GetDatabase(), _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory));
|
||||
}
|
||||
|
||||
public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
|
||||
@ -1,8 +1,7 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Runtime;
|
||||
using Orleans.Streams;
|
||||
|
||||
namespace Provider
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
public class RedisStreamFailureHandler : IStreamFailureHandler
|
||||
{
|
||||
@ -10,7 +9,7 @@ namespace Provider
|
||||
|
||||
public RedisStreamFailureHandler(ILogger<RedisStreamFailureHandler> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public bool ShouldFaultSubsriptionOnError => true;
|
||||
@ -3,7 +3,7 @@ using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
|
||||
|
||||
namespace Provider
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
public class RedisStreamReceiver : IQueueAdapterReceiver
|
||||
{
|
||||
@ -16,8 +16,8 @@ namespace Provider
|
||||
public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger<RedisStreamReceiver> logger)
|
||||
{
|
||||
_queueId = queueId;
|
||||
_database = database;
|
||||
_logger = logger;
|
||||
_database = database ?? throw new ArgumentNullException(nameof(database));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<IList<IBatchContainer>?> GetQueueMessagesAsync(int maxCount)
|
||||
@ -69,7 +69,7 @@ namespace Provider
|
||||
var container = message as RedisStreamBatchContainer;
|
||||
if (container != null)
|
||||
{
|
||||
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
|
||||
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntryId);
|
||||
pendingTasks = ack;
|
||||
await ack;
|
||||
}
|
||||
@ -1,15 +1,15 @@
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
|
||||
namespace Provider
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
[GenerateSerializer]
|
||||
public class RedisStreamSequenceToken : StreamSequenceToken
|
||||
{
|
||||
[Id(0)]
|
||||
public override long SequenceNumber { get; protected set; }
|
||||
public sealed override long SequenceNumber { get; protected set; }
|
||||
[Id(1)]
|
||||
public override int EventIndex { get; protected set; }
|
||||
public sealed override int EventIndex { get; protected set; }
|
||||
|
||||
public RedisStreamSequenceToken(RedisValue id)
|
||||
{
|
||||
@ -37,7 +37,7 @@ namespace Provider
|
||||
throw new ArgumentException("Invalid token type", nameof(other));
|
||||
}
|
||||
|
||||
public override bool Equals(StreamSequenceToken other)
|
||||
public override bool Equals(StreamSequenceToken? other)
|
||||
{
|
||||
var token = other as RedisStreamSequenceToken;
|
||||
return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex;
|
||||
@ -1,15 +1,15 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
|
||||
<PackageReference Include="Microsoft.Orleans.Sdk" Version="8.1.0" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.0.1" />
|
||||
<PackageReference Include="Microsoft.Orleans.Sdk" Version="9.0.1" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
7
build.cmd
Executable file
7
build.cmd
Executable file
@ -0,0 +1,7 @@
|
||||
:; set -eo pipefail
|
||||
:; SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
|
||||
:; ${SCRIPT_DIR}/build.sh "$@"
|
||||
:; exit $?
|
||||
|
||||
@ECHO OFF
|
||||
powershell -ExecutionPolicy ByPass -NoProfile -File "%~dp0build.ps1" %*
|
||||
74
build.ps1
Normal file
74
build.ps1
Normal file
@ -0,0 +1,74 @@
|
||||
[CmdletBinding()]
|
||||
Param(
|
||||
[Parameter(Position=0,Mandatory=$false,ValueFromRemainingArguments=$true)]
|
||||
[string[]]$BuildArguments
|
||||
)
|
||||
|
||||
Write-Output "PowerShell $($PSVersionTable.PSEdition) version $($PSVersionTable.PSVersion)"
|
||||
|
||||
Set-StrictMode -Version 2.0; $ErrorActionPreference = "Stop"; $ConfirmPreference = "None"; trap { Write-Error $_ -ErrorAction Continue; exit 1 }
|
||||
$PSScriptRoot = Split-Path $MyInvocation.MyCommand.Path -Parent
|
||||
|
||||
###########################################################################
|
||||
# CONFIGURATION
|
||||
###########################################################################
|
||||
|
||||
$BuildProjectFile = "$PSScriptRoot\build\_build.csproj"
|
||||
$TempDirectory = "$PSScriptRoot\\.nuke\temp"
|
||||
|
||||
$DotNetGlobalFile = "$PSScriptRoot\\global.json"
|
||||
$DotNetInstallUrl = "https://dot.net/v1/dotnet-install.ps1"
|
||||
$DotNetChannel = "STS"
|
||||
|
||||
$env:DOTNET_CLI_TELEMETRY_OPTOUT = 1
|
||||
$env:DOTNET_NOLOGO = 1
|
||||
|
||||
###########################################################################
|
||||
# EXECUTION
|
||||
###########################################################################
|
||||
|
||||
function ExecSafe([scriptblock] $cmd) {
|
||||
& $cmd
|
||||
if ($LASTEXITCODE) { exit $LASTEXITCODE }
|
||||
}
|
||||
|
||||
# If dotnet CLI is installed globally and it matches requested version, use for execution
|
||||
if ($null -ne (Get-Command "dotnet" -ErrorAction SilentlyContinue) -and `
|
||||
$(dotnet --version) -and $LASTEXITCODE -eq 0) {
|
||||
$env:DOTNET_EXE = (Get-Command "dotnet").Path
|
||||
}
|
||||
else {
|
||||
# Download install script
|
||||
$DotNetInstallFile = "$TempDirectory\dotnet-install.ps1"
|
||||
New-Item -ItemType Directory -Path $TempDirectory -Force | Out-Null
|
||||
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
|
||||
(New-Object System.Net.WebClient).DownloadFile($DotNetInstallUrl, $DotNetInstallFile)
|
||||
|
||||
# If global.json exists, load expected version
|
||||
if (Test-Path $DotNetGlobalFile) {
|
||||
$DotNetGlobal = $(Get-Content $DotNetGlobalFile | Out-String | ConvertFrom-Json)
|
||||
if ($DotNetGlobal.PSObject.Properties["sdk"] -and $DotNetGlobal.sdk.PSObject.Properties["version"]) {
|
||||
$DotNetVersion = $DotNetGlobal.sdk.version
|
||||
}
|
||||
}
|
||||
|
||||
# Install by channel or version
|
||||
$DotNetDirectory = "$TempDirectory\dotnet-win"
|
||||
if (!(Test-Path variable:DotNetVersion)) {
|
||||
ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Channel $DotNetChannel -NoPath }
|
||||
} else {
|
||||
ExecSafe { & powershell $DotNetInstallFile -InstallDir $DotNetDirectory -Version $DotNetVersion -NoPath }
|
||||
}
|
||||
$env:DOTNET_EXE = "$DotNetDirectory\dotnet.exe"
|
||||
$env:PATH = "$DotNetDirectory;$env:PATH"
|
||||
}
|
||||
|
||||
Write-Output "Microsoft (R) .NET SDK version $(& $env:DOTNET_EXE --version)"
|
||||
|
||||
if (Test-Path env:NUKE_ENTERPRISE_TOKEN) {
|
||||
& $env:DOTNET_EXE nuget remove source "nuke-enterprise" > $null
|
||||
& $env:DOTNET_EXE nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password $env:NUKE_ENTERPRISE_TOKEN > $null
|
||||
}
|
||||
|
||||
ExecSafe { & $env:DOTNET_EXE build $BuildProjectFile /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet }
|
||||
ExecSafe { & $env:DOTNET_EXE run --project $BuildProjectFile --no-build -- $BuildArguments }
|
||||
67
build.sh
Executable file
67
build.sh
Executable file
@ -0,0 +1,67 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
bash --version 2>&1 | head -n 1
|
||||
|
||||
set -eo pipefail
|
||||
SCRIPT_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
|
||||
|
||||
###########################################################################
|
||||
# CONFIGURATION
|
||||
###########################################################################
|
||||
|
||||
BUILD_PROJECT_FILE="$SCRIPT_DIR/build/_build.csproj"
|
||||
TEMP_DIRECTORY="$SCRIPT_DIR//.nuke/temp"
|
||||
|
||||
DOTNET_GLOBAL_FILE="$SCRIPT_DIR//global.json"
|
||||
DOTNET_INSTALL_URL="https://dot.net/v1/dotnet-install.sh"
|
||||
DOTNET_CHANNEL="STS"
|
||||
|
||||
export DOTNET_CLI_TELEMETRY_OPTOUT=1
|
||||
export DOTNET_NOLOGO=1
|
||||
|
||||
###########################################################################
|
||||
# EXECUTION
|
||||
###########################################################################
|
||||
|
||||
function FirstJsonValue {
|
||||
perl -nle 'print $1 if m{"'"$1"'": "([^"]+)",?}' <<< "${@:2}"
|
||||
}
|
||||
|
||||
# If dotnet CLI is installed globally and it matches requested version, use for execution
|
||||
if [ -x "$(command -v dotnet)" ] && dotnet --version &>/dev/null; then
|
||||
export DOTNET_EXE="$(command -v dotnet)"
|
||||
else
|
||||
# Download install script
|
||||
DOTNET_INSTALL_FILE="$TEMP_DIRECTORY/dotnet-install.sh"
|
||||
mkdir -p "$TEMP_DIRECTORY"
|
||||
curl -Lsfo "$DOTNET_INSTALL_FILE" "$DOTNET_INSTALL_URL"
|
||||
chmod +x "$DOTNET_INSTALL_FILE"
|
||||
|
||||
# If global.json exists, load expected version
|
||||
if [[ -f "$DOTNET_GLOBAL_FILE" ]]; then
|
||||
DOTNET_VERSION=$(FirstJsonValue "version" "$(cat "$DOTNET_GLOBAL_FILE")")
|
||||
if [[ "$DOTNET_VERSION" == "" ]]; then
|
||||
unset DOTNET_VERSION
|
||||
fi
|
||||
fi
|
||||
|
||||
# Install by channel or version
|
||||
DOTNET_DIRECTORY="$TEMP_DIRECTORY/dotnet-unix"
|
||||
if [[ -z ${DOTNET_VERSION+x} ]]; then
|
||||
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --channel "$DOTNET_CHANNEL" --no-path
|
||||
else
|
||||
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "$DOTNET_VERSION" --no-path
|
||||
fi
|
||||
export DOTNET_EXE="$DOTNET_DIRECTORY/dotnet"
|
||||
export PATH="$DOTNET_DIRECTORY:$PATH"
|
||||
fi
|
||||
|
||||
echo "Microsoft (R) .NET SDK version $("$DOTNET_EXE" --version)"
|
||||
|
||||
if [[ ! -z ${NUKE_ENTERPRISE_TOKEN+x} && "$NUKE_ENTERPRISE_TOKEN" != "" ]]; then
|
||||
"$DOTNET_EXE" nuget remove source "nuke-enterprise" &>/dev/null || true
|
||||
"$DOTNET_EXE" nuget add source "https://f.feedz.io/nuke/enterprise/nuget" --name "nuke-enterprise" --username "PAT" --password "$NUKE_ENTERPRISE_TOKEN" --store-password-in-clear-text &>/dev/null || true
|
||||
fi
|
||||
|
||||
"$DOTNET_EXE" build "$BUILD_PROJECT_FILE" /nodeReuse:false /p:UseSharedCompilation=false -nologo -clp:NoSummary --verbosity quiet
|
||||
"$DOTNET_EXE" run --project "$BUILD_PROJECT_FILE" --no-build -- "$@"
|
||||
11
build/.editorconfig
Normal file
11
build/.editorconfig
Normal file
@ -0,0 +1,11 @@
|
||||
[*.cs]
|
||||
dotnet_style_qualification_for_field = false:warning
|
||||
dotnet_style_qualification_for_property = false:warning
|
||||
dotnet_style_qualification_for_method = false:warning
|
||||
dotnet_style_qualification_for_event = false:warning
|
||||
dotnet_style_require_accessibility_modifiers = never:warning
|
||||
|
||||
csharp_style_expression_bodied_methods = true:silent
|
||||
csharp_style_expression_bodied_properties = true:warning
|
||||
csharp_style_expression_bodied_indexers = true:warning
|
||||
csharp_style_expression_bodied_accessors = true:warning
|
||||
177
build/Build.cs
Normal file
177
build/Build.cs
Normal file
@ -0,0 +1,177 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using Nuke.Common;
|
||||
using Nuke.Common.CI;
|
||||
using Nuke.Common.Execution;
|
||||
using Nuke.Common.IO;
|
||||
using Nuke.Common.ProjectModel;
|
||||
using Nuke.Common.Tooling;
|
||||
using Nuke.Common.Tools.DotNet;
|
||||
using Nuke.Common.Tools.Git;
|
||||
using Nuke.Common.Tools.GitVersion;
|
||||
using Nuke.Common.Utilities.Collections;
|
||||
using Serilog;
|
||||
using static Nuke.Common.EnvironmentInfo;
|
||||
using static Nuke.Common.IO.PathConstruction;
|
||||
|
||||
class Build : NukeBuild
|
||||
{
|
||||
/// Support plugins are available for:
|
||||
/// - JetBrains ReSharper https://nuke.build/resharper
|
||||
/// - JetBrains Rider https://nuke.build/rider
|
||||
/// - Microsoft VisualStudio https://nuke.build/visualstudio
|
||||
/// - Microsoft VSCode https://nuke.build/vscode
|
||||
|
||||
public static int Main() => Execute<Build>(x => x.Pack);
|
||||
|
||||
private const string LibraryProjectName = "Universley.OrleansContrib.StreamsProvider.Redis";
|
||||
|
||||
[Parameter("Configuration to build - Default is 'Debug' (local) or 'Release' (server)")]
|
||||
readonly Configuration Configuration = IsLocalBuild ? Configuration.Debug : Configuration.Release;
|
||||
|
||||
[Solution] readonly Solution Solution;
|
||||
|
||||
AbsolutePath SourceDirectory => RootDirectory;
|
||||
AbsolutePath ArtifactsDirectory => RootDirectory / "artifacts";
|
||||
|
||||
AbsolutePath NuGetPackagesDirectory => ArtifactsDirectory / "nuget";
|
||||
AbsolutePath TestResultDirectory => ArtifactsDirectory / "test-results";
|
||||
|
||||
// Nuget API key can be also set as an environment variable
|
||||
[Parameter("NuGet API key")]
|
||||
readonly string NuGetApiKey;
|
||||
|
||||
[Parameter("Build number override")]
|
||||
readonly string BuildNumber
|
||||
= Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER") ?? $"{0}";
|
||||
|
||||
[Parameter("Gitea Nuget package source name")]
|
||||
readonly string GiteaNugetSourceName = Environment.GetEnvironmentVariable("GITEA_NUGET_SOURCE_NAME");
|
||||
|
||||
[Parameter("Gitea package owner user")]
|
||||
[Secret]
|
||||
readonly string GiteaPackageOwnerUser = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_USER");
|
||||
[Parameter("Gitea package owner password")]
|
||||
[Secret]
|
||||
readonly string GiteaPackageOwnerPassword = Environment.GetEnvironmentVariable("GITEA_PACKAGE_OWNER_PASSWORD");
|
||||
|
||||
Target Clean => _ => _
|
||||
.Before(Compile)
|
||||
.Executes(() =>
|
||||
{
|
||||
SourceDirectory
|
||||
.GlobDirectories("**/bin", "**/obj")
|
||||
.Except(RootDirectory.GlobDirectories("build/**/bin", "build/**/obj"))
|
||||
.ForEach(ap => Directory.Delete(ap, true));
|
||||
ArtifactsDirectory.CreateOrCleanDirectory();
|
||||
});
|
||||
|
||||
Target Compile => _ => _
|
||||
.Executes(() =>
|
||||
{
|
||||
var semVer = Version;
|
||||
var semFileVer = Version;
|
||||
var informationalVersion = Version;
|
||||
|
||||
Log.Logger.Information("AssemblySemVer: {semVer}", semVer);
|
||||
|
||||
DotNetTasks.DotNetBuild(s => s
|
||||
.SetProjectFile(Solution)
|
||||
.SetConfiguration(Configuration)
|
||||
.SetAssemblyVersion(semVer)
|
||||
.SetFileVersion(semFileVer)
|
||||
.SetInformationalVersion(informationalVersion));
|
||||
});
|
||||
|
||||
Target Test => _ => _
|
||||
.DependsOn(Compile)
|
||||
.Executes(() =>
|
||||
{
|
||||
DotNetTasks.DotNetTest(s => s
|
||||
.SetProjectFile(Solution)
|
||||
.SetConfiguration(Configuration)
|
||||
.SetLoggers($"trx;LogFileName={TestResultDirectory / "testresults.trx"}")
|
||||
);
|
||||
});
|
||||
|
||||
Target Pack => _ => _
|
||||
.DependsOn(Test)
|
||||
.Executes(() =>
|
||||
{
|
||||
|
||||
DotNetTasks.DotNetPack(s => s
|
||||
.SetProject(SourceDirectory / "Universley.OrleansContrib.StreamsProvider.Redis" / $"{LibraryProjectName}.csproj")
|
||||
.SetConfiguration(Configuration)
|
||||
.SetOutputDirectory(NuGetPackagesDirectory)
|
||||
.SetVersion(Version)
|
||||
.SetNoBuild(true)
|
||||
);
|
||||
});
|
||||
|
||||
Target Publish => _ => _
|
||||
.DependsOn(Pack, CreateAndPushGitTag)
|
||||
.Executes(() =>
|
||||
{
|
||||
try
|
||||
{
|
||||
DotNetTasks.DotNetNuGetAddSource(c => c
|
||||
.EnableStorePasswordInClearText()
|
||||
.SetUsername(GiteaPackageOwnerUser)
|
||||
.SetPassword(GiteaPackageOwnerPassword)
|
||||
.SetName("Gitea")
|
||||
.SetSource($"{GiteaNugetSourceName}/index.json"));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Log.Warning("Error adding Gitea source: {e}", e);
|
||||
}
|
||||
|
||||
DotNetTasks.DotNetNuGetPush(s => s
|
||||
.SetSource($"{GiteaNugetSourceName}/symbols")
|
||||
.SetApiKey(NuGetApiKey)
|
||||
.SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.snupkg"));
|
||||
|
||||
DotNetTasks.DotNetNuGetPush(s => s
|
||||
.SetSource($"{GiteaNugetSourceName}/index.json")
|
||||
.SetTargetPath(NuGetPackagesDirectory / $"{LibraryProjectName}.{Version}.nupkg"));
|
||||
});
|
||||
|
||||
Target CreateAndPushGitTag => _ => _
|
||||
.Executes(() =>
|
||||
{
|
||||
var gitTag = $"{Version}";
|
||||
GitTasks.Git($"tag {gitTag}");
|
||||
GitTasks.Git($"push origin {gitTag}");
|
||||
});
|
||||
|
||||
private string Version
|
||||
{
|
||||
get
|
||||
{
|
||||
if (version == null)
|
||||
{
|
||||
version = GetVersion();
|
||||
}
|
||||
return version;
|
||||
}
|
||||
}
|
||||
|
||||
private string version = null;
|
||||
|
||||
private string GetVersion()
|
||||
{
|
||||
Log.Information("GitHub run number is {number}", Environment.GetEnvironmentVariable("GITHUB_RUN_NUMBER"));
|
||||
var buildNumber = BuildNumber;
|
||||
if (string.IsNullOrEmpty(buildNumber))
|
||||
{
|
||||
return "1.0.0";
|
||||
}
|
||||
|
||||
var currentDateTime = DateTimeOffset.UtcNow;
|
||||
|
||||
var assembledVersion = $"{currentDateTime.Year}.{currentDateTime.Month}.{buildNumber}";
|
||||
Log.Information("Assembled version: {assembledVersion}", assembledVersion);
|
||||
return assembledVersion;
|
||||
}
|
||||
}
|
||||
16
build/Configuration.cs
Normal file
16
build/Configuration.cs
Normal file
@ -0,0 +1,16 @@
|
||||
using System;
|
||||
using System.ComponentModel;
|
||||
using System.Linq;
|
||||
using Nuke.Common.Tooling;
|
||||
|
||||
[TypeConverter(typeof(TypeConverter<Configuration>))]
|
||||
public class Configuration : Enumeration
|
||||
{
|
||||
public static Configuration Debug = new Configuration { Value = nameof(Debug) };
|
||||
public static Configuration Release = new Configuration { Value = nameof(Release) };
|
||||
|
||||
public static implicit operator string(Configuration configuration)
|
||||
{
|
||||
return configuration.Value;
|
||||
}
|
||||
}
|
||||
8
build/Directory.Build.props
Normal file
8
build/Directory.Build.props
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
|
||||
<!-- This file prevents unintended imports of unrelated MSBuild files -->
|
||||
<!-- Uncomment to include parent Directory.Build.props file -->
|
||||
<!--<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />-->
|
||||
|
||||
</Project>
|
||||
8
build/Directory.Build.targets
Normal file
8
build/Directory.Build.targets
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
|
||||
<!-- This file prevents unintended imports of unrelated MSBuild files -->
|
||||
<!-- Uncomment to include parent Directory.Build.targets file -->
|
||||
<!--<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />-->
|
||||
|
||||
</Project>
|
||||
22
build/_build.csproj
Normal file
22
build/_build.csproj
Normal file
@ -0,0 +1,22 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<RootNamespace></RootNamespace>
|
||||
<NoWarn>CS0649;CS0169;CA1050;CA1822;CA2211;IDE1006</NoWarn>
|
||||
<NukeRootDirectory>..</NukeRootDirectory>
|
||||
<NukeScriptDirectory>..</NukeScriptDirectory>
|
||||
<NukeTelemetryVersion>1</NukeTelemetryVersion>
|
||||
<IsPackable>false</IsPackable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Nuke.Common" Version="9.0.4" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageDownload Include="GitVersion.Tool" Version="[6.1.0]" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
31
build/_build.csproj.DotSettings
Normal file
31
build/_build.csproj.DotSettings
Normal file
@ -0,0 +1,31 @@
|
||||
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=HeapView_002EDelegateAllocation/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=VariableHidesOuterVariable/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ClassNeverInstantiated_002EGlobal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=MemberCanBeMadeStatic_002ELocal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=InterpolatedStringExpressionIsNotIFormattable/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/DEFAULT_INTERNAL_MODIFIER/@EntryValue">Implicit</s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/DEFAULT_PRIVATE_MODIFIER/@EntryValue">Implicit</s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/METHOD_OR_OPERATOR_BODY/@EntryValue">ExpressionBody</s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpCodeStyle/ThisQualifier/INSTANCE_MEMBERS_QUALIFY_MEMBERS/@EntryValue">0</s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/ANONYMOUS_METHOD_DECLARATION_BRACES/@EntryValue">NEXT_LINE</s:String>
|
||||
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/KEEP_USER_LINEBREAKS/@EntryValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_AFTER_INVOCATION_LPAR/@EntryValue">False</s:Boolean>
|
||||
<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_ATTRIBUTE_LENGTH_FOR_SAME_LINE/@EntryValue">120</s:Int64>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_FIELD_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">IF_OWNER_IS_SINGLE_LINE</s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/WRAP_ARGUMENTS_STYLE/@EntryValue">WRAP_IF_LONG</s:String>
|
||||
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_ANONYMOUSMETHOD_ON_SINGLE_LINE/@EntryValue">False</s:Boolean>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /></s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateStaticFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /></s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=4a98fdf6_002D7d98_002D4f5a_002Dafeb_002Dea44ad98c70c/@EntryIndexedValue"><Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy></s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=f9fce829_002De6f4_002D4cb2_002D80f1_002D5497c44f51df/@EntryIndexedValue"><Policy><Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"><ElementKinds><Kind Name="FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy></s:String>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpAttributeForSingleLineMethodUpgrade/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EPredefinedNamingRulesToUserRulesUpgrade/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
|
||||
@ -2,19 +2,19 @@
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Orleans.Client" Version="8.1.0" />
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
|
||||
<PackageReference Include="Microsoft.Orleans.Client" Version="9.0.1" />
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.0.1" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Provider\Provider.csproj" />
|
||||
<ProjectReference Include="..\..\Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@ -2,17 +2,16 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Configuration;
|
||||
using Orleans.Runtime;
|
||||
using Provider;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
using IHost host = new HostBuilder()
|
||||
.UseOrleansClient(clientBuilder =>
|
||||
{
|
||||
clientBuilder.Services.AddSingleton<IDatabase>(sp =>
|
||||
clientBuilder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
|
||||
{
|
||||
IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase();
|
||||
return db;
|
||||
return ConnectionMultiplexer.Connect("localhost");
|
||||
});
|
||||
clientBuilder.UseLocalhostClustering();
|
||||
|
||||
@ -38,6 +37,15 @@ var streamProvider = client.GetStreamProvider("RedisStream");
|
||||
var streamId = StreamId.Create("numbergenerator", "consecutive");
|
||||
var stream = streamProvider.GetStream<int>(streamId);
|
||||
|
||||
var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
|
||||
var newStream = streamProvider.GetStream<int>(newStreamId);
|
||||
|
||||
await stream.SubscribeAsync((i, token) =>
|
||||
{
|
||||
logger.LogInformation("Received back number {Number}", i);
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
|
||||
var task = Task.Run(async () =>
|
||||
{
|
||||
var num = 0;
|
||||
@ -2,21 +2,21 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Configuration;
|
||||
using Orleans.Runtime;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
var builder = new HostBuilder()
|
||||
.UseOrleans(silo =>
|
||||
{
|
||||
silo.UseLocalhostClustering();
|
||||
silo.Services.AddSingleton<IDatabase>(sp =>
|
||||
silo.Services.AddSingleton<IConnectionMultiplexer>(sp =>
|
||||
{
|
||||
return ConnectionMultiplexer.Connect("localhost").GetDatabase();
|
||||
return ConnectionMultiplexer.Connect("localhost");
|
||||
});
|
||||
silo.ConfigureLogging(logging => logging.AddConsole());
|
||||
silo.AddMemoryGrainStorage("PubSubStore");
|
||||
silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null);
|
||||
silo.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null);
|
||||
silo.AddMemoryGrainStorageAsDefault();
|
||||
}).UseConsoleLifetime();
|
||||
builder.ConfigureServices(services =>
|
||||
@ -66,6 +66,11 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver
|
||||
_logger.LogInformation("Received number {Number}", item);
|
||||
await Task.Delay(2000);
|
||||
|
||||
var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
|
||||
var newStream = this.GetStreamProvider("RedisStream").GetStream<int>(newStreamId);
|
||||
_logger.LogInformation("Sending number {Number} to new stream", item);
|
||||
await newStream.OnNextAsync(item);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,19 +2,19 @@
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<TargetFramework>net9.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Orleans.Server" Version="8.1.0" />
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
|
||||
<PackageReference Include="Microsoft.Orleans.Server" Version="9.0.1" />
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.0.1" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.8.24" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Provider\Provider.csproj" />
|
||||
<ProjectReference Include="..\..\Universley.OrleansContrib.StreamsProvider.Redis\Universley.OrleansContrib.StreamsProvider.Redis.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
Loading…
x
Reference in New Issue
Block a user