Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,28 @@ using (var tentaclePolling = new HalibutRuntime(services, Certificates.Alice))
}
```

Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work.
Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work.

## RPC over Redis

Halibut supports executing RPC commands between nodes using a shared Redis queue as the communication mechanism. In this mode, nodes do not communicate directly with each other - all communication flows through Redis. This is particularly useful for scenarios where multiple nodes can all access a shared Redis instance but cannot establish direct network connections to each other.

### How it works

When using RPC over Redis:

- The client queues RPC requests in Redis
- The server polls Redis for pending requests
- The server processes requests and writes responses back to Redis
- The client retrieves responses from Redis

This decoupled communication model allows nodes behind firewalls, in different networks, or with restricted connectivity to communicate as long as they can all reach the shared Redis instance.

### Usage

See the [SimpleLocalExecutionExample](source/Halibut.Tests/LocalExecutionModeFixture.cs) test for a complete example of how to set up and use RPC over Redis.

For more detailed information about the Redis queue implementation, refer to the [Redis Queue documentation](docs/RedisQueue.md).

## Failure modes

Expand Down
46 changes: 46 additions & 0 deletions source/Halibut.Tests/HalibutExamplesFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Threading.Tasks;
using Halibut.ServiceModel;
using Halibut.Tests.Support;
using Halibut.Tests.TestServices;
using Halibut.Tests.TestServices.Async;
using Halibut.TestUtils.Contracts;
using NUnit.Framework;

namespace Halibut.Tests
{
public class HalibutExamplesFixture : BaseTest
{
[Test]
public async Task SimplePollingExample()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this since we lack simple examples of how to use halibit.

{
var services = GetDelegateServiceFactory();
await using (var client = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.Octopus)
.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build())
.Build())
await using (var pollingService = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.TentaclePolling)
.WithServiceFactory(services)
.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build())
.Build())
{
var octopusPort = client.Listen();
client.Trust(Certificates.TentaclePollingPublicThumbprint);

pollingService.Poll(new Uri("poll://alice"), new ServiceEndPoint("https://localhost:" + octopusPort, Certificates.OctopusPublicThumbprint, client.TimeoutsAndLimits), CancellationToken);

var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(new ServiceEndPoint("poll://alice", null, client.TimeoutsAndLimits));

await echo.SayHelloAsync("World");
}
}

static DelegateServiceFactory GetDelegateServiceFactory()
{
var services = new DelegateServiceFactory();
services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService());
return services;
}
}
}
102 changes: 102 additions & 0 deletions source/Halibut.Tests/LocalExecutionModeFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#if NET8_0_OR_GREATER
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Diagnostics;
using Halibut.Logging;
using Halibut.Queue;
using Halibut.Queue.Redis;
using Halibut.Queue.Redis.RedisDataLossDetection;
using Halibut.Queue.Redis.RedisHelpers;
using Halibut.ServiceModel;
using Halibut.Tests.Queue.Redis.Utils;
using Halibut.Tests.Support;
using Halibut.Tests.Support.Logging;
using Halibut.Tests.TestServices;
using Halibut.Tests.TestServices.Async;
using Halibut.TestUtils.Contracts;
using NUnit.Framework;
using DisposableCollection = Halibut.Util.DisposableCollection;

namespace Halibut.Tests
{
public class LocalExecutionModeFixture : BaseTest
{
[RedisTest]
[Test]
public async Task SimpleLocalExecutionExample()
{
var services = GetDelegateServiceFactory();
var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
timeoutsAndLimits = new HalibutTimeoutsAndLimits();

// Use a shared queue factory so client and worker share the same queue
var queueFactory = new PendingRequestQueueFactoryAsync(timeoutsAndLimits, new LogFactory());

var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace));

var log = new TestContextLogCreator("Redis", LogLevel.Fatal);

var preSharedGuid = Guid.NewGuid();

await using var disposables = new DisposableCollection();

await using var client = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.Octopus)
.WithPendingRequestQueueFactory(RedisFactory())
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
.Build();

await using var worker = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.TentaclePolling)
.WithServiceFactory(services)
.WithPendingRequestQueueFactory(RedisFactory())
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
.Build();

// Start worker polling for local://test-worker
using var workerCts = new CancellationTokenSource();
var pollingTask = Task.Run(async () =>
{
//await Task.Delay(TimeSpan.FromSeconds(10));
await worker.PollLocalAsync(new Uri("local://test-worker"), workerCts.Token);
}, workerCts.Token);

// Client creates proxy to local://test-worker and makes request
var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(
new ServiceEndPoint("local://test-worker", null, client.TimeoutsAndLimits));

var result = await echo.SayHelloAsync("World");
result.Should().Be("World...");

await workerCts.CancelAsync();

Func<QueueMessageSerializer, IPendingRequestQueueFactory> RedisFactory()
{
return msgSer =>
{
var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid);
disposables.AddAsyncDisposable(redisFacade);
var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher"));
disposables.AddAsyncDisposable(watchForRedisLosingAllItsData);

return new RedisPendingRequestQueueFactory(msgSer,
new InMemoryStoreDataStreamsForDistributedQueues(),
watchForRedisLosingAllItsData,
new HalibutRedisTransport(redisFacade),
new HalibutTimeoutsAndLimitsForTestsBuilder().Build(),
logFactory);
};
}
}

static DelegateServiceFactory GetDelegateServiceFactory()
{
var services = new DelegateServiceFactory();
services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService());
return services;
}
}
}
#endif
6 changes: 4 additions & 2 deletions source/Halibut/Diagnostics/InMemoryConnectionLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

namespace Halibut.Diagnostics
{
internal class InMemoryConnectionLog : ILog
public class InMemoryConnectionLog : ILog
{
public static readonly int MaxLogEvents = 100;

readonly string endpoint;
readonly Logging.ILog? logger;
readonly ConcurrentQueue<LogEvent> events = new();
Expand Down Expand Up @@ -57,7 +59,7 @@ void WriteInternal(LogEvent logEvent)

events.Enqueue(logEvent);

while (events.Count > 100 && events.TryDequeue(out _)) { }
while (events.Count > MaxLogEvents && events.TryDequeue(out _)) { }
}

static LogLevel GetLogLevel(LogEvent logEvent)
Expand Down
52 changes: 52 additions & 0 deletions source/Halibut/HalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,55 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c
pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy));
}

public async Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken)
{
if (localEndpoint.Scheme.ToLowerInvariant() != "local")
{
throw new ArgumentException($"Only 'local://' endpoints are supported. Provided: {localEndpoint.Scheme}://", nameof(localEndpoint));
}

var queue = GetQueue(localEndpoint);
var log = logs.ForEndpoint(localEndpoint);

log.Write(EventType.MessageExchange, $"Starting local polling for endpoint: {localEndpoint}");

while (!cancellationToken.IsCancellationRequested)
{
try
{
var request = await queue.DequeueAsync(cancellationToken);

if (request != null)
{
ResponseMessage response;
try
{
response = await invoker.InvokeAsync(request.RequestMessage);
}
catch (Exception ex)
{
log.WriteException(EventType.Error, $"Error executing local request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex);
response = ResponseMessage.FromException(request.RequestMessage, ex);
}

await queue.ApplyResponse(response, request.RequestMessage.ActivityId);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
log.Write(EventType.MessageExchange, $"Local polling cancelled for endpoint: {localEndpoint}");
break;
}
catch (Exception ex)
{
log.WriteException(EventType.Error, $"Error in local polling loop for endpoint: {localEndpoint}", ex);
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}

log.Write(EventType.MessageExchange, $"Local polling stopped for endpoint: {localEndpoint}");
}

public async Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken)
{
return await DiscoverAsync(new ServiceEndPoint(uri, null, TimeoutsAndLimits), cancellationToken);
Expand Down Expand Up @@ -244,6 +293,9 @@ async Task<ResponseMessage> SendOutgoingRequestAsync(RequestMessage request, Met
case "poll":
response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false);
break;
case "local":
response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false);
break;
default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme);
}

Expand Down
1 change: 1 addition & 0 deletions source/Halibut/IHalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public interface IHalibutRuntime : IAsyncDisposable, IDisposable
int Listen(IPEndPoint endpoint);
void ListenWebSocket(string endpoint);
void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken cancellationToken);
Task PollLocalAsync(Uri localEndpoint, CancellationToken cancellationToken);

Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken);
Task<ServiceEndPoint> DiscoverAsync(ServiceEndPoint endpoint, CancellationToken cancellationToken);
Expand Down