Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void SetUp()
stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server));
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits);
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>());
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>(), new NullSubscriberObserver());
}

// TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation
Expand Down
4 changes: 2 additions & 2 deletions source/Halibut.Tests/Transport/SecureClientFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt()
var connection = Substitute.For<IConnection>();
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log));
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log, new NullSubscriberObserver()));

await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None);
}
Expand Down Expand Up @@ -102,7 +102,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
{
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger);
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger, new NullSubscriberObserver());
}
}
}
13 changes: 10 additions & 3 deletions source/Halibut/HalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class HalibutRuntime : IHalibutRuntime
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
readonly IControlMessageObserver controlMessageObserver;
readonly ISslConfigurationProvider sslConfigurationProvider;
readonly ISubscriberObserver subscriberObserver;

internal HalibutRuntime(
IServiceFactory serviceFactory,
Expand All @@ -63,8 +64,8 @@ internal HalibutRuntime(
IConnectionsObserver connectionsObserver,
IControlMessageObserver controlMessageObserver,
ISecureConnectionObserver secureConnectionObserver,
ISslConfigurationProvider sslConfigurationProvider
)
ISslConfigurationProvider sslConfigurationProvider,
ISubscriberObserver subscriberObserver)
{
this.serverCertificate = serverCertificate;
this.trustProvider = trustProvider;
Expand All @@ -79,6 +80,7 @@ ISslConfigurationProvider sslConfigurationProvider
TimeoutsAndLimits = halibutTimeoutsAndLimits;
this.connectionsObserver = connectionsObserver;
this.secureConnectionObserver = secureConnectionObserver;
this.subscriberObserver = subscriberObserver;
this.controlMessageObserver = controlMessageObserver;
this.sslConfigurationProvider = sslConfigurationProvider;

Expand Down Expand Up @@ -121,7 +123,12 @@ public int Listen(int port)

ExchangeProtocolBuilder ExchangeProtocolBuilder()
{
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log);
return (stream, log) => new MessageExchangeProtocol(
new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log),
TimeoutsAndLimits,
activeTcpConnectionsLimiter,
log,
subscriberObserver);
}

public int Listen(IPEndPoint endpoint)
Expand Down
13 changes: 11 additions & 2 deletions source/Halibut/HalibutRuntimeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class HalibutRuntimeBuilder
IControlMessageObserver? controlMessageObserver;
MessageStreamWrappers queueMessageStreamWrappers = new();
ISslConfigurationProvider? sslConfigurationProvider;
ISubscriberObserver? subscriberObserver;

public HalibutRuntimeBuilder WithQueueMessageStreamWrappers(MessageStreamWrappers queueMessageStreamWrappers)
{
Expand All @@ -58,6 +59,12 @@ public HalibutRuntimeBuilder WithSslConfigurationProvider(ISslConfigurationProvi
return this;
}

public HalibutRuntimeBuilder WithSubscriptionObserver(ISubscriberObserver subscriberObserver)
{
this.subscriberObserver = subscriberObserver;
return this;
}

internal HalibutRuntimeBuilder WithStreamFactory(IStreamFactory streamFactory)
{
this.streamFactory = streamFactory;
Expand Down Expand Up @@ -194,6 +201,7 @@ public HalibutRuntime Build()
var rpcObserver = this.rpcObserver ?? new NoRpcObserver();
var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver();
var sslConfigurationProvider = this.sslConfigurationProvider ?? SslConfiguration.Default;
var subscriberObserver = this.subscriberObserver ?? new NullSubscriberObserver();

var halibutRuntime = new HalibutRuntime(
serviceFactory,
Expand All @@ -210,7 +218,8 @@ public HalibutRuntime Build()
connectionsObserver,
controlMessageObserver,
secureConnectionObserver,
sslConfigurationProvider
sslConfigurationProvider,
subscriberObserver
);

if (onUnauthorizedClientConnect is not null)
Expand All @@ -221,4 +230,4 @@ public HalibutRuntime Build()
return halibutRuntime;
}
}
}
}
24 changes: 24 additions & 0 deletions source/Halibut/Transport/Observability/ISubscriberObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Threading;
using System.Threading.Tasks;

namespace Halibut.Transport.Observability
{
public interface ISubscriberObserver
{
Task SubscriberConnected(string uri, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Threading;
using System.Threading.Tasks;

namespace Halibut.Transport.Observability
{
public class NullSubscriberObserver : ISubscriberObserver
{
public Task SubscriberConnected(string uri, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Halibut.Diagnostics;
using Halibut.Exceptions;
using Halibut.ServiceModel;
using Halibut.Transport.Observability;

namespace Halibut.Transport.Protocol
{
Expand All @@ -21,15 +22,18 @@ public class MessageExchangeProtocol
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
readonly ILog log;
readonly ISubscriberObserver subscriberObserver;
bool identified;
volatile bool acceptClientRequests = true;

public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log)
public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log,
ISubscriberObserver subscriberObserver)
{
this.stream = stream;
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter;
this.log = log;
this.subscriberObserver = subscriberObserver;
}

public async Task<ResponseMessage> ExchangeAsClientAsync(RequestMessage request, CancellationToken cancellationToken)
Expand Down Expand Up @@ -112,6 +116,7 @@ public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessag
//if the remote identity is a subscriber, we might need to limit their active TCP connections
if (identity.IdentityType == RemoteIdentityType.Subscriber)
{
await subscriberObserver.SubscriberConnected(identity.SubscriptionId.ToString(), cancellationToken);
limitedConnectionLease = activeTcpConnectionsLimiter.LeaseActiveTcpConnection(identity.SubscriptionId);
}

Expand Down