Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ At the queue_manager.php config file you can configure:
| supervisor_bin | The supervisor bin path | /usr/bin/supervisorctl |
| supervisor_user | The supervisor user | docker |
| supervisor_update_timeout | The supervisor update timeout to gracefully stop the process when a configuration change | 600 |
| execute_as_api | Enable the queue as API mode | false |
| api_url | URL to run the queue as API mode | http://127.0.0.1/queue/process |

### Showing all available jobs
```bash
Expand Down Expand Up @@ -118,3 +120,15 @@ Every time you change the PHP code, it's necessary to restart the queues. Put th
```bash
$ php artisan queue:restart
```

## API Mode
### Introduction
To easily scale your jobs machine, you can run the queues in API mode. An API is much more easy to apply auto-scale.

### Configuration
In your route configuration file add:
```php
$api->post('queue/process', 'LaravelQueueManager\Http\Controllers\QueueController@process');
```

Edit in your "queue_manager.php" config file the execute_as_api and api_url options.
26 changes: 26 additions & 0 deletions src/Console/Commands/WorkCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

namespace LaravelQueueManager\Console\Commands;

use Illuminate\Queue\Console\WorkCommand as LaravelWorkCommand;


class WorkCommand extends LaravelWorkCommand
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue-manager:work
{connection? : The name of connection}
{--queue= : The queue to listen on}
{--daemon : Run the worker in daemon mode (Deprecated)}
{--once : Only process the next job on the queue}
{--delay=0 : Amount of time to delay failed jobs}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';
}
89 changes: 89 additions & 0 deletions src/Core/Worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

namespace LaravelQueueManager\Core;

use Exception;
use Throwable;
use Illuminate\Queue\Worker as LaravelWorker;
use Illuminate\Queue\WorkerOptions;
use Symfony\Component\Debug\Exception\FatalThrowableError;

class Worker extends LaravelWorker
{
/**
* Process a given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
try {
$this->raiseBeforeJobEvent($connectionName, $job);

$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);

// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
if (!config('queue_manager.execute_as_api')) {
$job->fire();
} else {
$this->firePost($job);
}

$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}

/**
* Using curl to avoid conflict with Guzzle in project
* @param $job
* @throws Exception
*/
protected function firePost($job)
{
$data = [
'data' => $job->getRawBody(),
];

$url = config('queue_manager.api_url');

$ch = curl_init($url);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($data));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$curlResponse = curl_exec($ch);

$curlErrorCode = curl_errno($ch);

curl_close($ch);

if ($curlErrorCode) {
throw new \Exception(curl_strerror($curlErrorCode), $curlErrorCode);
}

$response = json_decode($curlResponse);

if ($response->status_code !== 200) {
throw new \Exception($response->error_description, $response->error_code);
}

if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}

}
47 changes: 47 additions & 0 deletions src/Http/Controllers/QueueController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace LaravelQueueManager\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Routing\Controller;

class QueueController extends Controller
{
/**
* Process a queue.
*
* @param \Illuminate\Http\Request $request
*/
public function process(Request $request)
{
try {
$data = json_decode($request->get('data'));
$service = unserialize($data->data->command);
$service->execute();

} catch (\Exception $e) {

$response = [
'status_code' => Response::HTTP_INTERNAL_SERVER_ERROR,
'error_code' => $e->getCode(),
'error_description' => $e->getMessage(),
];

if (config('app.debug', false)) {
$response['trace'] = $e->getTraceAsString();
$response['file'] = $e->getFile();
$response['line'] = $e->getLine();
$response['class'] = get_class($e);
}

return response()->json($response, Response::HTTP_INTERNAL_SERVER_ERROR);
}

$response = [
'status_code' => Response::HTTP_OK,
];

return response()->json($response, Response::HTTP_OK);
}
}
8 changes: 8 additions & 0 deletions src/Providers/LaravelQueueManagerServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
use LaravelQueueManager\Console\Commands\GenerateConfigCommand;
use LaravelQueueManager\Console\Commands\GenerateQueueCommand;
use LaravelQueueManager\Console\Commands\ShowJobsCommand;
use LaravelQueueManager\Console\Commands\WorkCommand;
use LaravelQueueManager\Core\Scheduler;
use LaravelQueueManager\Core\Worker;

class LaravelQueueManagerServiceProvider extends ServiceProvider
{
Expand All @@ -35,6 +37,7 @@ public function boot()
$this->commands('queue-manager.generate-config');
$this->commands('queue-manager.generate-queue');
$this->commands('queue-manager.show-jobs');
$this->commands('queue-manager.work');
}

public function schedule()
Expand Down Expand Up @@ -83,5 +86,10 @@ protected function registerCommand()
$this->app['queue-manager.show-jobs'] = $this->app->share(function () {
return new ShowJobsCommand();
});

$this->app['queue-manager.work'] = $this->app->share(function () {
return new WorkCommand(new Worker(resolve('Illuminate\Queue\QueueManager'), resolve('Illuminate\Contracts\Events\Dispatcher'), resolve('Illuminate\Contracts\Debug\ExceptionHandler')));
});

}
}
4 changes: 3 additions & 1 deletion src/config/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

return [
'artisan_path' => base_path('artisan'),
'log_path' => storage_path('logs/worker.log'),
'log_path' => storage_path('logs/worker.log'),
'execute_as_api' => false,
'api_url' => 'http://127.0.0.1/queue/process',
'supervisor_config_file' => '/etc/supervisor/conf.d/laravel-queue.conf',
'supervisor_bin' => '/usr/bin/supervisorctl',
'supervisor_user' => 'docker',
Expand Down
2 changes: 1 addition & 1 deletion src/resources/views/supervisor-generator.blade.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
@foreach($configs as $config)
[program:laravel-worker-{{$config->name}}]
process_name=%(program_name)s_%(process_num)02d
command=php {{config('queue_manager.artisan_path')}} queue:work --queue={{$config->name}} --sleep={{ (int)$config->delay }} --tries={{ $config->max_attempts ? $config->max_attempts : 5 }} --timeout={{ $config->timeout ? $config->timeout : 0 }} {{ $config->connection == 'default' ? '' : $config->connection }}
command=php {{config('queue_manager.artisan_path')}} queue-manager:work --queue={{$config->name}} --sleep={{ (int)$config->delay }} --tries={{ $config->max_attempts ? $config->max_attempts : 5 }} --timeout={{ $config->timeout ? $config->timeout : 0 }} {{ $config->connection == 'default' ? '' : $config->connection }}
autostart=true
autorestart=true
user={{config('queue_manager.supervisor_user')}}
Expand Down