diff --git a/index.d.ts b/index.d.ts index 9dc3e8f..46ac69a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -170,6 +170,7 @@ export class Message { getRedeliveryCount(): number; getPartitionKey(): string; getOrderingKey(): string; + getProducerName(): string; } export class MessageId { diff --git a/src/Message.cc b/src/Message.cc index 184b8e7..8b1d081 100644 --- a/src/Message.cc +++ b/src/Message.cc @@ -46,7 +46,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp), InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount), InstanceMethod("getPartitionKey", &Message::GetPartitionKey), - InstanceMethod("getOrderingKey", &Message::GetOrderingKey)}); + InstanceMethod("getOrderingKey", &Message::GetOrderingKey), + InstanceMethod("getProducerName", &Message::GetProducerName)}); constructor = Napi::Persistent(func); constructor.SuppressDestruct(); @@ -147,6 +148,14 @@ Napi::Value Message::GetOrderingKey(const Napi::CallbackInfo &info) { return Napi::String::New(env, pulsar_message_get_orderingKey(this->cMessage.get())); } +Napi::Value Message::GetProducerName(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return Napi::String::New(env, pulsar_message_get_producer_name(this->cMessage.get())); +} + bool Message::ValidateCMessage(Napi::Env env) { if (this->cMessage.get()) { return true; diff --git a/src/Message.h b/src/Message.h index 6097f1c..417de92 100644 --- a/src/Message.h +++ b/src/Message.h @@ -45,6 +45,7 @@ class Message : public Napi::ObjectWrap { Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info); Napi::Value GetPartitionKey(const Napi::CallbackInfo &info); Napi::Value GetOrderingKey(const Napi::CallbackInfo &info); + Napi::Value GetProducerName(const Napi::CallbackInfo &info); Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info); bool ValidateCMessage(Napi::Env env); diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 2c08418..e652c79 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -40,11 +40,13 @@ const Pulsar = require('../index'); } : {}), }); + const producerName = 'test-producer'; const topic = 'persistent://public/default/produce-consume'; const producer = await client.createProducer({ topic, sendTimeoutMs: 30000, batchingEnabled: true, + producerName, }); expect(producer).not.toBeNull(); @@ -70,6 +72,7 @@ const Pulsar = require('../index'); for (let i = 0; i < 10; i += 1) { const msg = await consumer.receive(); consumer.acknowledge(msg); + expect(msg.getProducerName()).toBe(producerName); results.push(msg.getData().toString()); } expect(lodash.difference(messages, results)).toEqual([]);