Fixed Serialization to allow to stream events from Grains
This commit is contained in:
parent
1625068884
commit
7ca04eb538
3
.gitignore
vendored
3
.gitignore
vendored
@ -361,3 +361,6 @@ MigrationBackup/
|
||||
|
||||
# Fody - auto-generated XML schema
|
||||
FodyWeavers.xsd
|
||||
|
||||
# Mac Finder
|
||||
*.DS_Store
|
||||
|
||||
13
.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
generated
vendored
Normal file
13
.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
generated
vendored
Normal file
@ -0,0 +1,13 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Rider ignored files
|
||||
/contentModel.xml
|
||||
/.idea.RedisStreamsInOrleans.iml
|
||||
/modules.xml
|
||||
/projectSettingsUpdater.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
4
.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
generated
Normal file
4
.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
generated
Normal file
@ -0,0 +1,4 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="Encoding" addBOMForNewFiles="with BOM under Windows, with no BOM otherwise" />
|
||||
</project>
|
||||
8
.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
generated
Normal file
8
.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
generated
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="UserContentModel">
|
||||
<attachedFolders />
|
||||
<explicitIncludes />
|
||||
<explicitExcludes />
|
||||
</component>
|
||||
</project>
|
||||
6
.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
generated
Normal file
6
.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
generated
Normal file
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
@ -2,6 +2,7 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Configuration;
|
||||
using Orleans.Streams;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
@ -36,6 +37,15 @@ var streamProvider = client.GetStreamProvider("RedisStream");
|
||||
var streamId = StreamId.Create("numbergenerator", "consecutive");
|
||||
var stream = streamProvider.GetStream<int>(streamId);
|
||||
|
||||
var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
|
||||
var newStream = streamProvider.GetStream<int>(newStreamId);
|
||||
|
||||
await stream.SubscribeAsync((i, token) =>
|
||||
{
|
||||
logger.LogInformation("Received back number {Number}", i);
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
|
||||
var task = Task.Run(async () =>
|
||||
{
|
||||
var num = 0;
|
||||
|
||||
@ -4,28 +4,64 @@ using System.Text.Json;
|
||||
|
||||
namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
{
|
||||
[GenerateSerializer]
|
||||
[Alias("Universley.OrleansContrib.StreamsProvider.Redis.RedisStreamBatchContainer")]
|
||||
public class RedisStreamBatchContainer : IBatchContainer
|
||||
{
|
||||
[Id(0)]
|
||||
public StreamId StreamId { get; }
|
||||
|
||||
[Id(1)]
|
||||
public StreamSequenceToken SequenceToken { get; }
|
||||
public StreamEntry StreamEntry { get; }
|
||||
|
||||
[Id(2)]
|
||||
public string EventType { get; }
|
||||
|
||||
[Id(3)]
|
||||
public string Data { get; }
|
||||
|
||||
[Id(4)]
|
||||
public string StreamEntryId { get; }
|
||||
|
||||
public RedisStreamBatchContainer(StreamEntry streamEntry)
|
||||
{
|
||||
StreamEntry = streamEntry;
|
||||
var streamNamespace = StreamEntry.Values[0].Value;
|
||||
var steamKey = StreamEntry.Values[1].Value;
|
||||
var streamNamespace = streamEntry.Values[0].Value;
|
||||
var steamKey = streamEntry.Values[1].Value;
|
||||
var eventType = streamEntry.Values[2].Value;
|
||||
var data = streamEntry.Values[3].Value;
|
||||
StreamEntryId = streamEntry.Id.ToString() ?? throw new ArgumentNullException(nameof(streamEntry.Id));
|
||||
|
||||
// Check incoming data
|
||||
if (string.IsNullOrWhiteSpace(streamNamespace))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(streamNamespace));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(steamKey))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(steamKey));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(eventType))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(eventType));
|
||||
}
|
||||
if (string.IsNullOrWhiteSpace(data))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(data));
|
||||
}
|
||||
|
||||
StreamId = StreamId.Create(streamNamespace!, steamKey!);
|
||||
SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
|
||||
SequenceToken = new RedisStreamSequenceToken(streamEntry.Id);
|
||||
EventType = eventType!;
|
||||
Data = data!;
|
||||
}
|
||||
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
|
||||
{
|
||||
List<Tuple<T, StreamSequenceToken>> events = new();
|
||||
var eventType = typeof(T).Name;
|
||||
if (eventType == StreamEntry.Values[2].Value)
|
||||
if (eventType == EventType)
|
||||
{
|
||||
var data = StreamEntry.Values[3].Value;
|
||||
var @event = JsonSerializer.Deserialize<T>(data!);
|
||||
var data = Data;
|
||||
var @event = JsonSerializer.Deserialize<T>(data);
|
||||
events.Add(new(@event!, SequenceToken));
|
||||
}
|
||||
return events;
|
||||
|
||||
@ -69,7 +69,7 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
var container = message as RedisStreamBatchContainer;
|
||||
if (container != null)
|
||||
{
|
||||
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
|
||||
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntryId);
|
||||
pendingTasks = ack;
|
||||
await ack;
|
||||
}
|
||||
|
||||
@ -7,9 +7,9 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
public class RedisStreamSequenceToken : StreamSequenceToken
|
||||
{
|
||||
[Id(0)]
|
||||
public override long SequenceNumber { get; protected set; }
|
||||
public sealed override long SequenceNumber { get; protected set; }
|
||||
[Id(1)]
|
||||
public override int EventIndex { get; protected set; }
|
||||
public sealed override int EventIndex { get; protected set; }
|
||||
|
||||
public RedisStreamSequenceToken(RedisValue id)
|
||||
{
|
||||
@ -37,7 +37,7 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis
|
||||
throw new ArgumentException("Invalid token type", nameof(other));
|
||||
}
|
||||
|
||||
public override bool Equals(StreamSequenceToken other)
|
||||
public override bool Equals(StreamSequenceToken? other)
|
||||
{
|
||||
var token = other as RedisStreamSequenceToken;
|
||||
return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex;
|
||||
|
||||
@ -27,10 +27,18 @@ namespace RedisStreamsProvider.UnitTests
|
||||
// Arrange
|
||||
var streamEntries = new[]
|
||||
{
|
||||
new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey")}),
|
||||
new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey")})
|
||||
new StreamEntry("1-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
]),
|
||||
new StreamEntry("2-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
])
|
||||
};
|
||||
_mockDatabase.Setup(db => db.StreamReadGroupAsync(
|
||||
It.IsAny<RedisKey>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue?>(),
|
||||
@ -49,14 +57,17 @@ namespace RedisStreamsProvider.UnitTests
|
||||
public async Task Initialize_CreatesConsumerGroup()
|
||||
{
|
||||
// Arrange
|
||||
_mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<bool>(), CommandFlags.None))
|
||||
_mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
|
||||
It.IsAny<RedisValue>(), It.IsAny<bool>(), CommandFlags.None))
|
||||
.ReturnsAsync(true);
|
||||
|
||||
// Act
|
||||
await _receiver.Initialize(TimeSpan.FromSeconds(5));
|
||||
|
||||
// Assert
|
||||
_mockDatabase.Verify(db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), Times.Once);
|
||||
_mockDatabase.Verify(
|
||||
db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None),
|
||||
Times.Once);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
@ -65,20 +76,31 @@ namespace RedisStreamsProvider.UnitTests
|
||||
// Arrange
|
||||
var messages = new List<IBatchContainer>
|
||||
{
|
||||
new RedisStreamBatchContainer(new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey")})),
|
||||
new RedisStreamBatchContainer(new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
|
||||
new NameValueEntry("key", "testKey")}))
|
||||
new RedisStreamBatchContainer(new StreamEntry("1-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
])),
|
||||
new RedisStreamBatchContainer(new StreamEntry("2-0", [
|
||||
new("namespace", "testNamespace"),
|
||||
new("key", "testKey"),
|
||||
new("eventType", "testEventType" ),
|
||||
new( "data", "testData" )
|
||||
]))
|
||||
};
|
||||
_mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), CommandFlags.None))
|
||||
_mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
|
||||
It.IsAny<RedisValue>(), CommandFlags.None))
|
||||
.ReturnsAsync(2);
|
||||
|
||||
// Act
|
||||
await _receiver.MessagesDeliveredAsync(messages);
|
||||
|
||||
// Assert
|
||||
_mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once);
|
||||
_mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once);
|
||||
_mockDatabase.Verify(
|
||||
db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once);
|
||||
_mockDatabase.Verify(
|
||||
db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
@ -86,7 +108,8 @@ namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
// Arrange
|
||||
var tcs = new TaskCompletionSource<StreamEntry[]>();
|
||||
_mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<int>(), CommandFlags.None))
|
||||
_mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny<RedisKey>(), It.IsAny<RedisValue>(),
|
||||
It.IsAny<RedisValue>(), It.IsAny<RedisValue>(), It.IsAny<int>(), CommandFlags.None))
|
||||
.Returns(tcs.Task);
|
||||
|
||||
// Act
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
using StackExchange.Redis;
|
||||
using Moq;
|
||||
using Orleans.Streams;
|
||||
using Moq;
|
||||
using StackExchange.Redis;
|
||||
using Universley.OrleansContrib.StreamsProvider.Redis;
|
||||
|
||||
namespace Provider.Tests
|
||||
namespace RedisStreamsProvider.UnitTests
|
||||
{
|
||||
public class RedisStreamSequenceTokenTests
|
||||
{
|
||||
|
||||
@ -66,6 +66,11 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver
|
||||
_logger.LogInformation("Received number {Number}", item);
|
||||
await Task.Delay(2000);
|
||||
|
||||
var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
|
||||
var newStream = this.GetStreamProvider("RedisStream").GetStream<int>(newStreamId);
|
||||
_logger.LogInformation("Sending number {Number} to new stream", item);
|
||||
await newStream.OnNextAsync(item);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
|
||||
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=HeapView_002EDelegateAllocation/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=VariableHidesOuterVariable/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ClassNeverInstantiated_002EGlobal/@EntryIndexedValue">DO_NOT_SHOW</s:String>
|
||||
@ -17,11 +17,15 @@
|
||||
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_ANONYMOUSMETHOD_ON_SINGLE_LINE/@EntryValue">False</s:Boolean>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /></s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateStaticFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" /></s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=4a98fdf6_002D7d98_002D4f5a_002Dafeb_002Dea44ad98c70c/@EntryIndexedValue"><Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy></s:String>
|
||||
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/UserRules/=f9fce829_002De6f4_002D4cb2_002D80f1_002D5497c44f51df/@EntryIndexedValue"><Policy><Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"><ElementKinds><Kind Name="FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy></s:String>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpAttributeForSingleLineMethodUpgrade/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
|
||||
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EPredefinedNamingRulesToUserRulesUpgrade/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user