Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ export class Message {
getRedeliveryCount(): number;
getPartitionKey(): string;
getOrderingKey(): string;
getProducerName(): string;
}

export class MessageId {
Expand Down
11 changes: 10 additions & 1 deletion src/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount),
InstanceMethod("getPartitionKey", &Message::GetPartitionKey),
InstanceMethod("getOrderingKey", &Message::GetOrderingKey)});
InstanceMethod("getOrderingKey", &Message::GetOrderingKey),
InstanceMethod("getProducerName", &Message::GetProducerName)});

constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
Expand Down Expand Up @@ -147,6 +148,14 @@ Napi::Value Message::GetOrderingKey(const Napi::CallbackInfo &info) {
return Napi::String::New(env, pulsar_message_get_orderingKey(this->cMessage.get()));
}

Napi::Value Message::GetProducerName(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
return env.Null();
}
return Napi::String::New(env, pulsar_message_get_producer_name(this->cMessage.get()));
}

bool Message::ValidateCMessage(Napi::Env env) {
if (this->cMessage.get()) {
return true;
Expand Down
1 change: 1 addition & 0 deletions src/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Message : public Napi::ObjectWrap<Message> {
Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
Napi::Value GetOrderingKey(const Napi::CallbackInfo &info);
Napi::Value GetProducerName(const Napi::CallbackInfo &info);
Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
bool ValidateCMessage(Napi::Env env);

Expand Down
3 changes: 3 additions & 0 deletions tests/end_to_end.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
} : {}),
});

const producerName = 'test-producer';
const topic = 'persistent://public/default/produce-consume';
const producer = await client.createProducer({
topic,
sendTimeoutMs: 30000,
batchingEnabled: true,
producerName,
});
expect(producer).not.toBeNull();

Expand All @@ -70,6 +72,7 @@
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
consumer.acknowledge(msg);
expect(msg.getProducerName()).toBe(producerName);
results.push(msg.getData().toString());
}
expect(lodash.difference(messages, results)).toEqual([]);
Expand Down Expand Up @@ -315,7 +318,7 @@
});

let consumer2Recv = 0;
while (true) {

Check warning on line 321 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

Unexpected constant condition
try {
const msg = await consumer2.receive(3000);
await new Promise((resolve) => setTimeout(resolve, 10));
Expand Down Expand Up @@ -360,7 +363,7 @@
topic,
startMessageId: Pulsar.MessageId.earliest(),
receiverQueueSize: 10,
listener: async (message, reader) => {

Check warning on line 366 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'reader' is defined but never used

Check warning on line 366 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
await new Promise((resolve) => setTimeout(resolve, 10));
reader1Recv += 1;
},
Expand Down Expand Up @@ -392,7 +395,7 @@
await client.close();
});

test('Message Listener error handling', async () => {

Check warning on line 398 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

Test has no assertions
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
Expand Down Expand Up @@ -428,7 +431,7 @@
subscription: 'sync',
subscriptionType: 'Shared',
subscriptionInitialPosition: 'Earliest',
listener: (message, messageConsumer) => {

Check warning on line 434 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'messageConsumer' is defined but never used

Check warning on line 434 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('consumer1 callback expected error');
},
});
Expand All @@ -438,7 +441,7 @@
subscription: 'async',
subscriptionType: 'Shared',
subscriptionInitialPosition: 'Earliest',
listener: async (message, messageConsumer) => {

Check warning on line 444 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'messageConsumer' is defined but never used

Check warning on line 444 in tests/end_to_end.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('consumer2 callback expected error');
},
});
Expand Down
Loading