/vendor/guzzlehttp/guzzle/src/Adapter/Curl/MultiAdapter.php |
@@ -0,0 +1,377 @@ |
<?php |
|
namespace GuzzleHttp\Adapter\Curl; |
|
use GuzzleHttp\Adapter\AdapterInterface; |
use GuzzleHttp\Adapter\ParallelAdapterInterface; |
use GuzzleHttp\Adapter\TransactionInterface; |
use GuzzleHttp\Event\RequestEvents; |
use GuzzleHttp\Exception\AdapterException; |
use GuzzleHttp\Exception\RequestException; |
use GuzzleHttp\Message\MessageFactoryInterface; |
|
/** |
* HTTP adapter that uses cURL multi as a transport layer |
* |
* When using the CurlAdapter, custom curl options can be specified as an |
* associative array of curl option constants mapping to values in the |
* **curl** key of a request's configuration options. |
* |
* In addition to being able to supply configuration options via the curl |
* request config, you can also specify the select_timeout variable using the |
* `GUZZLE_CURL_SELECT_TIMEOUT` environment variable. |
*/ |
class MultiAdapter implements AdapterInterface, ParallelAdapterInterface |
{ |
const ERROR_STR = 'See http://curl.haxx.se/libcurl/c/libcurl-errors.html for an explanation of cURL errors'; |
const ENV_SELECT_TIMEOUT = 'GUZZLE_CURL_SELECT_TIMEOUT'; |
|
/** @var CurlFactory */ |
private $curlFactory; |
|
/** @var MessageFactoryInterface */ |
private $messageFactory; |
|
/** @var array Array of curl multi handles */ |
private $multiHandles = []; |
|
/** @var array Array of curl multi handles */ |
private $multiOwned = []; |
|
/** @var int Total number of idle handles to keep in cache */ |
private $maxHandles; |
|
/** @var double */ |
private $selectTimeout; |
|
/** |
* Accepts an associative array of options: |
* |
* - handle_factory: Optional callable factory used to create cURL handles. |
* The callable is invoked with the following arguments: |
* TransactionInterface, MessageFactoryInterface, and an optional cURL |
* handle to modify. The factory method must then return a cURL resource. |
* - select_timeout: Specify a float in seconds to use for a |
* curl_multi_select timeout. |
* - max_handles: Maximum number of idle handles (defaults to 3). |
* |
* @param MessageFactoryInterface $messageFactory |
* @param array $options Array of options to use with the adapter: |
*/ |
public function __construct( |
MessageFactoryInterface $messageFactory, |
array $options = [] |
) { |
$this->messageFactory = $messageFactory; |
$this->curlFactory = isset($options['handle_factory']) |
? $options['handle_factory'] |
: new CurlFactory(); |
|
if (isset($options['select_timeout'])) { |
$this->selectTimeout = $options['select_timeout']; |
} elseif (isset($_SERVER[self::ENV_SELECT_TIMEOUT])) { |
$this->selectTimeout = $_SERVER[self::ENV_SELECT_TIMEOUT]; |
} else { |
$this->selectTimeout = 1; |
} |
|
$this->maxHandles = isset($options['max_handles']) |
? $options['max_handles'] |
: 3; |
} |
|
public function __destruct() |
{ |
foreach ($this->multiHandles as $handle) { |
if (is_resource($handle)) { |
curl_multi_close($handle); |
} |
} |
} |
|
/** |
* Throw an exception for a cURL multi response |
* |
* @param int $code Curl response code |
* @throws AdapterException |
*/ |
public static function throwMultiError($code) |
{ |
$buffer = function_exists('curl_multi_strerror') |
? curl_multi_strerror($code) |
: self::ERROR_STR; |
|
throw new AdapterException(sprintf('cURL error %s: %s', $code, $buffer)); |
} |
|
public function send(TransactionInterface $transaction) |
{ |
$context = new BatchContext($this->checkoutMultiHandle(), true); |
$this->addHandle($transaction, $context); |
$this->perform($context); |
|
return $transaction->getResponse(); |
} |
|
public function sendAll(\Iterator $transactions, $parallel) |
{ |
$context = new BatchContext( |
$this->checkoutMultiHandle(), |
false, |
$transactions |
); |
|
foreach (new \LimitIterator($transactions, 0, $parallel) as $trans) { |
$this->addHandle($trans, $context); |
} |
|
$this->perform($context); |
} |
|
private function perform(BatchContext $context) |
{ |
// The first curl_multi_select often times out no matter what, but is |
// usually required for fast transfers. |
$active = false; |
$multi = $context->getMultiHandle(); |
|
do { |
do { |
$mrc = curl_multi_exec($multi, $active); |
} while ($mrc === CURLM_CALL_MULTI_PERFORM); |
|
if ($mrc != CURLM_OK) { |
self::throwMultiError($mrc); |
} |
|
$this->processMessages($context); |
|
if ($active && |
curl_multi_select($multi, $this->selectTimeout) === -1 |
) { |
// Perform a usleep if a select returns -1. |
// See: https://bugs.php.net/bug.php?id=61141 |
usleep(250); |
} |
|
} while ($context->isActive() || $active); |
|
$this->releaseMultiHandle($multi, $this->maxHandles); |
} |
|
private function processMessages(BatchContext $context) |
{ |
$multi = $context->getMultiHandle(); |
|
while ($done = curl_multi_info_read($multi)) { |
$transaction = $context->findTransaction($done['handle']); |
$this->processResponse($transaction, $done, $context); |
// Add the next transaction if there are more in the queue |
if ($next = $context->nextPending()) { |
$this->addHandle($next, $context); |
} |
} |
} |
|
private function processResponse( |
TransactionInterface $transaction, |
array $curl, |
BatchContext $context |
) { |
$info = $context->removeTransaction($transaction); |
|
try { |
if (!$this->isCurlException($transaction, $curl, $context, $info) && |
$this->validateResponseWasSet($transaction, $context) |
) { |
if ($body = $transaction->getResponse()->getBody()) { |
$body->seek(0); |
} |
RequestEvents::emitComplete($transaction, $info); |
} |
} catch (\Exception $e) { |
$this->throwException($e, $context); |
} |
} |
|
private function addHandle( |
TransactionInterface $transaction, |
BatchContext $context |
) { |
try { |
RequestEvents::emitBefore($transaction); |
// Only transfer if the request was not intercepted |
if (!$transaction->getResponse()) { |
$factory = $this->curlFactory; |
$context->addTransaction( |
$transaction, |
$factory($transaction, $this->messageFactory) |
); |
} |
} catch (RequestException $e) { |
$this->throwException($e, $context); |
} |
} |
|
private function isCurlException( |
TransactionInterface $transaction, |
array $curl, |
BatchContext $context, |
array $info |
) { |
if (CURLM_OK == $curl['result'] || |
CURLM_CALL_MULTI_PERFORM == $curl['result'] |
) { |
return false; |
} |
|
$request = $transaction->getRequest(); |
try { |
// Send curl stats along if they are available |
$stats = ['curl_result' => $curl['result']] + $info; |
RequestEvents::emitError( |
$transaction, |
new RequestException( |
sprintf( |
'[curl] (#%s) %s [url] %s', |
$curl['result'], |
function_exists('curl_strerror') |
? curl_strerror($curl['result']) |
: self::ERROR_STR, |
$request->getUrl() |
), |
$request |
), |
$stats |
); |
} catch (\Exception $e) { |
$this->throwException($e, $context); |
} |
|
return true; |
} |
|
private function throwException(\Exception $e, BatchContext $context) |
{ |
if ($context->throwsExceptions() |
|| ($e instanceof RequestException && $e->getThrowImmediately()) |
) { |
$context->removeAll(); |
$this->releaseMultiHandle($context->getMultiHandle(), -1); |
throw $e; |
} |
} |
|
/** |
* Returns a curl_multi handle from the cache or creates a new one |
* |
* @return resource |
*/ |
private function checkoutMultiHandle() |
{ |
// Find an unused handle in the cache |
$key = array_search(false, $this->multiOwned, true); |
if (false !== $key) { |
$this->multiOwned[$key] = true; |
return $this->multiHandles[$key]; |
} |
|
// Add a new handle |
$handle = curl_multi_init(); |
$id = (int) $handle; |
$this->multiHandles[$id] = $handle; |
$this->multiOwned[$id] = true; |
|
return $handle; |
} |
|
/** |
* Releases a curl_multi handle back into the cache and removes excess cache |
* |
* @param resource $handle Curl multi handle to remove |
* @param int $maxHandles (Optional) Maximum number of existing multiHandles to allow before pruning. |
*/ |
private function releaseMultiHandle($handle, $maxHandles) |
{ |
$id = (int) $handle; |
|
if (count($this->multiHandles) <= $maxHandles) { |
$this->multiOwned[$id] = false; |
} elseif (isset($this->multiHandles[$id], $this->multiOwned[$id])) { |
// Prune excessive handles |
curl_multi_close($this->multiHandles[$id]); |
unset($this->multiHandles[$id], $this->multiOwned[$id]); |
} |
} |
|
/** |
* This function ensures that a response was set on a transaction. If one |
* was not set, then the request is retried if possible. This error |
* typically means you are sending a payload, curl encountered a |
* "Connection died, retrying a fresh connect" error, tried to rewind the |
* stream, and then encountered a "necessary data rewind wasn't possible" |
* error, causing the request to be sent through curl_multi_info_read() |
* without an error status. |
* |
* @param TransactionInterface $transaction |
* @param BatchContext $context |
* |
* @return bool Returns true if it's OK, and false if it failed. |
* @throws \GuzzleHttp\Exception\RequestException If it failed and cannot |
* recover. |
*/ |
private function validateResponseWasSet( |
TransactionInterface $transaction, |
BatchContext $context |
) { |
if ($transaction->getResponse()) { |
return true; |
} |
|
$body = $transaction->getRequest()->getBody(); |
|
if (!$body) { |
// This is weird and should probably never happen. |
RequestEvents::emitError( |
$transaction, |
new RequestException( |
'No response was received for a request with no body. This' |
. ' could mean that you are saturating your network.', |
$transaction->getRequest() |
) |
); |
} elseif (!$body->isSeekable() || !$body->seek(0)) { |
// Nothing we can do with this. Sorry! |
RequestEvents::emitError( |
$transaction, |
new RequestException( |
'The connection was unexpectedly closed. The request would' |
. ' have been retried, but attempting to rewind the' |
. ' request body failed. Consider wrapping your request' |
. ' body in a CachingStream decorator to work around this' |
. ' issue if necessary.', |
$transaction->getRequest() |
) |
); |
} else { |
$this->retryFailedConnection($transaction, $context); |
} |
|
return false; |
} |
|
private function retryFailedConnection( |
TransactionInterface $transaction, |
BatchContext $context |
) { |
// Add the request back to the batch to retry automatically. |
$context->addTransaction( |
$transaction, |
call_user_func( |
$this->curlFactory, |
$transaction, |
$this->messageFactory |
) |
); |
} |
} |