diff --git a/src/CloudTasksQueue.php b/src/CloudTasksQueue.php index 7c892a4..73b7764 100644 --- a/src/CloudTasksQueue.php +++ b/src/CloudTasksQueue.php @@ -18,11 +18,13 @@ class CloudTasksQueue extends LaravelQueue implements QueueContract private $client; private $default; + private $config; public function __construct(array $config, CloudTasksClient $client) { $this->client = $client; $this->default = $config['queue']; + $this->config = $config; } public function size($queue = null) @@ -52,11 +54,11 @@ public function later($delay, $job, $data = '', $queue = null) protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) { $queue = $this->getQueue($queue); - $queueName = $this->client->queueName(Config::project(), Config::location(), $queue); + $queueName = $this->client->queueName($this->config['project'], $this->config['location'], $queue); $availableAt = $this->availableAt($delay); $httpRequest = $this->createHttpRequest(); - $httpRequest->setUrl(Config::handler()); + $httpRequest->setUrl($this->config['handler']); $httpRequest->setHttpMethod(HttpMethod::POST); $httpRequest->setBody($payload); @@ -64,7 +66,7 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0, $attempts = 0) $task->setHttpRequest($httpRequest); $token = new OidcToken; - $token->setServiceAccountEmail(Config::serviceAccountEmail()); + $token->setServiceAccountEmail($this->config['service_account_email']); $httpRequest->setOidcToken($token); if ($availableAt > time()) { diff --git a/src/Config.php b/src/Config.php index f0ea54d..ab82603 100644 --- a/src/Config.php +++ b/src/Config.php @@ -6,31 +6,6 @@ class Config { - public static function credentials() - { - return config('queue.connections.cloudtasks.credentials'); - } - - public static function project() - { - return config('queue.connections.cloudtasks.project'); - } - - public static function location() - { - return config('queue.connections.cloudtasks.location'); - } - - public static function handler() - { - return config('queue.connections.cloudtasks.handler'); - } - - public static function serviceAccountEmail() - { - return config('queue.connections.cloudtasks.service_account_email'); - } - public static function validate(array $config) { if (empty($config['project'])) { diff --git a/src/TaskHandler.php b/src/TaskHandler.php index 58be40c..a8b2b5f 100644 --- a/src/TaskHandler.php +++ b/src/TaskHandler.php @@ -12,6 +12,7 @@ class TaskHandler { private $request; private $publicKey; + private $config; public function __construct(CloudTasksClient $client, Request $request, OpenIdVerificator $publicKey) { @@ -26,15 +27,27 @@ public function __construct(CloudTasksClient $client, Request $request, OpenIdVe */ public function handle($task = null) { - $this->authorizeRequest(); - $task = $task ?: $this->captureTask(); + $this->loadQueueConnectionConfiguration($task); + + $this->authorizeRequest(); + $this->listenForEvents(); $this->handleTask($task); } + private function loadQueueConnectionConfiguration($task) + { + $command = unserialize($task['data']['command']); + $connection = $command->connection ?? config('queue.default'); + $this->config = array_merge( + config("queue.connections.{$connection}"), + ['connection' => $connection] + ); + } + /** * @throws CloudTasksException */ @@ -64,7 +77,7 @@ protected function validateToken($openIdToken) throw new CloudTasksException('The given OpenID token is not valid'); } - if ($openIdToken->aud != Config::handler()) { + if ($openIdToken->aud != $this->config['handler']) { throw new CloudTasksException('The given OpenID token is not valid'); } @@ -97,7 +110,7 @@ private function listenForEvents() { app('events')->listen(JobFailed::class, function ($event) { app('queue.failer')->log( - 'cloudtasks', $event->job->getQueue(), + $this->config['connection'], $event->job->getQueue(), $event->job->getRawBody(), $event->exception ); }); @@ -117,14 +130,14 @@ private function handleTask($task) $worker = $this->getQueueWorker(); - $worker->process('cloudtasks', $job, new WorkerOptions()); + $worker->process($this->config['connection'], $job, new WorkerOptions()); } private function getQueueMaxTries(CloudTasksJob $job) { $queueName = $this->client->queueName( - Config::project(), - Config::location(), + $this->config['project'], + $this->config['location'], $job->getQueue() ); diff --git a/tests/TaskHandlerTest.php b/tests/TaskHandlerTest.php index 24a5343..c0d2f3e 100644 --- a/tests/TaskHandlerTest.php +++ b/tests/TaskHandlerTest.php @@ -77,7 +77,7 @@ public function it_needs_an_authorization_header() $this->expectException(CloudTasksException::class); $this->expectExceptionMessage('Missing [Authorization] header'); - $this->handler->handle(); + $this->handler->handle($this->simpleJob()); } /** @test */ @@ -165,7 +165,7 @@ public function after_max_attempts_it_will_log_to_failed_table() } $this->assertDatabaseHas('failed_jobs', [ - 'connection' => 'cloudtasks', + 'connection' => 'my-cloudtasks-connection', 'queue' => 'my-queue', 'payload' => rtrim($this->failingJobPayload()), ]); diff --git a/tests/TestCase.php b/tests/TestCase.php index 71d043d..f0fdf78 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -59,8 +59,8 @@ protected function getEnvironmentSetUp($app) } $app['config']->set('cache.default', 'file'); - $app['config']->set('queue.default', 'cloudtasks'); - $app['config']->set('queue.connections.cloudtasks', [ + $app['config']->set('queue.default', 'my-cloudtasks-connection'); + $app['config']->set('queue.connections.my-cloudtasks-connection', [ 'driver' => 'cloudtasks', 'queue' => 'test-queue', 'project' => 'test-project', @@ -72,6 +72,6 @@ protected function getEnvironmentSetUp($app) protected function setConfigValue($key, $value) { - $this->app['config']->set('queue.connections.cloudtasks.' . $key, $value); + $this->app['config']->set('queue.connections.my-cloudtasks-connection.' . $key, $value); } }