diff --git a/.gitignore b/.gitignore
index 9491a2f..59b290a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -360,4 +360,7 @@ MigrationBackup/
.ionide/
# Fody - auto-generated XML schema
-FodyWeavers.xsd
\ No newline at end of file
+FodyWeavers.xsd
+
+# Mac Finder
+*.DS_Store
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
new file mode 100644
index 0000000..806d58f
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/.gitignore
@@ -0,0 +1,13 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Rider ignored files
+/contentModel.xml
+/.idea.RedisStreamsInOrleans.iml
+/modules.xml
+/projectSettingsUpdater.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
new file mode 100644
index 0000000..df87cf9
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/encodings.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
new file mode 100644
index 0000000..7b08163
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/indexLayout.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/.idea.RedisStreamsInOrleans/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/Client/Program.cs b/Client/Program.cs
index 15ad374..7b0426a 100644
--- a/Client/Program.cs
+++ b/Client/Program.cs
@@ -2,6 +2,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
+using Orleans.Streams;
using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
@@ -36,6 +37,15 @@ var streamProvider = client.GetStreamProvider("RedisStream");
var streamId = StreamId.Create("numbergenerator", "consecutive");
var stream = streamProvider.GetStream(streamId);
+var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
+var newStream = streamProvider.GetStream(newStreamId);
+
+await stream.SubscribeAsync((i, token) =>
+{
+ logger.LogInformation("Received back number {Number}", i);
+ return Task.CompletedTask;
+});
+
var task = Task.Run(async () =>
{
var num = 0;
diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs
index 1da1f52..d403425 100644
--- a/Provider/RedisStreamBatchContainer.cs
+++ b/Provider/RedisStreamBatchContainer.cs
@@ -4,28 +4,64 @@ using System.Text.Json;
namespace Universley.OrleansContrib.StreamsProvider.Redis
{
+ [GenerateSerializer]
+ [Alias("Universley.OrleansContrib.StreamsProvider.Redis.RedisStreamBatchContainer")]
public class RedisStreamBatchContainer : IBatchContainer
{
+ [Id(0)]
public StreamId StreamId { get; }
+ [Id(1)]
public StreamSequenceToken SequenceToken { get; }
- public StreamEntry StreamEntry { get; }
+
+ [Id(2)]
+ public string EventType { get; }
+
+ [Id(3)]
+ public string Data { get; }
+
+ [Id(4)]
+ public string StreamEntryId { get; }
+
public RedisStreamBatchContainer(StreamEntry streamEntry)
{
- StreamEntry = streamEntry;
- var streamNamespace = StreamEntry.Values[0].Value;
- var steamKey = StreamEntry.Values[1].Value;
+ var streamNamespace = streamEntry.Values[0].Value;
+ var steamKey = streamEntry.Values[1].Value;
+ var eventType = streamEntry.Values[2].Value;
+ var data = streamEntry.Values[3].Value;
+ StreamEntryId = streamEntry.Id.ToString() ?? throw new ArgumentNullException(nameof(streamEntry.Id));
+
+ // Check incoming data
+ if (string.IsNullOrWhiteSpace(streamNamespace))
+ {
+ throw new ArgumentNullException(nameof(streamNamespace));
+ }
+ if (string.IsNullOrWhiteSpace(steamKey))
+ {
+ throw new ArgumentNullException(nameof(steamKey));
+ }
+ if (string.IsNullOrWhiteSpace(eventType))
+ {
+ throw new ArgumentNullException(nameof(eventType));
+ }
+ if (string.IsNullOrWhiteSpace(data))
+ {
+ throw new ArgumentNullException(nameof(data));
+ }
+
StreamId = StreamId.Create(streamNamespace!, steamKey!);
- SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
+ SequenceToken = new RedisStreamSequenceToken(streamEntry.Id);
+ EventType = eventType!;
+ Data = data!;
}
public IEnumerable> GetEvents()
{
List> events = new();
var eventType = typeof(T).Name;
- if (eventType == StreamEntry.Values[2].Value)
+ if (eventType == EventType)
{
- var data = StreamEntry.Values[3].Value;
- var @event = JsonSerializer.Deserialize(data!);
+ var data = Data;
+ var @event = JsonSerializer.Deserialize(data);
events.Add(new(@event!, SequenceToken));
}
return events;
diff --git a/Provider/RedisStreamReceiver.cs b/Provider/RedisStreamReceiver.cs
index a6928d7..f752799 100644
--- a/Provider/RedisStreamReceiver.cs
+++ b/Provider/RedisStreamReceiver.cs
@@ -69,7 +69,7 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis
var container = message as RedisStreamBatchContainer;
if (container != null)
{
- var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
+ var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntryId);
pendingTasks = ack;
await ack;
}
diff --git a/Provider/RedisStreamSequenceToken.cs b/Provider/RedisStreamSequenceToken.cs
index e32c34e..388ef7f 100644
--- a/Provider/RedisStreamSequenceToken.cs
+++ b/Provider/RedisStreamSequenceToken.cs
@@ -7,9 +7,9 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis
public class RedisStreamSequenceToken : StreamSequenceToken
{
[Id(0)]
- public override long SequenceNumber { get; protected set; }
+ public sealed override long SequenceNumber { get; protected set; }
[Id(1)]
- public override int EventIndex { get; protected set; }
+ public sealed override int EventIndex { get; protected set; }
public RedisStreamSequenceToken(RedisValue id)
{
@@ -37,7 +37,7 @@ namespace Universley.OrleansContrib.StreamsProvider.Redis
throw new ArgumentException("Invalid token type", nameof(other));
}
- public override bool Equals(StreamSequenceToken other)
+ public override bool Equals(StreamSequenceToken? other)
{
var token = other as RedisStreamSequenceToken;
return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex;
diff --git a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs
index 86761d2..09efc47 100644
--- a/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs
+++ b/RedisStreamsProvider.UnitTests/RedisStreamReceiverTests.cs
@@ -27,15 +27,23 @@ namespace RedisStreamsProvider.UnitTests
// Arrange
var streamEntries = new[]
{
- new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
- new NameValueEntry("key", "testKey")}),
- new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
- new NameValueEntry("key", "testKey")})
+ new StreamEntry("1-0", [
+ new("namespace", "testNamespace"),
+ new("key", "testKey"),
+ new("eventType", "testEventType" ),
+ new( "data", "testData" )
+ ]),
+ new StreamEntry("2-0", [
+ new("namespace", "testNamespace"),
+ new("key", "testKey"),
+ new("eventType", "testEventType" ),
+ new( "data", "testData" )
+ ])
};
_mockDatabase.Setup(db => db.StreamReadGroupAsync(
- It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(),
- It.IsAny(), It.IsAny(), CommandFlags.None))
- .ReturnsAsync(streamEntries);
+ It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(),
+ It.IsAny(), It.IsAny(), CommandFlags.None))
+ .ReturnsAsync(streamEntries);
// Act
var result = await _receiver.GetQueueMessagesAsync(10);
@@ -49,14 +57,17 @@ namespace RedisStreamsProvider.UnitTests
public async Task Initialize_CreatesConsumerGroup()
{
// Arrange
- _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None))
- .ReturnsAsync(true);
+ _mockDatabase.Setup(db => db.StreamCreateConsumerGroupAsync(It.IsAny(), It.IsAny(),
+ It.IsAny(), It.IsAny(), CommandFlags.None))
+ .ReturnsAsync(true);
// Act
await _receiver.Initialize(TimeSpan.FromSeconds(5));
// Assert
- _mockDatabase.Verify(db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None), Times.Once);
+ _mockDatabase.Verify(
+ db => db.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true, CommandFlags.None),
+ Times.Once);
}
[Fact]
@@ -65,20 +76,31 @@ namespace RedisStreamsProvider.UnitTests
// Arrange
var messages = new List
{
- new RedisStreamBatchContainer(new StreamEntry("1-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
- new NameValueEntry("key", "testKey")})),
- new RedisStreamBatchContainer(new StreamEntry("2-0", new NameValueEntry[2] {new NameValueEntry("namespace", "testNamespace"),
- new NameValueEntry("key", "testKey")}))
+ new RedisStreamBatchContainer(new StreamEntry("1-0", [
+ new("namespace", "testNamespace"),
+ new("key", "testKey"),
+ new("eventType", "testEventType" ),
+ new( "data", "testData" )
+ ])),
+ new RedisStreamBatchContainer(new StreamEntry("2-0", [
+ new("namespace", "testNamespace"),
+ new("key", "testKey"),
+ new("eventType", "testEventType" ),
+ new( "data", "testData" )
+ ]))
};
- _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None))
- .ReturnsAsync(2);
+ _mockDatabase.Setup(db => db.StreamAcknowledgeAsync(It.IsAny(), It.IsAny(),
+ It.IsAny(), CommandFlags.None))
+ .ReturnsAsync(2);
// Act
await _receiver.MessagesDeliveredAsync(messages);
// Assert
- _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once);
- _mockDatabase.Verify(db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once);
+ _mockDatabase.Verify(
+ db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "1-0", CommandFlags.None), Times.Once);
+ _mockDatabase.Verify(
+ db => db.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", "2-0", CommandFlags.None), Times.Once);
}
[Fact]
@@ -86,8 +108,9 @@ namespace RedisStreamsProvider.UnitTests
{
// Arrange
var tcs = new TaskCompletionSource();
- _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None))
- .Returns(tcs.Task);
+ _mockDatabase.Setup(db => db.StreamReadGroupAsync(It.IsAny(), It.IsAny(),
+ It.IsAny(), It.IsAny(), It.IsAny(), CommandFlags.None))
+ .Returns(tcs.Task);
// Act
var getMessagesTask = _receiver.GetQueueMessagesAsync(10);
@@ -97,4 +120,4 @@ namespace RedisStreamsProvider.UnitTests
Assert.True(getMessagesTask.IsCompleted);
}
}
-}
+}
\ No newline at end of file
diff --git a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs
index 9446117..1222146 100644
--- a/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs
+++ b/RedisStreamsProvider.UnitTests/RedisStreamSequenceTokenTests.cs
@@ -1,9 +1,9 @@
-using StackExchange.Redis;
+using Moq;
using Orleans.Streams;
-using Moq;
+using StackExchange.Redis;
using Universley.OrleansContrib.StreamsProvider.Redis;
-namespace Provider.Tests
+namespace RedisStreamsProvider.UnitTests
{
public class RedisStreamSequenceTokenTests
{
diff --git a/Server/Program.cs b/Server/Program.cs
index e45ff42..e2b823b 100644
--- a/Server/Program.cs
+++ b/Server/Program.cs
@@ -66,6 +66,11 @@ public class NumberGeneratorGrain : Grain, INumberGeneratorGrain, IAsyncObserver
_logger.LogInformation("Received number {Number}", item);
await Task.Delay(2000);
+ var newStreamId = StreamId.Create("numbergenerator", "consecutive-back");
+ var newStream = this.GetStreamProvider("RedisStream").GetStream(newStreamId);
+ _logger.LogInformation("Sending number {Number} to new stream", item);
+ await newStream.OnNextAsync(item);
+
}
}
diff --git a/build/_build.csproj.DotSettings b/build/_build.csproj.DotSettings
index a778f33..88a8824 100644
--- a/build/_build.csproj.DotSettings
+++ b/build/_build.csproj.DotSettings
@@ -1,4 +1,4 @@
-
+
DO_NOT_SHOW
DO_NOT_SHOW
DO_NOT_SHOW
@@ -17,11 +17,15 @@
False
<Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
<Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
+ <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy>
+ <Policy><Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"><ElementKinds><Kind Name="FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="AaBb" /></Policy>
True
True
True
True
+ True
True
True
True
- True
+ True
+ True