Skip to content

Commit c12efe9

Browse files
committed
Add fallback connection
1 parent 837985f commit c12efe9

File tree

6 files changed

+103
-16
lines changed

6 files changed

+103
-16
lines changed

README.md

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,19 @@ $ php artisan queue-manager:generate-queue queue_name foo=test,bar=test
6969

7070
To the job work correctly, it is necessary generate a row in the queue_config table.
7171

72-
| Field | Description |
73-
| --------------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
74-
| name | It's the same of the return of the getName() method. |
75-
| class_name | The full path with namespace of your job class (\App\Jobs\TestJob) |
76-
| active | If the job is active or not |
77-
| schedulable | If the job is schedulable or not |
72+
| Field | Description |
73+
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
74+
| name | It's the same of the return of the getName() method. |
75+
| class_name | The full path with namespace of your job class (\App\Jobs\TestJob) |
76+
| active | If the job is active or not |
77+
| schedulable | If the job is schedulable or not |
7878
| schedule_config | A JSON config of the schedule. {"method" : "The schedule methods from laravel", "params": "The params to the schedule method (optional)", "props": [ { "my_job_prop": 1 }, { "my_job_prop": 2 } ]} |
79-
| max_attemps | The max attempts of the queue |
80-
| max_instances | The max parallel instances of the queue |
81-
| timeout | The timeout of the queue |
82-
| delay | The delay to the next execution (Not implemented yet) |
83-
| connection | The connection name of the queue provider. (If null = default) |
84-
| aggregator | The aggregator is used to group the queues in a report |
79+
| max_attemps | The max attempts of the queue |
80+
| max_instances | The max parallel instances of the queue |
81+
| timeout | The timeout of the queue |
82+
| delay | The delay to the next execution (Not implemented yet) |
83+
| connection | The connection name of the queue provider. (If null = default) |
84+
| aggregator | The aggregator is used to group the queues in a report |
8585

8686
### Config
8787

@@ -97,6 +97,7 @@ At the queue_manager.php config file you can configure:
9797
| supervisor_update_timeout | The supervisor update timeout to gracefully stop the process when a configuration change | 600 |
9898
| execute_as_api | Enable the queue as API mode | false |
9999
| api_url | URL to run the queue as API mode | http://127.0.0.1/queue/process |
100+
| fallback_connections | Array of fallback connections when first provider fails to dispatch | [] |
100101

101102
### Showing all available jobs
102103

@@ -106,11 +107,15 @@ $ php artisan queue-manager:show-jobs
106107

107108
### Getting error events
108109

109-
You need add to your AppServiceProvider and log as you like
110+
You need add to your AppServiceProvider and log as you like:
110111

111112
```php
112113
$this->app['events']->listen(\LaravelQueueManager\Events\ScheduleError::class, function(\LaravelQueueManager\Events\ScheduleError $scheduleError){
114+
// my code
115+
});
113116

117+
$this->app['events']->listen(\LaravelQueueManager\Events\ScheduleError::class, function(\LaravelQueueManager\Events\DispatchQueueError $scheduleError){
118+
// my code
114119
});
115120
```
116121

src/AbstractJob.php

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Illuminate\Queue\Jobs\SyncJob;
99
use Illuminate\Queue\SerializesModels;
1010
use LaravelQueueManager\Repository\QueueConfigRepository;
11+
use LaravelQueueManager\Events\DispatchQueueError;
1112

1213
abstract class AbstractJob implements ShouldQueue
1314
{
@@ -47,17 +48,45 @@ final public function handle()
4748
$this->execute();
4849
}
4950

50-
final public function dispatch()
51+
final public function dispatch($delaySeconds = 0)
5152
{
5253
$this->uid = uniqid();
5354

5455
$connectionName = $this->getConectionName();
5556

57+
$runningConnection = config('queue.default');
5658
if ($connectionName != 'default') {
59+
$runningConnection = $connectionName;
5760
$this->onConnection($connectionName);
5861
}
5962

60-
return dispatch($this->onQueue($this->getName()));
63+
try {
64+
$dispatcher = app(Dispatcher::class);
65+
66+
if ($delaySeconds) {
67+
$this->delay = now()->addSeconds($delaySeconds);
68+
}
69+
70+
return $dispatcher->dispatch($this);
71+
} catch (\Throwable $e) {
72+
73+
event(new DispatchQueueError($e->getMessage(), ['action' => 'queue_dispatch_error']));
74+
75+
$fallbackConnections = config('queue_manager.fallback_connections');
76+
foreach ($fallbackConnections as $fallbackConnection) {
77+
if ($fallbackConnection === $runningConnection) {
78+
continue;
79+
}
80+
81+
try {
82+
$this->onConnection($fallbackConnection);
83+
$dispatcher = app(Dispatcher::class);
84+
return $dispatcher->dispatch($this);
85+
} catch (\Throwable $e) {}
86+
}
87+
88+
throw $e;
89+
}
6190
}
6291

6392
final public function setProps($props)

src/Core/SupervisorGenerator.php

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,26 @@ public function __construct(QueueConfigRepository $queueConfigRepository)
2121

2222
public function generate()
2323
{
24-
$supervisorConfig = View::make('laravel_queue_manager::supervisor-generator', ['configs' => $this->queueConfigRepository->findAll()]);
24+
$configs = $this->queueConfigRepository->findActives();
25+
$fallbackConnections = array_filter(config('queue_manager.fallback_connections'), function($name) {
26+
return $name !== 'sync';
27+
});
28+
29+
foreach($configs as $config) {
30+
$connectionName = config('queue.default');
31+
if ($config->connection && $config->connection !== 'default') {
32+
$connectionName = $config->connection;
33+
}
34+
35+
$config->fallback_connections = array_filter($fallbackConnections, function($name) use($config) {
36+
return $name !== $config->connection;
37+
});
38+
}
39+
40+
$supervisorConfig = View::make('laravel_queue_manager::supervisor-generator', [
41+
'configs' => $configs,
42+
'fallbackConnections' => $fallbackConnections,
43+
]);
2544

2645
$supervisorConfigOld = '';
2746
if (file_exists($this->filename)) {

src/Events/DispatchQueueError.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace LaravelQueueManager\Events;
4+
5+
class DispatchQueueError
6+
{
7+
public $message;
8+
9+
public $context;
10+
11+
public function __construct($message, $context)
12+
{
13+
$this->message = $message;
14+
15+
$this->context = $context;
16+
}
17+
18+
}

src/config/config.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@
1010
'supervisor_bin' => '/usr/bin/supervisorctl',
1111
'supervisor_user' => 'docker',
1212
'supervisor_update_timeout' => 600,
13+
'fallback_connections' => [],
1314
];

src/resources/views/supervisor-generator.blade.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,19 @@
1111
stopwaitsecs={{ $config->timeout ? $config->timeout : 600 }}
1212
stopsignal=TERM
1313

14+
@foreach($config->fallback_connections as $fallbackConnection)
15+
[program:laravel-worker-{{$config->name}}-fallback-{{$fallbackConnection}}]
16+
process_name=%(program_name)s_%(process_num)02d
17+
command=php {{config('queue_manager.artisan_path')}} queue-manager:work --queue={{$config->name}} --sleep=60 --tries={{ $config->max_attempts ? $config->max_attempts : 5 }} --timeout={{ $config->timeout ? $config->timeout : 0 }} {{ $fallbackConnection }}
18+
autostart=true
19+
autorestart=true
20+
user={{config('queue_manager.supervisor_user')}}
21+
numprocs=1
22+
redirect_stderr=true
23+
stdout_logfile={{config('queue_manager.log_path')}}
24+
stopwaitsecs={{ $config->timeout ? $config->timeout : 600 }}
25+
stopsignal=TERM
26+
27+
@endforeach
28+
1429
@endforeach

0 commit comments

Comments
 (0)