diff --git a/.tekton/generate-test-files.js b/.tekton/generate-test-files.js index 034e63ef4e..290fe816f7 100644 --- a/.tekton/generate-test-files.js +++ b/.tekton/generate-test-files.js @@ -79,7 +79,7 @@ const groups = { subname: 'test:ci:tracing:protocols' }, 'test:ci:collector:tracing:general': { - sidecars: ['postgres', 'oracledb'], + sidecars: ['postgres', 'oracledb', 'zookeeper', 'kafka', 'kafka-topics', 'schema-registry'], condition: ' && true' }, 'test:ci:collector:tracing:misc': { diff --git a/.tekton/tasks/test-groups/collector-tracing-general-task.yaml b/.tekton/tasks/test-groups/collector-tracing-general-task.yaml index 7986e28fd6..7f13a753a1 100644 --- a/.tekton/tasks/test-groups/collector-tracing-general-task.yaml +++ b/.tekton/tasks/test-groups/collector-tracing-general-task.yaml @@ -33,6 +33,71 @@ spec: value: "teamnodejs" - name: "APP_USER_PASSWORD" value: "teamnodejspassword" + - name: zookeeper + image: confluentinc/cp-zookeeper:7.9.1 + imagePullPolicy: IfNotPresent + env: + - name: "ZOOKEEPER_CLIENT_PORT" + value: "2181" + readinessProbe: + tcpSocket: + port: 2181 + initialDelaySeconds: 5 + periodSeconds: 2 + timeoutSeconds: 30 + - name: kafka + image: confluentinc/cp-kafka:7.9.1 + imagePullPolicy: IfNotPresent + env: + - name: "KAFKA_BROKER_ID" + value: "1" + - name: "KAFKA_ZOOKEEPER_CONNECT" + value: "localhost:2181" + - name: "KAFKA_ADVERTISED_LISTENERS" + value: "PLAINTEXT://127.0.0.1:29092,EXTERNAL://localhost:9092" + - name: "KAFKA_LISTENERS" + value: "EXTERNAL://:9092,PLAINTEXT://:29092" + - name: "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" + value: "1" + - name: "KAFKA_DEFAULT_REPLICATION_FACTOR" + value: "1" + - name: "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" + value: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,EXTERNAL:PLAINTEXT" + readinessProbe: + tcpSocket: + port: 9092 + initialDelaySeconds: 90 + periodSeconds: 2 + timeoutSeconds: 60 + - name: kafka-topics + image: confluentinc/cp-kafka:7.9.1 + imagePullPolicy: IfNotPresent + args: + - "-lc" + - "kafka_server=\"0.0.0.0:9092\"; kafka_topics=\"test test-topic test-batch-topic rdkafka-topic\"; until kafka-topics --bootstrap-server \"$kafka_server\" --list >/dev/null 2>&1; do echo 'waiting for Kafka...'; sleep 2; done; for t in $kafka_topics; do kafka-topics --bootstrap-server \"$kafka_server\" --create --if-not-exists --topic \"$t\"; done; echo 'All topics created.'" + command: + - "/bin/bash" + - name: schema-registry + image: confluentinc/cp-schema-registry:7.9.0 + imagePullPolicy: IfNotPresent + env: + - name: "SCHEMA_REGISTRY_HOST_NAME" + value: "schema-registry" + - name: "SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL" + value: "0.0.0.0:2181" + - name: "SCHEMA_REGISTRY_LISTENERS" + value: "http://0.0.0.0:8081" + - name: "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS" + value: "PLAINTEXT://0.0.0.0:29092" + - name: "SCHEMA_REGISTRY_DEBUG" + value: "true" + readinessProbe: + httpGet: + path: / + port: 8081 + initialDelaySeconds: 5 + periodSeconds: 2 + timeoutSeconds: 60 envFrom: - configMapRef: name: environment-properties diff --git a/currencies.json b/currencies.json index 7cdab99d28..ebac92615b 100644 --- a/currencies.json +++ b/currencies.json @@ -867,5 +867,16 @@ "ignoreUpdates": false, "note": "", "core": false + }, + { + "name": "@confluentinc/kafka-javascript", + "policy": "45-days", + "lastSupportedVersion": "", + "latestVersion": "", + "cloudNative": false, + "isBeta": false, + "ignoreUpdates": false, + "note": "", + "core": false } ] diff --git a/docker-compose-base.yaml b/docker-compose-base.yaml index 1e03cdc946..3d528377e2 100644 --- a/docker-compose-base.yaml +++ b/docker-compose-base.yaml @@ -122,7 +122,7 @@ services: set -euo pipefail kafka_server="kafka:29092" - kafka_topics="test test-topic test-batch-topic rdkafka-topic" + kafka_topics="test test-topic test-batch-topic rdkafka-topic confluent-kafka-topic" echo "Waiting for Kafka to be ready on $$kafka_server..." diff --git a/package-lock.json b/package-lock.json index 0b67d1afa6..a247c4aa7e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -32,6 +32,7 @@ "@azure/storage-blob-v12.27.0": "npm:@azure/storage-blob@12.27.0", "@commitlint/cli": "14.1.0", "@commitlint/config-conventional": "14.1.0", + "@confluentinc/kafka-javascript": "1.7.0", "@elastic/ecs-pino-format": "^1.5.0", "@elastic/elasticsearch": "9.2.0", "@elastic/elasticsearch-v7.17.0": "npm:@elastic/elasticsearch@7.17.0", @@ -11278,6 +11279,27 @@ "node": ">=v12" } }, + "node_modules/@confluentinc/kafka-javascript": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/@confluentinc/kafka-javascript/-/kafka-javascript-1.7.0.tgz", + "integrity": "sha512-nmb7+TYjokCFOX2qUgSqhSW7dCy6/raYYneqR/dLDQwUPak4yuRLGfrvGHTvoH0VC4BUUiRu8niJS//5c3SC/A==", + "dev": true, + "hasInstallScript": true, + "license": "MIT", + "workspaces": [ + ".", + "schemaregistry", + "schemaregistry-examples" + ], + "dependencies": { + "@mapbox/node-pre-gyp": "^1.0.11", + "bindings": "^1.3.1", + "nan": "^2.22.0" + }, + "engines": { + "node": ">=18.0.0" + } + }, "node_modules/@couchbase/couchbase-darwin-arm64-napi": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/@couchbase/couchbase-darwin-arm64-napi/-/couchbase-darwin-arm64-napi-4.6.0.tgz", @@ -11402,6 +11424,31 @@ "kuler": "^2.0.0" } }, + "node_modules/@drazke/instrumentation-confluent-kafka-javascript": { + "version": "1.0.1", + "resolved": "https://github.com/kirrg001/instrumentation-confluent-kafka-javascript/releases/download/1.0.1/drazke-instrumentation-confluent-kafka-javascript-1.0.1.tgz", + "integrity": "sha512-82ALolwp/F4RKqnGgHHu7B4seZC78Sib53v23rdJIVj/b5PH99gdppLYKu2BxROBZDzrq7fldsYVXFcwje+biw==", + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/instrumentation": "^0.207.0", + "@opentelemetry/semantic-conventions": "^1.34.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": "^1.3.0" + } + }, + "node_modules/@drazke/instrumentation-confluent-kafka-javascript/node_modules/@opentelemetry/semantic-conventions": { + "version": "1.38.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.38.0.tgz", + "integrity": "sha512-kocjix+/sSggfJhwXqClZ3i9Y/MI0fp7b+g7kCRm6psy2dsf8uApTRclwG18h8Avm7C9+fnt+O36PspJ/OzoWg==", + "license": "Apache-2.0", + "engines": { + "node": ">=14" + } + }, "node_modules/@elastic/ecs-helpers": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/@elastic/ecs-helpers/-/ecs-helpers-2.1.1.tgz", @@ -13661,6 +13708,243 @@ "node": ">=12" } }, + "node_modules/@mapbox/node-pre-gyp": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@mapbox/node-pre-gyp/-/node-pre-gyp-1.0.11.tgz", + "integrity": "sha512-Yhlar6v9WQgUp/He7BdgzOz8lqMQ8sU+jkCq7Wx8Myc5YFJLbEe7lgui/V7G1qB1DJykHSGwreceSaD60Y0PUQ==", + "dev": true, + "license": "BSD-3-Clause", + "dependencies": { + "detect-libc": "^2.0.0", + "https-proxy-agent": "^5.0.0", + "make-dir": "^3.1.0", + "node-fetch": "^2.6.7", + "nopt": "^5.0.0", + "npmlog": "^5.0.1", + "rimraf": "^3.0.2", + "semver": "^7.3.5", + "tar": "^6.1.11" + }, + "bin": { + "node-pre-gyp": "bin/node-pre-gyp" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/abbrev": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz", + "integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==", + "dev": true, + "license": "ISC" + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/are-we-there-yet": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/are-we-there-yet/-/are-we-there-yet-2.0.0.tgz", + "integrity": "sha512-Ci/qENmwHnsYo9xKIcUJN5LeDKdJ6R1Z1j9V/J5wyq8nh/mYPEpIKJbBZXtZjG04HiK7zV/p6Vs9952MrMeUIw==", + "deprecated": "This package is no longer supported.", + "dev": true, + "license": "ISC", + "dependencies": { + "delegates": "^1.0.0", + "readable-stream": "^3.6.0" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/detect-libc": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-2.1.2.tgz", + "integrity": "sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=8" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/gauge": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/gauge/-/gauge-3.0.2.tgz", + "integrity": "sha512-+5J6MS/5XksCuXq++uFRsnUd7Ovu1XenbeuIuNRJxYWjgQbPuFhT14lAvsWfqfAmnwluf1OwMjz39HjfLPci0Q==", + "deprecated": "This package is no longer supported.", + "dev": true, + "license": "ISC", + "dependencies": { + "aproba": "^1.0.3 || ^2.0.0", + "color-support": "^1.1.2", + "console-control-strings": "^1.0.0", + "has-unicode": "^2.0.1", + "object-assign": "^4.1.1", + "signal-exit": "^3.0.0", + "string-width": "^4.2.3", + "strip-ansi": "^6.0.1", + "wide-align": "^1.1.2" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/https-proxy-agent": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz", + "integrity": "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==", + "dev": true, + "license": "MIT", + "dependencies": { + "agent-base": "6", + "debug": "4" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/make-dir": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz", + "integrity": "sha512-g3FeP20LNwhALb/6Cz6Dd4F2ngze0jz7tbzrD2wAV+o9FeNHe4rL+yK2md0J/fiSf1sa1ADhXqi5+oVwOM/eGw==", + "dev": true, + "license": "MIT", + "dependencies": { + "semver": "^6.0.0" + }, + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/make-dir/node_modules/semver": { + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", + "dev": true, + "license": "ISC", + "bin": { + "semver": "bin/semver.js" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/node-fetch": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", + "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", + "dev": true, + "license": "MIT", + "dependencies": { + "whatwg-url": "^5.0.0" + }, + "engines": { + "node": "4.x || >=6.0.0" + }, + "peerDependencies": { + "encoding": "^0.1.0" + }, + "peerDependenciesMeta": { + "encoding": { + "optional": true + } + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/nopt": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/nopt/-/nopt-5.0.0.tgz", + "integrity": "sha512-Tbj67rffqceeLpcRXrT7vKAN8CwfPeIBgM7E6iBkmKLV7bEMwpGgYLGv0jACUsECaa/vuxP0IjEont6umdMgtQ==", + "dev": true, + "license": "ISC", + "dependencies": { + "abbrev": "1" + }, + "bin": { + "nopt": "bin/nopt.js" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/npmlog": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/npmlog/-/npmlog-5.0.1.tgz", + "integrity": "sha512-AqZtDUWOMKs1G/8lwylVjrdYgqA4d9nu8hc+0gzRxlDb1I10+FHBGMXs6aiQHFdCUUlqH99MUMuLfzWDNDtfxw==", + "deprecated": "This package is no longer supported.", + "dev": true, + "license": "ISC", + "dependencies": { + "are-we-there-yet": "^2.0.0", + "console-control-strings": "^1.1.0", + "gauge": "^3.0.0", + "set-blocking": "^2.0.0" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dev": true, + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/safe-buffer": { + "version": "5.2.1", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", + "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", + "dev": true, + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/feross" + }, + { + "type": "patreon", + "url": "https://www.patreon.com/feross" + }, + { + "type": "consulting", + "url": "https://feross.org/support" + } + ], + "license": "MIT" + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "dev": true, + "license": "MIT", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/tr46": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/tr46/-/tr46-0.0.3.tgz", + "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==", + "dev": true, + "license": "MIT" + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/webidl-conversions": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", + "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==", + "dev": true, + "license": "BSD-2-Clause" + }, + "node_modules/@mapbox/node-pre-gyp/node_modules/whatwg-url": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz", + "integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==", + "dev": true, + "license": "MIT", + "dependencies": { + "tr46": "~0.0.3", + "webidl-conversions": "^3.0.0" + } + }, "node_modules/@mongodb-js/saslprep": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/@mongodb-js/saslprep/-/saslprep-1.3.0.tgz", @@ -22067,7 +22351,7 @@ "version": "1.5.0", "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz", "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==", - "optional": true, + "devOptional": true, "dependencies": { "file-uri-to-path": "1.0.0" } @@ -29530,7 +29814,7 @@ "version": "1.0.0", "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz", "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==", - "optional": true + "devOptional": true }, "node_modules/filelist": { "version": "1.0.4", @@ -48197,6 +48481,7 @@ "version": "5.0.2", "license": "MIT", "dependencies": { + "@drazke/instrumentation-confluent-kafka-javascript": "https://github.com/kirrg001/instrumentation-confluent-kafka-javascript/releases/download/1.0.1/drazke-instrumentation-confluent-kafka-javascript-1.0.1.tgz", "@opentelemetry/api": ">=1.3.0 <1.10.0", "@opentelemetry/context-async-hooks": "1.25.0", "@opentelemetry/instrumentation-fs": "0.28.0", diff --git a/package.json b/package.json index 886d263b4a..11fb9cca80 100644 --- a/package.json +++ b/package.json @@ -99,6 +99,7 @@ "@azure/storage-blob-v12.27.0": "npm:@azure/storage-blob@12.27.0", "@commitlint/cli": "14.1.0", "@commitlint/config-conventional": "14.1.0", + "@confluentinc/kafka-javascript": "1.7.0", "@elastic/ecs-pino-format": "^1.5.0", "@elastic/elasticsearch": "9.2.0", "@elastic/elasticsearch-v7.17.0": "npm:@elastic/elasticsearch@7.17.0", diff --git a/packages/collector/test/tracing/opentelemetry/confluent-kafka-consumer-app.js b/packages/collector/test/tracing/opentelemetry/confluent-kafka-consumer-app.js new file mode 100644 index 0000000000..676f3d90e4 --- /dev/null +++ b/packages/collector/test/tracing/opentelemetry/confluent-kafka-consumer-app.js @@ -0,0 +1,143 @@ +/* + * (c) Copyright IBM Corp. 2025 + */ + +'use strict'; + +process.on('SIGTERM', () => { + process.disconnect(); + process.exit(0); +}); + +const instana = require('@instana/collector')(); +const express = require('express'); +const bodyParser = require('body-parser'); +const fetch = require('node-fetch-v2'); +const delay = require('../../../../core/test/test_util/delay'); +const port = require('../../test_util/app-port')(); +const logPrefix = `Confluent Kafka Consumer App (${process.pid}):\t`; +const KafkaLib = require('@confluentinc/kafka-javascript'); + +const app = express(); +app.use(bodyParser.json()); + +const broker = process.env.KAFKA; +const topic = process.env.CONFLUENT_KAFKA_TOPIC; +const clientType = (process.env.KAFKA_CLIENT_TYPE || 'kafkajs').toLowerCase(); + +let consumer; +let connected = false; + +async function setupConsumer() { + if (!KafkaLib) return; + + let KafkaClient; + let useRdKafka = false; + + if (clientType === 'rdkafka') { + KafkaClient = KafkaLib.KafkaConsumer || KafkaLib.RdKafka.KafkaConsumer; + useRdKafka = true; + log(`Using ${clientType} (node-rdkafka) client.`); + } else { + KafkaClient = KafkaLib.KafkaJS.Kafka || KafkaLib.Kafka; + log(`Using ${clientType} (kafkajs) client.`); + } + + if (!KafkaClient) { + log(`Error: Kafka client for type ${clientType} not found in KafkaLib.`); + return; + } + + try { + if (useRdKafka) { + consumer = new KafkaClient({ + 'group.id': `confluent-consumer-${process.pid}`, + 'metadata.broker.list': broker + }); + + consumer.connect(); + + consumer.on('ready', () => { + log('RdKafka Consumer ready.'); + consumer.subscribe([topic]); + consumer.consume(); + connected = true; + }); + + consumer.on('data', async msg => { + const span = instana.currentSpan(); + span && span.disableAutoEnd && span.disableAutoEnd(); + + const value = msg.value && msg.value.toString(); + log('Consumed (data) message', msg.topic, value); + + await delay(50); + await fetch(`http://127.0.0.1:${process.env.INSTANA_AGENT_PORT}/ping`); + span && span.end && span.end(); + }); + + consumer.on('event.error', err => { + log('RdKafka Consumer error:', err.message); + }); + } else { + const kafka = new KafkaClient({ + kafkaJS: { + clientId: 'confluent-consumer', + brokers: [broker], + groupId: `confluent-consumer-${process.pid}` + } + }); + + consumer = kafka.consumer(); + + await consumer.connect(); + log('Consumer connected, subscribing to topic:', topic); + await consumer.subscribe({ topic }); + log('Consumer subscribed to topic:', topic); + + log('Starting consumer.run()...'); + await consumer.run({ + eachMessage: async ({ topic: t, message }) => { + log('eachMessage callback called for topic:', t); + const span = instana.currentSpan(); + span && span.disableAutoEnd && span.disableAutoEnd(); + + const value = message && message.value ? message.value.toString() : undefined; + log('Consumed (kafkajs) message', t, value); + + await delay(50); + await fetch(`http://127.0.0.1:${process.env.INSTANA_AGENT_PORT}/ping`); + span && span.end && span.end(); + } + }); + + setTimeout(() => { + log('KafkaJS Consumer ready.'); + connected = true; + }, 2 * 1000); + } + } catch (e) { + log('Consumer setup error', e && e.message); + } +} + +app.get('/', (_req, res) => { + if (connected) res.send('OK'); + else res.sendStatus(503); +}); + +app.get('/messages', (_req, res) => { + res.sendStatus(200); +}); + +app.listen(port, () => { + log(`Listening on port: ${port}`); + setupConsumer().catch(() => {}); +}); + +function log() { + const args = Array.prototype.slice.call(arguments); + args[0] = logPrefix + args[0]; + // eslint-disable-next-line no-console + console.log.apply(console, args); +} diff --git a/packages/collector/test/tracing/opentelemetry/confluent-kafka-consumer-app.mjs b/packages/collector/test/tracing/opentelemetry/confluent-kafka-consumer-app.mjs new file mode 100644 index 0000000000..79e9e8bfa2 --- /dev/null +++ b/packages/collector/test/tracing/opentelemetry/confluent-kafka-consumer-app.mjs @@ -0,0 +1,146 @@ +/* + * (c) Copyright IBM Corp. 2025 + */ + +'use strict'; + +process.on('SIGTERM', () => { + process.disconnect(); + process.exit(0); +}); + +import instana from '../../../src/index.js'; +import express from 'express'; +import bodyParser from 'body-parser'; +import fetch from 'node-fetch-v2'; +import delay from '../../../../core/test/test_util/delay.js'; +import getAppPort from '../../test_util/app-port.js'; +const port = getAppPort(); +const logPrefix = `Confluent Kafka Consumer App (${process.pid}):\t`; + +console.log('WTF confluent-kafka consumer app!'); +import KafkaLib from '@confluentinc/kafka-javascript'; + +const app = express(); +app.use(bodyParser.json()); + +const broker = process.env.KAFKA; +const topic = process.env.CONFLUENT_KAFKA_TOPIC; +const clientType = (process.env.KAFKA_CLIENT_TYPE || 'kafkajs').toLowerCase(); + +let consumer; +let connected = false; + +async function setupConsumer() { + if (!KafkaLib) return; + + let KafkaClient; + let useRdKafka = false; + + if (clientType === 'rdkafka') { + KafkaClient = KafkaLib.KafkaConsumer || KafkaLib.RdKafka.KafkaConsumer; + useRdKafka = true; + log(`Using ${clientType} (node-rdkafka) client.`); + } else { + KafkaClient = KafkaLib.KafkaJS.Kafka || KafkaLib.Kafka; + log(`Using ${clientType} (kafkajs) client.`); + } + + if (!KafkaClient) { + log(`Error: Kafka client for type ${clientType} not found in KafkaLib.`); + return; + } + + try { + if (useRdKafka) { + consumer = new KafkaClient({ + 'group.id': `confluent-consumer-${process.pid}`, + 'metadata.broker.list': broker + }); + + consumer.connect(); + + consumer.on('ready', () => { + log('RdKafka Consumer ready.'); + consumer.subscribe([topic]); + consumer.consume(); + connected = true; + }); + + consumer.on('data', async msg => { + const span = instana.currentSpan(); + span && span.disableAutoEnd && span.disableAutoEnd(); + + const value = msg.value && msg.value.toString(); + log('Consumed (data) message', msg.topic, value); + + await delay(50); + await fetch(`http://127.0.0.1:${process.env.INSTANA_AGENT_PORT}/ping`); + span && span.end && span.end(); + }); + + consumer.on('event.error', err => { + log('RdKafka Consumer error:', err.message); + }); + } else { + const kafka = new KafkaClient({ + kafkaJS: { + clientId: 'confluent-consumer', + brokers: [broker], + groupId: `confluent-consumer-${process.pid}` + } + }); + + consumer = kafka.consumer(); + + await consumer.connect(); + log('Consumer connected, subscribing to topic:', topic); + await consumer.subscribe({ topic }); + log('Consumer subscribed to topic:', topic); + + log('Starting consumer.run()...'); + await consumer.run({ + eachMessage: async ({ topic: t, message }) => { + log('eachMessage callback called for topic:', t); + const span = instana.currentSpan(); + span && span.disableAutoEnd && span.disableAutoEnd(); + + const value = message && message.value ? message.value.toString() : undefined; + log('Consumed (kafkajs) message', t, value); + + await delay(50); + await fetch(`http://127.0.0.1:${process.env.INSTANA_AGENT_PORT}/ping`); + span && span.end && span.end(); + } + }); + + setTimeout(() => { + log('KafkaJS Consumer ready.'); + connected = true; + }, 2 * 1000); + } + } catch (e) { + log('Consumer setup error', e && e.message); + } +} + +app.get('/', (_req, res) => { + if (connected) res.send('OK'); + else res.sendStatus(503); +}); + +app.get('/messages', (_req, res) => { + res.sendStatus(200); +}); + +app.listen(port, () => { + log(`Listening on port: ${port}`); + setupConsumer().catch(() => {}); +}); + +function log() { + const args = Array.prototype.slice.call(arguments); + args[0] = logPrefix + args[0]; + // eslint-disable-next-line no-console + console.log.apply(console, args); +} diff --git a/packages/collector/test/tracing/opentelemetry/confluent-kafka-producer-app.js b/packages/collector/test/tracing/opentelemetry/confluent-kafka-producer-app.js new file mode 100644 index 0000000000..be21046969 --- /dev/null +++ b/packages/collector/test/tracing/opentelemetry/confluent-kafka-producer-app.js @@ -0,0 +1,152 @@ +/* + * (c) Copyright IBM Corp. 2025 + */ + +'use strict'; + +// NOTE: c8 bug https://github.com/bcoe/c8/issues/166 +process.on('SIGTERM', () => { + process.disconnect(); + process.exit(0); +}); + +require('@instana/collector')(); +const express = require('express'); +const path = require('path'); +const bodyParser = require('body-parser'); +const port = require('../../test_util/app-port')(); +const logPrefix = `Confluent Kafka Producer App (${process.pid}):\t`; +const KafkaLib = require('@confluentinc/kafka-javascript'); + +const confluentKafkaPath = require.resolve('@confluentinc/kafka-javascript'); +const expectedLocalPath = path.resolve(__dirname, 'node_modules', '@confluentinc/kafka-javascript'); +if (!confluentKafkaPath.includes(expectedLocalPath)) { + throw new Error( + // eslint-disable-next-line max-len + `@confluentinc/kafka-javascript must be loaded from local node_modules. Expected path containing: ${expectedLocalPath}, but got: ${confluentKafkaPath}` + ); +} + +const app = express(); +app.use(bodyParser.json()); + +const broker = process.env.KAFKA; +const topic = process.env.CONFLUENT_KAFKA_TOPIC; +// Umgebungsvariable zur Auswahl des Client-Typs +const clientType = (process.env.KAFKA_CLIENT_TYPE || 'kafkajs').toLowerCase(); + +let producer; +let connected = false; +let useRdKafka = false; + +async function setupProducer() { + if (!KafkaLib) return; + + let KafkaClient; + + if (clientType === 'rdkafka') { + // Wenn 'rdkafka' gewählt ist, verwenden wir Producer (RdKafka) + KafkaClient = KafkaLib.Producer || KafkaLib.RdKafka.Producer; + useRdKafka = true; + log(`Using ${clientType} (node-rdkafka) client.`); + } else { + // Standardmäßig oder bei 'kafkajs' verwenden wir KafkaJS.Kafka + KafkaClient = KafkaLib.KafkaJS.Kafka || KafkaLib.Kafka; + log(`Using ${clientType} (kafkajs) client.`); + } + + if (!KafkaClient) { + log(`Error: Kafka client for type ${clientType} not found in KafkaLib.`); + return; + } + + try { + if (useRdKafka) { + producer = new KafkaClient({ + 'metadata.broker.list': broker + }); + + producer.connect(); + + producer.on('ready', () => { + log('RdKafka Producer ready.'); + connected = true; + }); + + producer.on('event.error', err => { + log('RdKafka Producer error:', err.message); + }); + } else { + // kafkajs Initialisierung + const kafka = new KafkaClient({ + 'bootstrap.servers': broker, + kafkaJS: { clientId: 'confluent-producer', brokers: [broker] } + }); + producer = kafka.producer(); + await producer.connect(); + log('KafkaJS Producer ready.'); + connected = true; + } + } catch (e) { + log('Producer setup error', e && e.message); + } +} + +app.get('/', (_req, res) => { + if (connected) res.send('OK'); + else res.sendStatus(503); +}); + +app.get('/produce', async (req, res) => { + log('/produce'); + + const message = `${Date.now()}-${process.pid}`; + if (!producer) { + return res.status(500).send({ error: 'producer not available' }); + } + + try { + if (useRdKafka) { + // RdKafka verwendet producer.produce + await producer.produce( + topic, + null, // Partition (null = zufällig) + Buffer.from(message), // Nachricht als Buffer + null, // Key + Date.now() // Timestamp + ); + + log('RdKafka message produced.'); + + // RdKafka benötigt flush(), um die Nachricht sofort zu senden + await new Promise((resolve, reject) => { + producer.flush(1000, err => { + if (err) return reject(err); + resolve(); + }); + }); + } else { + // kafkajs verwendet producer.send + await producer.send({ topic, messages: [{ value: message }] }); + log('KafkaJS message produced.'); + } + + // allow small time for delivery (nur für den Testfluss notwendig) + await new Promise(resolve => setTimeout(resolve, 200)); + res.send({ produced: true, message }); + } catch (e) { + res.status(500).send({ error: e.message }); + } +}); + +app.listen(port, () => { + log(`Listening on port: ${port}`); + setupProducer().catch(() => {}); +}); + +function log() { + const args = Array.prototype.slice.call(arguments); + args[0] = logPrefix + args[0]; + // eslint-disable-next-line no-console + console.log.apply(console, args); +} diff --git a/packages/collector/test/tracing/opentelemetry/confluent-kafka-producer-app.mjs b/packages/collector/test/tracing/opentelemetry/confluent-kafka-producer-app.mjs new file mode 100644 index 0000000000..bc3ca5d244 --- /dev/null +++ b/packages/collector/test/tracing/opentelemetry/confluent-kafka-producer-app.mjs @@ -0,0 +1,155 @@ +/* + * (c) Copyright IBM Corp. 2025 + */ + +'use strict'; + +// NOTE: c8 bug https://github.com/bcoe/c8/issues/166 +process.on('SIGTERM', () => { + process.disconnect(); + process.exit(0); +}); + +import express from 'express'; +import bodyParser from 'body-parser'; +import getAppPort from '../../test_util/app-port.js'; +import path from 'path'; +import { fileURLToPath } from 'url'; +import { createRequire } from 'module'; +const port = getAppPort(); +const logPrefix = `Confluent Kafka Producer App (${process.pid}):\t`; +import KafkaLib from '@confluentinc/kafka-javascript'; + +const require = createRequire(import.meta.url); +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +const confluentKafkaPath = require.resolve('@confluentinc/kafka-javascript'); +const expectedLocalPath = path.resolve(__dirname, 'node_modules', '@confluentinc/kafka-javascript'); +if (!confluentKafkaPath.includes(expectedLocalPath)) { + throw new Error( + `@confluentinc/kafka-javascript must be loaded from local node_modules. Expected path containing: ${expectedLocalPath}, but got: ${confluentKafkaPath}` + ); +} + +const app = express(); +app.use(bodyParser.json()); + +const broker = process.env.KAFKA; +const topic = process.env.CONFLUENT_KAFKA_TOPIC; +// Umgebungsvariable zur Auswahl des Client-Typs +const clientType = (process.env.KAFKA_CLIENT_TYPE || 'kafkajs').toLowerCase(); + +let producer; +let connected = false; +let useRdKafka = false; + +async function setupProducer() { + if (!KafkaLib) return; + + let KafkaClient; + + if (clientType === 'rdkafka') { + // Wenn 'rdkafka' gewählt ist, verwenden wir Producer (RdKafka) + KafkaClient = KafkaLib.Producer || KafkaLib.RdKafka.Producer; + useRdKafka = true; + log(`Using ${clientType} (node-rdkafka) client.`); + } else { + // Standardmäßig oder bei 'kafkajs' verwenden wir KafkaJS.Kafka + KafkaClient = KafkaLib.KafkaJS.Kafka || KafkaLib.Kafka; + log(`Using ${clientType} (kafkajs) client.`); + } + + if (!KafkaClient) { + log(`Error: Kafka client for type ${clientType} not found in KafkaLib.`); + return; + } + + try { + if (useRdKafka) { + producer = new KafkaClient({ + 'metadata.broker.list': broker + }); + + producer.connect(); + + producer.on('ready', () => { + log('RdKafka Producer ready.'); + connected = true; + }); + + producer.on('event.error', err => { + log('RdKafka Producer error:', err.message); + }); + } else { + // kafkajs Initialisierung + const kafka = new KafkaClient({ + 'bootstrap.servers': broker, + kafkaJS: { clientId: 'confluent-producer', brokers: [broker] } + }); + producer = kafka.producer(); + await producer.connect(); + log('KafkaJS Producer ready.'); + connected = true; + } + } catch (e) { + log('Producer setup error', e && e.message); + } +} + +app.get('/', (_req, res) => { + if (connected) res.send('OK'); + else res.sendStatus(503); +}); + +app.get('/produce', async (req, res) => { + log('/produce'); + + const message = `${Date.now()}-${process.pid}`; + if (!producer) { + return res.status(500).send({ error: 'producer not available' }); + } + + try { + if (useRdKafka) { + // RdKafka verwendet producer.produce + await producer.produce( + topic, + null, // Partition (null = zufällig) + Buffer.from(message), // Nachricht als Buffer + null, // Key + Date.now() // Timestamp + ); + + log('RdKafka message produced.'); + + // RdKafka benötigt flush(), um die Nachricht sofort zu senden + await new Promise((resolve, reject) => { + producer.flush(1000, err => { + if (err) return reject(err); + resolve(); + }); + }); + } else { + // kafkajs verwendet producer.send + await producer.send({ topic, messages: [{ value: message }] }); + log('KafkaJS message produced.'); + } + + // allow small time for delivery (nur für den Testfluss notwendig) + await new Promise(resolve => setTimeout(resolve, 200)); + res.send({ produced: true, message }); + } catch (e) { + res.status(500).send({ error: e.message }); + } +}); + +app.listen(port, () => { + log(`Listening on port: ${port}`); + setupProducer().catch(() => {}); +}); + +function log() { + const args = Array.prototype.slice.call(arguments); + args[0] = logPrefix + args[0]; + // eslint-disable-next-line no-console + console.log.apply(console, args); +} diff --git a/packages/collector/test/tracing/opentelemetry/test.js b/packages/collector/test/tracing/opentelemetry/test.js index bf9dbcd733..8421560f34 100644 --- a/packages/collector/test/tracing/opentelemetry/test.js +++ b/packages/collector/test/tracing/opentelemetry/test.js @@ -26,10 +26,13 @@ const ProcessControls = require('../../test_util/ProcessControls'); const globalAgent = require('../../globalAgent'); const DELAY_TIMEOUT_IN_MS = 500; const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : describe.skip; +const agentControls = globalAgent.instance; mochaSuiteFn('opentelemetry tests', function () { this.timeout(config.getTestTimeout() * 2); + globalAgent.setUpCleanUpHooks(); + before(() => { if (process.env.INSTANA_TEST_SKIP_INSTALLING_DEPS === 'true') { return; @@ -38,6 +41,8 @@ mochaSuiteFn('opentelemetry tests', function () { execSync('rm -rf package-lock.json', { cwd: __dirname, stdio: 'inherit' }); execSync('rm -rf package.json', { cwd: __dirname, stdio: 'inherit' }); execSync('rm -rf node_modules', { cwd: __dirname, stdio: 'inherit' }); + execSync('rm -rf collector.tgz', { cwd: __dirname, stdio: 'inherit' }); + execSync('rm -rf core.tgz', { cwd: __dirname, stdio: 'inherit' }); execSync('./preinstall.sh', { cwd: __dirname, stdio: 'inherit' }); }); @@ -62,20 +67,161 @@ mochaSuiteFn('opentelemetry tests', function () { stdio: 'inherit' }); + // eslint-disable-next-line no-console + console.log('Installed core.tgz'); + execSync('npm install --save --prefix ./ ./collector.tgz', { cwd: __dirname, stdio: 'inherit' }); + // eslint-disable-next-line no-console + console.log('Installed collector.tgz'); + execSync('npm install --save --prefix ./ @opentelemetry/api@1.9.0', { cwd: __dirname, stdio: 'inherit' }); + // eslint-disable-next-line no-console + console.log('Installed @opentelemetry/api@1.9.0'); + execSync('npm install --save --prefix ./ "@opentelemetry/api-v1.3.0@npm:@opentelemetry/api@1.3.0"', { cwd: __dirname, stdio: 'inherit' }); + + // eslint-disable-next-line no-console + console.log('Installed @opentelemetry/api-v1.3.0@npm:@opentelemetry/api@1.3.0'); + }); + + // node bin/start-test-containers.js --zookeeper --kafka --schema-registry --kafka-topics + describe('tracing/confluent-kafka', function () { + let producerControls; + const topic = 'confluent-kafka-topic'; + + before(async () => { + if (process.env.INSTANA_TEST_SKIP_INSTALLING_DEPS !== 'true') { + const rootPackageJson = require('../../../../../package.json'); + const confluentKafkaVersion = rootPackageJson.devDependencies['@confluentinc/kafka-javascript']; + // Install only kafka-javascript, use flags to prevent npm from removing existing packages + execSync(`npm i "@confluentinc/kafka-javascript@${confluentKafkaVersion}" --prefix ./ --save`, { + cwd: __dirname, + stdio: 'inherit' + }); + + // eslint-disable-next-line no-console + console.log('Installed kafka-javascript'); + } + + producerControls = new ProcessControls({ + appPath: path.join(__dirname, 'confluent-kafka-producer-app.js'), + useGlobalAgent: true, + cwd: __dirname, + enableOtelIntegration: true, + esmLoaderPath: path.join(__dirname, 'node_modules', '@instana', 'collector', 'esm-register.mjs'), + env: { + CONFLUENT_KAFKA_TOPIC: topic + } + }); + + await producerControls.startAndWaitForAgentConnection(); + }); + + beforeEach(async () => { + await agentControls.clearReceivedTraceData(); + }); + + after(async () => { + await producerControls.stop(); + }); + + afterEach(async () => { + await producerControls.clearIpcMessages(); + }); + + describe('consuming message', function () { + let consumerControls; + + before(async () => { + consumerControls = new ProcessControls({ + appPath: path.join(__dirname, 'confluent-kafka-consumer-app.js'), + useGlobalAgent: true, + enableOtelIntegration: true, + esmLoaderPath: path.join(__dirname, 'node_modules', '@instana', 'collector', 'esm-register.mjs'), + env: { + CONFLUENT_KAFKA_TOPIC: topic + } + }); + + await consumerControls.startAndWaitForAgentConnection(); + }); + + beforeEach(async () => { + await agentControls.clearReceivedTraceData(); + }); + + after(async () => { + await consumerControls.stop(); + }); + + afterEach(async () => { + await consumerControls.clearIpcMessages(); + }); + + const apiPath = '/produce'; + + it('produces and consumes a message', async () => { + const response = await producerControls.sendRequest({ + method: 'GET', + path: apiPath + }); + + expect(response.produced).to.equal(true); + + return retry(() => { + return agentControls.getSpans().then(spans => { + expect(spans.length).to.equal(3); + + const httpEntry = verifyHttpRootEntry({ + spans, + apiPath: '/produce', + pid: String(producerControls.getPid()) + }); + + verifyExitSpan({ + spanName: 'otel', + spans, + parent: httpEntry, + withError: false, + pid: String(producerControls.getPid()), + dataProperty: 'tags', + extraTests: span => { + expect(span.data.tags.name).to.eql('confluent-kafka-topic'); + expect(span.data.tags['messaging.system']).to.equal('kafka'); + expect(span.data.tags['messaging.operation.name']).to.equal('produce'); + checkTelemetryResourceAttrs(span); + } + }); + + verifyEntrySpan({ + spanName: 'otel', + spans, + parent: httpEntry, + withError: false, + pid: String(producerControls.getPid()), + dataProperty: 'tags', + extraTests: span => { + expect(span.data.tags.name).to.eql('confluent-kafka-topic'); + expect(span.data.tags['messaging.system']).to.equal('kafka'); + expect(span.data.tags['messaging.operation.type']).to.equal('deliver'); + checkTelemetryResourceAttrs(span); + } + }); + }); + }); + }); + }); }); // TODO: Restify test is broken in v24. See Issue: https://github.com/restify/node-restify/issues/1984 @@ -88,7 +234,6 @@ mochaSuiteFn('opentelemetry tests', function () { restifyTest('restify', function () { describe('opentelemetry is enabled', function () { globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; let controls; @@ -231,9 +376,6 @@ mochaSuiteFn('opentelemetry tests', function () { }); describe('opentelemetry is disabled', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; - let controls; before(async () => { @@ -302,7 +444,6 @@ mochaSuiteFn('opentelemetry tests', function () { runFs('fs', function () { globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; let controls; @@ -427,8 +568,6 @@ mochaSuiteFn('opentelemetry tests', function () { } runSocketIo('socket.io', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; let socketIOServerPort; let serverControls; @@ -588,8 +727,6 @@ mochaSuiteFn('opentelemetry tests', function () { describe('tedious', function () { describe('opentelemetry is enabled', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; let controls; // We need to increase the waiting timeout here for the initial azure connection, @@ -698,8 +835,6 @@ mochaSuiteFn('opentelemetry tests', function () { }); describe('opentelemetry is disabled', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; let controls; before(async () => { @@ -749,9 +884,6 @@ mochaSuiteFn('opentelemetry tests', function () { this.timeout(1000 * 60); describe('opentelemetry is enabled', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; - let controls; before(async () => { @@ -826,9 +958,6 @@ mochaSuiteFn('opentelemetry tests', function () { }); describe('opentelemetry is disabled', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; - let controls; before(async () => { @@ -901,9 +1030,6 @@ mochaSuiteFn('opentelemetry tests', function () { }); describe('when openTelemetry initialized first', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; - let controls; before(async () => { controls = new ProcessControls({ @@ -1001,9 +1127,6 @@ mochaSuiteFn('opentelemetry tests', function () { }); describe('when Collector initialized first', function () { - globalAgent.setUpCleanUpHooks(); - const agentControls = globalAgent.instance; - let controls; before(async () => { diff --git a/packages/core/package.json b/packages/core/package.json index 0c54ac7fa9..ca7dd4af42 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -57,6 +57,7 @@ }, "homepage": "https://github.com/instana/nodejs/blob/main/packages/core/README.md", "dependencies": { + "@drazke/instrumentation-confluent-kafka-javascript": "https://github.com/kirrg001/instrumentation-confluent-kafka-javascript/releases/download/1.0.1/drazke-instrumentation-confluent-kafka-javascript-1.0.1.tgz", "@opentelemetry/api": ">=1.3.0 <1.10.0", "@opentelemetry/context-async-hooks": "1.25.0", "@opentelemetry/instrumentation-fs": "0.28.0", diff --git a/packages/core/src/tracing/opentelemetry-instrumentations/confluent-kafka.js b/packages/core/src/tracing/opentelemetry-instrumentations/confluent-kafka.js new file mode 100644 index 0000000000..e2ff5c1388 --- /dev/null +++ b/packages/core/src/tracing/opentelemetry-instrumentations/confluent-kafka.js @@ -0,0 +1,31 @@ +/* + * (c) Copyright IBM Corp. 2025 + */ + +'use strict'; + +const constants = require('../constants'); + +const isEntrySpan = otelSpan => otelSpan.attributes?.['messaging.operation.type'] === 'deliver'; + +module.exports.init = () => { + // NOTE: Otel instrumentations sometimes require the target library TYPE to mock on top of their instrumentation. + // This works, because "import type" does not load the target library. + // eslint-disable-next-line max-len + // EXAMPLE: https://github.com/open-telemetry/opentelemetry-js-contrib/blob/instrumentation-express-v0.57.1/packages/instrumentation-express/src/instrumentation.ts#L25 + const { ConfluentKafkaInstrumentation } = require('@drazke/instrumentation-confluent-kafka-javascript'); + + const instrumentation = new ConfluentKafkaInstrumentation(); + + if (!instrumentation.getConfig().enabled) { + instrumentation.enable(); + } +}; + +module.exports.getKind = otelSpan => { + if (isEntrySpan(otelSpan)) { + return constants.ENTRY; + } + + return constants.EXIT; +}; diff --git a/packages/core/src/tracing/opentelemetry-instrumentations/wrap.js b/packages/core/src/tracing/opentelemetry-instrumentations/wrap.js index b99141249b..222621e9c2 100644 --- a/packages/core/src/tracing/opentelemetry-instrumentations/wrap.js +++ b/packages/core/src/tracing/opentelemetry-instrumentations/wrap.js @@ -21,7 +21,8 @@ const instrumentations = { '@opentelemetry/instrumentation-restify': { name: 'restify' }, '@opentelemetry/instrumentation-socket.io': { name: 'socket.io' }, '@opentelemetry/instrumentation-tedious': { name: 'tedious' }, - '@opentelemetry/instrumentation-oracledb': { name: 'oracle' } + '@opentelemetry/instrumentation-oracledb': { name: 'oracle' }, + '@drazke/instrumentation-confluent-kafka-javascript': { name: 'confluent-kafka' } }; // NOTE: using a logger might create a recursive execution