Skip to content

Commit 74df025

Browse files
committed
Updated JobLogHandler to RxPHP 2.0
1 parent b9ca9e8 commit 74df025

File tree

1 file changed

+3
-8
lines changed

1 file changed

+3
-8
lines changed

src/CommandBus/Handler/JobLogHandler.php

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
use React\Promise\PromiseInterface;
1313
use Rx\Observable;
1414
use Rx\ObserverInterface;
15-
use Rx\SchedulerInterface;
16-
use function ApiClients\Tools\Rx\unwrapObservableFromPromise;
1715
use function React\Promise\resolve;
18-
use function WyriHaximus\React\futureFunctionPromise;
1916

2017
final class JobLogHandler
2118
{
@@ -52,8 +49,7 @@ public function handle(JobLogCommand $command): PromiseInterface
5249
ApiSettings::PUSHER_KEY
5350
)->then(function (PusherAsyncClient $pusher) use ($command) {
5451
return resolve(Observable::create(function (
55-
ObserverInterface $observer,
56-
SchedulerInterface $scheduler
52+
ObserverInterface $observer
5753
) use (
5854
$pusher,
5955
$command
@@ -62,7 +58,7 @@ public function handle(JobLogCommand $command): PromiseInterface
6258
return $event->getEvent() === 'job:log';
6359
})->map(function (Event $event) {
6460
return $this->hydrator->hydrate(LogLineInterface::HYDRATE_CLASS, $event->getData());
65-
})->subscribeCallback(
61+
})->subscribe(
6662
function (LogLineInterface $line) use ($observer, &$subscription) {
6763
$observer->onNext($line);
6864

@@ -75,8 +71,7 @@ function ($error) use ($observer) {
7571
},
7672
function () use ($observer) {
7773
$observer->onComplete();
78-
},
79-
$scheduler
74+
}
8075
);
8176
}));
8277
});

0 commit comments

Comments
 (0)