ForgeEvents

Event queue system for Forge Kernel. Provides asynchronous event processing with multiple queue drivers, priority-based processing, retry mechanisms, and scalable worker processes.

Overview

ForgeEvents provides a comprehensive event queue system for Forge Kernel applications. It enables asynchronous event processing with support for multiple queues, multiple workers, priority-based processing, and robust retry mechanisms.

Key Features

Multiple queue drivers (file, database, in-memory)
Multiple queues support
Multiple workers per queue
Priority-based processing (HIGH, NORMAL, LOW)
Automatic retry with exponential backoff
Delayed processing support
Graceful shutdown
Automatic listener discovery

What ForgeEvents Provides

  • Event-Driven Architecture: Clean separation of concerns with asynchronous event processing
  • Multiple Queue Drivers: File, database, and in-memory options for different use cases
  • Multiple Queues: Organize events by type (emails, cache_refresh, page_visits)
  • Multiple Workers: Scale processing with parallel workers per queue
  • Priority Processing: HIGH, NORMAL, LOW priority support for critical events
  • Retry Mechanism: Automatic retries with exponential backoff on failure
  • Delayed Processing: Schedule events for future processing with time strings
  • Graceful Shutdown: Proper cleanup on worker termination with signal handling
  • Automatic Listener Discovery: Listeners are automatically discovered and registered

Generic Module: ForgeEvents is a generic module (type: 'generic', order: 99), providing event queue functionality that can be installed as needed. It optionally depends on ForgeDatabaseSql for the database queue driver.

Architecture & Design Philosophy

ForgeEvents is built with scalability, reliability, and developer experience in mind.

Event-Driven Architecture

ForgeEvents implements a clean event-driven architecture:

  • Events are dispatched asynchronously to queues
  • Workers process events independently from the main application
  • Listeners handle events when they are processed
  • Complete decoupling between event producers and consumers

Queue-Based Processing

Queue-based processing provides reliability and scalability:

  • Events are persisted to queues before processing
  • Workers poll queues for available jobs
  • Failed jobs can be retried automatically
  • Multiple queues allow workload separation

Driver Abstraction

QueueInterface provides driver abstraction:

  • FileQueue for file-based storage (development/testing)
  • DatabaseQueue for database storage (production)
  • InMemoryQueue for in-memory processing (testing)
  • Easy to add custom queue drivers

Multi-Process Worker Architecture

Workers use process forking for parallel processing:

  • One process per queue
  • Multiple worker processes per queue (configurable)
  • Process isolation for reliability
  • Graceful shutdown with signal handling

Installation

ForgeEvents can be installed via ForgePackageManager.

Using ForgePackageManager

# Install with wizard (interactive)
php forge.php package:install-module

# Install directly (skip wizard)
php forge.php package:install-module --module=ForgeEvents

# Install specific version
php forge.php package:install-module --module=ForgeEvents@0.2.3

Dependencies

ForgeEvents has optional dependencies:

  • ForgeDatabaseSql: Required only if using the database queue driver
  • File and in-memory drivers have no dependencies

Database Migration

If using the database driver, run the migration:

php forge.php db:migrate --module=ForgeEvents

This creates the queue_jobs table for storing queued events.

Queue Drivers

ForgeEvents supports three queue drivers, each suited for different use cases.

FileQueue

File-based queue storage, perfect for development and small applications:

  • Stores jobs as files in storage/queues/
  • No database required
  • File locking for concurrent access
  • Priority-based file sorting
  • Good for development and testing
// Set in environment
QUEUE_DRIVER=file

DatabaseQueue

Database-based queue storage, ideal for production:

  • Stores jobs in queue_jobs table
  • Requires ForgeDatabaseSql module
  • Transaction support for reliability
  • SQLite-specific optimizations
  • Lock-based job reservation
  • Production-ready for high-traffic applications
// Set in environment
QUEUE_DRIVER=database

InMemoryQueue

In-memory queue, perfect for testing:

  • Stores jobs in memory (SplPriorityQueue)
  • No persistence (lost on process exit)
  • Fastest option for testing
  • Not suitable for production
// Set in environment
QUEUE_DRIVER=in-memory

QueueInterface Contract

All queue drivers implement QueueInterface:

interface QueueInterface
{
    public function push(
        string $payload,
        int $priority = 0,
        int $delayMilliseconds = 0,
        int $maxRetries = 3,
        string $queue = 'default'
    ): void;
    
    public function pop(string $queue = 'default'): ?array;
    public function count(): int;
    public function clear(): void;
    public function release(int $jobId, int $delay = 0): void;
    public function getNextJobDelay(string $queue = 'default'): ?float;
}

Creating Events

Events in ForgeEvents are simple classes decorated with the #[Event] attribute.

Basic Event Structure

use App\Modules\ForgeEvents\Attributes\Event;
use App\Modules\ForgeEvents\Enums\QueuePriority;

#[Event(
    queue: "page_visits",
    maxRetries: 5,
    delay: "1m",
    priority: QueuePriority::HIGH,
)]
final readonly class TestPageVisitedEvent
{
    public function __construct(
        public int $userId,
        public string $visitedAt
    ) {}
}

Event Attribute Parameters

  • queue: Queue name (default: 'default')
  • maxRetries: Maximum retry attempts (default: 1)
  • retryDelay: Delay between retries in milliseconds (default: 1000)
  • delay: Initial delay before processing (e.g., '10m', '30s', '2h') (default: '0s')
  • priority: QueuePriority enum (HIGH, NORMAL, LOW) (default: NORMAL)

Delay Format

The delay parameter accepts time strings:

#[Event(delay: "30s")]  // 30 seconds
#[Event(delay: "1m")]   // 1 minute
#[Event(delay: "2h")]   // 2 hours
#[Event(delay: "1d")]   // 1 day

Priority Levels

use App\Modules\ForgeEvents\Enums\QueuePriority;

// HIGH priority (processed first)
#[Event(priority: QueuePriority::HIGH)]

// NORMAL priority (default)
#[Event(priority: QueuePriority::NORMAL)]

// LOW priority (processed last)
#[Event(priority: QueuePriority::LOW)]

Event Attributes

ForgeEvents uses PHP 8 attributes to configure event behavior.

#[Event]

The #[Event] attribute defines event queue configuration:

#[Event(
    queue: "emails",
    maxRetries: 3,
    retryDelay: 2000,
    delay: "5m",
    priority: QueuePriority::HIGH
)]
class EmailSentEvent
{
    // ...
}

QueuePriority Enum

QueuePriority defines processing priority:

enum QueuePriority: int
{
    case HIGH = 3;    // Processed first
    case NORMAL = 2;  // Default priority
    case LOW = 1;     // Processed last
}

Jobs are sorted by priority (ascending), then by creation time.

Dispatching Events

Events are dispatched through the EventDispatcher service.

Basic Dispatching

use App\Modules\ForgeEvents\Services\EventDispatcher;
use App\Events\TestPageVisitedEvent;

public function __construct(
    private readonly EventDispatcher $dispatcher
) {}

public function index(): Response
{
    $this->dispatcher->dispatch(
        new TestPageVisitedEvent(
            userId: 123456,
            visitedAt: date("Y-m-d H:i:s")
        )
    );
    
    return $this->view("pages/test/index");
}

How Dispatching Works

  • EventDispatcher reads #[Event] attribute from event class
  • Event is serialized with metadata
  • Event is pushed to the specified queue
  • Priority and delay are applied
  • Processing happens asynchronously by workers

Event Serialization

Events are automatically serialized when queued:

// Serialized payload includes:
[
    'event' => $event,              // The event instance
    'class' => $eventClass,         // Event class name
    'metadata' => $eventMetadata,    // #[Event] attribute instance
    'attempts' => 0                 // Retry attempt count
]

Event Listeners

Event listeners handle events when they are processed by workers.

Automatic Listener Discovery

Listeners are automatically discovered during application bootstrap:

  • ServiceDiscoverSetup scans service directories
  • Methods with #[EventListener] attribute are registered
  • Listeners are resolved from the container
  • No manual registration required

Creating Listeners

use App\Modules\ForgeEvents\Attributes\EventListener;
use App\Events\TestPageVisitedEvent;
use Forge\Core\DI\Attributes\Service;

#[Service]
class PageVisitLogger
{
    #[EventListener(TestPageVisitedEvent::class)]
    public function handlePageVisit(TestPageVisitedEvent $event): void
    {
        // Log the page visit
        error_log("User {$event->userId} visited at {$event->visitedAt}");
    }
}

Multiple Listeners

Multiple listeners can handle the same event:

#[Service]
class AnalyticsService
{
    #[EventListener(TestPageVisitedEvent::class)]
    public function trackVisit(TestPageVisitedEvent $event): void
    {
        // Track in analytics
    }
}

#[Service]
class NotificationService
{
    #[EventListener(TestPageVisitedEvent::class)]
    public function sendNotification(TestPageVisitedEvent $event): void
    {
        // Send notification
    }
}

All listeners for an event are called when the event is processed.

Manual Listener Registration

Listeners can also be registered manually:

$dispatcher->addListener(
    TestPageVisitedEvent::class,
    function (TestPageVisitedEvent $event) {
        // Handle event
    }
);

Queue Workers

Queue workers process events from queues. They support multiple queues and multiple workers per queue.

Starting Workers

# Start workers for all queues (1 worker per queue)
php forge.php queue:work

# Start with multiple workers per queue
php forge.php queue:work --workers=2

# Interactive wizard
php forge.php queue:work  # (starts wizard)

Multiple Queues

Workers automatically process all queues defined in QUEUE_LIST:

# Environment configuration
QUEUE_LIST=[emails,cache_refresh,page_visits]

# Workers start for each queue:
# Worker for queue 'emails' started (PID 39099)
# Worker for queue 'cache_refresh' started (PID 39100)
# Worker for queue 'page_visits' started (PID 39101)

Multiple Workers Per Queue

Scale processing by running multiple workers per queue:

php forge.php queue:work --workers=2

# Output:
# Worker for queue 'cache_refresh' started (PID 39446)
# Worker for queue 'emails' started (PID 39445)
# Worker for queue 'page_visits' started (PID 39447)
# Worker for queue 'emails' started (PID 39448)
# Worker for queue 'cache_refresh' started (PID 39449)
# Worker for queue 'page_visits' started (PID 39450)

Worker Architecture

  • One parent process per queue
  • Multiple worker processes per queue (forked)
  • Each worker polls its queue independently
  • Process isolation for reliability

Worker Loop

Workers continuously poll queues for jobs:

  • Poll queue for available jobs
  • Process job if available
  • Exponential backoff when no jobs available
  • Garbage collection every 50 jobs
  • Check for shutdown signals

Graceful Shutdown

Workers handle shutdown signals gracefully:

# Press Ctrl+C to stop workers
^CWorker for queue 'cache_refresh' (PID 39449) exiting gracefully.
Worker for queue 'emails' (PID 39448) exiting gracefully.
Worker for queue 'cache_refresh' (PID 39446) exiting gracefully.
Worker for queue 'page_visits' (PID 39447) exiting gracefully.
Worker for queue 'page_visits' (PID 39450) exiting gracefully.
Worker for queue 'emails' (PID 39445) exiting gracefully.
  • SIGINT (Ctrl+C) and SIGTERM trigger graceful shutdown
  • Current job is released back to queue
  • Workers exit cleanly

Queue Processing

Queue processing involves priority-based job selection, delayed processing, and job reservation.

Priority-Based Selection

Jobs are selected based on priority and creation time:

  • Jobs sorted by priority (ascending: LOW, NORMAL, HIGH)
  • Within same priority, sorted by creation time (oldest first)
  • HIGH priority jobs processed first

Delayed Processing

Jobs with delay are scheduled for future processing:

  • process_at timestamp set based on delay
  • Jobs only processed when process_at has passed
  • Workers check delay before processing

Job Reservation

Jobs are reserved when being processed (database driver):

  • reserved_at timestamp set when job is popped
  • Prevents multiple workers from processing same job
  • Transaction-based reservation for reliability
  • Job deleted after successful processing

Transaction Support

DatabaseQueue uses transactions for job reservation:

// Transaction ensures atomic job reservation
$this->queryBuilder->beginTransaction();
try {
    // Find and reserve job
    $job = $this->queryBuilder
        ->where('queue', '=', $queue)
        ->whereNull('reserved_at')
        ->lockForUpdate()
        ->first();
    
    if ($job) {
        $this->queryBuilder
            ->where('id', '=', $job['id'])
            ->update(['reserved_at' => $now]);
    }
    
    $this->queryBuilder->commit();
} catch (\Throwable $e) {
    $this->queryBuilder->rollback();
    throw $e;
}

SQLite Optimizations

DatabaseQueue includes SQLite-specific optimizations:

  • Uses SQLite's RETURNING clause for atomic updates
  • Single query for find and reserve
  • Improved performance on SQLite

Retry Mechanism

ForgeEvents automatically retries failed events with configurable attempts and delays.

Automatic Retry

When a listener throws an exception, the event is automatically retried:

  • Exception is caught during listener execution
  • Retry count is incremented
  • If retries remaining, job is re-queued with delay
  • If max retries exceeded, job is marked as failed

Retry Configuration

#[Event(
    maxRetries: 5,        // Maximum retry attempts
    retryDelay: 2000,     // Delay between retries (ms)
)]
class MyEvent
{
    // ...
}

Retry Process

// When event fails:
1. Exception caught
2. Attempts incremented
3. If attempts < maxRetries:
   - Calculate retry delay
   - Re-queue with LOW priority
   - Set process_at for delayed retry
4. If attempts >= maxRetries:
   - Mark job as failed (failed_at)
   - Delete job from queue

Failed Jobs

Jobs that exceed max retries are marked as failed:

  • failed_at timestamp is set
  • Job is removed from active queue
  • Failed jobs can be inspected in database

Exponential Backoff

Retry delay can be configured, and workers use exponential backoff when no jobs available:

  • Initial backoff: 0.1 seconds
  • Backoff doubles on each empty poll
  • Maximum backoff: 5 seconds
  • Resets when job is processed

Configuration

ForgeEvents is configured via environment variables.

Queue Driver

Set the queue driver via QUEUE_DRIVER:

# File-based queue (development)
QUEUE_DRIVER=file

# Database queue (production)
QUEUE_DRIVER=database

# In-memory queue (testing)
QUEUE_DRIVER=in-memory

Queue List

Define queues to process via QUEUE_LIST:

# Single queue
QUEUE_LIST=[default]

# Multiple queues
QUEUE_LIST=[emails,cache_refresh,page_visits]

Workers will start for each queue in the list.

Complete Configuration Example

# Queue Configuration
QUEUE_DRIVER=database
QUEUE_LIST=[emails,cache_refresh,page_visits]

Database Schema

The database queue driver uses the queue_jobs table.

Table Structure

CREATE TABLE queue_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    queue VARCHAR(255) NOT NULL DEFAULT 'default',
    payload TEXT NOT NULL,
    attempts INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 1,
    priority INTEGER DEFAULT 100,
    process_at TIMESTAMP NULL,
    reserved_at TIMESTAMP NULL,
    failed_at TIMESTAMP NULL,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

Columns

  • id: Primary key
  • queue: Queue name
  • payload: Serialized event data
  • attempts: Current retry attempt count
  • max_retries: Maximum retry attempts
  • priority: Job priority (1=LOW, 2=NORMAL, 3=HIGH)
  • process_at: When to process (for delayed jobs)
  • reserved_at: When job was reserved by worker
  • failed_at: When job failed (exceeded max retries)
  • created_at: Job creation timestamp
  • updated_at: Last update timestamp

Indexes

-- Index for queue and process_at (job selection)
CREATE INDEX idx_queue_process_at ON queue_jobs(queue, process_at);

-- Index for attempts (retry tracking)
CREATE INDEX idx_attempts ON queue_jobs(attempts);

Migration

The migration is automatically included with ForgeEvents:

php forge.php db:migrate --module=ForgeEvents

Usage Examples

Comprehensive examples demonstrating ForgeEvents usage.

Event Definition

use App\Modules\ForgeEvents\Attributes\Event;
use App\Modules\ForgeEvents\Enums\QueuePriority;

#[Event(
    queue: "page_visits",
    maxRetries: 5,
    delay: "1m",
    priority: QueuePriority::HIGH,
)]
final readonly class TestPageVisitedEvent
{
    public function __construct(
        public int $userId,
        public string $visitedAt
    ) {}
}

Dispatching Events

use App\Modules\ForgeEvents\Services\EventDispatcher;
use App\Events\TestPageVisitedEvent;

#[Service]
final class TestController
{
    public function __construct(
        private readonly EventDispatcher $dispatcher
    ) {}

    public function index(): Response
    {
        $this->dispatcher->dispatch(
            new TestPageVisitedEvent(
                userId: $this->session->get("user_id"),
                visitedAt: date("Y-m-d H:i:s")
            )
        );

        return $this->view("pages/test/index");
    }
}

Event Listeners

use App\Modules\ForgeEvents\Attributes\EventListener;
use App\Events\TestPageVisitedEvent;
use Forge\Core\DI\Attributes\Service;

#[Service]
class PageVisitLogger
{
    #[EventListener(TestPageVisitedEvent::class)]
    public function handlePageVisit(TestPageVisitedEvent $event): void
    {
        error_log("User {$event->userId} visited at {$event->visitedAt}");
    }
}

Starting Workers

# Single worker per queue
php forge.php queue:work

# Multiple workers per queue
php forge.php queue:work --workers=2

Worker Output

Process queued events

Worker for queue 'emails' started (PID 39099)
Worker for queue 'cache_refresh' started (PID 39100)
Worker for queue 'page_visits' started (PID 39101)
Handling event: App\Events\TestPageVisitedEvent
No listeners for event: App\Events\TestPageVisitedEvent
Queue page_visits processed job 1
Handling event: App\Events\TestPageVisitedEvent
No listeners for event: App\Events\TestPageVisitedEvent
Queue page_visits processed job 2

Multiple Workers Example

php forge.php queue:work --workers=2

Worker for queue 'cache_refresh' started (PID 39446)
Worker for queue 'emails' started (PID 39445)
Worker for queue 'page_visits' started (PID 39447)
Worker for queue 'emails' started (PID 39448)
Worker for queue 'cache_refresh' started (PID 39449)
Worker for queue 'page_visits' started (PID 39450)

Best Practices

Guidelines for using ForgeEvents effectively.

Queue Driver Selection

  • FileQueue: Use for development and small applications
  • DatabaseQueue: Use for production with high reliability requirements
  • InMemoryQueue: Use only for testing

Queue Organization

  • Organize queues by event type (emails, cache_refresh, page_visits)
  • Use separate queues for different priorities
  • Group related events in the same queue
  • Consider queue size and processing time when organizing

Worker Scaling

  • Start with 1 worker per queue and scale as needed
  • Monitor queue depth to determine worker count
  • Balance worker count with system resources
  • Use process managers (supervisor, systemd) for production

Error Handling

  • Always handle exceptions in listeners
  • Set appropriate maxRetries for different event types
  • Use retryDelay to prevent overwhelming systems
  • Monitor failed jobs and investigate failures

Monitoring and Logging

  • Log event processing for debugging
  • Monitor queue depth and processing times
  • Track failed jobs and retry rates
  • Set up alerts for queue backlogs

Performance Considerations

  • Use appropriate priorities (don't make everything HIGH)
  • Avoid long-running listeners (delegate to sub-processes if needed)
  • Batch similar operations when possible
  • Monitor database queue table size and clean up old jobs