Event queue system for Forge Kernel. Provides asynchronous event processing with multiple queue drivers, priority-based processing, retry mechanisms, and scalable worker processes.
ForgeEvents provides a comprehensive event queue system for Forge Kernel applications. It enables asynchronous event processing with support for multiple queues, multiple workers, priority-based processing, and robust retry mechanisms.
Generic Module: ForgeEvents is a generic module (type: 'generic', order: 99), providing event queue functionality that can be installed as needed. It optionally depends on ForgeDatabaseSql for the database queue driver.
ForgeEvents is built with scalability, reliability, and developer experience in mind.
ForgeEvents implements a clean event-driven architecture:
Queue-based processing provides reliability and scalability:
QueueInterface provides driver abstraction:
Workers use process forking for parallel processing:
ForgeEvents can be installed via ForgePackageManager.
# Install with wizard (interactive)
php forge.php package:install-module
# Install directly (skip wizard)
php forge.php package:install-module --module=ForgeEvents
# Install specific version
php forge.php package:install-module --module=ForgeEvents@0.2.3
ForgeEvents has optional dependencies:
If using the database driver, run the migration:
php forge.php db:migrate --module=ForgeEvents
This creates the queue_jobs table for storing queued events.
ForgeEvents supports three queue drivers, each suited for different use cases.
File-based queue storage, perfect for development and small applications:
storage/queues/// Set in environment
QUEUE_DRIVER=file
Database-based queue storage, ideal for production:
queue_jobs table// Set in environment
QUEUE_DRIVER=database
In-memory queue, perfect for testing:
// Set in environment
QUEUE_DRIVER=in-memory
All queue drivers implement QueueInterface:
interface QueueInterface
{
public function push(
string $payload,
int $priority = 0,
int $delayMilliseconds = 0,
int $maxRetries = 3,
string $queue = 'default'
): void;
public function pop(string $queue = 'default'): ?array;
public function count(): int;
public function clear(): void;
public function release(int $jobId, int $delay = 0): void;
public function getNextJobDelay(string $queue = 'default'): ?float;
}
Events in ForgeEvents are simple classes decorated with the #[Event] attribute.
use App\Modules\ForgeEvents\Attributes\Event;
use App\Modules\ForgeEvents\Enums\QueuePriority;
#[Event(
queue: "page_visits",
maxRetries: 5,
delay: "1m",
priority: QueuePriority::HIGH,
)]
final readonly class TestPageVisitedEvent
{
public function __construct(
public int $userId,
public string $visitedAt
) {}
}
The delay parameter accepts time strings:
#[Event(delay: "30s")] // 30 seconds
#[Event(delay: "1m")] // 1 minute
#[Event(delay: "2h")] // 2 hours
#[Event(delay: "1d")] // 1 day
use App\Modules\ForgeEvents\Enums\QueuePriority;
// HIGH priority (processed first)
#[Event(priority: QueuePriority::HIGH)]
// NORMAL priority (default)
#[Event(priority: QueuePriority::NORMAL)]
// LOW priority (processed last)
#[Event(priority: QueuePriority::LOW)]
ForgeEvents uses PHP 8 attributes to configure event behavior.
The #[Event] attribute defines event queue configuration:
#[Event(
queue: "emails",
maxRetries: 3,
retryDelay: 2000,
delay: "5m",
priority: QueuePriority::HIGH
)]
class EmailSentEvent
{
// ...
}
QueuePriority defines processing priority:
enum QueuePriority: int
{
case HIGH = 3; // Processed first
case NORMAL = 2; // Default priority
case LOW = 1; // Processed last
}
Jobs are sorted by priority (ascending), then by creation time.
Events are dispatched through the EventDispatcher service.
use App\Modules\ForgeEvents\Services\EventDispatcher;
use App\Events\TestPageVisitedEvent;
public function __construct(
private readonly EventDispatcher $dispatcher
) {}
public function index(): Response
{
$this->dispatcher->dispatch(
new TestPageVisitedEvent(
userId: 123456,
visitedAt: date("Y-m-d H:i:s")
)
);
return $this->view("pages/test/index");
}
Events are automatically serialized when queued:
// Serialized payload includes:
[
'event' => $event, // The event instance
'class' => $eventClass, // Event class name
'metadata' => $eventMetadata, // #[Event] attribute instance
'attempts' => 0 // Retry attempt count
]
Event listeners handle events when they are processed by workers.
Listeners are automatically discovered during application bootstrap:
use App\Modules\ForgeEvents\Attributes\EventListener;
use App\Events\TestPageVisitedEvent;
use Forge\Core\DI\Attributes\Service;
#[Service]
class PageVisitLogger
{
#[EventListener(TestPageVisitedEvent::class)]
public function handlePageVisit(TestPageVisitedEvent $event): void
{
// Log the page visit
error_log("User {$event->userId} visited at {$event->visitedAt}");
}
}
Multiple listeners can handle the same event:
#[Service]
class AnalyticsService
{
#[EventListener(TestPageVisitedEvent::class)]
public function trackVisit(TestPageVisitedEvent $event): void
{
// Track in analytics
}
}
#[Service]
class NotificationService
{
#[EventListener(TestPageVisitedEvent::class)]
public function sendNotification(TestPageVisitedEvent $event): void
{
// Send notification
}
}
All listeners for an event are called when the event is processed.
Listeners can also be registered manually:
$dispatcher->addListener(
TestPageVisitedEvent::class,
function (TestPageVisitedEvent $event) {
// Handle event
}
);
Queue workers process events from queues. They support multiple queues and multiple workers per queue.
# Start workers for all queues (1 worker per queue)
php forge.php queue:work
# Start with multiple workers per queue
php forge.php queue:work --workers=2
# Interactive wizard
php forge.php queue:work # (starts wizard)
Workers automatically process all queues defined in QUEUE_LIST:
# Environment configuration
QUEUE_LIST=[emails,cache_refresh,page_visits]
# Workers start for each queue:
# Worker for queue 'emails' started (PID 39099)
# Worker for queue 'cache_refresh' started (PID 39100)
# Worker for queue 'page_visits' started (PID 39101)
Scale processing by running multiple workers per queue:
php forge.php queue:work --workers=2
# Output:
# Worker for queue 'cache_refresh' started (PID 39446)
# Worker for queue 'emails' started (PID 39445)
# Worker for queue 'page_visits' started (PID 39447)
# Worker for queue 'emails' started (PID 39448)
# Worker for queue 'cache_refresh' started (PID 39449)
# Worker for queue 'page_visits' started (PID 39450)
Workers continuously poll queues for jobs:
Workers handle shutdown signals gracefully:
# Press Ctrl+C to stop workers
^CWorker for queue 'cache_refresh' (PID 39449) exiting gracefully.
Worker for queue 'emails' (PID 39448) exiting gracefully.
Worker for queue 'cache_refresh' (PID 39446) exiting gracefully.
Worker for queue 'page_visits' (PID 39447) exiting gracefully.
Worker for queue 'page_visits' (PID 39450) exiting gracefully.
Worker for queue 'emails' (PID 39445) exiting gracefully.
Queue processing involves priority-based job selection, delayed processing, and job reservation.
Jobs are selected based on priority and creation time:
Jobs with delay are scheduled for future processing:
process_at timestamp set based on delayprocess_at has passedJobs are reserved when being processed (database driver):
reserved_at timestamp set when job is poppedDatabaseQueue uses transactions for job reservation:
// Transaction ensures atomic job reservation
$this->queryBuilder->beginTransaction();
try {
// Find and reserve job
$job = $this->queryBuilder
->where('queue', '=', $queue)
->whereNull('reserved_at')
->lockForUpdate()
->first();
if ($job) {
$this->queryBuilder
->where('id', '=', $job['id'])
->update(['reserved_at' => $now]);
}
$this->queryBuilder->commit();
} catch (\Throwable $e) {
$this->queryBuilder->rollback();
throw $e;
}
DatabaseQueue includes SQLite-specific optimizations:
ForgeEvents automatically retries failed events with configurable attempts and delays.
When a listener throws an exception, the event is automatically retried:
#[Event(
maxRetries: 5, // Maximum retry attempts
retryDelay: 2000, // Delay between retries (ms)
)]
class MyEvent
{
// ...
}
// When event fails:
1. Exception caught
2. Attempts incremented
3. If attempts < maxRetries:
- Calculate retry delay
- Re-queue with LOW priority
- Set process_at for delayed retry
4. If attempts >= maxRetries:
- Mark job as failed (failed_at)
- Delete job from queue
Jobs that exceed max retries are marked as failed:
failed_at timestamp is setRetry delay can be configured, and workers use exponential backoff when no jobs available:
ForgeEvents is configured via environment variables.
Set the queue driver via QUEUE_DRIVER:
# File-based queue (development)
QUEUE_DRIVER=file
# Database queue (production)
QUEUE_DRIVER=database
# In-memory queue (testing)
QUEUE_DRIVER=in-memory
Define queues to process via QUEUE_LIST:
# Single queue
QUEUE_LIST=[default]
# Multiple queues
QUEUE_LIST=[emails,cache_refresh,page_visits]
Workers will start for each queue in the list.
# Queue Configuration
QUEUE_DRIVER=database
QUEUE_LIST=[emails,cache_refresh,page_visits]
The database queue driver uses the queue_jobs table.
CREATE TABLE queue_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
queue VARCHAR(255) NOT NULL DEFAULT 'default',
payload TEXT NOT NULL,
attempts INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 1,
priority INTEGER DEFAULT 100,
process_at TIMESTAMP NULL,
reserved_at TIMESTAMP NULL,
failed_at TIMESTAMP NULL,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
-- Index for queue and process_at (job selection)
CREATE INDEX idx_queue_process_at ON queue_jobs(queue, process_at);
-- Index for attempts (retry tracking)
CREATE INDEX idx_attempts ON queue_jobs(attempts);
The migration is automatically included with ForgeEvents:
php forge.php db:migrate --module=ForgeEvents
Comprehensive examples demonstrating ForgeEvents usage.
use App\Modules\ForgeEvents\Attributes\Event;
use App\Modules\ForgeEvents\Enums\QueuePriority;
#[Event(
queue: "page_visits",
maxRetries: 5,
delay: "1m",
priority: QueuePriority::HIGH,
)]
final readonly class TestPageVisitedEvent
{
public function __construct(
public int $userId,
public string $visitedAt
) {}
}
use App\Modules\ForgeEvents\Services\EventDispatcher;
use App\Events\TestPageVisitedEvent;
#[Service]
final class TestController
{
public function __construct(
private readonly EventDispatcher $dispatcher
) {}
public function index(): Response
{
$this->dispatcher->dispatch(
new TestPageVisitedEvent(
userId: $this->session->get("user_id"),
visitedAt: date("Y-m-d H:i:s")
)
);
return $this->view("pages/test/index");
}
}
use App\Modules\ForgeEvents\Attributes\EventListener;
use App\Events\TestPageVisitedEvent;
use Forge\Core\DI\Attributes\Service;
#[Service]
class PageVisitLogger
{
#[EventListener(TestPageVisitedEvent::class)]
public function handlePageVisit(TestPageVisitedEvent $event): void
{
error_log("User {$event->userId} visited at {$event->visitedAt}");
}
}
# Single worker per queue
php forge.php queue:work
# Multiple workers per queue
php forge.php queue:work --workers=2
Process queued events
Worker for queue 'emails' started (PID 39099)
Worker for queue 'cache_refresh' started (PID 39100)
Worker for queue 'page_visits' started (PID 39101)
Handling event: App\Events\TestPageVisitedEvent
No listeners for event: App\Events\TestPageVisitedEvent
Queue page_visits processed job 1
Handling event: App\Events\TestPageVisitedEvent
No listeners for event: App\Events\TestPageVisitedEvent
Queue page_visits processed job 2
php forge.php queue:work --workers=2
Worker for queue 'cache_refresh' started (PID 39446)
Worker for queue 'emails' started (PID 39445)
Worker for queue 'page_visits' started (PID 39447)
Worker for queue 'emails' started (PID 39448)
Worker for queue 'cache_refresh' started (PID 39449)
Worker for queue 'page_visits' started (PID 39450)
Guidelines for using ForgeEvents effectively.