diff --git a/.circleci/config.yml b/.circleci/config.yml index 1c6a9a31d..c63709853 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,8 +62,10 @@ commands: sudo apt install \ --yes --no-install-recommends \ openjdk-11-jdk-headless \ - ruby \ - golang-go + ruby + # Install Go 1.22 (required for prometheus client library) + curl -sL https://go.dev/dl/go1.22.10.linux-amd64.tar.gz | sudo tar -C /usr/local -xzf - + echo 'export PATH=/usr/local/go/bin:$PATH' >> $BASH_ENV make install-kotlin-linters - setup-rust-toolchain - run: diff --git a/glean_parser/go_server.py b/glean_parser/go_server.py index 2eb4cbac2..233d2a7c1 100644 --- a/glean_parser/go_server.py +++ b/glean_parser/go_server.py @@ -11,15 +11,18 @@ generates does not use the Glean SDK. It is meant to be used to collect events in server-side environments. In these environments SDK assumptions to measurement window and connectivity don't hold. + Generated code takes care of assembling pings with metrics, and serializing to messages -conforming to Glean schema. +conforming to Glean schema. Two transport modes are supported: +- Cloud Logging (go_server): Logs to stdout in MozLog format for ingestion via GCP log routing +- HTTP through Ingestion Edge (go_server_http): Publishes to standard Ingestion Edge endpoint Warning: this outputter supports limited set of metrics, see `SUPPORTED_METRIC_TYPES` below. Generated code creates two methods for each ping (`RecordPingX` and `RecordPingXWithoutUserInfo`) -that are used for submitting (logging) them. -If pings have `event` metrics assigned, they can be passed to these methods. +that are used for submitting events. If pings have `event` metrics assigned, they can be +passed to these methods. """ from collections import defaultdict @@ -81,7 +84,10 @@ def clean_string(s: str) -> str: def output_go( - objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] + objs: metrics.ObjectTree, + output_dir: Path, + options: Optional[Dict[str, Any]], + transport: str = "logging", ) -> None: """ Given a tree of objects, output Go code to `output_dir`. @@ -92,6 +98,8 @@ def output_go( :param objects: A tree of objects (metrics and pings) as returned from `parser.parse_objects`. :param output_dir: Path to an output directory to write to. + :param transport: Transport mode - either "logging" (Cloud Logging) or + "http" (HTTP Edge direct publishing). Default is "logging". """ template = util.get_jinja2_template( @@ -149,6 +157,37 @@ def output_go( with filepath.open("w", encoding="utf-8") as fd: fd.write( template.render( - parser_version=__version__, pings=ping_to_metrics, events=event_metrics + parser_version=__version__, + pings=ping_to_metrics, + events=event_metrics, + transport=transport, ) ) + + +def output_go_logger( + objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None +) -> None: + """ + Given a tree of objects, output Go code using Cloud Logging transport. + + :param objects: A tree of objects (metrics and pings) as returned from + `parser.parse_objects`. + :param output_dir: Path to an output directory to write to. + :param options: options dictionary (currently unused for Go). + """ + output_go(objs, output_dir, options, transport="logging") + + +def output_go_http( + objs: metrics.ObjectTree, output_dir: Path, options: Optional[Dict[str, Any]] = None +) -> None: + """ + Given a tree of objects, output Go code using HTTP Edge transport. + + :param objects: A tree of objects (metrics and pings) as returned from + `parser.parse_objects`. + :param output_dir: Path to an output directory to write to. + :param options: options dictionary (currently unused for Go). + """ + output_go(objs, output_dir, options, transport="http") diff --git a/glean_parser/templates/go_server.jinja2 b/glean_parser/templates/go_server.jinja2 index f3ebb1481..2f0a1a185 100644 --- a/glean_parser/templates/go_server.jinja2 +++ b/glean_parser/templates/go_server.jinja2 @@ -9,6 +9,20 @@ package glean // required imports import ( +{% if transport == "http" %} + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +{% else %} "encoding/json" "errors" "fmt" @@ -17,8 +31,55 @@ import ( "time" "github.com/google/uuid" +{% endif %} ) +{% if transport == "http" %} +// Prometheus metrics for monitoring HTTP publishing +var ( + // Tracks API calls (RecordEventsPing calls - may queue if workers are slow) + gleanPublishTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "glean_http_publish_total", + Help: "Total number of Glean events queued for publishing (API calls)", + }, + []string{"app_id", "document_type", "status"}, + ) + + // Tracks actual HTTP requests completed (the real throughput metric) + gleanHTTPRequestsCompleted = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "glean_http_requests_completed", + Help: "Actual HTTP requests completed over the network", + }, + []string{"app_id", "document_type", "status"}, + ) +) + +// httpRequest tracks metadata for async HTTP operations +type httpRequest struct { + url string + payload []byte + userAgent string + documentType string + doneCh chan error +} + +// GleanEventsPublisher publishes Glean events via HTTP to ingestion endpoint +type GleanEventsPublisher struct { + AppID string // Application ID to identify application per Glean standards + AppDisplayVersion string // Version of application emitting the event + AppChannel string // Channel to differentiate logs from prod/beta/staging/devel + + endpoint string // HTTP endpoint URL (e.g., https://incoming.telemetry.mozilla.org/submit) + client *http.Client + ctx context.Context + pendingReqs chan *httpRequest + wg sync.WaitGroup + once sync.Once + maxConcurrency int // Number of concurrent HTTP workers +} +{% else %} // log type string used to identify logs to process in the Moz Data Pipeline var gleanEventMozlogType string = "glean-server-event" @@ -32,6 +93,7 @@ type GleanEventsLogger struct { AppChannel string // Channel to differentiate logs from prod/beta/staging/devel Writer io.Writer // Writer to output to. Normal operation expects os.Stdout } +{% endif %} // exported type for public method parameters type RequestInfo struct { @@ -63,6 +125,7 @@ type pingInfo struct { EndTime string `json:"end_time"` } +{% if transport != "http" %} type ping struct { DocumentNamespace string `json:"document_namespace"` DocumentType string `json:"document_type"` @@ -72,6 +135,7 @@ type ping struct { IpAddress string `json:"ip_address,omitempty"` Payload string `json:"payload"` } +{% endif -%} type metrics map[string]map[string]interface{} @@ -89,14 +153,20 @@ type gleanEvent struct { Extra map[string]string `json:"extra"` } +{% if transport != "http" %} type logEnvelope struct { Timestamp string Logger string Type string Fields ping } +{% endif -%} +{% if transport == "http" %} +func (g *GleanEventsPublisher) createClientInfo() clientInfo { +{% else %} func (g GleanEventsLogger) createClientInfo() clientInfo { +{% endif %} // Fields with default values are required in the Glean schema, but not used in server context return clientInfo{ TelemetrySDKBuild: "glean_parser v{{ parser_version }}", @@ -120,6 +190,7 @@ func createPingInfo() pingInfo { } } +{% if transport != "http" %} func (g GleanEventsLogger) createPing(documentType string, config RequestInfo, payload pingPayload) (ping, error) { payloadJson, err := json.Marshal(payload) if err != nil { @@ -180,6 +251,182 @@ func (g GleanEventsLogger) record( fmt.Fprintln(g.Writer, string(envelopeJson)) return nil } +{% else %} +// NewGleanEventsPublisher creates a new HTTP-based Glean events publisher +func NewGleanEventsPublisher(ctx context.Context, endpoint, appID, appDisplayVersion, appChannel string, concurrency int) (*GleanEventsPublisher, error) { + if concurrency <= 0 { + concurrency = 10 // Default concurrency + } + + publisher := &GleanEventsPublisher{ + AppID: appID, + AppDisplayVersion: appDisplayVersion, + AppChannel: appChannel, + endpoint: endpoint, + ctx: ctx, + pendingReqs: make(chan *httpRequest, 10000), + maxConcurrency: concurrency, + client: &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 1000, + MaxIdleConnsPerHost: concurrency * 2, + IdleConnTimeout: 90 * time.Second, + DisableCompression: false, + }, + }, + } + + // Start background goroutines to process HTTP requests concurrently + for i := 0; i < concurrency; i++ { + publisher.wg.Add(1) + go publisher.processHTTPRequests() + } + + return publisher, nil +} + +// processHTTPRequests runs in background to handle HTTP requests asynchronously +func (g *GleanEventsPublisher) processHTTPRequests() { + defer g.wg.Done() + + for req := range g.pendingReqs { + err := g.sendHTTPRequest(req) + + status := "success" + if err != nil { + status = "error" + log.Printf("HTTP publish error: %v", err) + } + + gleanPublishTotal.WithLabelValues( + g.AppID, + req.documentType, + status, + ).Inc() + + // Track actual HTTP request completion (the real throughput) + gleanHTTPRequestsCompleted.WithLabelValues( + g.AppID, + req.documentType, + status, + ).Inc() + + // Notify caller if they're waiting + if req.doneCh != nil { + req.doneCh <- err + close(req.doneCh) + } + } +} + +// sendHTTPRequest performs the actual HTTP POST request +func (g *GleanEventsPublisher) sendHTTPRequest(req *httpRequest) error { + httpReq, err := http.NewRequestWithContext(g.ctx, "POST", req.url, bytes.NewReader(req.payload)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + httpReq.Header.Set("Content-Type", "application/json") + if req.userAgent != "" { + httpReq.Header.Set("User-Agent", req.userAgent) + } + + resp, err := g.client.Do(httpReq) + if err != nil { + return fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("HTTP request failed with status %d", resp.StatusCode) + } + + return nil +} + +// construct the Glean payload and publish via HTTP asynchronously +func (g *GleanEventsPublisher) publish( + documentType string, + requestInfo RequestInfo, + metrics metrics, + events []gleanEvent, +) error { + var telemetryPayload = pingPayload{ + ClientInfo: g.createClientInfo(), + PingInfo: createPingInfo(), + Metrics: metrics, + Events: events, + } + + // Marshal the inner payload + payloadJSON, err := json.Marshal(telemetryPayload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + // Generate document ID + documentID, err := uuid.NewRandom() + if err != nil { + return fmt.Errorf("failed to generate document ID: %w", err) + } + + // Construct URL following ingestion-edge format: + // POST /submit//// + url := fmt.Sprintf("%s/%s/%s/%s/%s", + g.endpoint, + g.AppID, + documentType, + "1", // document version + documentID.String(), + ) + + // Create HTTP request + req := &httpRequest{ + url: url, + payload: payloadJSON, + userAgent: requestInfo.UserAgent, + documentType: documentType, + doneCh: nil, // Non-blocking async mode + } + + // Send request to background workers (non-blocking if channel has space) + select { + case g.pendingReqs <- req: + // Successfully queued + return nil + case <-g.ctx.Done(): + // Context cancelled + return g.ctx.Err() + } +} + +// QueueDepth returns the current number of pending HTTP requests in the queue +func (g *GleanEventsPublisher) QueueDepth() int { + return len(g.pendingReqs) +} + +// Flush waits for all pending HTTP requests to complete +// Call this before shutdown to ensure no messages are lost +func (g *GleanEventsPublisher) Flush() { + log.Println("Flushing pending HTTP requests...") + + // Close channel to signal workers to finish + g.once.Do(func() { + close(g.pendingReqs) + }) + + // Wait for all workers to finish + g.wg.Wait() + log.Println("All pending HTTP requests flushed.") +} + +// Close performs graceful shutdown, flushing all pending messages +func (g *GleanEventsPublisher) Close() error { + g.Flush() + return nil +} +{% endif %} {# if any ping has an event metric, create methods and types for them #} {% if events %} @@ -188,7 +435,7 @@ func newGleanEvent(category, name string, extra map[string]string) gleanEvent { Category: category, Name: name, Timestamp: time.Now().UnixMilli(), - Extra: extra, + Extra: extra, } } {# each event has a type and method to create a gleanEvent #} @@ -250,7 +497,11 @@ type {{ ping|ping_type_name }} struct { } // Record and submit `{{ ping }}` ping +{% if transport == "http" %} +func (g *GleanEventsPublisher) Record{{ ping|ping_type_name }}( +{% else %} func (g GleanEventsLogger) Record{{ ping|ping_type_name }}( +{% endif %} requestInfo RequestInfo, params {{ ping|ping_type_name }}, ) error { @@ -276,11 +527,19 @@ func (g GleanEventsLogger) Record{{ ping|ping_type_name }}( events = append(events, params.Event.gleanEvent()) } {% endif %} +{% if transport == "http" %} + return g.publish("{{ ping }}", requestInfo, metrics, events) +{% else %} return g.record("{{ ping }}", requestInfo, metrics, events) +{% endif %} } // Record and submit `{{ ping }}` ping omitting user request info +{% if transport == "http" %} +func (g *GleanEventsPublisher) Record{{ ping|ping_type_name}}WithoutUserInfo( +{% else %} func (g GleanEventsLogger) Record{{ ping|ping_type_name}}WithoutUserInfo( +{% endif %} params {{ ping|ping_type_name}}, ) error { return g.Record{{ ping|ping_type_name }}(defaultRequestInfo, params) diff --git a/glean_parser/translate.py b/glean_parser/translate.py index 303ea15a6..0502dcd4f 100644 --- a/glean_parser/translate.py +++ b/glean_parser/translate.py @@ -56,7 +56,8 @@ def __init__( OUTPUTTERS = { - "go_server": Outputter(go_server.output_go, []), + "go_server": Outputter(go_server.output_go_logger, []), + "go_server_http": Outputter(go_server.output_go_http, []), "javascript": Outputter(javascript.output_javascript, []), "typescript": Outputter(javascript.output_typescript, []), "javascript_server": Outputter(javascript_server.output_javascript, []), diff --git a/tests/data/server_custom_ping_only_compare.go b/tests/data/server_custom_ping_only_compare.go index 0d0a095cd..ff2dce608 100644 --- a/tests/data/server_custom_ping_only_compare.go +++ b/tests/data/server_custom_ping_only_compare.go @@ -71,7 +71,6 @@ type ping struct { IpAddress string `json:"ip_address,omitempty"` Payload string `json:"payload"` } - type metrics map[string]map[string]interface{} type pingPayload struct { @@ -94,7 +93,6 @@ type logEnvelope struct { Type string Fields ping } - func (g GleanEventsLogger) createClientInfo() clientInfo { // Fields with default values are required in the Glean schema, but not used in server context return clientInfo{ @@ -184,7 +182,7 @@ func newGleanEvent(category, name string, extra map[string]string) gleanEvent { Category: category, Name: name, Timestamp: time.Now().UnixMilli(), - Extra: extra, + Extra: extra, } } diff --git a/tests/data/server_events_and_custom_ping_compare.go b/tests/data/server_events_and_custom_ping_compare.go index c194d3791..ffc8dcf7d 100644 --- a/tests/data/server_events_and_custom_ping_compare.go +++ b/tests/data/server_events_and_custom_ping_compare.go @@ -71,7 +71,6 @@ type ping struct { IpAddress string `json:"ip_address,omitempty"` Payload string `json:"payload"` } - type metrics map[string]map[string]interface{} type pingPayload struct { @@ -94,7 +93,6 @@ type logEnvelope struct { Type string Fields ping } - func (g GleanEventsLogger) createClientInfo() clientInfo { // Fields with default values are required in the Glean schema, but not used in server context return clientInfo{ @@ -184,7 +182,7 @@ func newGleanEvent(category, name string, extra map[string]string) gleanEvent { Category: category, Name: name, Timestamp: time.Now().UnixMilli(), - Extra: extra, + Extra: extra, } } diff --git a/tests/data/server_events_only_compare.go b/tests/data/server_events_only_compare.go index 5e0ad6d2b..b366ea483 100644 --- a/tests/data/server_events_only_compare.go +++ b/tests/data/server_events_only_compare.go @@ -71,7 +71,6 @@ type ping struct { IpAddress string `json:"ip_address,omitempty"` Payload string `json:"payload"` } - type metrics map[string]map[string]interface{} type pingPayload struct { @@ -94,7 +93,6 @@ type logEnvelope struct { Type string Fields ping } - func (g GleanEventsLogger) createClientInfo() clientInfo { // Fields with default values are required in the Glean schema, but not used in server context return clientInfo{ @@ -184,7 +182,7 @@ func newGleanEvent(category, name string, extra map[string]string) gleanEvent { Category: category, Name: name, Timestamp: time.Now().UnixMilli(), - Extra: extra, + Extra: extra, } } diff --git a/tests/data/server_events_only_http_compare.go b/tests/data/server_events_only_http_compare.go new file mode 100644 index 000000000..5aa536e68 --- /dev/null +++ b/tests/data/server_events_only_http_compare.go @@ -0,0 +1,392 @@ +package glean + +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// AUTOGENERATED BY {current_version}. DO NOT EDIT. + +// required imports +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Prometheus metrics for monitoring HTTP publishing +var ( + // Tracks API calls (RecordEventsPing calls - may queue if workers are slow) + gleanPublishTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "glean_http_publish_total", + Help: "Total number of Glean events queued for publishing (API calls)", + }, + []string{"app_id", "document_type", "status"}, + ) + + // Tracks actual HTTP requests completed (the real throughput metric) + gleanHTTPRequestsCompleted = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "glean_http_requests_completed", + Help: "Actual HTTP requests completed over the network", + }, + []string{"app_id", "document_type", "status"}, + ) +) + +// httpRequest tracks metadata for async HTTP operations +type httpRequest struct { + url string + payload []byte + userAgent string + documentType string + doneCh chan error +} + +// GleanEventsPublisher publishes Glean events via HTTP to ingestion endpoint +type GleanEventsPublisher struct { + AppID string // Application ID to identify application per Glean standards + AppDisplayVersion string // Version of application emitting the event + AppChannel string // Channel to differentiate logs from prod/beta/staging/devel + + endpoint string // HTTP endpoint URL (e.g., https://incoming.telemetry.mozilla.org/submit) + client *http.Client + ctx context.Context + pendingReqs chan *httpRequest + wg sync.WaitGroup + once sync.Once + maxConcurrency int // Number of concurrent HTTP workers +} + +// exported type for public method parameters +type RequestInfo struct { + UserAgent string + IpAddress string +} + +// default empty values will be omitted in json from ping struct definition +var defaultRequestInfo = RequestInfo{ + UserAgent: "", + IpAddress: "", +} + +// structs to construct the glean ping +type clientInfo struct { + TelemetrySDKBuild string `json:"telemetry_sdk_build"` + FirstRunDate string `json:"first_run_date"` + OS string `json:"os"` + OSVersion string `json:"os_version"` + Architecture string `json:"architecture"` + AppBuild string `json:"app_build"` + AppDisplayVersion string `json:"app_display_version"` + AppChannel string `json:"app_channel"` +} + +type pingInfo struct { + Seq int `json:"seq"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` +} + +type metrics map[string]map[string]interface{} + +type pingPayload struct { + ClientInfo clientInfo `json:"client_info"` + PingInfo pingInfo `json:"ping_info"` + Metrics metrics `json:"metrics"` + Events []gleanEvent `json:"events"` +} + +type gleanEvent struct { + Category string `json:"category"` + Name string `json:"name"` + Timestamp int64 `json:"timestamp"` + Extra map[string]string `json:"extra"` +} + +func (g *GleanEventsPublisher) createClientInfo() clientInfo { + // Fields with default values are required in the Glean schema, but not used in server context + return clientInfo{ + TelemetrySDKBuild: "{current_version}", + FirstRunDate: "Unknown", + OS: "Unknown", + OSVersion: "Unknown", + Architecture: "Unknown", + AppBuild: "Unknown", + AppDisplayVersion: g.AppDisplayVersion, + AppChannel: g.AppChannel, + } +} + +func createPingInfo() pingInfo { + now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + return pingInfo{ + Seq: 0, + StartTime: now, + EndTime: now, + } +} + +// NewGleanEventsPublisher creates a new HTTP-based Glean events publisher +func NewGleanEventsPublisher(ctx context.Context, endpoint, appID, appDisplayVersion, appChannel string, concurrency int) (*GleanEventsPublisher, error) { + if concurrency <= 0 { + concurrency = 10 // Default concurrency + } + + publisher := &GleanEventsPublisher{ + AppID: appID, + AppDisplayVersion: appDisplayVersion, + AppChannel: appChannel, + endpoint: endpoint, + ctx: ctx, + pendingReqs: make(chan *httpRequest, 10000), + maxConcurrency: concurrency, + client: &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + MaxIdleConns: 1000, + MaxIdleConnsPerHost: concurrency * 2, + IdleConnTimeout: 90 * time.Second, + DisableCompression: false, + }, + }, + } + + // Start background goroutines to process HTTP requests concurrently + for i := 0; i < concurrency; i++ { + publisher.wg.Add(1) + go publisher.processHTTPRequests() + } + + return publisher, nil +} + +// processHTTPRequests runs in background to handle HTTP requests asynchronously +func (g *GleanEventsPublisher) processHTTPRequests() { + defer g.wg.Done() + + for req := range g.pendingReqs { + err := g.sendHTTPRequest(req) + + status := "success" + if err != nil { + status = "error" + log.Printf("HTTP publish error: %v", err) + } + + gleanPublishTotal.WithLabelValues( + g.AppID, + req.documentType, + status, + ).Inc() + + // Track actual HTTP request completion (the real throughput) + gleanHTTPRequestsCompleted.WithLabelValues( + g.AppID, + req.documentType, + status, + ).Inc() + + // Notify caller if they're waiting + if req.doneCh != nil { + req.doneCh <- err + close(req.doneCh) + } + } +} + +// sendHTTPRequest performs the actual HTTP POST request +func (g *GleanEventsPublisher) sendHTTPRequest(req *httpRequest) error { + httpReq, err := http.NewRequestWithContext(g.ctx, "POST", req.url, bytes.NewReader(req.payload)) + if err != nil { + return fmt.Errorf("failed to create HTTP request: %w", err) + } + + httpReq.Header.Set("Content-Type", "application/json") + if req.userAgent != "" { + httpReq.Header.Set("User-Agent", req.userAgent) + } + + resp, err := g.client.Do(httpReq) + if err != nil { + return fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("HTTP request failed with status %d", resp.StatusCode) + } + + return nil +} + +// construct the Glean payload and publish via HTTP asynchronously +func (g *GleanEventsPublisher) publish( + documentType string, + requestInfo RequestInfo, + metrics metrics, + events []gleanEvent, +) error { + var telemetryPayload = pingPayload{ + ClientInfo: g.createClientInfo(), + PingInfo: createPingInfo(), + Metrics: metrics, + Events: events, + } + + // Marshal the inner payload + payloadJSON, err := json.Marshal(telemetryPayload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + // Generate document ID + documentID, err := uuid.NewRandom() + if err != nil { + return fmt.Errorf("failed to generate document ID: %w", err) + } + + // Construct URL following ingestion-edge format: + // POST /submit//// + url := fmt.Sprintf("%s/%s/%s/%s/%s", + g.endpoint, + g.AppID, + documentType, + "1", // document version + documentID.String(), + ) + + // Create HTTP request + req := &httpRequest{ + url: url, + payload: payloadJSON, + userAgent: requestInfo.UserAgent, + documentType: documentType, + doneCh: nil, // Non-blocking async mode + } + + // Send request to background workers (non-blocking if channel has space) + select { + case g.pendingReqs <- req: + // Successfully queued + return nil + case <-g.ctx.Done(): + // Context cancelled + return g.ctx.Err() + } +} + +// QueueDepth returns the current number of pending HTTP requests in the queue +func (g *GleanEventsPublisher) QueueDepth() int { + return len(g.pendingReqs) +} + +// Flush waits for all pending HTTP requests to complete +// Call this before shutdown to ensure no messages are lost +func (g *GleanEventsPublisher) Flush() { + log.Println("Flushing pending HTTP requests...") + + // Close channel to signal workers to finish + g.once.Do(func() { + close(g.pendingReqs) + }) + + // Wait for all workers to finish + g.wg.Wait() + log.Println("All pending HTTP requests flushed.") +} + +// Close performs graceful shutdown, flushing all pending messages +func (g *GleanEventsPublisher) Close() error { + g.Flush() + return nil +} + +func newGleanEvent(category, name string, extra map[string]string) gleanEvent { + return gleanEvent{ + Category: category, + Name: name, + Timestamp: time.Now().UnixMilli(), + Extra: extra, + } +} + +type BackendTestEventEvent struct { + EventFieldString string // A string extra field + EventFieldQuantity int64 // A quantity extra field + EventFieldBool bool // A boolean extra field +} + +func (e BackendTestEventEvent) gleanEvent() gleanEvent { + return newGleanEvent( + "backend", + "test_event", + map[string]string{ + "event_field_string": e.EventFieldString, + "event_field_quantity": fmt.Sprintf("%d", e.EventFieldQuantity), + "event_field_bool": fmt.Sprintf("%t", e.EventFieldBool), + }, + ) +} + +type EventsPingEvent interface { + isEventsPingEvent() + gleanEvent() gleanEvent +} + +func (e BackendTestEventEvent) isEventsPingEvent() {} + +type EventsPing struct { + MetricName string // Test string metric + MetricRequestBool bool // boolean + MetricRequestCount int64 // Test quantity metric + MetricRequestDatetime time.Time // Test datetime metric + MetricRequestStringList []string // Test string_list metric + Event EventsPingEvent // valid event for this ping +} + +// Record and submit `events` ping +func (g *GleanEventsPublisher) RecordEventsPing( + requestInfo RequestInfo, + params EventsPing, +) error { + metrics := metrics{ + "string": { + "metric.name": params.MetricName, + }, + "boolean": { + "metric.request_bool": params.MetricRequestBool, + }, + "quantity": { + "metric.request_count": params.MetricRequestCount, + }, + "datetime": { + "metric.request_datetime": params.MetricRequestDatetime.Format("2006-01-02T15:04:05.000Z"), + }, + "string_list": { + "metric.request_string_list": params.MetricRequestStringList, + }, + } + + events := []gleanEvent{} + if params.Event != nil { + events = append(events, params.Event.gleanEvent()) + } + return g.publish("events", requestInfo, metrics, events) +} + +// Record and submit `events` ping omitting user request info +func (g *GleanEventsPublisher) RecordEventsPingWithoutUserInfo( + params EventsPing, +) error { + return g.RecordEventsPing(defaultRequestInfo, params) +} diff --git a/tests/test-go/test_http.go.tmpl b/tests/test-go/test_http.go.tmpl new file mode 100644 index 000000000..e9f1e914b --- /dev/null +++ b/tests/test-go/test_http.go.tmpl @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "fmt" + "glean/glean" + "io" + "net/http" + "net/http/httptest" + "time" + /* IMPORTS */ +) + +var receivedPayload string + +func main() { + // Create mock HTTP server to receive pings + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + receivedPayload = string(body) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + ctx := context.Background() + publisher, err := glean.NewGleanEventsPublisher( + ctx, + server.URL+"/submit", + "glean.test", + "0.0.1", + "nightly", + 10, + ) + if err != nil { + panic(err) + } + defer publisher.Close() + + /* CODE */ + + // Flush all pending requests + publisher.Flush() + + // Print the received payload for validation + fmt.Print(receivedPayload) +} diff --git a/tests/test_go_server.py b/tests/test_go_server.py index 8790ff4ea..984727f99 100644 --- a/tests/test_go_server.py +++ b/tests/test_go_server.py @@ -400,3 +400,109 @@ def test_run_logging_custom_ping_with_event(tmp_path): assert validate_ping.validate_ping(input, output, schema_url=schema_url) == 0, ( output.getvalue() ) + + +def test_parser_go_server_http_events_only(tmp_path): + """Test that parser works for HTTP transport with events ping""" + translate.translate( + ROOT / "data" / "go_server_events_only_metrics.yaml", + "go_server_http", + tmp_path, + ) + + assert set(x.name for x in tmp_path.iterdir()) == set(["server_events.go"]) + + # Make sure generated file matches expected + with (tmp_path / "server_events.go").open("r", encoding="utf-8") as fd: + content = fd.read() + with (ROOT / "data" / "server_events_only_http_compare.go").open( + "r", encoding="utf-8" + ) as cd: + compare_raw = cd.read() + + glean_version = f"glean_parser v{glean_parser.__version__}" + # use replace instead of format since Go uses { } + compare = compare_raw.replace("{current_version}", glean_version) + assert content == compare + + +def run_http_publisher(code_dir, code, imports=""): + """ + Run the Go HTTP publisher and capture the received payload. + """ + + tmpl_code = "" + with open(ROOT / "test-go" / "test_http.go.tmpl", "r") as fp: + tmpl_code = fp.read() + + tmpl_code = tmpl_code.replace("/* CODE */", code).replace("/* IMPORTS */", imports) + + with open(code_dir / "test.go", "w") as fp: + fp.write(tmpl_code) + + subprocess.call(["go", "mod", "init", "glean"], cwd=code_dir) + subprocess.call(["go", "mod", "tidy"], cwd=code_dir) + + return subprocess.check_output(["go", "run", "test.go"], cwd=code_dir).decode( + "utf-8" + ) + + +@pytest.mark.go_dependency +def test_run_http_events_ping(tmp_path): + glean_module_path = tmp_path / "glean" + + translate.translate( + [ + ROOT / "data" / "go_server_events_only_metrics.yaml", + ], + "go_server_http", + glean_module_path, + ) + + code = """ + publisher.RecordEventsPing( + glean.RequestInfo{ + UserAgent: "glean-test/1.0", + IpAddress: "127.0.0.1", + }, + glean.EventsPing{ + MetricName: "string value", + MetricRequestBool: true, + MetricRequestCount: 10, + MetricRequestDatetime: time.Now(), + MetricRequestStringList: []string{"list", "of", "strings"}, + Event: glean.BackendTestEventEvent{ + EventFieldString: "event extra string value", + EventFieldQuantity: 100, + EventFieldBool: false, + }, + }, + ) + """ + + received_output = run_http_publisher(tmp_path, code) + received_payload = json.loads(received_output.strip()) + + # Verify basic structure of received payload + assert "0.0.1" == received_payload["client_info"]["app_display_version"] + assert "nightly" == received_payload["client_info"]["app_channel"] + assert "seq" in received_payload["ping_info"] + assert "metrics" in received_payload + assert "events" in received_payload + assert len(received_payload["events"]) == 1 + assert received_payload["events"][0]["category"] == "backend" + assert received_payload["events"][0]["name"] == "test_event" + + # Validate against Glean schema + schema_url = ( + "https://raw.githubusercontent.com/mozilla-services/" + "mozilla-pipeline-schemas/main/" + "schemas/glean/glean/glean.1.schema.json" + ) + + input = io.StringIO(received_output.strip()) + output = io.StringIO() + assert validate_ping.validate_ping(input, output, schema_url=schema_url) == 0, ( + output.getvalue() + )