scratch – Blame information for rev 87

Subversion Repositories:
Rev:
Rev Author Line No. Line
87 office 1 <?php
2  
3 namespace GuzzleHttp\Adapter\Curl;
4  
5 use GuzzleHttp\Adapter\AdapterInterface;
6 use GuzzleHttp\Adapter\ParallelAdapterInterface;
7 use GuzzleHttp\Adapter\TransactionInterface;
8 use GuzzleHttp\Event\RequestEvents;
9 use GuzzleHttp\Exception\AdapterException;
10 use GuzzleHttp\Exception\RequestException;
11 use GuzzleHttp\Message\MessageFactoryInterface;
12  
13 /**
14 * HTTP adapter that uses cURL multi as a transport layer
15 *
16 * When using the CurlAdapter, custom curl options can be specified as an
17 * associative array of curl option constants mapping to values in the
18 * **curl** key of a request's configuration options.
19 *
20 * In addition to being able to supply configuration options via the curl
21 * request config, you can also specify the select_timeout variable using the
22 * `GUZZLE_CURL_SELECT_TIMEOUT` environment variable.
23 */
24 class MultiAdapter implements AdapterInterface, ParallelAdapterInterface
25 {
26 const ERROR_STR = 'See http://curl.haxx.se/libcurl/c/libcurl-errors.html for an explanation of cURL errors';
27 const ENV_SELECT_TIMEOUT = 'GUZZLE_CURL_SELECT_TIMEOUT';
28  
29 /** @var CurlFactory */
30 private $curlFactory;
31  
32 /** @var MessageFactoryInterface */
33 private $messageFactory;
34  
35 /** @var array Array of curl multi handles */
36 private $multiHandles = [];
37  
38 /** @var array Array of curl multi handles */
39 private $multiOwned = [];
40  
41 /** @var int Total number of idle handles to keep in cache */
42 private $maxHandles;
43  
44 /** @var double */
45 private $selectTimeout;
46  
47 /**
48 * Accepts an associative array of options:
49 *
50 * - handle_factory: Optional callable factory used to create cURL handles.
51 * The callable is invoked with the following arguments:
52 * TransactionInterface, MessageFactoryInterface, and an optional cURL
53 * handle to modify. The factory method must then return a cURL resource.
54 * - select_timeout: Specify a float in seconds to use for a
55 * curl_multi_select timeout.
56 * - max_handles: Maximum number of idle handles (defaults to 3).
57 *
58 * @param MessageFactoryInterface $messageFactory
59 * @param array $options Array of options to use with the adapter:
60 */
61 public function __construct(
62 MessageFactoryInterface $messageFactory,
63 array $options = []
64 ) {
65 $this->messageFactory = $messageFactory;
66 $this->curlFactory = isset($options['handle_factory'])
67 ? $options['handle_factory']
68 : new CurlFactory();
69  
70 if (isset($options['select_timeout'])) {
71 $this->selectTimeout = $options['select_timeout'];
72 } elseif (isset($_SERVER[self::ENV_SELECT_TIMEOUT])) {
73 $this->selectTimeout = $_SERVER[self::ENV_SELECT_TIMEOUT];
74 } else {
75 $this->selectTimeout = 1;
76 }
77  
78 $this->maxHandles = isset($options['max_handles'])
79 ? $options['max_handles']
80 : 3;
81 }
82  
83 public function __destruct()
84 {
85 foreach ($this->multiHandles as $handle) {
86 if (is_resource($handle)) {
87 curl_multi_close($handle);
88 }
89 }
90 }
91  
92 /**
93 * Throw an exception for a cURL multi response
94 *
95 * @param int $code Curl response code
96 * @throws AdapterException
97 */
98 public static function throwMultiError($code)
99 {
100 $buffer = function_exists('curl_multi_strerror')
101 ? curl_multi_strerror($code)
102 : self::ERROR_STR;
103  
104 throw new AdapterException(sprintf('cURL error %s: %s', $code, $buffer));
105 }
106  
107 public function send(TransactionInterface $transaction)
108 {
109 $context = new BatchContext($this->checkoutMultiHandle(), true);
110 $this->addHandle($transaction, $context);
111 $this->perform($context);
112  
113 return $transaction->getResponse();
114 }
115  
116 public function sendAll(\Iterator $transactions, $parallel)
117 {
118 $context = new BatchContext(
119 $this->checkoutMultiHandle(),
120 false,
121 $transactions
122 );
123  
124 foreach (new \LimitIterator($transactions, 0, $parallel) as $trans) {
125 $this->addHandle($trans, $context);
126 }
127  
128 $this->perform($context);
129 }
130  
131 private function perform(BatchContext $context)
132 {
133 // The first curl_multi_select often times out no matter what, but is
134 // usually required for fast transfers.
135 $active = false;
136 $multi = $context->getMultiHandle();
137  
138 do {
139 do {
140 $mrc = curl_multi_exec($multi, $active);
141 } while ($mrc === CURLM_CALL_MULTI_PERFORM);
142  
143 if ($mrc != CURLM_OK) {
144 self::throwMultiError($mrc);
145 }
146  
147 $this->processMessages($context);
148  
149 if ($active &&
150 curl_multi_select($multi, $this->selectTimeout) === -1
151 ) {
152 // Perform a usleep if a select returns -1.
153 // See: https://bugs.php.net/bug.php?id=61141
154 usleep(250);
155 }
156  
157 } while ($context->isActive() || $active);
158  
159 $this->releaseMultiHandle($multi, $this->maxHandles);
160 }
161  
162 private function processMessages(BatchContext $context)
163 {
164 $multi = $context->getMultiHandle();
165  
166 while ($done = curl_multi_info_read($multi)) {
167 $transaction = $context->findTransaction($done['handle']);
168 $this->processResponse($transaction, $done, $context);
169 // Add the next transaction if there are more in the queue
170 if ($next = $context->nextPending()) {
171 $this->addHandle($next, $context);
172 }
173 }
174 }
175  
176 private function processResponse(
177 TransactionInterface $transaction,
178 array $curl,
179 BatchContext $context
180 ) {
181 $info = $context->removeTransaction($transaction);
182  
183 try {
184 if (!$this->isCurlException($transaction, $curl, $context, $info) &&
185 $this->validateResponseWasSet($transaction, $context)
186 ) {
187 if ($body = $transaction->getResponse()->getBody()) {
188 $body->seek(0);
189 }
190 RequestEvents::emitComplete($transaction, $info);
191 }
192 } catch (\Exception $e) {
193 $this->throwException($e, $context);
194 }
195 }
196  
197 private function addHandle(
198 TransactionInterface $transaction,
199 BatchContext $context
200 ) {
201 try {
202 RequestEvents::emitBefore($transaction);
203 // Only transfer if the request was not intercepted
204 if (!$transaction->getResponse()) {
205 $factory = $this->curlFactory;
206 $context->addTransaction(
207 $transaction,
208 $factory($transaction, $this->messageFactory)
209 );
210 }
211 } catch (RequestException $e) {
212 $this->throwException($e, $context);
213 }
214 }
215  
216 private function isCurlException(
217 TransactionInterface $transaction,
218 array $curl,
219 BatchContext $context,
220 array $info
221 ) {
222 if (CURLM_OK == $curl['result'] ||
223 CURLM_CALL_MULTI_PERFORM == $curl['result']
224 ) {
225 return false;
226 }
227  
228 $request = $transaction->getRequest();
229 try {
230 // Send curl stats along if they are available
231 $stats = ['curl_result' => $curl['result']] + $info;
232 RequestEvents::emitError(
233 $transaction,
234 new RequestException(
235 sprintf(
236 '[curl] (#%s) %s [url] %s',
237 $curl['result'],
238 function_exists('curl_strerror')
239 ? curl_strerror($curl['result'])
240 : self::ERROR_STR,
241 $request->getUrl()
242 ),
243 $request
244 ),
245 $stats
246 );
247 } catch (\Exception $e) {
248 $this->throwException($e, $context);
249 }
250  
251 return true;
252 }
253  
254 private function throwException(\Exception $e, BatchContext $context)
255 {
256 if ($context->throwsExceptions()
257 || ($e instanceof RequestException && $e->getThrowImmediately())
258 ) {
259 $context->removeAll();
260 $this->releaseMultiHandle($context->getMultiHandle(), -1);
261 throw $e;
262 }
263 }
264  
265 /**
266 * Returns a curl_multi handle from the cache or creates a new one
267 *
268 * @return resource
269 */
270 private function checkoutMultiHandle()
271 {
272 // Find an unused handle in the cache
273 $key = array_search(false, $this->multiOwned, true);
274 if (false !== $key) {
275 $this->multiOwned[$key] = true;
276 return $this->multiHandles[$key];
277 }
278  
279 // Add a new handle
280 $handle = curl_multi_init();
281 $id = (int) $handle;
282 $this->multiHandles[$id] = $handle;
283 $this->multiOwned[$id] = true;
284  
285 return $handle;
286 }
287  
288 /**
289 * Releases a curl_multi handle back into the cache and removes excess cache
290 *
291 * @param resource $handle Curl multi handle to remove
292 * @param int $maxHandles (Optional) Maximum number of existing multiHandles to allow before pruning.
293 */
294 private function releaseMultiHandle($handle, $maxHandles)
295 {
296 $id = (int) $handle;
297  
298 if (count($this->multiHandles) <= $maxHandles) {
299 $this->multiOwned[$id] = false;
300 } elseif (isset($this->multiHandles[$id], $this->multiOwned[$id])) {
301 // Prune excessive handles
302 curl_multi_close($this->multiHandles[$id]);
303 unset($this->multiHandles[$id], $this->multiOwned[$id]);
304 }
305 }
306  
307 /**
308 * This function ensures that a response was set on a transaction. If one
309 * was not set, then the request is retried if possible. This error
310 * typically means you are sending a payload, curl encountered a
311 * "Connection died, retrying a fresh connect" error, tried to rewind the
312 * stream, and then encountered a "necessary data rewind wasn't possible"
313 * error, causing the request to be sent through curl_multi_info_read()
314 * without an error status.
315 *
316 * @param TransactionInterface $transaction
317 * @param BatchContext $context
318 *
319 * @return bool Returns true if it's OK, and false if it failed.
320 * @throws \GuzzleHttp\Exception\RequestException If it failed and cannot
321 * recover.
322 */
323 private function validateResponseWasSet(
324 TransactionInterface $transaction,
325 BatchContext $context
326 ) {
327 if ($transaction->getResponse()) {
328 return true;
329 }
330  
331 $body = $transaction->getRequest()->getBody();
332  
333 if (!$body) {
334 // This is weird and should probably never happen.
335 RequestEvents::emitError(
336 $transaction,
337 new RequestException(
338 'No response was received for a request with no body. This'
339 . ' could mean that you are saturating your network.',
340 $transaction->getRequest()
341 )
342 );
343 } elseif (!$body->isSeekable() || !$body->seek(0)) {
344 // Nothing we can do with this. Sorry!
345 RequestEvents::emitError(
346 $transaction,
347 new RequestException(
348 'The connection was unexpectedly closed. The request would'
349 . ' have been retried, but attempting to rewind the'
350 . ' request body failed. Consider wrapping your request'
351 . ' body in a CachingStream decorator to work around this'
352 . ' issue if necessary.',
353 $transaction->getRequest()
354 )
355 );
356 } else {
357 $this->retryFailedConnection($transaction, $context);
358 }
359  
360 return false;
361 }
362  
363 private function retryFailedConnection(
364 TransactionInterface $transaction,
365 BatchContext $context
366 ) {
367 // Add the request back to the batch to retry automatically.
368 $context->addTransaction(
369 $transaction,
370 call_user_func(
371 $this->curlFactory,
372 $transaction,
373 $this->messageFactory
374 )
375 );
376 }
377 }