curl_multi_exec() 函数用于执行cURL多句柄中的各个子句柄。这个函数以非阻塞方式处理多个并行的cURL请求,需要通过循环调用直到所有请求完成。
int curl_multi_exec ( resource $mh , int &$still_running )
| 参数 | 描述 | 类型 | 必需 |
|---|---|---|---|
| mh | 由 curl_multi_init() 返回的cURL多句柄 |
resource | 是 |
| still_running | 一个引用参数,表示是否还有正在运行的句柄数量 | int | 是 |
返回一个cURL多句柄错误代码:
CURLM_OK (0) - 操作成功CURLM_CALL_MULTI_PERFORM (-1) - 早期版本中表示需要再次调用@php
// 创建多句柄
$mh = curl_multi_init();
// 创建多个cURL句柄并添加到多句柄
$handles = [];
$urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/3'
];
foreach ($urls as $i => $url) {
$handles[$i] = curl_init($url);
curl_setopt($handles[$i], CURLOPT_RETURNTRANSFER, true);
curl_multi_add_handle($mh, $handles[$i]);
}
echo "开始执行 " . count($urls) . " 个并行请求...\n";
// 执行多句柄 - 核心循环
$active = null;
$startTime = microtime(true);
do {
// 执行多句柄
$status = curl_multi_exec($mh, $active);
// 检查执行状态
if ($status !== CURLM_OK) {
echo "多句柄执行错误,状态码: " . $status . "\n";
break;
}
// 等待活动(避免CPU占用过高)
if ($active > 0) {
curl_multi_select($mh, 1.0); // 等待最多1秒
}
// 输出当前进度
echo "正在运行: " . $active . " 个请求\n";
} while ($active > 0);
$totalTime = microtime(true) - $startTime;
echo "所有请求完成!总耗时: " . round($totalTime, 2) . " 秒\n";
// 处理结果
foreach ($handles as $i => $handle) {
$content = curl_multi_getcontent($handle);
$info = curl_getinfo($handle);
echo "请求 {$i}: HTTP " . $info['http_code'] . ", 耗时: " . round($info['total_time'], 2) . "秒\n";
curl_multi_remove_handle($mh, $handle);
curl_close($handle);
}
curl_multi_close($mh);
@endphp
@php
class AdvancedMultiExecutor {
private $multiHandle;
private $handles = [];
private $results = [];
private $startTime;
public function __construct() {
$this->multiHandle = curl_multi_init();
$this->startTime = microtime(true);
}
public function addRequest($url, $id = null) {
if ($id === null) {
$id = uniqid('req_');
}
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30,
CURLOPT_NOPROGRESS => false,
CURLOPT_PROGRESSFUNCTION => function($resource, $download_size, $downloaded, $upload_size, $uploaded) use ($id) {
// 进度回调(可选)
return 0;
}
]);
$this->handles[$id] = $ch;
curl_multi_add_handle($this->multiHandle, $ch);
return $id;
}
public function execute() {
echo "开始执行 " . count($this->handles) . " 个并行请求...\n\n";
$active = null;
$lastProgressUpdate = 0;
// 主执行循环
do {
// 执行多句柄
$status = curl_multi_exec($this->multiHandle, $active);
if ($status !== CURLM_OK) {
$this->handleMultiError($status);
break;
}
// 处理已完成的请求
$this->processCompletedRequests();
// 显示进度(每秒更新一次)
$currentTime = microtime(true);
if ($currentTime - $lastProgressUpdate >= 1.0) {
$this->showProgress($active);
$lastProgressUpdate = $currentTime;
}
// 等待活动
if ($active > 0) {
$ready = curl_multi_select($this->multiHandle, 0.1);
if ($ready === -1) {
// 选择错误,短暂暂停
usleep(100000); // 100ms
}
}
} while ($active > 0);
// 最终处理可能剩余的任何请求
$this->processCompletedRequests(true);
$totalTime = microtime(true) - $this->startTime;
echo "\n所有请求执行完成!总耗时: " . round($totalTime, 2) . " 秒\n";
return $this->results;
}
private function processCompletedRequests($final = false) {
// 处理所有已完成的请求
while ($info = curl_multi_info_read($this->multiHandle)) {
$handle = $info['handle'];
$id = array_search($handle, $this->handles, true);
if ($id !== false) {
$this->handleCompletedRequest($id, $handle, $info);
unset($this->handles[$id]);
}
}
// 如果是最终处理且仍有活动句柄,强制处理
if ($final && !empty($this->handles)) {
foreach ($this->handles as $id => $handle) {
$this->handleCompletedRequest($id, $handle, ['handle' => $handle, 'result' => CURLE_OPERATION_TIMEDOUT]);
curl_multi_remove_handle($this->multiHandle, $handle);
}
$this->handles = [];
}
}
private function handleCompletedRequest($id, $handle, $info) {
$content = curl_multi_getcontent($handle);
$curlInfo = curl_getinfo($handle);
$error = curl_error($handle);
$errno = curl_errno($handle);
$this->results[$id] = [
'success' => ($info['result'] === CURLE_OK),
'http_code' => $curlInfo['http_code'],
'total_time' => $curlInfo['total_time'],
'content_length' => strlen($content),
'error' => $error,
'errno' => $errno,
'content' => $content
];
curl_multi_remove_handle($this->multiHandle, $handle);
curl_close($handle);
$status = $info['result'] === CURLE_OK ? '完成' : '失败';
echo "请求 {$id}: {$status} (" . round($curlInfo['total_time'], 2) . "秒)\n";
}
private function showProgress($active) {
$elapsed = microtime(true) - $this->startTime;
$completed = count($this->results);
$total = $completed + count($this->handles);
echo sprintf(
"进度: %d/%d 完成, %d 运行中, 已耗时: %.1fs\n",
$completed, $total, $active, $elapsed
);
}
private function handleMultiError($status) {
$errors = [
CURLM_BAD_HANDLE => '传递了无效的多句柄',
CURLM_BAD_EASY_HANDLE => '传递了无效的简单句柄',
CURLM_OUT_OF_MEMORY => '内存不足',
CURLM_INTERNAL_ERROR => '内部错误',
CURLM_BAD_SOCKET => '无效的socket',
CURLM_UNKNOWN_OPTION => '未知选项'
];
$errorMsg = $errors[$status] ?? "未知错误 (代码: {$status})";
echo "多句柄执行错误: {$errorMsg}\n";
}
public function close() {
foreach ($this->handles as $handle) {
if (is_resource($handle)) {
curl_multi_remove_handle($this->multiHandle, $handle);
curl_close($handle);
}
}
if (is_resource($this->multiHandle)) {
curl_multi_close($this->multiHandle);
}
$this->handles = [];
$this->results = [];
}
public function __destruct() {
$this->close();
}
}
// 使用高级执行器
$executor = new AdvancedMultiExecutor();
// 添加多个请求
$urls = [
'fast' => 'https://httpbin.org/delay/1',
'medium' => 'https://httpbin.org/delay/2',
'slow' => 'https://httpbin.org/delay/3',
'instant' => 'https://httpbin.org/get'
];
foreach ($urls as $id => $url) {
$executor->addRequest($url, $id);
}
// 执行所有请求
$results = $executor->execute();
// 显示统计信息
echo "\n执行结果统计:\n";
$successCount = count(array_filter($results, function($r) { return $r['success']; }));
$totalCount = count($results);
echo "成功: {$successCount}/{$totalCount}\n";
// 显示每个请求的详细信息
foreach ($results as $id => $result) {
$status = $result['success'] ? '✓' : '✗';
echo "{$status} {$id}: HTTP {$result['http_code']}, {$result['content_length']} 字节, {$result['total_time']}秒\n";
}
$executor->close();
@endphp
@php
class NonBlockingMultiExecutor {
private $multiHandle;
private $handles = [];
private $callbacks = [];
private $isRunning = false;
public function __construct() {
$this->multiHandle = curl_multi_init();
}
public function addRequest($url, $callback, $options = []) {
$id = uniqid('req_');
$ch = curl_init();
$defaultOptions = [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30
];
$finalOptions = $options + $defaultOptions;
curl_setopt_array($ch, $finalOptions);
$this->handles[$id] = $ch;
$this->callbacks[$id] = $callback;
curl_multi_add_handle($this->multiHandle, $ch);
return $id;
}
public function start() {
$this->isRunning = true;
echo "开始非阻塞执行...\n";
}
public function process($maxTime = 0.1) {
if (!$this->isRunning || empty($this->handles)) {
return false;
}
$active = null;
$startTime = microtime(true);
// 执行多句柄(非阻塞)
do {
$status = curl_multi_exec($this->multiHandle, $active);
if ($status !== CURLM_OK) {
break;
}
// 处理已完成的请求
$this->processCompletedRequests();
// 检查时间限制
if (microtime(true) - $startTime >= $maxTime) {
break;
}
// 短暂等待
if ($active > 0) {
curl_multi_select($this->multiHandle, 0.01);
}
} while ($active > 0);
return !empty($this->handles); // 返回是否还有未完成的请求
}
private function processCompletedRequests() {
while ($info = curl_multi_info_read($this->multiHandle)) {
$handle = $info['handle'];
$id = array_search($handle, $this->handles, true);
if ($id !== false) {
$this->handleCompletedRequest($id, $handle, $info);
}
}
}
private function handleCompletedRequest($id, $handle, $info) {
$content = curl_multi_getcontent($handle);
$curlInfo = curl_getinfo($handle);
$error = curl_error($handle);
$result = [
'success' => ($info['result'] === CURLE_OK),
'content' => $content,
'info' => $curlInfo,
'error' => $error
];
// 调用回调函数
if (isset($this->callbacks[$id])) {
call_user_func($this->callbacks[$id], $result, $id);
}
// 清理资源
curl_multi_remove_handle($this->multiHandle, $handle);
curl_close($handle);
unset($this->handles[$id], $this->callbacks[$id]);
echo "请求 {$id} 完成并回调\n";
}
public function waitForCompletion($checkInterval = 0.1) {
while ($this->process($checkInterval)) {
// 可以在这里执行其他任务
echo ".";
usleep(50000); // 50ms
}
echo "\n所有请求完成!\n";
$this->isRunning = false;
}
public function close() {
$this->isRunning = false;
foreach ($this->handles as $handle) {
if (is_resource($handle)) {
curl_multi_remove_handle($this->multiHandle, $handle);
curl_close($handle);
}
}
if (is_resource($this->multiHandle)) {
curl_multi_close($this->multiHandle);
}
$this->handles = [];
$this->callbacks = [];
}
public function __destruct() {
$this->close();
}
}
// 使用非阻塞执行器
$executor = new NonBlockingMultiExecutor();
// 定义回调函数
$callback = function($result, $id) {
if ($result['success']) {
echo "[回调] {$id}: 成功, 长度: " . strlen($result['content']) . " 字节\n";
} else {
echo "[回调] {$id}: 失败, 错误: " . $result['error'] . "\n";
}
};
// 添加多个请求
$urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/3',
'https://httpbin.org/get'
];
foreach ($urls as $url) {
$executor->addRequest($url, $callback);
}
echo "开始非阻塞执行,可以同时处理其他任务...\n";
$executor->start();
// 模拟同时处理其他任务
$taskCount = 0;
while ($executor->process(0.05)) { // 每次处理最多50ms
$taskCount++;
echo "处理其他任务 #{$taskCount}\n";
usleep(100000); // 100ms
}
// 或者使用等待完成
// $executor->waitForCompletion();
echo "所有HTTP请求和其他任务都已完成!\n";
$executor->close();
@endphp
@php
class RobustMultiExecutor {
private $multiHandle;
private $requests = [];
private $maxRetries;
private $timeout;
public function __construct($maxRetries = 3, $timeout = 30) {
$this->multiHandle = curl_multi_init();
$this->maxRetries = $maxRetries;
$this->timeout = $timeout;
}
public function addRequest($url, $id = null) {
if ($id === null) {
$id = uniqid('req_');
}
$this->requests[$id] = [
'url' => $url,
'handle' => null,
'retries' => 0,
'completed' => false,
'result' => null
];
$this->createHandle($id);
return $id;
}
private function createHandle($id) {
$request = &$this->requests[$id];
if ($request['handle'] && is_resource($request['handle'])) {
curl_multi_remove_handle($this->multiHandle, $request['handle']);
curl_close($request['handle']);
}
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $request['url'],
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => $this->timeout,
CURLOPT_CONNECTTIMEOUT => 10
]);
$request['handle'] = $ch;
curl_multi_add_handle($this->multiHandle, $ch);
$request['retries']++;
}
public function execute() {
echo "开始执行(最大重试次数: {$this->maxRetries})...\n";
$startTime = microtime(true);
$active = null;
do {
// 执行多句柄
$status = curl_multi_exec($this->multiHandle, $active);
if ($status !== CURLM_OK) {
$this->handleExecutionError($status);
// 继续执行,不立即退出
}
// 处理完成的请求
$this->processCompletedRequests();
// 处理需要重试的请求
$this->processRetries();
// 显示进度
$this->showProgress();
// 等待活动
if ($active > 0) {
curl_multi_select($this->multiHandle, 0.1);
}
// 超时检查
if (microtime(true) - $startTime > $this->timeout * 2) {
echo "执行超时,强制结束\n";
break;
}
} while ($active > 0 || $this->hasPendingRetries());
$totalTime = microtime(true) - $startTime;
echo "\n执行完成!总耗时: " . round($totalTime, 2) . " 秒\n";
return $this->collectResults();
}
private function processCompletedRequests() {
while ($info = curl_multi_info_read($this->multiHandle)) {
$handle = $info['handle'];
$id = $this->findRequestId($handle);
if ($id !== null) {
$this->handleRequestCompletion($id, $handle, $info);
}
}
}
private function findRequestId($handle) {
foreach ($this->requests as $id => $request) {
if ($request['handle'] === $handle) {
return $id;
}
}
return null;
}
private function handleRequestCompletion($id, $handle, $info) {
$request = &$this->requests[$id];
$content = curl_multi_getcontent($handle);
$curlInfo = curl_getinfo($handle);
$error = curl_error($handle);
$result = [
'success' => ($info['result'] === CURLE_OK),
'http_code' => $curlInfo['http_code'],
'content' => $content,
'error' => $error,
'errno' => $info['result'],
'retries' => $request['retries'] - 1, // 减1因为当前尝试不算重试
'total_time' => $curlInfo['total_time']
];
// 检查是否需要重试
if (!$result['success'] && $request['retries'] <= $this->maxRetries) {
echo "请求 {$id} 失败,准备重试 ({$request['retries']}/{$this->maxRetries})...\n";
$this->createHandle($id); // 重新创建句柄进行重试
return;
}
// 请求完成(成功或达到最大重试次数)
$request['completed'] = true;
$request['result'] = $result;
curl_multi_remove_handle($this->multiHandle, $handle);
curl_close($handle);
$status = $result['success'] ? '成功' : '最终失败';
echo "请求 {$id}: {$status} (尝试: {$request['retries']}次)\n";
}
private function processRetries() {
// 可以在这里实现延迟重试逻辑
// 当前实现是立即重试
}
private function hasPendingRetries() {
foreach ($this->requests as $request) {
if (!$request['completed'] && $request['retries'] <= $this->maxRetries) {
return true;
}
}
return false;
}
private function showProgress() {
static $lastUpdate = 0;
$currentTime = microtime(true);
if ($currentTime - $lastUpdate < 1.0) {
return;
}
$completed = 0;
$total = count($this->requests);
foreach ($this->requests as $request) {
if ($request['completed']) {
$completed++;
}
}
echo "进度: {$completed}/{$total} 完成\n";
$lastUpdate = $currentTime;
}
private function handleExecutionError($status) {
$errors = [
CURLM_BAD_HANDLE => '无效的多句柄',
CURLM_OUT_OF_MEMORY => '内存不足',
CURLM_INTERNAL_ERROR => '内部错误'
];
$errorMsg = $errors[$status] ?? "错误代码: {$status}";
echo "多句柄执行错误: {$errorMsg}\n";
}
private function collectResults() {
$results = [];
foreach ($this->requests as $id => $request) {
if ($request['completed']) {
$results[$id] = $request['result'];
}
}
return $results;
}
public function close() {
foreach ($this->requests as $request) {
if ($request['handle'] && is_resource($request['handle'])) {
curl_multi_remove_handle($this->multiHandle, $request['handle']);
curl_close($request['handle']);
}
}
if (is_resource($this->multiHandle)) {
curl_multi_close($this->multiHandle);
}
$this->requests = [];
}
public function __destruct() {
$this->close();
}
}
// 使用健壮的执行器(包含错误处理和重试)
$executor = new RobustMultiExecutor(2, 10); // 最大重试2次,超时10秒
// 添加一些可能失败的请求进行测试
$urls = [
'good' => 'https://httpbin.org/get',
'slow' => 'https://httpbin.org/delay/2',
'timeout' => 'https://httpbin.org/delay/5', // 可能超时
'invalid' => 'https://invalid-url-12345.com' // 会失败
];
foreach ($urls as $id => $url) {
$executor->addRequest($url, $id);
}
$results = $executor->execute();
// 分析结果
echo "\n最终结果分析:\n";
$successCount = 0;
foreach ($results as $id => $result) {
if ($result['success']) {
$successCount++;
echo "✓ {$id}: 成功 (尝试: {$result['retries']}次, 耗时: " . round($result['total_time'], 2) . "秒)\n";
} else {
echo "✗ {$id}: 失败 (尝试: {$result['retries']}次, 错误: {$result['error']})\n";
}
}
echo "\n统计: {$successCount}/" . count($urls) . " 成功\n";
$executor->close();
@endphp
do {
curl_multi_exec($mh, $running);
if ($running) {
curl_multi_select($mh, 1.0);
}
} while ($running > 0);
最简单的执行循环,等待所有请求完成。
do {
curl_multi_exec($mh, $running);
// 处理已完成请求
while ($info = curl_multi_info_read($mh)) {
// 处理单个完成请求
}
if ($running) {
curl_multi_select($mh, 0.1);
}
} while ($running > 0);
实时处理完成请求,避免堆积。
// 单次执行,不等待完成
curl_multi_exec($mh, $running);
// 可以在这里执行其他任务
if ($running) {
curl_multi_select($mh, 0);
}
适合在游戏循环或事件驱动架构中使用。
$timeout = 30;
$start = time();
do {
curl_multi_exec($mh, $running);
if ($running && (time() - $start) < $timeout) {
curl_multi_select($mh, 1);
} else {
break; // 超时退出
}
} while ($running > 0);
防止请求无限期挂起。