diff --git a/docs/sections/misc.md b/docs/sections/misc.md index 585c6775..caf63734 100644 --- a/docs/sections/misc.md +++ b/docs/sections/misc.md @@ -51,9 +51,69 @@ This includes also failed ones if not filtered further using `where()` condition ## Events The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle. +These events are useful for monitoring, logging, and integrating with external services. + +### Queue.Job.created +Fired when a new job is added to the queue (producer side). + +```php +use Cake\Event\EventInterface; +use Cake\Event\EventManager; + +EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) { + $job = $event->getData('job'); + // Track job creation for monitoring +}); +``` + +Event data: +- `job`: The `QueuedJob` entity that was created + +### Queue.Job.started +Fired when a worker begins processing a job (consumer side). + +```php +EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) { + $job = $event->getData('job'); + // Start tracing/monitoring span +}); +``` + +Event data: +- `job`: The `QueuedJob` entity being processed + +### Queue.Job.completed +Fired when a job finishes successfully. + +```php +EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) { + $job = $event->getData('job'); + // Mark trace as successful +}); +``` + +Event data: +- `job`: The `QueuedJob` entity that completed + +### Queue.Job.failed +Fired when a job fails (on every failure attempt). + +```php +EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) { + $job = $event->getData('job'); + $failureMessage = $event->getData('failureMessage'); + $exception = $event->getData('exception'); + // Mark trace as failed, log error +}); +``` + +Event data: +- `job`: The `QueuedJob` entity that failed +- `failureMessage`: The error message from the failure +- `exception`: The exception object (if available) ### Queue.Job.maxAttemptsExhausted -This event is triggered when a job has failed and exhausted all of its configured retry attempts. +Fired when a job has failed and exhausted all of its configured retry attempts. ```php use Cake\Event\EventInterface; @@ -81,10 +141,51 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn }); ``` -The event data contains: +Event data: - `job`: The `QueuedJob` entity that failed - `failureMessage`: The error message from the last failure +### Sentry Integration + +The plugin includes a `CakeSentryEventBridge` listener that bridges Queue events to +[lordsimal/cakephp-sentry](https://github.com/LordSimal/cakephp-sentry) for queue monitoring. + +To enable Sentry integration, register the bridge in your `Application::bootstrap()`: + +```php +use Cake\Event\EventManager; +use Queue\Event\CakeSentryEventBridge; + +// Enable Sentry queue monitoring +EventManager::instance()->on(new CakeSentryEventBridge()); +``` + +This automatically dispatches the `CakeSentry.Queue.*` events that the Sentry plugin listens to: +- `CakeSentry.Queue.enqueue` - when a job is created +- `CakeSentry.Queue.beforeExecute` - when a job starts processing +- `CakeSentry.Queue.afterExecute` - when a job completes or fails + +#### Trace Propagation + +For distributed tracing to work across producer and consumer, add Sentry trace headers +to job data when creating jobs: + +```php +$data = [ + 'your_data' => 'here', +]; + +// Add Sentry trace headers if Sentry SDK is available +if (class_exists(\Sentry\SentrySdk::class)) { + $data['_sentry_trace'] = \Sentry\getTraceparent(); + $data['_sentry_baggage'] = \Sentry\getBaggage(); +} + +$queuedJobsTable->createJob('MyTask', $data); +``` + +The bridge will automatically extract these headers and pass them to the Sentry plugin. + ## Notes `` is the complete class name without the Task suffix (e.g. Example or PluginName.Example). diff --git a/src/Event/CakeSentryEventBridge.php b/src/Event/CakeSentryEventBridge.php new file mode 100644 index 00000000..4cc40230 --- /dev/null +++ b/src/Event/CakeSentryEventBridge.php @@ -0,0 +1,146 @@ +on(new \Queue\Event\CakeSentryEventBridge()); + * ``` + * + * @see https://github.com/LordSimal/cakephp-sentry + */ +class CakeSentryEventBridge implements EventListenerInterface { + + /** + * @var float|null Start time for execution time calculation + */ + protected ?float $startTime = null; + + /** + * @inheritDoc + */ + public function implementedEvents(): array { + return [ + 'Queue.Job.created' => 'handleCreated', + 'Queue.Job.started' => 'handleStarted', + 'Queue.Job.completed' => 'handleCompleted', + 'Queue.Job.failed' => 'handleFailed', + ]; + } + + /** + * Handle job created event - dispatches CakeSentry.Queue.enqueue + * + * @param \Cake\Event\EventInterface $event + * + * @return void + */ + public function handleCreated(EventInterface $event): void { + /** @var \Queue\Model\Entity\QueuedJob $job */ + $job = $event->getData('job'); + + $sentryEvent = new Event('CakeSentry.Queue.enqueue', $this, [ + 'class' => $job->job_task, + 'id' => (string)$job->id, + 'queue' => $job->job_task, + 'data' => $job->data ?? [], + ]); + EventManager::instance()->dispatch($sentryEvent); + } + + /** + * Handle job started event - dispatches CakeSentry.Queue.beforeExecute + * + * @param \Cake\Event\EventInterface $event + * + * @return void + */ + public function handleStarted(EventInterface $event): void { + $this->startTime = microtime(true); + + /** @var \Queue\Model\Entity\QueuedJob $job */ + $job = $event->getData('job'); + $jobData = is_array($job->data) ? $job->data : []; + + $sentryEvent = new Event('CakeSentry.Queue.beforeExecute', $this, [ + 'class' => $job->job_task, + 'sentry_trace' => $jobData['_sentry_trace'] ?? '', + 'sentry_baggage' => $jobData['_sentry_baggage'] ?? '', + ]); + EventManager::instance()->dispatch($sentryEvent); + } + + /** + * Handle job completed event - dispatches CakeSentry.Queue.afterExecute + * + * @param \Cake\Event\EventInterface $event + * + * @return void + */ + public function handleCompleted(EventInterface $event): void { + /** @var \Queue\Model\Entity\QueuedJob $job */ + $job = $event->getData('job'); + + $sentryEvent = new Event('CakeSentry.Queue.afterExecute', $this, $this->buildAfterExecuteData($job)); + EventManager::instance()->dispatch($sentryEvent); + } + + /** + * Handle job failed event - dispatches CakeSentry.Queue.afterExecute with exception + * + * @param \Cake\Event\EventInterface $event + * + * @return void + */ + public function handleFailed(EventInterface $event): void { + /** @var \Queue\Model\Entity\QueuedJob $job */ + $job = $event->getData('job'); + $exception = $event->getData('exception'); + + $data = $this->buildAfterExecuteData($job); + if ($exception !== null) { + $data['exception'] = $exception; + } + + $sentryEvent = new Event('CakeSentry.Queue.afterExecute', $this, $data); + EventManager::instance()->dispatch($sentryEvent); + } + + /** + * Build common data for afterExecute event. + * + * @param \Queue\Model\Entity\QueuedJob $job + * + * @return array + */ + protected function buildAfterExecuteData(QueuedJob $job): array { + $executionTime = 0; + if ($this->startTime !== null) { + $executionTime = (int)((microtime(true) - $this->startTime) * 1000); + $this->startTime = null; + } + + return [ + 'id' => (string)$job->id, + 'queue' => $job->job_task, + 'data' => $job->data ?? [], + 'execution_time' => $executionTime, + 'retry_count' => $job->attempts, + ]; + } + +} diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index e65704c0..302ffe8d 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -6,7 +6,9 @@ use ArrayObject; use Cake\Core\Configure; use Cake\Core\Plugin; +use Cake\Event\Event; use Cake\Event\EventInterface; +use Cake\Event\EventManager; use Cake\I18n\DateTime; use Cake\ORM\Query\SelectQuery; use Cake\ORM\Table; @@ -216,8 +218,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array } $queuedJob = $this->newEntity($queuedJob); + $queuedJob = $this->saveOrFail($queuedJob); - return $this->saveOrFail($queuedJob); + $event = new Event('Queue.Job.created', $this, [ + 'job' => $queuedJob, + ]); + EventManager::instance()->dispatch($event); + + return $queuedJob; } /** diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index f1e626a4..0b0ef045 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -218,6 +218,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false); $taskName = $queuedJob->job_task; + // Dispatch started event + $event = new Event('Queue.Job.started', $this, [ + 'job' => $queuedJob, + ]); + EventManager::instance()->dispatch($event); + $return = $failureMessage = null; try { $this->time = time(); @@ -247,6 +253,14 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { $this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid); $this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.'); + // Dispatch failed event + $event = new Event('Queue.Job.failed', $this, [ + 'job' => $queuedJob, + 'failureMessage' => $failureMessage, + 'exception' => $e ?? null, + ]); + EventManager::instance()->dispatch($event); + // Dispatch event when job has exhausted all retries if ($failedStatus === 'aborted') { $event = new Event('Queue.Job.maxAttemptsExhausted', $this, [ @@ -260,6 +274,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void { } $this->QueuedJobs->markJobDone($queuedJob); + + // Dispatch completed event + $event = new Event('Queue.Job.completed', $this, [ + 'job' => $queuedJob, + ]); + EventManager::instance()->dispatch($event); + $this->io->out('Job Finished.'); $this->currentJob = null; } diff --git a/tests/TestCase/Event/CakeSentryEventBridgeTest.php b/tests/TestCase/Event/CakeSentryEventBridgeTest.php new file mode 100644 index 00000000..076f3938 --- /dev/null +++ b/tests/TestCase/Event/CakeSentryEventBridgeTest.php @@ -0,0 +1,152 @@ +bridge = new CakeSentryEventBridge(); + } + + /** + * @return void + */ + public function testImplementedEvents(): void { + $events = $this->bridge->implementedEvents(); + + $this->assertArrayHasKey('Queue.Job.created', $events); + $this->assertArrayHasKey('Queue.Job.started', $events); + $this->assertArrayHasKey('Queue.Job.completed', $events); + $this->assertArrayHasKey('Queue.Job.failed', $events); + } + + /** + * @return void + */ + public function testHandleCreatedDispatchesCakeSentryEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + EventManager::instance()->on($this->bridge); + + $job = new QueuedJob([ + 'id' => 123, + 'job_task' => 'Queue.Example', + 'data' => ['test' => 'data'], + ]); + + $event = new Event('Queue.Job.created', null, ['job' => $job]); + EventManager::instance()->dispatch($event); + + $this->assertEventFired('CakeSentry.Queue.enqueue'); + } + + /** + * @return void + */ + public function testHandleStartedDispatchesCakeSentryEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + EventManager::instance()->on($this->bridge); + + $job = new QueuedJob([ + 'id' => 123, + 'job_task' => 'Queue.Example', + 'data' => ['test' => 'data'], + ]); + + $event = new Event('Queue.Job.started', null, ['job' => $job]); + EventManager::instance()->dispatch($event); + + $this->assertEventFired('CakeSentry.Queue.beforeExecute'); + } + + /** + * @return void + */ + public function testHandleCompletedDispatchesCakeSentryEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + EventManager::instance()->on($this->bridge); + + $job = new QueuedJob([ + 'id' => 123, + 'job_task' => 'Queue.Example', + 'data' => ['test' => 'data'], + 'attempts' => 1, + ]); + + $event = new Event('Queue.Job.completed', null, ['job' => $job]); + EventManager::instance()->dispatch($event); + + $this->assertEventFired('CakeSentry.Queue.afterExecute'); + } + + /** + * @return void + */ + public function testHandleFailedDispatchesCakeSentryEvent(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + EventManager::instance()->on($this->bridge); + + $job = new QueuedJob([ + 'id' => 123, + 'job_task' => 'Queue.Example', + 'data' => ['test' => 'data'], + 'attempts' => 1, + ]); + $exception = new RuntimeException('Test failure'); + + $event = new Event('Queue.Job.failed', null, [ + 'job' => $job, + 'failureMessage' => 'Test failure', + 'exception' => $exception, + ]); + EventManager::instance()->dispatch($event); + + $this->assertEventFired('CakeSentry.Queue.afterExecute'); + } + + /** + * @return void + */ + public function testSentryTraceHeadersArePassedThrough(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + EventManager::instance()->on($this->bridge); + + $job = new QueuedJob([ + 'id' => 123, + 'job_task' => 'Queue.Example', + 'data' => [ + '_sentry_trace' => 'test-trace-id', + '_sentry_baggage' => 'test-baggage', + 'other_data' => 'value', + ], + ]); + + $event = new Event('Queue.Job.started', null, ['job' => $job]); + EventManager::instance()->dispatch($event); + + $this->assertEventFired('CakeSentry.Queue.beforeExecute'); + } + +} diff --git a/tests/TestCase/Model/Table/QueuedJobsTableTest.php b/tests/TestCase/Model/Table/QueuedJobsTableTest.php index 739a5c2b..913cc348 100644 --- a/tests/TestCase/Model/Table/QueuedJobsTableTest.php +++ b/tests/TestCase/Model/Table/QueuedJobsTableTest.php @@ -11,6 +11,8 @@ use Cake\Core\Configure; use Cake\Datasource\ConnectionManager; +use Cake\Event\EventList; +use Cake\Event\EventManager; use Cake\I18n\DateTime; use Cake\ORM\TableRegistry; use Cake\TestSuite\TestCase; @@ -752,6 +754,20 @@ public function testGetStats() { $this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1); } + /** + * Test that Queue.Job.created event is fired when a job is created. + * + * @return void + */ + public function testJobCreatedEventFired(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + $this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']); + + $this->assertEventFired('Queue.Job.created'); + } + /** * Helper method for skipping tests that need a real connection. * diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index e150a391..4e3766d7 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -15,6 +15,7 @@ use Queue\Model\Entity\QueuedJob; use Queue\Model\Table\QueuedJobsTable; use Queue\Queue\Processor; +use Queue\Queue\Task\ExampleTask; use Queue\Queue\Task\RetryExampleTask; use ReflectionClass; use RuntimeException; @@ -287,6 +288,106 @@ public function testWorkerTimeoutHandlingIntegration() { } } + /** + * Test that Queue.Job.started event is fired when job begins processing. + * + * @return void + */ + public function testJobStartedEventFired(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]); + + // Create processor with mock task + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(ExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + + $processor->method('loadTask')->willReturn($mockTask); + + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + $this->assertEventFired('Queue.Job.started'); + } + + /** + * Test that Queue.Job.completed event is fired when job completes successfully. + * + * @return void + */ + public function testJobCompletedEventFired(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]); + + // Create processor with mock task + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(ExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + + $processor->method('loadTask')->willReturn($mockTask); + + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + $this->assertEventFired('Queue.Job.completed'); + } + + /** + * Test that Queue.Job.failed event is fired when job fails. + * + * @return void + */ + public function testJobFailedEventFired(): void { + $eventList = new EventList(); + EventManager::instance()->setEventList($eventList); + + // Create a job + $QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs'); + $job = $QueuedJobs->createJob('Queue.RetryExample', ['test' => 'data'], ['priority' => 1]); + + // Create processor with mock task that fails + $out = new ConsoleOutput(); + $err = new ConsoleOutput(); + $processor = $this->getMockBuilder(Processor::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['loadTask']) + ->getMock(); + + $mockTask = $this->getMockBuilder(RetryExampleTask::class) + ->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()]) + ->onlyMethods(['run']) + ->getMock(); + $mockTask->method('run')->willThrowException(new RuntimeException('Task failed')); + + $processor->method('loadTask')->willReturn($mockTask); + + $this->invokeMethod($processor, 'runJob', [$job, 'test-pid']); + + $this->assertEventFired('Queue.Job.failed'); + } + /** * Test setPhpTimeout with new config names *