Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 103 additions & 2 deletions docs/sections/misc.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,69 @@ This includes also failed ones if not filtered further using `where()` condition

## Events
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
These events are useful for monitoring, logging, and integrating with external services.

### Queue.Job.created
Fired when a new job is added to the queue (producer side).

```php
use Cake\Event\EventInterface;
use Cake\Event\EventManager;

EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) {
$job = $event->getData('job');
// Track job creation for monitoring
});
```

Event data:
- `job`: The `QueuedJob` entity that was created

### Queue.Job.started
Fired when a worker begins processing a job (consumer side).

```php
EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) {
$job = $event->getData('job');
// Start tracing/monitoring span
});
```

Event data:
- `job`: The `QueuedJob` entity being processed

### Queue.Job.completed
Fired when a job finishes successfully.

```php
EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) {
$job = $event->getData('job');
// Mark trace as successful
});
```

Event data:
- `job`: The `QueuedJob` entity that completed

### Queue.Job.failed
Fired when a job fails (on every failure attempt).

```php
EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) {
$job = $event->getData('job');
$failureMessage = $event->getData('failureMessage');
$exception = $event->getData('exception');
// Mark trace as failed, log error
});
```

Event data:
- `job`: The `QueuedJob` entity that failed
- `failureMessage`: The error message from the failure
- `exception`: The exception object (if available)

### Queue.Job.maxAttemptsExhausted
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
Fired when a job has failed and exhausted all of its configured retry attempts.

```php
use Cake\Event\EventInterface;
Expand Down Expand Up @@ -81,10 +141,51 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn
});
```

The event data contains:
Event data:
- `job`: The `QueuedJob` entity that failed
- `failureMessage`: The error message from the last failure

### Sentry Integration

The plugin includes a `CakeSentryEventBridge` listener that bridges Queue events to
[lordsimal/cakephp-sentry](https://github.com/LordSimal/cakephp-sentry) for queue monitoring.

To enable Sentry integration, register the bridge in your `Application::bootstrap()`:

```php
use Cake\Event\EventManager;
use Queue\Event\CakeSentryEventBridge;

// Enable Sentry queue monitoring
EventManager::instance()->on(new CakeSentryEventBridge());
```

This automatically dispatches the `CakeSentry.Queue.*` events that the Sentry plugin listens to:
- `CakeSentry.Queue.enqueue` - when a job is created
- `CakeSentry.Queue.beforeExecute` - when a job starts processing
- `CakeSentry.Queue.afterExecute` - when a job completes or fails

#### Trace Propagation

For distributed tracing to work across producer and consumer, add Sentry trace headers
to job data when creating jobs:

```php
$data = [
'your_data' => 'here',
];

// Add Sentry trace headers if Sentry SDK is available
if (class_exists(\Sentry\SentrySdk::class)) {
$data['_sentry_trace'] = \Sentry\getTraceparent();
$data['_sentry_baggage'] = \Sentry\getBaggage();
}

$queuedJobsTable->createJob('MyTask', $data);
```

The bridge will automatically extract these headers and pass them to the Sentry plugin.

## Notes

`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).
Expand Down
146 changes: 146 additions & 0 deletions src/Event/CakeSentryEventBridge.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php
declare(strict_types=1);

namespace Queue\Event;

use Cake\Event\Event;
use Cake\Event\EventInterface;
use Cake\Event\EventListenerInterface;
use Cake\Event\EventManager;
use Queue\Model\Entity\QueuedJob;

/**
* Event listener that bridges Queue.Job.* events to CakeSentry.Queue.* events.
*
* This enables integration with lordsimal/cakephp-sentry for queue monitoring
* without the Queue plugin depending on the Sentry SDK.
*
* Usage:
* ```php
* // In Application::bootstrap() or a plugin bootstrap
* EventManager::instance()->on(new \Queue\Event\CakeSentryEventBridge());
* ```
*
* @see https://github.com/LordSimal/cakephp-sentry
*/
class CakeSentryEventBridge implements EventListenerInterface {

/**
* @var float|null Start time for execution time calculation
*/
protected ?float $startTime = null;

/**
* @inheritDoc
*/
public function implementedEvents(): array {
return [
'Queue.Job.created' => 'handleCreated',
'Queue.Job.started' => 'handleStarted',
'Queue.Job.completed' => 'handleCompleted',
'Queue.Job.failed' => 'handleFailed',
];
}

/**
* Handle job created event - dispatches CakeSentry.Queue.enqueue
*
* @param \Cake\Event\EventInterface $event
*
* @return void
*/
public function handleCreated(EventInterface $event): void {
/** @var \Queue\Model\Entity\QueuedJob $job */
$job = $event->getData('job');

$sentryEvent = new Event('CakeSentry.Queue.enqueue', $this, [
'class' => $job->job_task,
'id' => (string)$job->id,
'queue' => $job->job_task,
'data' => $job->data ?? [],
]);
EventManager::instance()->dispatch($sentryEvent);
}

/**
* Handle job started event - dispatches CakeSentry.Queue.beforeExecute
*
* @param \Cake\Event\EventInterface $event
*
* @return void
*/
public function handleStarted(EventInterface $event): void {
$this->startTime = microtime(true);

/** @var \Queue\Model\Entity\QueuedJob $job */
$job = $event->getData('job');
$jobData = is_array($job->data) ? $job->data : [];

$sentryEvent = new Event('CakeSentry.Queue.beforeExecute', $this, [
'class' => $job->job_task,
'sentry_trace' => $jobData['_sentry_trace'] ?? '',
'sentry_baggage' => $jobData['_sentry_baggage'] ?? '',
]);
EventManager::instance()->dispatch($sentryEvent);
}

/**
* Handle job completed event - dispatches CakeSentry.Queue.afterExecute
*
* @param \Cake\Event\EventInterface $event
*
* @return void
*/
public function handleCompleted(EventInterface $event): void {
/** @var \Queue\Model\Entity\QueuedJob $job */
$job = $event->getData('job');

$sentryEvent = new Event('CakeSentry.Queue.afterExecute', $this, $this->buildAfterExecuteData($job));
EventManager::instance()->dispatch($sentryEvent);
}

/**
* Handle job failed event - dispatches CakeSentry.Queue.afterExecute with exception
*
* @param \Cake\Event\EventInterface $event
*
* @return void
*/
public function handleFailed(EventInterface $event): void {
/** @var \Queue\Model\Entity\QueuedJob $job */
$job = $event->getData('job');
$exception = $event->getData('exception');

$data = $this->buildAfterExecuteData($job);
if ($exception !== null) {
$data['exception'] = $exception;
}

$sentryEvent = new Event('CakeSentry.Queue.afterExecute', $this, $data);
EventManager::instance()->dispatch($sentryEvent);
}

/**
* Build common data for afterExecute event.
*
* @param \Queue\Model\Entity\QueuedJob $job
*
* @return array<string, mixed>
*/
protected function buildAfterExecuteData(QueuedJob $job): array {
$executionTime = 0;
if ($this->startTime !== null) {
$executionTime = (int)((microtime(true) - $this->startTime) * 1000);
$this->startTime = null;
}

return [
'id' => (string)$job->id,
'queue' => $job->job_task,
'data' => $job->data ?? [],
'execution_time' => $executionTime,
'retry_count' => $job->attempts,
];
}

}
10 changes: 9 additions & 1 deletion src/Model/Table/QueuedJobsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
use ArrayObject;
use Cake\Core\Configure;
use Cake\Core\Plugin;
use Cake\Event\Event;
use Cake\Event\EventInterface;
use Cake\Event\EventManager;
use Cake\I18n\DateTime;
use Cake\ORM\Query\SelectQuery;
use Cake\ORM\Table;
Expand Down Expand Up @@ -216,8 +218,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
}

$queuedJob = $this->newEntity($queuedJob);
$queuedJob = $this->saveOrFail($queuedJob);

return $this->saveOrFail($queuedJob);
$event = new Event('Queue.Job.created', $this, [
'job' => $queuedJob,
]);
EventManager::instance()->dispatch($event);

return $queuedJob;
}

/**
Expand Down
21 changes: 21 additions & 0 deletions src/Queue/Processor.php
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
$taskName = $queuedJob->job_task;

// Dispatch started event
$event = new Event('Queue.Job.started', $this, [
'job' => $queuedJob,
]);
EventManager::instance()->dispatch($event);

$return = $failureMessage = null;
try {
$this->time = time();
Expand Down Expand Up @@ -247,6 +253,14 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');

// Dispatch failed event
$event = new Event('Queue.Job.failed', $this, [
'job' => $queuedJob,
'failureMessage' => $failureMessage,
'exception' => $e ?? null,
]);
EventManager::instance()->dispatch($event);

// Dispatch event when job has exhausted all retries
if ($failedStatus === 'aborted') {
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
Expand All @@ -260,6 +274,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
}

$this->QueuedJobs->markJobDone($queuedJob);

// Dispatch completed event
$event = new Event('Queue.Job.completed', $this, [
'job' => $queuedJob,
]);
EventManager::instance()->dispatch($event);

$this->io->out('Job Finished.');
$this->currentJob = null;
}
Expand Down
Loading