| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- <?php
- declare(strict_types=1);
- namespace Sabre\Event\Loop;
- /**
- * A simple eventloop implementation.
- *
- * This eventloop supports:
- * * nextTick
- * * setTimeout for delayed functions
- * * setInterval for repeating functions
- * * stream events using stream_select
- *
- * @copyright Copyright (C) fruux GmbH (https://fruux.com/)
- * @author Evert Pot (http://evertpot.com/)
- * @license http://sabre.io/license/ Modified BSD License
- */
- class Loop
- {
- /**
- * Executes a function after x seconds.
- */
- public function setTimeout(callable $cb, float $timeout)
- {
- $triggerTime = microtime(true) + $timeout;
- if (!$this->timers) {
- // Special case when the timers array was empty.
- $this->timers[] = [$triggerTime, $cb];
- return;
- }
- // We need to insert these values in the timers array, but the timers
- // array must be in reverse-order of trigger times.
- //
- // So here we search the array for the insertion point.
- $index = count($this->timers) - 1;
- while (true) {
- if ($triggerTime < $this->timers[$index][0]) {
- array_splice(
- $this->timers,
- $index + 1,
- 0,
- [[$triggerTime, $cb]]
- );
- break;
- } elseif (0 === $index) {
- array_unshift($this->timers, [$triggerTime, $cb]);
- break;
- }
- --$index;
- }
- }
- /**
- * Executes a function every x seconds.
- *
- * The value this function returns can be used to stop the interval with
- * clearInterval.
- */
- public function setInterval(callable $cb, float $timeout): array
- {
- $keepGoing = true;
- $f = null;
- $f = function () use ($cb, &$f, $timeout, &$keepGoing) {
- if ($keepGoing) {
- $cb();
- $this->setTimeout($f, $timeout);
- }
- };
- $this->setTimeout($f, $timeout);
- // Really the only thing that matters is returning the $keepGoing
- // boolean value.
- //
- // We need to pack it in an array to allow returning by reference.
- // Because I'm worried people will be confused by using a boolean as a
- // sort of identifier, I added an extra string.
- return ['I\'m an implementation detail', &$keepGoing];
- }
- /**
- * Stops a running interval.
- */
- public function clearInterval(array $intervalId)
- {
- $intervalId[1] = false;
- }
- /**
- * Runs a function immediately at the next iteration of the loop.
- */
- public function nextTick(callable $cb)
- {
- $this->nextTick[] = $cb;
- }
- /**
- * Adds a read stream.
- *
- * The callback will be called as soon as there is something to read from
- * the stream.
- *
- * You MUST call removeReadStream after you are done with the stream, to
- * prevent the eventloop from never stopping.
- *
- * @param resource $stream
- */
- public function addReadStream($stream, callable $cb)
- {
- $this->readStreams[(int) $stream] = $stream;
- $this->readCallbacks[(int) $stream] = $cb;
- }
- /**
- * Adds a write stream.
- *
- * The callback will be called as soon as the system reports it's ready to
- * receive writes on the stream.
- *
- * You MUST call removeWriteStream after you are done with the stream, to
- * prevent the eventloop from never stopping.
- *
- * @param resource $stream
- */
- public function addWriteStream($stream, callable $cb)
- {
- $this->writeStreams[(int) $stream] = $stream;
- $this->writeCallbacks[(int) $stream] = $cb;
- }
- /**
- * Stop watching a stream for reads.
- *
- * @param resource $stream
- */
- public function removeReadStream($stream)
- {
- unset(
- $this->readStreams[(int) $stream],
- $this->readCallbacks[(int) $stream]
- );
- }
- /**
- * Stop watching a stream for writes.
- *
- * @param resource $stream
- */
- public function removeWriteStream($stream)
- {
- unset(
- $this->writeStreams[(int) $stream],
- $this->writeCallbacks[(int) $stream]
- );
- }
- /**
- * Runs the loop.
- *
- * This function will run continuously, until there's no more events to
- * handle.
- */
- public function run()
- {
- $this->running = true;
- do {
- $hasEvents = $this->tick(true);
- } while ($this->running && $hasEvents);
- $this->running = false;
- }
- /**
- * Executes all pending events.
- *
- * If $block is turned true, this function will block until any event is
- * triggered.
- *
- * If there are now timeouts, nextTick callbacks or events in the loop at
- * all, this function will exit immediately.
- *
- * This function will return true if there are _any_ events left in the
- * loop after the tick.
- */
- public function tick(bool $block = false): bool
- {
- $this->runNextTicks();
- $nextTimeout = $this->runTimers();
- // Calculating how long runStreams should at most wait.
- if (!$block) {
- // Don't wait
- $streamWait = 0;
- } elseif ($this->nextTick) {
- // There's a pending 'nextTick'. Don't wait.
- $streamWait = 0;
- } elseif (is_numeric($nextTimeout)) {
- // Wait until the next Timeout should trigger.
- $streamWait = $nextTimeout;
- } else {
- // Wait indefinitely
- $streamWait = null;
- }
- $this->runStreams($streamWait);
- return $this->readStreams || $this->writeStreams || $this->nextTick || $this->timers;
- }
- /**
- * Stops a running eventloop.
- */
- public function stop()
- {
- $this->running = false;
- }
- /**
- * Executes all 'nextTick' callbacks.
- *
- * return void
- */
- protected function runNextTicks()
- {
- $nextTick = $this->nextTick;
- $this->nextTick = [];
- foreach ($nextTick as $cb) {
- $cb();
- }
- }
- /**
- * Runs all pending timers.
- *
- * After running the timer callbacks, this function returns the number of
- * seconds until the next timer should be executed.
- *
- * If there's no more pending timers, this function returns null.
- *
- * @return float|null
- */
- protected function runTimers()
- {
- $now = microtime(true);
- while (($timer = array_pop($this->timers)) && $timer[0] < $now) {
- $timer[1]();
- }
- // Add the last timer back to the array.
- if ($timer) {
- $this->timers[] = $timer;
- return max(0, $timer[0] - microtime(true));
- }
- }
- /**
- * Runs all pending stream events.
- *
- * If $timeout is 0, it will return immediately. If $timeout is null, it
- * will wait indefinitely.
- *
- * @param float|null $timeout
- */
- protected function runStreams($timeout)
- {
- if ($this->readStreams || $this->writeStreams) {
- $read = $this->readStreams;
- $write = $this->writeStreams;
- $except = null;
- // stream_select changes behavior in 8.1 to forbid passing non-null microseconds when the seconds are null.
- // Older versions of php don't allow passing null to microseconds.
- if (null !== $timeout ? stream_select($read, $write, $except, 0, (int) ($timeout * 1000000)) : stream_select($read, $write, $except, null)) {
- // See PHP Bug https://bugs.php.net/bug.php?id=62452
- // Fixed in PHP7
- foreach ($read as $readStream) {
- $readCb = $this->readCallbacks[(int) $readStream];
- $readCb();
- }
- foreach ($write as $writeStream) {
- $writeCb = $this->writeCallbacks[(int) $writeStream];
- $writeCb();
- }
- }
- } elseif ($this->running && ($this->nextTick || $this->timers)) {
- usleep(null !== $timeout ? intval($timeout * 1000000) : 200000);
- }
- }
- /**
- * Is the main loop active.
- *
- * @var bool
- */
- protected $running = false;
- /**
- * A list of timers, added by setTimeout.
- *
- * @var array
- */
- protected $timers = [];
- /**
- * A list of 'nextTick' callbacks.
- *
- * @var callable[]
- */
- protected $nextTick = [];
- /**
- * List of readable streams for stream_select, indexed by stream id.
- *
- * @var resource[]
- */
- protected $readStreams = [];
- /**
- * List of writable streams for stream_select, indexed by stream id.
- *
- * @var resource[]
- */
- protected $writeStreams = [];
- /**
- * List of read callbacks, indexed by stream id.
- *
- * @var callable[]
- */
- protected $readCallbacks = [];
- /**
- * List of write callbacks, indexed by stream id.
- *
- * @var callable[]
- */
- protected $writeCallbacks = [];
- }
|