diff --git a/README.md b/README.md index c6519997c..de6f88756 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,28 @@ using (var tentaclePolling = new HalibutRuntime(services, Certificates.Alice)) } ``` -Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work. +Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work. + +## RPC over Redis + +Halibut supports executing RPC commands between nodes using a shared Redis queue as the communication mechanism. In this mode, nodes do not communicate directly with each other - all communication flows through Redis. This is particularly useful for scenarios where multiple nodes can all access a shared Redis instance but cannot establish direct network connections to each other. + +### How it works + +When using RPC over Redis: + +- The client queues RPC requests in Redis +- The server polls Redis for pending requests +- The server processes requests and writes responses back to Redis +- The client retrieves responses from Redis + +This decoupled communication model allows nodes behind firewalls, in different networks, or with restricted connectivity to communicate as long as they can all reach the shared Redis instance. + +### Usage + +See the [SimpleLocalExecutionExample](source/Halibut.Tests/LocalExecutionModeFixture.cs) test for a complete example of how to set up and use RPC over Redis. + +For more detailed information about the Redis queue implementation, refer to the [Redis Queue documentation](docs/RedisQueue.md). ## Failure modes diff --git a/source/Halibut.Tests/HalibutExamplesFixture.cs b/source/Halibut.Tests/HalibutExamplesFixture.cs new file mode 100644 index 000000000..83187a113 --- /dev/null +++ b/source/Halibut.Tests/HalibutExamplesFixture.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using Halibut.ServiceModel; +using Halibut.Tests.Support; +using Halibut.Tests.TestServices; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; + +namespace Halibut.Tests +{ + public class HalibutExamplesFixture : BaseTest + { + [Test] + public async Task SimplePollingExample() + { + var services = GetDelegateServiceFactory(); + await using (var client = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.Octopus) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build()) + .Build()) + await using (var pollingService = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.TentaclePolling) + .WithServiceFactory(services) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build()) + .Build()) + { + var octopusPort = client.Listen(); + client.Trust(Certificates.TentaclePollingPublicThumbprint); + + pollingService.Poll(new Uri("poll://alice"), new ServiceEndPoint("https://localhost:" + octopusPort, Certificates.OctopusPublicThumbprint, client.TimeoutsAndLimits), CancellationToken); + + var echo = client.CreateAsyncClient(new ServiceEndPoint("poll://alice", null, client.TimeoutsAndLimits)); + + await echo.SayHelloAsync("World"); + } + } + + static DelegateServiceFactory GetDelegateServiceFactory() + { + var services = new DelegateServiceFactory(); + services.Register(() => new AsyncEchoService()); + return services; + } + } +} \ No newline at end of file diff --git a/source/Halibut.Tests/LocalExecutionModeFixture.cs b/source/Halibut.Tests/LocalExecutionModeFixture.cs new file mode 100644 index 000000000..ee91a968f --- /dev/null +++ b/source/Halibut.Tests/LocalExecutionModeFixture.cs @@ -0,0 +1,102 @@ +#if NET8_0_OR_GREATER +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Diagnostics; +using Halibut.Logging; +using Halibut.Queue; +using Halibut.Queue.Redis; +using Halibut.Queue.Redis.RedisDataLossDetection; +using Halibut.Queue.Redis.RedisHelpers; +using Halibut.ServiceModel; +using Halibut.Tests.Queue.Redis.Utils; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Logging; +using Halibut.Tests.TestServices; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; +using DisposableCollection = Halibut.Util.DisposableCollection; + +namespace Halibut.Tests +{ + public class LocalExecutionModeFixture : BaseTest + { + [RedisTest] + [Test] + public async Task SimpleLocalExecutionExample() + { + var services = GetDelegateServiceFactory(); + var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + timeoutsAndLimits = new HalibutTimeoutsAndLimits(); + + // Use a shared queue factory so client and worker share the same queue + var queueFactory = new PendingRequestQueueFactoryAsync(timeoutsAndLimits, new LogFactory()); + + var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace)); + + var log = new TestContextLogCreator("Redis", LogLevel.Fatal); + + var preSharedGuid = Guid.NewGuid(); + + await using var disposables = new DisposableCollection(); + + await using var client = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.Octopus) + .WithPendingRequestQueueFactory(RedisFactory()) + .WithHalibutTimeoutsAndLimits(timeoutsAndLimits) + .Build(); + + await using var worker = new HalibutRuntimeBuilder() + .WithServerCertificate(Certificates.TentaclePolling) + .WithServiceFactory(services) + .WithPendingRequestQueueFactory(RedisFactory()) + .WithHalibutTimeoutsAndLimits(timeoutsAndLimits) + .Build(); + + // Start worker polling for local://test-worker + using var workerCts = new CancellationTokenSource(); + var pollingTask = Task.Run(async () => + { + //await Task.Delay(TimeSpan.FromSeconds(10)); + await worker.PollLocalAsync(new Uri("local://test-worker"), workerCts.Token); + }, workerCts.Token); + + // Client creates proxy to local://test-worker and makes request + var echo = client.CreateAsyncClient( + new ServiceEndPoint("local://test-worker", null, client.TimeoutsAndLimits)); + + var result = await echo.SayHelloAsync("World"); + result.Should().Be("World..."); + + await workerCts.CancelAsync(); + + Func RedisFactory() + { + return msgSer => + { + var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid); + disposables.AddAsyncDisposable(redisFacade); + var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher")); + disposables.AddAsyncDisposable(watchForRedisLosingAllItsData); + + return new RedisPendingRequestQueueFactory(msgSer, + new InMemoryStoreDataStreamsForDistributedQueues(), + watchForRedisLosingAllItsData, + new HalibutRedisTransport(redisFacade), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + logFactory); + }; + } + } + + static DelegateServiceFactory GetDelegateServiceFactory() + { + var services = new DelegateServiceFactory(); + services.Register(() => new AsyncEchoService()); + return services; + } + } +} +#endif diff --git a/source/Halibut/Diagnostics/InMemoryConnectionLog.cs b/source/Halibut/Diagnostics/InMemoryConnectionLog.cs index f76ed93af..521e55a4f 100644 --- a/source/Halibut/Diagnostics/InMemoryConnectionLog.cs +++ b/source/Halibut/Diagnostics/InMemoryConnectionLog.cs @@ -6,8 +6,10 @@ namespace Halibut.Diagnostics { - internal class InMemoryConnectionLog : ILog + public class InMemoryConnectionLog : ILog { + public static readonly int MaxLogEvents = 100; + readonly string endpoint; readonly Logging.ILog? logger; readonly ConcurrentQueue events = new(); @@ -57,7 +59,7 @@ void WriteInternal(LogEvent logEvent) events.Enqueue(logEvent); - while (events.Count > 100 && events.TryDequeue(out _)) { } + while (events.Count > MaxLogEvents && events.TryDequeue(out _)) { } } static LogLevel GetLogLevel(LogEvent logEvent) diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index 4338e3cca..57cd687e4 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -199,6 +199,55 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy)); } + public async Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken) + { + if (localEndpoint.Scheme.ToLowerInvariant() != "local") + { + throw new ArgumentException($"Only 'local://' endpoints are supported. Provided: {localEndpoint.Scheme}://", nameof(localEndpoint)); + } + + var queue = GetQueue(localEndpoint); + var log = logs.ForEndpoint(localEndpoint); + + log.Write(EventType.MessageExchange, $"Starting local polling for endpoint: {localEndpoint}"); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + var request = await queue.DequeueAsync(cancellationToken); + + if (request != null) + { + ResponseMessage response; + try + { + response = await invoker.InvokeAsync(request.RequestMessage); + } + catch (Exception ex) + { + log.WriteException(EventType.Error, $"Error executing local request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex); + response = ResponseMessage.FromException(request.RequestMessage, ex); + } + + await queue.ApplyResponse(response, request.RequestMessage.ActivityId); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + log.Write(EventType.MessageExchange, $"Local polling cancelled for endpoint: {localEndpoint}"); + break; + } + catch (Exception ex) + { + log.WriteException(EventType.Error, $"Error in local polling loop for endpoint: {localEndpoint}", ex); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); + } + } + + log.Write(EventType.MessageExchange, $"Local polling stopped for endpoint: {localEndpoint}"); + } + public async Task DiscoverAsync(Uri uri, CancellationToken cancellationToken) { return await DiscoverAsync(new ServiceEndPoint(uri, null, TimeoutsAndLimits), cancellationToken); @@ -244,6 +293,9 @@ async Task SendOutgoingRequestAsync(RequestMessage request, Met case "poll": response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); break; + case "local": + response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false); + break; default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme); } diff --git a/source/Halibut/IHalibutRuntime.cs b/source/Halibut/IHalibutRuntime.cs index 3d678db77..a4d8c7e22 100644 --- a/source/Halibut/IHalibutRuntime.cs +++ b/source/Halibut/IHalibutRuntime.cs @@ -17,6 +17,7 @@ public interface IHalibutRuntime : IAsyncDisposable, IDisposable int Listen(IPEndPoint endpoint); void ListenWebSocket(string endpoint); void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken cancellationToken); + Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken); Task DiscoverAsync(Uri uri, CancellationToken cancellationToken); Task DiscoverAsync(ServiceEndPoint endpoint, CancellationToken cancellationToken);