mantis-matrix-integration – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | <?php |
2 | |||
3 | namespace GuzzleHttp; |
||
4 | |||
5 | use GuzzleHttp\Promise as P; |
||
6 | use GuzzleHttp\Promise\EachPromise; |
||
7 | use GuzzleHttp\Promise\PromiseInterface; |
||
8 | use GuzzleHttp\Promise\PromisorInterface; |
||
9 | use Psr\Http\Message\RequestInterface; |
||
10 | |||
11 | /** |
||
12 | * Sends an iterator of requests concurrently using a capped pool size. |
||
13 | * |
||
14 | * The pool will read from an iterator until it is cancelled or until the |
||
15 | * iterator is consumed. When a request is yielded, the request is sent after |
||
16 | * applying the "request_options" request options (if provided in the ctor). |
||
17 | * |
||
18 | * When a function is yielded by the iterator, the function is provided the |
||
19 | * "request_options" array that should be merged on top of any existing |
||
20 | * options, and the function MUST then return a wait-able promise. |
||
21 | * |
||
22 | * @final |
||
23 | */ |
||
24 | class Pool implements PromisorInterface |
||
25 | { |
||
26 | /** |
||
27 | * @var EachPromise |
||
28 | */ |
||
29 | private $each; |
||
30 | |||
31 | /** |
||
32 | * @param ClientInterface $client Client used to send the requests. |
||
33 | * @param array|\Iterator $requests Requests or functions that return |
||
34 | * requests to send concurrently. |
||
35 | * @param array $config Associative array of options |
||
36 | * - concurrency: (int) Maximum number of requests to send concurrently |
||
37 | * - options: Array of request options to apply to each request. |
||
38 | * - fulfilled: (callable) Function to invoke when a request completes. |
||
39 | * - rejected: (callable) Function to invoke when a request is rejected. |
||
40 | */ |
||
41 | public function __construct(ClientInterface $client, $requests, array $config = []) |
||
42 | { |
||
43 | if (!isset($config['concurrency'])) { |
||
44 | $config['concurrency'] = 25; |
||
45 | } |
||
46 | |||
47 | if (isset($config['options'])) { |
||
48 | $opts = $config['options']; |
||
49 | unset($config['options']); |
||
50 | } else { |
||
51 | $opts = []; |
||
52 | } |
||
53 | |||
54 | $iterable = P\Create::iterFor($requests); |
||
55 | $requests = static function () use ($iterable, $client, $opts) { |
||
56 | foreach ($iterable as $key => $rfn) { |
||
57 | if ($rfn instanceof RequestInterface) { |
||
58 | yield $key => $client->sendAsync($rfn, $opts); |
||
59 | } elseif (\is_callable($rfn)) { |
||
60 | yield $key => $rfn($opts); |
||
61 | } else { |
||
62 | throw new \InvalidArgumentException('Each value yielded by the iterator must be a Psr7\Http\Message\RequestInterface or a callable that returns a promise that fulfills with a Psr7\Message\Http\ResponseInterface object.'); |
||
63 | } |
||
64 | } |
||
65 | }; |
||
66 | |||
67 | $this->each = new EachPromise($requests(), $config); |
||
68 | } |
||
69 | |||
70 | /** |
||
71 | * Get promise |
||
72 | */ |
||
73 | public function promise(): PromiseInterface |
||
74 | { |
||
75 | return $this->each->promise(); |
||
76 | } |
||
77 | |||
78 | /** |
||
79 | * Sends multiple requests concurrently and returns an array of responses |
||
80 | * and exceptions that uses the same ordering as the provided requests. |
||
81 | * |
||
82 | * IMPORTANT: This method keeps every request and response in memory, and |
||
83 | * as such, is NOT recommended when sending a large number or an |
||
84 | * indeterminate number of requests concurrently. |
||
85 | * |
||
86 | * @param ClientInterface $client Client used to send the requests |
||
87 | * @param array|\Iterator $requests Requests to send concurrently. |
||
88 | * @param array $options Passes through the options available in |
||
89 | * {@see \GuzzleHttp\Pool::__construct} |
||
90 | * |
||
91 | * @return array Returns an array containing the response or an exception |
||
92 | * in the same order that the requests were sent. |
||
93 | * |
||
94 | * @throws \InvalidArgumentException if the event format is incorrect. |
||
95 | */ |
||
96 | public static function batch(ClientInterface $client, $requests, array $options = []): array |
||
97 | { |
||
98 | $res = []; |
||
99 | self::cmpCallback($options, 'fulfilled', $res); |
||
100 | self::cmpCallback($options, 'rejected', $res); |
||
101 | $pool = new static($client, $requests, $options); |
||
102 | $pool->promise()->wait(); |
||
103 | \ksort($res); |
||
104 | |||
105 | return $res; |
||
106 | } |
||
107 | |||
108 | /** |
||
109 | * Execute callback(s) |
||
110 | */ |
||
111 | private static function cmpCallback(array &$options, string $name, array &$results): void |
||
112 | { |
||
113 | if (!isset($options[$name])) { |
||
114 | $options[$name] = static function ($v, $k) use (&$results) { |
||
115 | $results[$k] = $v; |
||
116 | }; |
||
117 | } else { |
||
118 | $currentFn = $options[$name]; |
||
119 | $options[$name] = static function ($v, $k) use (&$results, $currentFn) { |
||
120 | $currentFn($v, $k); |
||
121 | $results[$k] = $v; |
||
122 | }; |
||
123 | } |
||
124 | } |
||
125 | } |