From 5ec7e886dec94090cc48850222ab4e4536142ab2 Mon Sep 17 00:00:00 2001 From: mscherer Date: Sun, 28 Dec 2025 16:48:42 +0100 Subject: [PATCH] Add CakeSentry.Queue.* events for Sentry integration. Dispatch events compatible with lordsimal/cakephp-sentry: - CakeSentry.Queue.enqueue: when job is added to queue - CakeSentry.Queue.beforeExecute: when job processing starts - CakeSentry.Queue.afterExecute: when job completes or fails This enables queue monitoring via Sentry without requiring the queue plugin to depend on Sentry SDK directly. --- docs/sections/misc.md | 56 ++++++++++ src/Model/Table/QueuedJobsTable.php | 14 ++- src/Queue/Processor.php | 34 ++++++ .../Model/Table/QueuedJobsTableTest.php | 16 +++ tests/TestCase/Queue/ProcessorTest.php | 101 ++++++++++++++++++ 5 files changed, 220 insertions(+), 1 deletion(-) diff --git a/docs/sections/misc.md b/docs/sections/misc.md index 585c6775..068efdae 100644 --- a/docs/sections/misc.md +++ b/docs/sections/misc.md @@ -85,6 +85,62 @@ The event data contains: - `job`: The `QueuedJob` entity that failed - `failureMessage`: The error message from the last failure +### Sentry Integration (CakeSentry Events) + +The Queue plugin dispatches events compatible with [lordsimal/cakephp-sentry](https://github.com/LordSimal/cakephp-sentry) for queue monitoring and tracing. +These events allow automatic integration with [Sentry's queue monitoring](https://docs.sentry.io/platforms/php/tracing/instrumentation/queues-module/) feature. + +The following events are dispatched: + +#### CakeSentry.Queue.enqueue +Fired when a job is added to the queue (producer side). + +Event data: +- `class`: The job task name +- `id`: The job ID +- `queue`: The queue/task name +- `data`: The job payload + +#### CakeSentry.Queue.beforeExecute +Fired when a worker begins processing a job (consumer side). + +Event data: +- `class`: The job task name +- `sentry_trace`: Sentry trace header from job data (if present) +- `sentry_baggage`: Sentry baggage header from job data (if present) + +#### CakeSentry.Queue.afterExecute +Fired when a job finishes (success or failure). + +Event data: +- `id`: The job ID +- `queue`: The queue/task name +- `data`: The job payload +- `execution_time`: Execution time in milliseconds +- `retry_count`: Number of attempts +- `exception`: The exception object (only on failure) + +#### Trace Propagation + +For distributed tracing to work across producer and consumer, you can add Sentry trace headers to job data when creating jobs: + +```php +// When creating a job with trace propagation +$data = [ + 'your_data' => 'here', +]; + +// Add Sentry trace headers if Sentry SDK is available +if (class_exists(\Sentry\SentrySdk::class)) { + $data['_sentry_trace'] = \Sentry\getTraceparent(); + $data['_sentry_baggage'] = \Sentry\getBaggage(); +} + +$queuedJobsTable->createJob('MyTask', $data); +``` + +The Queue plugin will automatically pass these headers to the `CakeSentry.Queue.beforeExecute` event for trace continuation. + ## Notes `` is the complete class name without the Task suffix (e.g. Example or PluginName.Example). diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index e65704c0..517d5efc 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -6,7 +6,9 @@ use ArrayObject; use Cake\Core\Configure; use Cake\Core\Plugin; +use Cake\Event\Event; use Cake\Event\EventInterface; +use Cake\Event\EventManager; use Cake\I18n\DateTime; use Cake\ORM\Query\SelectQuery; use Cake\ORM\Table; @@ -216,8 +218,18 @@ public function createJob(string $jobTask, array|object|null $data = null, array } $queuedJob = $this->newEntity($queuedJob); + $queuedJob = $this->saveOrFail($queuedJob); + + // Dispatch CakeSentry event for queue tracing integration + $event = new Event('CakeSentry.Queue.enqueue', $this, [ + 'class' => $queuedJob->job_task, + 'id' => (string)$queuedJob->id, + 'queue' => $queuedJob->job_task, + 'data' => $queuedJob->data ?? [], + ]); + EventManager::instance()->dispatch($event); - return $this->saveOrFail($queuedJob); + return $queuedJob; } /** diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index f1e626a4..657ceac8 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -218,6 +218,16 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false); $taskName = $queuedJob->job_task; + // Dispatch CakeSentry beforeExecute event for queue tracing + $jobData = is_array($queuedJob->data) ? $queuedJob->data : []; + $event = new Event('CakeSentry.Queue.beforeExecute', $this, [ + 'class' => $queuedJob->job_task, + 'sentry_trace' => $jobData['_sentry_trace'] ?? '', + 'sentry_baggage' => $jobData['_sentry_baggage'] ?? '', + ]); + EventManager::instance()->dispatch($event); + + $startTime = microtime(true); $return = $failureMessage = null; try { $this->time = time(); @@ -241,12 +251,25 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->logError($taskName . ' (job ' . $queuedJob->id . ')' . "\n" . $failureMessage, $pid); } + $executionTime = (int)((microtime(true) - $startTime) * 1000); + if ($return === false) { $this->QueuedJobs->markJobFailed($queuedJob, $failureMessage); $failedStatus = $this->QueuedJobs->getFailedStatus($queuedJob, $this->getTaskConf()); $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid); $this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.'); + // Dispatch CakeSentry afterExecute event for failure + $event = new Event('CakeSentry.Queue.afterExecute', $this, [ + 'id' => (string)$queuedJob->id, + 'queue' => $queuedJob->job_task, + 'data' => $jobData, + 'execution_time' => $executionTime, + 'retry_count' => $queuedJob->attempts, + 'exception' => $e ?? null, + ]); + EventManager::instance()->dispatch($event); + // Dispatch event when job has exhausted all retries if ($failedStatus === 'aborted') { $event = new Event('Queue.Job.maxAttemptsExhausted', $this, [ @@ -260,6 +283,17 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { } $this->QueuedJobs->markJobDone($queuedJob); + + // Dispatch CakeSentry afterExecute event for success + $event = new Event('CakeSentry.Queue.afterExecute', $this, [ + 'id' => (string)$queuedJob->id, + 'queue' => $queuedJob->job_task, + 'data' => $jobData, + 'execution_time' => $executionTime, + 'retry_count' => $queuedJob->attempts, + ]); + EventManager::instance()->dispatch($event); + $this->io->out('Job Finished.'); $this->currentJob = null; } diff --git a/tests/TestCase/Model/Table/QueuedJobsTableTest.php b/tests/TestCase/Model/Table/QueuedJobsTableTest.php index 739a5c2b..5e8f0e13 100644 --- a/tests/TestCase/Model/Table/QueuedJobsTableTest.php +++ b/tests/TestCase/Model/Table/QueuedJobsTableTest.php @@ -11,6 +11,8 @@ use Cake\Core\Configure; use Cake\Datasource\ConnectionManager; +use Cake\Event\EventList; +use Cake\Event\EventManager; use Cake\I18n\DateTime; use Cake\ORM\TableRegistry; use Cake\TestSuite\TestCase; @@ -752,6 +754,20 @@ public function testGetStats() { $this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1); } + /** + * Test that CakeSentry.Queue.enqueue event is fired when a job is created. + * + * @return void + */ + public function testEnqueueEventFired(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + $this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']); + + $this->assertEventFired('CakeSentry.Queue.enqueue'); + } + /** * Helper method for skipping tests that need a real connection. * diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index e150a391..4d421341 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -15,6 +15,7 @@ use Queue\Model\Entity\QueuedJob; use Queue\Model\Table\QueuedJobsTable; use Queue\Queue\Processor; +use Queue\Queue\Task\ExampleTask; use Queue\Queue\Task\RetryExampleTask; use ReflectionClass; use RuntimeException; @@ -287,6 +288,106 @@ public function testWorkerTimeoutHandlingIntegration() { } } + /** + * Test that CakeSentry.Queue.beforeExecute event is fired when job starts. + * + * @return void + */ + public function testCakeSentryBeforeExecuteEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]); + + // Create processor with mock task + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(ExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + + $processor->method('loadTask')->willReturn($mockTask); + + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + $this->assertEventFired('CakeSentry.Queue.beforeExecute'); + } + + /** + * Test that CakeSentry.Queue.afterExecute event is fired when job completes successfully. + * + * @return void + */ + public function testCakeSentryAfterExecuteSuccessEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]); + + // Create processor with mock task + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(ExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + + $processor->method('loadTask')->willReturn($mockTask); + + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + $this->assertEventFired('CakeSentry.Queue.afterExecute'); + } + + /** + * Test that CakeSentry.Queue.afterExecute event is fired with exception on failure. + * + * @return void + */ + public function testCakeSentryAfterExecuteFailureEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.RetryExample', ['test' => 'data'], ['priority' => 1]); + + // Create processor with mock task that fails + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(RetryExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + $mockTask->method('run')->willThrowException(new RuntimeException('Task failed')); + + $processor->method('loadTask')->willReturn($mockTask); + + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + $this->assertEventFired('CakeSentry.Queue.afterExecute'); + } + /** * Test setPhpTimeout with new config names *