Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions docs/sections/misc.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,62 @@ The event data contains:
- `job`: The `QueuedJob` entity that failed
- `failureMessage`: The error message from the last failure

### Sentry Integration (CakeSentry Events)

The Queue plugin dispatches events compatible with [lordsimal/cakephp-sentry](https://github.com/LordSimal/cakephp-sentry) for queue monitoring and tracing.
These events allow automatic integration with [Sentry's queue monitoring](https://docs.sentry.io/platforms/php/tracing/instrumentation/queues-module/) feature.

The following events are dispatched:

#### CakeSentry.Queue.enqueue
Fired when a job is added to the queue (producer side).

Event data:
- `class`: The job task name
- `id`: The job ID
- `queue`: The queue/task name
- `data`: The job payload

#### CakeSentry.Queue.beforeExecute
Fired when a worker begins processing a job (consumer side).

Event data:
- `class`: The job task name
- `sentry_trace`: Sentry trace header from job data (if present)
- `sentry_baggage`: Sentry baggage header from job data (if present)

#### CakeSentry.Queue.afterExecute
Fired when a job finishes (success or failure).

Event data:
- `id`: The job ID
- `queue`: The queue/task name
- `data`: The job payload
- `execution_time`: Execution time in milliseconds
- `retry_count`: Number of attempts
- `exception`: The exception object (only on failure)

#### Trace Propagation

For distributed tracing to work across producer and consumer, you can add Sentry trace headers to job data when creating jobs:

```php
// When creating a job with trace propagation
$data = [
'your_data' => 'here',
];

// Add Sentry trace headers if Sentry SDK is available
if (class_exists(\Sentry\SentrySdk::class)) {
$data['_sentry_trace'] = \Sentry\getTraceparent();
$data['_sentry_baggage'] = \Sentry\getBaggage();
}

$queuedJobsTable->createJob('MyTask', $data);
```

The Queue plugin will automatically pass these headers to the `CakeSentry.Queue.beforeExecute` event for trace continuation.

## Notes

`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).
Expand Down
14 changes: 13 additions & 1 deletion src/Model/Table/QueuedJobsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
use ArrayObject;
use Cake\Core\Configure;
use Cake\Core\Plugin;
use Cake\Event\Event;
use Cake\Event\EventInterface;
use Cake\Event\EventManager;
use Cake\I18n\DateTime;
use Cake\ORM\Query\SelectQuery;
use Cake\ORM\Table;
Expand Down Expand Up @@ -216,8 +218,18 @@ public function createJob(string $jobTask, array|object|null $data = null, array
}

$queuedJob = $this->newEntity($queuedJob);
$queuedJob = $this->saveOrFail($queuedJob);

// Dispatch CakeSentry event for queue tracing integration
$event = new Event('CakeSentry.Queue.enqueue', $this, [
'class' => $queuedJob->job_task,
'id' => (string)$queuedJob->id,
'queue' => $queuedJob->job_task,
'data' => $queuedJob->data ?? [],
]);
EventManager::instance()->dispatch($event);

return $this->saveOrFail($queuedJob);
return $queuedJob;
}

/**
Expand Down
34 changes: 34 additions & 0 deletions src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,16 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
$taskName = $queuedJob->job_task;

// Dispatch CakeSentry beforeExecute event for queue tracing
$jobData = is_array($queuedJob->data) ? $queuedJob->data : [];
$event = new Event('CakeSentry.Queue.beforeExecute', $this, [
'class' => $queuedJob->job_task,
'sentry_trace' => $jobData['_sentry_trace'] ?? '',
'sentry_baggage' => $jobData['_sentry_baggage'] ?? '',
]);
EventManager::instance()->dispatch($event);

$startTime = microtime(true);
$return = $failureMessage = null;
try {
$this->time = time();
Expand All @@ -241,12 +251,25 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
$this->logError($taskName . ' (job ' . $queuedJob->id . ')' . "\n" . $failureMessage, $pid);
}

$executionTime = (int)((microtime(true) - $startTime) * 1000);

if ($return === false) {
$this->QueuedJobs->markJobFailed($queuedJob, $failureMessage);
$failedStatus = $this->QueuedJobs->getFailedStatus($queuedJob, $this->getTaskConf());
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');

// Dispatch CakeSentry afterExecute event for failure
$event = new Event('CakeSentry.Queue.afterExecute', $this, [
'id' => (string)$queuedJob->id,
'queue' => $queuedJob->job_task,
'data' => $jobData,
'execution_time' => $executionTime,
'retry_count' => $queuedJob->attempts,
'exception' => $e ?? null,
]);
EventManager::instance()->dispatch($event);

// Dispatch event when job has exhausted all retries
if ($failedStatus === 'aborted') {
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
Expand All @@ -260,6 +283,17 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
}

$this->QueuedJobs->markJobDone($queuedJob);

// Dispatch CakeSentry afterExecute event for success
$event = new Event('CakeSentry.Queue.afterExecute', $this, [
'id' => (string)$queuedJob->id,
'queue' => $queuedJob->job_task,
'data' => $jobData,
'execution_time' => $executionTime,
'retry_count' => $queuedJob->attempts,
]);
EventManager::instance()->dispatch($event);

$this->io->out('Job Finished.');
$this->currentJob = null;
}
Expand Down
16 changes: 16 additions & 0 deletions tests/TestCase/Model/Table/QueuedJobsTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

use Cake\Core\Configure;
use Cake\Datasource\ConnectionManager;
use Cake\Event\EventList;
use Cake\Event\EventManager;
use Cake\I18n\DateTime;
use Cake\ORM\TableRegistry;
use Cake\TestSuite\TestCase;
Expand Down Expand Up @@ -752,6 +754,20 @@ public function testGetStats() {
$this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1);
}

/**
* Test that CakeSentry.Queue.enqueue event is fired when a job is created.
*
* @return void
*/
public function testEnqueueEventFired(): void {
$eventList = new EventList();
EventManager::instance()->setEventList($eventList);

$this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']);

$this->assertEventFired('CakeSentry.Queue.enqueue');
}

/**
* Helper method for skipping tests that need a real connection.
*
Expand Down
101 changes: 101 additions & 0 deletions tests/TestCase/Queue/ProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Queue\Model\Entity\QueuedJob;
use Queue\Model\Table\QueuedJobsTable;
use Queue\Queue\Processor;
use Queue\Queue\Task\ExampleTask;
use Queue\Queue\Task\RetryExampleTask;
use ReflectionClass;
use RuntimeException;
Expand Down Expand Up @@ -287,6 +288,106 @@ public function testWorkerTimeoutHandlingIntegration() {
}
}

/**
* Test that CakeSentry.Queue.beforeExecute event is fired when job starts.
*
* @return void
*/
public function testCakeSentryBeforeExecuteEvent(): void {
$eventList = new EventList();
EventManager::instance()->setEventList($eventList);

// Create a job
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
$job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]);

// Create processor with mock task
$out = new ConsoleOutput();
$err = new ConsoleOutput();
$processor = $this->getMockBuilder(Processor::class)
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
->onlyMethods(['loadTask'])
->getMock();

$mockTask = $this->getMockBuilder(ExampleTask::class)
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
->onlyMethods(['run'])
->getMock();

$processor->method('loadTask')->willReturn($mockTask);

$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);

$this->assertEventFired('CakeSentry.Queue.beforeExecute');
}

/**
* Test that CakeSentry.Queue.afterExecute event is fired when job completes successfully.
*
* @return void
*/
public function testCakeSentryAfterExecuteSuccessEvent(): void {
$eventList = new EventList();
EventManager::instance()->setEventList($eventList);

// Create a job
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
$job = $QueuedJobs->createJob('Queue.Example', ['test' => 'data'], ['priority' => 1]);

// Create processor with mock task
$out = new ConsoleOutput();
$err = new ConsoleOutput();
$processor = $this->getMockBuilder(Processor::class)
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
->onlyMethods(['loadTask'])
->getMock();

$mockTask = $this->getMockBuilder(ExampleTask::class)
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
->onlyMethods(['run'])
->getMock();

$processor->method('loadTask')->willReturn($mockTask);

$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);

$this->assertEventFired('CakeSentry.Queue.afterExecute');
}

/**
* Test that CakeSentry.Queue.afterExecute event is fired with exception on failure.
*
* @return void
*/
public function testCakeSentryAfterExecuteFailureEvent(): void {
$eventList = new EventList();
EventManager::instance()->setEventList($eventList);

// Create a job
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
$job = $QueuedJobs->createJob('Queue.RetryExample', ['test' => 'data'], ['priority' => 1]);

// Create processor with mock task that fails
$out = new ConsoleOutput();
$err = new ConsoleOutput();
$processor = $this->getMockBuilder(Processor::class)
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
->onlyMethods(['loadTask'])
->getMock();

$mockTask = $this->getMockBuilder(RetryExampleTask::class)
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
->onlyMethods(['run'])
->getMock();
$mockTask->method('run')->willThrowException(new RuntimeException('Task failed'));

$processor->method('loadTask')->willReturn($mockTask);

$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);

$this->assertEventFired('CakeSentry.Queue.afterExecute');
}

/**
* Test setPhpTimeout with new config names
*
Expand Down