Skip to content
This repository was archived by the owner on Nov 5, 2025. It is now read-only.

Commit 6805385

Browse files
committed
Use database for queue jobs
1 parent b1eee36 commit 6805385

File tree

3 files changed

+83
-56
lines changed

3 files changed

+83
-56
lines changed

src/AsyncQueue.php

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use Illuminate\Queue\Queue;
44
use Illuminate\Queue\QueueInterface;
55
use Symfony\Component\Process\Process;
6+
use Barryvdh\Queue\Models\Job;
67

78
class AsyncQueue extends Queue implements QueueInterface {
89

@@ -16,53 +17,54 @@ class AsyncQueue extends Queue implements QueueInterface {
1617
*/
1718
public function push($job, $data = '', $queue = null)
1819
{
19-
$payload = $this->createPayload($job, $data);
20-
21-
$process = $this->makeProcess($payload);
22-
$process->disableOutput();
23-
$process->run();
24-
25-
20+
$id = $this->storeJob($job, $data);
21+
$this->startProcess($id, 0);
2622
return 0;
2723
}
2824

2925
/**
30-
* Create a payload string from the given job and data.
31-
*
32-
* @param string $job
33-
* @param mixed $data
34-
* @param string $queue
35-
* @return string
26+
* Store the job in the database
27+
*
28+
* @param string $job
29+
* @param mixed $data
30+
* @param integer $delay
31+
* @return integer The id of the job
3632
*/
37-
protected function createPayload($job, $data = '', $queue = null)
38-
{
39-
$payload = parent::createPayload($job, $data, $queue);
33+
public function storeJob($job, $data, $delay = 0){
34+
35+
$payload = $this->createPayload($job, $data);
36+
37+
$job = new Job;
38+
$job->status = Job::STATUS_OPEN;
39+
$job->delay = $delay;
40+
$job->payload = $payload;
41+
$job->save();
4042

41-
return base64_encode($payload);
43+
return $job->id;
4244
}
4345

4446
/**
45-
* Make a Process for the Artisan command with the payload
47+
* Make a Process for the Artisan command for the job id
4648
*
47-
* @param $payload
48-
* @return \Symfony\Component\Process\Process
49+
* @param integer $jobId
4950
*/
50-
public function makeProcess($payload)
51+
public function startProcess($jobId)
5152
{
5253
$environment = $this->container->environment();
5354
$cwd = $this->container['path.base'];
55+
$string = 'php artisan queue:async %d --env=%s ';
5456

55-
$string = 'php artisan queue:async %s --env=%s';
56-
57-
if (defined('PHP_WINDOWS_VERSION_BUILD')){
58-
$string = 'start /B ' . $string;
59-
} else {
57+
if (defined('PHP_WINDOWS_VERSION_BUILD')){
58+
$string = 'start /B ' . $string;
59+
} else {
6060
$string = 'nohup ' . $string . ' &';
61-
}
61+
}
6262

63-
$command = sprintf($string, $payload, $environment);
63+
$command = sprintf($string, $jobId, $environment);
6464

65-
return new Process($command, $cwd);
65+
$process = new Process($command, $cwd);
66+
$process->disableOutput();
67+
$process->run();
6668
}
6769

6870
/**
@@ -76,7 +78,10 @@ public function makeProcess($payload)
7678
*/
7779
public function later($delay, $job, $data = '', $queue = null)
7880
{
79-
return $this->push($job, $data, $queue);
81+
$delay = $this->getSeconds($delay);
82+
$id = $this->storeJob($job, $data, $delay);
83+
$this->startProcess($id);
84+
return 0;
8085
}
8186

8287
/**

src/Console/AsyncCommand.php

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php namespace Barryvdh\Queue\Console;
22

3+
use Barryvdh\Queue\Models\Job;
34
use Illuminate\Console\Command;
4-
use Symfony\Component\Console\Input\InputOption;
55
use Symfony\Component\Console\Input\InputArgument;
66
use Barryvdh\Queue\Jobs\AsyncJob;
77

@@ -19,7 +19,7 @@ class AsyncCommand extends Command {
1919
*
2020
* @var string
2121
*/
22-
protected $description = 'Run a base64+json encode serialized queue';
22+
protected $description = 'Run a queue from the database';
2323

2424
/**
2525
* Create a new command instance.
@@ -37,9 +37,9 @@ public function __construct()
3737
*/
3838
public function fire()
3939
{
40-
$payload = $this->argument('payload');
40+
$item = Job::findOrFail($this->argument('job_id'));
4141

42-
$job = new AsyncJob($this->laravel, $payload);
42+
$job = new AsyncJob($this->laravel, $item);
4343

4444
$job->fire();
4545

@@ -53,18 +53,8 @@ public function fire()
5353
protected function getArguments()
5454
{
5555
return array(
56-
array('payload', InputArgument::REQUIRED, 'The Job Payload'),
56+
array('job_id', InputArgument::REQUIRED, 'The Job ID'),
5757
);
5858
}
5959

60-
/**
61-
* Get the console command options.
62-
*
63-
* @return array
64-
*/
65-
protected function getOptions()
66-
{
67-
return array();
68-
}
69-
70-
}
60+
}

src/Jobs/AsyncJob.php

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
<?php namespace Barryvdh\Queue\Jobs;
22

3+
use Barryvdh\Queue\Models\Job;
34
use Illuminate\Queue\Jobs\SyncJob;
45
use Illuminate\Container\Container;
56

67
class AsyncJob extends SyncJob {
78

89
/**
9-
* The payload, Base64 & JSON Encoded
10+
* The job model
1011
*
11-
* @var array
12+
* @var Job
1213
*/
13-
protected $payload;
14+
protected $job;
1415

1516
/**
1617
* Create a new job instance.
1718
*
18-
* @param \Illuminate\Container\Container $container
19-
* @param string $payload
19+
* @param \Illuminate\Container\Container $container
20+
* @param \Barryvdh\Queue\Models\Job $job
2021
*/
21-
public function __construct(Container $container, $payload)
22+
public function __construct(Container $container, Job $job)
2223
{
23-
$this->payload = $payload;
24+
$this->job = $job;
2425
$this->container = $container;
2526
}
2627

@@ -31,9 +32,40 @@ public function __construct(Container $container, $payload)
3132
*/
3233
public function fire()
3334
{
34-
$payload = $this->parsePayload($this->payload);
35+
// Get the payload from the job
36+
$payload = $this->parsePayload($this->job->payload);
3537

38+
// If we have to wait, sleep until our time has come
39+
if($this->job->delay){
40+
$this->job->status = Job::STATUS_WAITING;
41+
$this->job->save();
42+
sleep($this->job->delay);
43+
}
44+
45+
// Mark job as started
46+
$this->job->status = Job::STATUS_STARTED;
47+
$this->job->save();
48+
49+
// Fire the actual job
3650
$this->resolveAndFire($payload);
51+
52+
// If job is not deleted, mark as finished
53+
if(!$this->deleted){
54+
$this->job->status = Job::STATUS_FINISHED;
55+
$this->job->save();
56+
}
57+
58+
}
59+
60+
/**
61+
* Delete the job from the queue.
62+
*
63+
* @return void
64+
*/
65+
public function delete()
66+
{
67+
parent::delete();
68+
$this->job->delete();
3769
}
3870

3971

@@ -44,8 +76,8 @@ public function fire()
4476
* @return array|null
4577
*/
4678
protected function parsePayload($payload){
47-
return json_decode(base64_decode($payload), true);
79+
return json_decode($payload, true);
4880
}
4981

5082

51-
}
83+
}

0 commit comments

Comments
 (0)