diff --git a/Client/Client.csproj b/Client/Client.csproj new file mode 100644 index 0000000..ce9902e --- /dev/null +++ b/Client/Client.csproj @@ -0,0 +1,20 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + diff --git a/Client/Program.cs b/Client/Program.cs new file mode 100644 index 0000000..3751555 --- /dev/null +++ b/Client/Program.cs @@ -0,0 +1,2 @@ +// See https://aka.ms/new-console-template for more information +Console.WriteLine("Hello, World!"); diff --git a/Provider/Provider.csproj b/Provider/Provider.csproj new file mode 100644 index 0000000..8eec680 --- /dev/null +++ b/Provider/Provider.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + + + + + + + + diff --git a/Provider/RedisStreamAdapter.cs b/Provider/RedisStreamAdapter.cs new file mode 100644 index 0000000..e8d5def --- /dev/null +++ b/Provider/RedisStreamAdapter.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Provider +{ + internal class RedisStreamAdapter + { + } +} diff --git a/Provider/RedisStreamBatchContainer.cs b/Provider/RedisStreamBatchContainer.cs new file mode 100644 index 0000000..6da99c4 --- /dev/null +++ b/Provider/RedisStreamBatchContainer.cs @@ -0,0 +1,27 @@ +using Orleans.Runtime; +using Orleans.Streams; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Provider +{ + public class RedisStreamBatchContainer : IBatchContainer + { + public StreamId StreamId => throw new NotImplementedException(); + private rea + public StreamSequenceToken SequenceToken => throw new NotImplementedException(); + + public IEnumerable> GetEvents() + { + throw new NotImplementedException(); + } + + public bool ImportRequestContext() + { + throw new NotImplementedException(); + } + } +} diff --git a/Provider/RedisStreamFactory.cs b/Provider/RedisStreamFactory.cs new file mode 100644 index 0000000..5d03a3f --- /dev/null +++ b/Provider/RedisStreamFactory.cs @@ -0,0 +1,7 @@ +namespace Provider +{ + public class RedisStreamFactory + { + + } +} diff --git a/Provider/RedisStreamFailureHandler.cs b/Provider/RedisStreamFailureHandler.cs new file mode 100644 index 0000000..11afbd1 --- /dev/null +++ b/Provider/RedisStreamFailureHandler.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Logging; +using Orleans.Runtime; +using Orleans.Streams; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Provider +{ + public class RedisStreamFailureHandler : IStreamFailureHandler + { + private ILogger _logger; + + public RedisStreamFailureHandler(ILogger logger) + { + _logger = logger; + } + + public bool ShouldFaultSubsriptionOnError => true; + + + public Task OnDeliveryFailure(GuidId subscriptionId, string streamProviderName, StreamId streamIdentity, StreamSequenceToken sequenceToken) + { + _logger.LogError("Delivery failure for subscription {SubscriptionId} on stream {StreamId} with token {Token}", subscriptionId, streamIdentity, sequenceToken); + return Task.CompletedTask; + } + + public Task OnSubscriptionFailure(GuidId subscriptionId, string streamProviderName, StreamId streamIdentity, StreamSequenceToken sequenceToken) + { + _logger.LogError("Subscription failure for subscription {SubscriptionId} on stream {StreamId} with token {Token}", subscriptionId, streamIdentity, sequenceToken); + return Task.CompletedTask; + } + } +} diff --git a/Provider/RedisStreamSequenceToken.cs b/Provider/RedisStreamSequenceToken.cs new file mode 100644 index 0000000..d180177 --- /dev/null +++ b/Provider/RedisStreamSequenceToken.cs @@ -0,0 +1,29 @@ +using Orleans; +using Orleans.Streams; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Provider +{ + [GenerateSerializer] + internal class RedisStreamSequenceToken : StreamSequenceToken + { + [Id(0)] + public override long SequenceNumber { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } + [Id(1)] + public override int EventIndex { get => throw new NotImplementedException(); protected set => throw new NotImplementedException(); } + + public override int CompareTo(StreamSequenceToken other) + { + throw new NotImplementedException(); + } + + public override bool Equals(StreamSequenceToken other) + { + throw new NotImplementedException(); + } + } +} diff --git a/RedisStreamsInOrleans.sln b/RedisStreamsInOrleans.sln new file mode 100644 index 0000000..3feeb5e --- /dev/null +++ b/RedisStreamsInOrleans.sln @@ -0,0 +1,37 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.10.34928.147 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Server", "Server\Server.csproj", "{35E441B1-DDF7-4497-B1D9-BBD9248690E9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Client", "Client\Client.csproj", "{B7477618-DE9C-4586-98D2-46CFF1CB0C74}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Provider", "Provider\Provider.csproj", "{70F8E685-F662-4225-A60C-D318E0C6ED18}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {35E441B1-DDF7-4497-B1D9-BBD9248690E9}.Release|Any CPU.Build.0 = Release|Any CPU + {B7477618-DE9C-4586-98D2-46CFF1CB0C74}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B7477618-DE9C-4586-98D2-46CFF1CB0C74}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B7477618-DE9C-4586-98D2-46CFF1CB0C74}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B7477618-DE9C-4586-98D2-46CFF1CB0C74}.Release|Any CPU.Build.0 = Release|Any CPU + {70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {70F8E685-F662-4225-A60C-D318E0C6ED18}.Debug|Any CPU.Build.0 = Debug|Any CPU + {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.ActiveCfg = Release|Any CPU + {70F8E685-F662-4225-A60C-D318E0C6ED18}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {9A8081FD-99C7-4302-A892-DC1DE30F4853} + EndGlobalSection +EndGlobal diff --git a/Server/Program.cs b/Server/Program.cs new file mode 100644 index 0000000..17e3f3b --- /dev/null +++ b/Server/Program.cs @@ -0,0 +1,23 @@ + + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; + +var builder = new HostBuilder() + .UseOrleans(silo => + { + silo.UseLocalhostClustering(); + silo.Services.AddSingleton(sp => + { + return ConnectionMultiplexer.Connect("localhost").GetDatabase(); + }); + silo.ConfigureLogging(logging => logging.AddConsole()); + silo.AddMemoryGrainStorage("PubSubStore"); + + silo.AddMemoryGrainStorageAsDefault(); + }).UseConsoleLifetime(); + +using IHost host = builder.Build(); +await host.RunAsync(); \ No newline at end of file diff --git a/Server/Server.csproj b/Server/Server.csproj new file mode 100644 index 0000000..a6e6fa8 --- /dev/null +++ b/Server/Server.csproj @@ -0,0 +1,20 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + +