@@ -30,20 +30,24 @@ import (
3030 "io"
3131 "net"
3232 "strconv"
33+ "strings"
3334 "testing"
3435 "time"
3536
3637 envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
3738 extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
3839 envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
3940 "github.com/go-logr/logr"
41+ "github.com/stretchr/testify/require"
42+ "google.golang.org/grpc"
43+ "google.golang.org/grpc/credentials/insecure"
4044 "google.golang.org/protobuf/types/known/structpb"
4145
4246 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
4347 requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
4448)
4549
46- // --- Request Builders (High-Level DSL ) ---
50+ // --- Request Builders (Protocol Level ) ---
4751
4852// ReqLLM creates a sequence of gRPC messages representing a standard, streamed LLM inference request.
4953// It generates:
@@ -205,7 +209,7 @@ func GenerateRequestMetadata(filterMetadata []string) map[string]*structpb.Struc
205209 return requestMetadata
206210}
207211
208- // --- Response Builders ---
212+ // --- Response Builders (Protocol Level) ---
209213
210214// NewRequestBufferedResponse creates a complete set of responses for the Request phase.
211215// It simulates the EPP deciding to:
@@ -433,27 +437,92 @@ func StreamedRequest(
433437
434438// --- System Utilities ---
435439
440+ // StartExtProcServer handles the lifecycle of starting a gRPC server in the background and connecting to it.
441+ // It guarantees that the server is listening on the specified port before returning.
442+ //
443+ // serverRunner: A function that blocks until the server exits (e.g. Runnable.Start).
444+ // port: The port the server is configured to listen on.
445+ func StartExtProcServer (
446+ t * testing.T ,
447+ ctx context.Context ,
448+ serverRunner func (context.Context ) error ,
449+ port int ,
450+ logger logr.Logger ,
451+ ) (extProcPb.ExternalProcessor_ProcessClient , * grpc.ClientConn ) {
452+ t .Helper ()
453+
454+ // Force IPv4 to match GetFreePort's binding and avoid IPv6 race conditions in CI.
455+ serverAddr := fmt .Sprintf ("127.0.0.1:%d" , port )
456+
457+ // Channel to signal if the server dies immediately (e.g., port binding error/panic)
458+ serverErrChan := make (chan error , 1 )
459+
460+ // Start server in background.
461+ go func () {
462+ logger .Info ("Starting ExtProc server" , "address" , serverAddr )
463+ if err := serverRunner (ctx ); err != nil {
464+ // Ignore expected cancellations during teardown.
465+ if ! strings .Contains (err .Error (), "context canceled" ) {
466+ logger .Error (err , "Server stopped unexpectedly" )
467+ select {
468+ case serverErrChan <- err :
469+ default :
470+ }
471+ }
472+ }
473+ }()
474+
475+ // Wait for TCP readiness.
476+ // We must poll the port until the server successfully binds and listens.
477+ require .Eventually (t , func () bool {
478+ // Fast-fail if the server crashed immediately.
479+ select {
480+ case err := <- serverErrChan :
481+ t .Fatalf ("Server failed to start: %v" , err )
482+ default :
483+ }
484+
485+ // Check if the port is open.
486+ conn , err := net .DialTimeout ("tcp" , serverAddr , 50 * time .Millisecond )
487+ if err != nil {
488+ return false
489+ }
490+ conn .Close ()
491+ return true
492+ }, 5 * time .Second , 50 * time .Millisecond , "Server failed to bind port %s" , serverAddr )
493+
494+ // Connect client.
495+ // Blocking dial is safe because we know the port is open.
496+ conn , err := grpc .NewClient (serverAddr , grpc .WithTransportCredentials (insecure .NewCredentials ()))
497+ require .NoError (t , err , "failed to create grpc connection" )
498+
499+ extProcClient , err := extProcPb .NewExternalProcessorClient (conn ).Process (ctx )
500+ require .NoError (t , err , "failed to initialize ext_proc stream client" )
501+
502+ return extProcClient , conn
503+ }
504+
436505// GetFreePort finds an available IPv4 TCP port on localhost.
437506// It works by asking the OS to allocate a port by listening on port 0, capturing the assigned address, and then
438507// immediately closing the listener.
439508//
440509// Note: There is a theoretical race condition where another process grabs the port between the Close() call and the
441510// subsequent usage, but this is generally acceptable in hermetic test environments.
442- func GetFreePort () (* net. TCPAddr , error ) {
511+ func GetFreePort () (int , error ) {
443512 // Force IPv4 to prevent flakes on dual-stack CI environments
444513 listener , err := net .Listen ("tcp" , "127.0.0.1:0" )
445514 if err != nil {
446- return nil , fmt .Errorf ("failed to listen on a free port: %w" , err )
515+ return 0 , fmt .Errorf ("failed to listen on a free port: %w" , err )
447516 }
448517
449518 // Critical: Close the listener immediately so the caller can bind to it.
450519 defer listener .Close ()
451520
452521 addr , ok := listener .Addr ().(* net.TCPAddr )
453522 if ! ok {
454- return nil , errors .New ("failed to cast listener address to TCPAddr" )
523+ return 0 , errors .New ("failed to cast listener address to TCPAddr" )
455524 }
456- return addr , nil
525+ return addr . Port , nil
457526}
458527
459528// --- Internal Helpers ---
0 commit comments