Loop.php 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. <?php
  2. declare(strict_types=1);
  3. namespace Sabre\Event\Loop;
  4. /**
  5. * A simple eventloop implementation.
  6. *
  7. * This eventloop supports:
  8. * * nextTick
  9. * * setTimeout for delayed functions
  10. * * setInterval for repeating functions
  11. * * stream events using stream_select
  12. *
  13. * @copyright Copyright (C) fruux GmbH (https://fruux.com/)
  14. * @author Evert Pot (http://evertpot.com/)
  15. * @license http://sabre.io/license/ Modified BSD License
  16. */
  17. class Loop
  18. {
  19. /**
  20. * Executes a function after x seconds.
  21. */
  22. public function setTimeout(callable $cb, float $timeout)
  23. {
  24. $triggerTime = microtime(true) + $timeout;
  25. if (!$this->timers) {
  26. // Special case when the timers array was empty.
  27. $this->timers[] = [$triggerTime, $cb];
  28. return;
  29. }
  30. // We need to insert these values in the timers array, but the timers
  31. // array must be in reverse-order of trigger times.
  32. //
  33. // So here we search the array for the insertion point.
  34. $index = count($this->timers) - 1;
  35. while (true) {
  36. if ($triggerTime < $this->timers[$index][0]) {
  37. array_splice(
  38. $this->timers,
  39. $index + 1,
  40. 0,
  41. [[$triggerTime, $cb]]
  42. );
  43. break;
  44. } elseif (0 === $index) {
  45. array_unshift($this->timers, [$triggerTime, $cb]);
  46. break;
  47. }
  48. --$index;
  49. }
  50. }
  51. /**
  52. * Executes a function every x seconds.
  53. *
  54. * The value this function returns can be used to stop the interval with
  55. * clearInterval.
  56. */
  57. public function setInterval(callable $cb, float $timeout): array
  58. {
  59. $keepGoing = true;
  60. $f = null;
  61. $f = function () use ($cb, &$f, $timeout, &$keepGoing) {
  62. if ($keepGoing) {
  63. $cb();
  64. $this->setTimeout($f, $timeout);
  65. }
  66. };
  67. $this->setTimeout($f, $timeout);
  68. // Really the only thing that matters is returning the $keepGoing
  69. // boolean value.
  70. //
  71. // We need to pack it in an array to allow returning by reference.
  72. // Because I'm worried people will be confused by using a boolean as a
  73. // sort of identifier, I added an extra string.
  74. return ['I\'m an implementation detail', &$keepGoing];
  75. }
  76. /**
  77. * Stops a running interval.
  78. */
  79. public function clearInterval(array $intervalId)
  80. {
  81. $intervalId[1] = false;
  82. }
  83. /**
  84. * Runs a function immediately at the next iteration of the loop.
  85. */
  86. public function nextTick(callable $cb)
  87. {
  88. $this->nextTick[] = $cb;
  89. }
  90. /**
  91. * Adds a read stream.
  92. *
  93. * The callback will be called as soon as there is something to read from
  94. * the stream.
  95. *
  96. * You MUST call removeReadStream after you are done with the stream, to
  97. * prevent the eventloop from never stopping.
  98. *
  99. * @param resource $stream
  100. */
  101. public function addReadStream($stream, callable $cb)
  102. {
  103. $this->readStreams[(int) $stream] = $stream;
  104. $this->readCallbacks[(int) $stream] = $cb;
  105. }
  106. /**
  107. * Adds a write stream.
  108. *
  109. * The callback will be called as soon as the system reports it's ready to
  110. * receive writes on the stream.
  111. *
  112. * You MUST call removeWriteStream after you are done with the stream, to
  113. * prevent the eventloop from never stopping.
  114. *
  115. * @param resource $stream
  116. */
  117. public function addWriteStream($stream, callable $cb)
  118. {
  119. $this->writeStreams[(int) $stream] = $stream;
  120. $this->writeCallbacks[(int) $stream] = $cb;
  121. }
  122. /**
  123. * Stop watching a stream for reads.
  124. *
  125. * @param resource $stream
  126. */
  127. public function removeReadStream($stream)
  128. {
  129. unset(
  130. $this->readStreams[(int) $stream],
  131. $this->readCallbacks[(int) $stream]
  132. );
  133. }
  134. /**
  135. * Stop watching a stream for writes.
  136. *
  137. * @param resource $stream
  138. */
  139. public function removeWriteStream($stream)
  140. {
  141. unset(
  142. $this->writeStreams[(int) $stream],
  143. $this->writeCallbacks[(int) $stream]
  144. );
  145. }
  146. /**
  147. * Runs the loop.
  148. *
  149. * This function will run continuously, until there's no more events to
  150. * handle.
  151. */
  152. public function run()
  153. {
  154. $this->running = true;
  155. do {
  156. $hasEvents = $this->tick(true);
  157. } while ($this->running && $hasEvents);
  158. $this->running = false;
  159. }
  160. /**
  161. * Executes all pending events.
  162. *
  163. * If $block is turned true, this function will block until any event is
  164. * triggered.
  165. *
  166. * If there are now timeouts, nextTick callbacks or events in the loop at
  167. * all, this function will exit immediately.
  168. *
  169. * This function will return true if there are _any_ events left in the
  170. * loop after the tick.
  171. */
  172. public function tick(bool $block = false): bool
  173. {
  174. $this->runNextTicks();
  175. $nextTimeout = $this->runTimers();
  176. // Calculating how long runStreams should at most wait.
  177. if (!$block) {
  178. // Don't wait
  179. $streamWait = 0;
  180. } elseif ($this->nextTick) {
  181. // There's a pending 'nextTick'. Don't wait.
  182. $streamWait = 0;
  183. } elseif (is_numeric($nextTimeout)) {
  184. // Wait until the next Timeout should trigger.
  185. $streamWait = $nextTimeout;
  186. } else {
  187. // Wait indefinitely
  188. $streamWait = null;
  189. }
  190. $this->runStreams($streamWait);
  191. return $this->readStreams || $this->writeStreams || $this->nextTick || $this->timers;
  192. }
  193. /**
  194. * Stops a running eventloop.
  195. */
  196. public function stop()
  197. {
  198. $this->running = false;
  199. }
  200. /**
  201. * Executes all 'nextTick' callbacks.
  202. *
  203. * return void
  204. */
  205. protected function runNextTicks()
  206. {
  207. $nextTick = $this->nextTick;
  208. $this->nextTick = [];
  209. foreach ($nextTick as $cb) {
  210. $cb();
  211. }
  212. }
  213. /**
  214. * Runs all pending timers.
  215. *
  216. * After running the timer callbacks, this function returns the number of
  217. * seconds until the next timer should be executed.
  218. *
  219. * If there's no more pending timers, this function returns null.
  220. *
  221. * @return float|null
  222. */
  223. protected function runTimers()
  224. {
  225. $now = microtime(true);
  226. while (($timer = array_pop($this->timers)) && $timer[0] < $now) {
  227. $timer[1]();
  228. }
  229. // Add the last timer back to the array.
  230. if ($timer) {
  231. $this->timers[] = $timer;
  232. return max(0, $timer[0] - microtime(true));
  233. }
  234. }
  235. /**
  236. * Runs all pending stream events.
  237. *
  238. * If $timeout is 0, it will return immediately. If $timeout is null, it
  239. * will wait indefinitely.
  240. *
  241. * @param float|null $timeout
  242. */
  243. protected function runStreams($timeout)
  244. {
  245. if ($this->readStreams || $this->writeStreams) {
  246. $read = $this->readStreams;
  247. $write = $this->writeStreams;
  248. $except = null;
  249. // stream_select changes behavior in 8.1 to forbid passing non-null microseconds when the seconds are null.
  250. // Older versions of php don't allow passing null to microseconds.
  251. if (null !== $timeout ? stream_select($read, $write, $except, 0, (int) ($timeout * 1000000)) : stream_select($read, $write, $except, null)) {
  252. // See PHP Bug https://bugs.php.net/bug.php?id=62452
  253. // Fixed in PHP7
  254. foreach ($read as $readStream) {
  255. $readCb = $this->readCallbacks[(int) $readStream];
  256. $readCb();
  257. }
  258. foreach ($write as $writeStream) {
  259. $writeCb = $this->writeCallbacks[(int) $writeStream];
  260. $writeCb();
  261. }
  262. }
  263. } elseif ($this->running && ($this->nextTick || $this->timers)) {
  264. usleep(null !== $timeout ? intval($timeout * 1000000) : 200000);
  265. }
  266. }
  267. /**
  268. * Is the main loop active.
  269. *
  270. * @var bool
  271. */
  272. protected $running = false;
  273. /**
  274. * A list of timers, added by setTimeout.
  275. *
  276. * @var array
  277. */
  278. protected $timers = [];
  279. /**
  280. * A list of 'nextTick' callbacks.
  281. *
  282. * @var callable[]
  283. */
  284. protected $nextTick = [];
  285. /**
  286. * List of readable streams for stream_select, indexed by stream id.
  287. *
  288. * @var resource[]
  289. */
  290. protected $readStreams = [];
  291. /**
  292. * List of writable streams for stream_select, indexed by stream id.
  293. *
  294. * @var resource[]
  295. */
  296. protected $writeStreams = [];
  297. /**
  298. * List of read callbacks, indexed by stream id.
  299. *
  300. * @var callable[]
  301. */
  302. protected $readCallbacks = [];
  303. /**
  304. * List of write callbacks, indexed by stream id.
  305. *
  306. * @var callable[]
  307. */
  308. protected $writeCallbacks = [];
  309. }