PHP curl_multi_exec函数

定义和用法

curl_multi_exec() 函数用于执行cURL多句柄中的各个子句柄。这个函数以非阻塞方式处理多个并行的cURL请求,需要通过循环调用直到所有请求完成。

提示: curl_multi_exec() 是非阻塞的,它立即返回并允许程序在请求进行时执行其他任务,这是实现高效并行请求的关键。

语法

int curl_multi_exec ( resource $mh , int &$still_running )

参数

参数 描述 类型 必需
mh curl_multi_init() 返回的cURL多句柄 resource
still_running 一个引用参数,表示是否还有正在运行的句柄数量 int

返回值

返回一个cURL多句柄错误代码:

  • CURLM_OK (0) - 操作成功
  • CURLM_CALL_MULTI_PERFORM (-1) - 早期版本中表示需要再次调用
  • 其他错误代码表示执行过程中出现的问题

执行流程详解

1
初始化循环
2
调用 curl_multi_exec()
3
检查 still_running
> 0
= 0
4
调用 curl_multi_select()
5
处理完成请求

示例

示例 1:基本的并行执行循环

@php
// 创建多句柄
$mh = curl_multi_init();

// 创建多个cURL句柄并添加到多句柄
$handles = [];
$urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2',
    'https://httpbin.org/delay/3'
];

foreach ($urls as $i => $url) {
    $handles[$i] = curl_init($url);
    curl_setopt($handles[$i], CURLOPT_RETURNTRANSFER, true);
    curl_multi_add_handle($mh, $handles[$i]);
}

echo "开始执行 " . count($urls) . " 个并行请求...\n";

// 执行多句柄 - 核心循环
$active = null;
$startTime = microtime(true);

do {
    // 执行多句柄
    $status = curl_multi_exec($mh, $active);

    // 检查执行状态
    if ($status !== CURLM_OK) {
        echo "多句柄执行错误,状态码: " . $status . "\n";
        break;
    }

    // 等待活动(避免CPU占用过高)
    if ($active > 0) {
        curl_multi_select($mh, 1.0); // 等待最多1秒
    }

    // 输出当前进度
    echo "正在运行: " . $active . " 个请求\n";

} while ($active > 0);

$totalTime = microtime(true) - $startTime;
echo "所有请求完成!总耗时: " . round($totalTime, 2) . " 秒\n";

// 处理结果
foreach ($handles as $i => $handle) {
    $content = curl_multi_getcontent($handle);
    $info = curl_getinfo($handle);

    echo "请求 {$i}: HTTP " . $info['http_code'] . ", 耗时: " . round($info['total_time'], 2) . "秒\n";

    curl_multi_remove_handle($mh, $handle);
    curl_close($handle);
}

curl_multi_close($mh);
@endphp

示例 2:带进度监控和完成处理的执行循环

@php
class AdvancedMultiExecutor {
    private $multiHandle;
    private $handles = [];
    private $results = [];
    private $startTime;

    public function __construct() {
        $this->multiHandle = curl_multi_init();
        $this->startTime = microtime(true);
    }

    public function addRequest($url, $id = null) {
        if ($id === null) {
            $id = uniqid('req_');
        }

        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => 30,
            CURLOPT_NOPROGRESS => false,
            CURLOPT_PROGRESSFUNCTION => function($resource, $download_size, $downloaded, $upload_size, $uploaded) use ($id) {
                // 进度回调(可选)
                return 0;
            }
        ]);

        $this->handles[$id] = $ch;
        curl_multi_add_handle($this->multiHandle, $ch);

        return $id;
    }

    public function execute() {
        echo "开始执行 " . count($this->handles) . " 个并行请求...\n\n";

        $active = null;
        $lastProgressUpdate = 0;

        // 主执行循环
        do {
            // 执行多句柄
            $status = curl_multi_exec($this->multiHandle, $active);

            if ($status !== CURLM_OK) {
                $this->handleMultiError($status);
                break;
            }

            // 处理已完成的请求
            $this->processCompletedRequests();

            // 显示进度(每秒更新一次)
            $currentTime = microtime(true);
            if ($currentTime - $lastProgressUpdate >= 1.0) {
                $this->showProgress($active);
                $lastProgressUpdate = $currentTime;
            }

            // 等待活动
            if ($active > 0) {
                $ready = curl_multi_select($this->multiHandle, 0.1);
                if ($ready === -1) {
                    // 选择错误,短暂暂停
                    usleep(100000); // 100ms
                }
            }

        } while ($active > 0);

        // 最终处理可能剩余的任何请求
        $this->processCompletedRequests(true);

        $totalTime = microtime(true) - $this->startTime;
        echo "\n所有请求执行完成!总耗时: " . round($totalTime, 2) . " 秒\n";

        return $this->results;
    }

    private function processCompletedRequests($final = false) {
        // 处理所有已完成的请求
        while ($info = curl_multi_info_read($this->multiHandle)) {
            $handle = $info['handle'];
            $id = array_search($handle, $this->handles, true);

            if ($id !== false) {
                $this->handleCompletedRequest($id, $handle, $info);
                unset($this->handles[$id]);
            }
        }

        // 如果是最终处理且仍有活动句柄,强制处理
        if ($final && !empty($this->handles)) {
            foreach ($this->handles as $id => $handle) {
                $this->handleCompletedRequest($id, $handle, ['handle' => $handle, 'result' => CURLE_OPERATION_TIMEDOUT]);
                curl_multi_remove_handle($this->multiHandle, $handle);
            }
            $this->handles = [];
        }
    }

    private function handleCompletedRequest($id, $handle, $info) {
        $content = curl_multi_getcontent($handle);
        $curlInfo = curl_getinfo($handle);
        $error = curl_error($handle);
        $errno = curl_errno($handle);

        $this->results[$id] = [
            'success' => ($info['result'] === CURLE_OK),
            'http_code' => $curlInfo['http_code'],
            'total_time' => $curlInfo['total_time'],
            'content_length' => strlen($content),
            'error' => $error,
            'errno' => $errno,
            'content' => $content
        ];

        curl_multi_remove_handle($this->multiHandle, $handle);
        curl_close($handle);

        $status = $info['result'] === CURLE_OK ? '完成' : '失败';
        echo "请求 {$id}: {$status} (" . round($curlInfo['total_time'], 2) . "秒)\n";
    }

    private function showProgress($active) {
        $elapsed = microtime(true) - $this->startTime;
        $completed = count($this->results);
        $total = $completed + count($this->handles);

        echo sprintf(
            "进度: %d/%d 完成, %d 运行中, 已耗时: %.1fs\n",
            $completed, $total, $active, $elapsed
        );
    }

    private function handleMultiError($status) {
        $errors = [
            CURLM_BAD_HANDLE => '传递了无效的多句柄',
            CURLM_BAD_EASY_HANDLE => '传递了无效的简单句柄',
            CURLM_OUT_OF_MEMORY => '内存不足',
            CURLM_INTERNAL_ERROR => '内部错误',
            CURLM_BAD_SOCKET => '无效的socket',
            CURLM_UNKNOWN_OPTION => '未知选项'
        ];

        $errorMsg = $errors[$status] ?? "未知错误 (代码: {$status})";
        echo "多句柄执行错误: {$errorMsg}\n";
    }

    public function close() {
        foreach ($this->handles as $handle) {
            if (is_resource($handle)) {
                curl_multi_remove_handle($this->multiHandle, $handle);
                curl_close($handle);
            }
        }

        if (is_resource($this->multiHandle)) {
            curl_multi_close($this->multiHandle);
        }

        $this->handles = [];
        $this->results = [];
    }

    public function __destruct() {
        $this->close();
    }
}

// 使用高级执行器
$executor = new AdvancedMultiExecutor();

// 添加多个请求
$urls = [
    'fast' => 'https://httpbin.org/delay/1',
    'medium' => 'https://httpbin.org/delay/2',
    'slow' => 'https://httpbin.org/delay/3',
    'instant' => 'https://httpbin.org/get'
];

foreach ($urls as $id => $url) {
    $executor->addRequest($url, $id);
}

// 执行所有请求
$results = $executor->execute();

// 显示统计信息
echo "\n执行结果统计:\n";
$successCount = count(array_filter($results, function($r) { return $r['success']; }));
$totalCount = count($results);
echo "成功: {$successCount}/{$totalCount}\n";

// 显示每个请求的详细信息
foreach ($results as $id => $result) {
    $status = $result['success'] ? '✓' : '✗';
    echo "{$status} {$id}: HTTP {$result['http_code']}, {$result['content_length']} 字节, {$result['total_time']}秒\n";
}

$executor->close();
@endphp

示例 3:非阻塞执行与任务并行处理

@php
class NonBlockingMultiExecutor {
    private $multiHandle;
    private $handles = [];
    private $callbacks = [];
    private $isRunning = false;

    public function __construct() {
        $this->multiHandle = curl_multi_init();
    }

    public function addRequest($url, $callback, $options = []) {
        $id = uniqid('req_');

        $ch = curl_init();

        $defaultOptions = [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => 30
        ];

        $finalOptions = $options + $defaultOptions;
        curl_setopt_array($ch, $finalOptions);

        $this->handles[$id] = $ch;
        $this->callbacks[$id] = $callback;

        curl_multi_add_handle($this->multiHandle, $ch);

        return $id;
    }

    public function start() {
        $this->isRunning = true;
        echo "开始非阻塞执行...\n";
    }

    public function process($maxTime = 0.1) {
        if (!$this->isRunning || empty($this->handles)) {
            return false;
        }

        $active = null;
        $startTime = microtime(true);

        // 执行多句柄(非阻塞)
        do {
            $status = curl_multi_exec($this->multiHandle, $active);

            if ($status !== CURLM_OK) {
                break;
            }

            // 处理已完成的请求
            $this->processCompletedRequests();

            // 检查时间限制
            if (microtime(true) - $startTime >= $maxTime) {
                break;
            }

            // 短暂等待
            if ($active > 0) {
                curl_multi_select($this->multiHandle, 0.01);
            }

        } while ($active > 0);

        return !empty($this->handles); // 返回是否还有未完成的请求
    }

    private function processCompletedRequests() {
        while ($info = curl_multi_info_read($this->multiHandle)) {
            $handle = $info['handle'];
            $id = array_search($handle, $this->handles, true);

            if ($id !== false) {
                $this->handleCompletedRequest($id, $handle, $info);
            }
        }
    }

    private function handleCompletedRequest($id, $handle, $info) {
        $content = curl_multi_getcontent($handle);
        $curlInfo = curl_getinfo($handle);
        $error = curl_error($handle);

        $result = [
            'success' => ($info['result'] === CURLE_OK),
            'content' => $content,
            'info' => $curlInfo,
            'error' => $error
        ];

        // 调用回调函数
        if (isset($this->callbacks[$id])) {
            call_user_func($this->callbacks[$id], $result, $id);
        }

        // 清理资源
        curl_multi_remove_handle($this->multiHandle, $handle);
        curl_close($handle);

        unset($this->handles[$id], $this->callbacks[$id]);

        echo "请求 {$id} 完成并回调\n";
    }

    public function waitForCompletion($checkInterval = 0.1) {
        while ($this->process($checkInterval)) {
            // 可以在这里执行其他任务
            echo ".";
            usleep(50000); // 50ms
        }

        echo "\n所有请求完成!\n";
        $this->isRunning = false;
    }

    public function close() {
        $this->isRunning = false;

        foreach ($this->handles as $handle) {
            if (is_resource($handle)) {
                curl_multi_remove_handle($this->multiHandle, $handle);
                curl_close($handle);
            }
        }

        if (is_resource($this->multiHandle)) {
            curl_multi_close($this->multiHandle);
        }

        $this->handles = [];
        $this->callbacks = [];
    }

    public function __destruct() {
        $this->close();
    }
}

// 使用非阻塞执行器
$executor = new NonBlockingMultiExecutor();

// 定义回调函数
$callback = function($result, $id) {
    if ($result['success']) {
        echo "[回调] {$id}: 成功, 长度: " . strlen($result['content']) . " 字节\n";
    } else {
        echo "[回调] {$id}: 失败, 错误: " . $result['error'] . "\n";
    }
};

// 添加多个请求
$urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2',
    'https://httpbin.org/delay/3',
    'https://httpbin.org/get'
];

foreach ($urls as $url) {
    $executor->addRequest($url, $callback);
}

echo "开始非阻塞执行,可以同时处理其他任务...\n";
$executor->start();

// 模拟同时处理其他任务
$taskCount = 0;
while ($executor->process(0.05)) { // 每次处理最多50ms
    $taskCount++;
    echo "处理其他任务 #{$taskCount}\n";
    usleep(100000); // 100ms
}

// 或者使用等待完成
// $executor->waitForCompletion();

echo "所有HTTP请求和其他任务都已完成!\n";
$executor->close();
@endphp

示例 4:错误处理与重试机制

@php
class RobustMultiExecutor {
    private $multiHandle;
    private $requests = [];
    private $maxRetries;
    private $timeout;

    public function __construct($maxRetries = 3, $timeout = 30) {
        $this->multiHandle = curl_multi_init();
        $this->maxRetries = $maxRetries;
        $this->timeout = $timeout;
    }

    public function addRequest($url, $id = null) {
        if ($id === null) {
            $id = uniqid('req_');
        }

        $this->requests[$id] = [
            'url' => $url,
            'handle' => null,
            'retries' => 0,
            'completed' => false,
            'result' => null
        ];

        $this->createHandle($id);

        return $id;
    }

    private function createHandle($id) {
        $request = &$this->requests[$id];

        if ($request['handle'] && is_resource($request['handle'])) {
            curl_multi_remove_handle($this->multiHandle, $request['handle']);
            curl_close($request['handle']);
        }

        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $request['url'],
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => $this->timeout,
            CURLOPT_CONNECTTIMEOUT => 10
        ]);

        $request['handle'] = $ch;
        curl_multi_add_handle($this->multiHandle, $ch);

        $request['retries']++;
    }

    public function execute() {
        echo "开始执行(最大重试次数: {$this->maxRetries})...\n";

        $startTime = microtime(true);
        $active = null;

        do {
            // 执行多句柄
            $status = curl_multi_exec($this->multiHandle, $active);

            if ($status !== CURLM_OK) {
                $this->handleExecutionError($status);
                // 继续执行,不立即退出
            }

            // 处理完成的请求
            $this->processCompletedRequests();

            // 处理需要重试的请求
            $this->processRetries();

            // 显示进度
            $this->showProgress();

            // 等待活动
            if ($active > 0) {
                curl_multi_select($this->multiHandle, 0.1);
            }

            // 超时检查
            if (microtime(true) - $startTime > $this->timeout * 2) {
                echo "执行超时,强制结束\n";
                break;
            }

        } while ($active > 0 || $this->hasPendingRetries());

        $totalTime = microtime(true) - $startTime;
        echo "\n执行完成!总耗时: " . round($totalTime, 2) . " 秒\n";

        return $this->collectResults();
    }

    private function processCompletedRequests() {
        while ($info = curl_multi_info_read($this->multiHandle)) {
            $handle = $info['handle'];
            $id = $this->findRequestId($handle);

            if ($id !== null) {
                $this->handleRequestCompletion($id, $handle, $info);
            }
        }
    }

    private function findRequestId($handle) {
        foreach ($this->requests as $id => $request) {
            if ($request['handle'] === $handle) {
                return $id;
            }
        }
        return null;
    }

    private function handleRequestCompletion($id, $handle, $info) {
        $request = &$this->requests[$id];

        $content = curl_multi_getcontent($handle);
        $curlInfo = curl_getinfo($handle);
        $error = curl_error($handle);

        $result = [
            'success' => ($info['result'] === CURLE_OK),
            'http_code' => $curlInfo['http_code'],
            'content' => $content,
            'error' => $error,
            'errno' => $info['result'],
            'retries' => $request['retries'] - 1, // 减1因为当前尝试不算重试
            'total_time' => $curlInfo['total_time']
        ];

        // 检查是否需要重试
        if (!$result['success'] && $request['retries'] <= $this->maxRetries) {
            echo "请求 {$id} 失败,准备重试 ({$request['retries']}/{$this->maxRetries})...\n";
            $this->createHandle($id); // 重新创建句柄进行重试
            return;
        }

        // 请求完成(成功或达到最大重试次数)
        $request['completed'] = true;
        $request['result'] = $result;

        curl_multi_remove_handle($this->multiHandle, $handle);
        curl_close($handle);

        $status = $result['success'] ? '成功' : '最终失败';
        echo "请求 {$id}: {$status} (尝试: {$request['retries']}次)\n";
    }

    private function processRetries() {
        // 可以在这里实现延迟重试逻辑
        // 当前实现是立即重试
    }

    private function hasPendingRetries() {
        foreach ($this->requests as $request) {
            if (!$request['completed'] && $request['retries'] <= $this->maxRetries) {
                return true;
            }
        }
        return false;
    }

    private function showProgress() {
        static $lastUpdate = 0;

        $currentTime = microtime(true);
        if ($currentTime - $lastUpdate < 1.0) {
            return;
        }

        $completed = 0;
        $total = count($this->requests);

        foreach ($this->requests as $request) {
            if ($request['completed']) {
                $completed++;
            }
        }

        echo "进度: {$completed}/{$total} 完成\n";
        $lastUpdate = $currentTime;
    }

    private function handleExecutionError($status) {
        $errors = [
            CURLM_BAD_HANDLE => '无效的多句柄',
            CURLM_OUT_OF_MEMORY => '内存不足',
            CURLM_INTERNAL_ERROR => '内部错误'
        ];

        $errorMsg = $errors[$status] ?? "错误代码: {$status}";
        echo "多句柄执行错误: {$errorMsg}\n";
    }

    private function collectResults() {
        $results = [];
        foreach ($this->requests as $id => $request) {
            if ($request['completed']) {
                $results[$id] = $request['result'];
            }
        }
        return $results;
    }

    public function close() {
        foreach ($this->requests as $request) {
            if ($request['handle'] && is_resource($request['handle'])) {
                curl_multi_remove_handle($this->multiHandle, $request['handle']);
                curl_close($request['handle']);
            }
        }

        if (is_resource($this->multiHandle)) {
            curl_multi_close($this->multiHandle);
        }

        $this->requests = [];
    }

    public function __destruct() {
        $this->close();
    }
}

// 使用健壮的执行器(包含错误处理和重试)
$executor = new RobustMultiExecutor(2, 10); // 最大重试2次,超时10秒

// 添加一些可能失败的请求进行测试
$urls = [
    'good' => 'https://httpbin.org/get',
    'slow' => 'https://httpbin.org/delay/2',
    'timeout' => 'https://httpbin.org/delay/5', // 可能超时
    'invalid' => 'https://invalid-url-12345.com' // 会失败
];

foreach ($urls as $id => $url) {
    $executor->addRequest($url, $id);
}

$results = $executor->execute();

// 分析结果
echo "\n最终结果分析:\n";
$successCount = 0;
foreach ($results as $id => $result) {
    if ($result['success']) {
        $successCount++;
        echo "✓ {$id}: 成功 (尝试: {$result['retries']}次, 耗时: " . round($result['total_time'], 2) . "秒)\n";
    } else {
        echo "✗ {$id}: 失败 (尝试: {$result['retries']}次, 错误: {$result['error']})\n";
    }
}

echo "\n统计: {$successCount}/" . count($urls) . " 成功\n";

$executor->close();
@endphp

常见执行模式

简单阻塞模式
do {
    curl_multi_exec($mh, $running);
    if ($running) {
        curl_multi_select($mh, 1.0);
    }
} while ($running > 0);

最简单的执行循环,等待所有请求完成。

带进度处理模式
do {
    curl_multi_exec($mh, $running);
    // 处理已完成请求
    while ($info = curl_multi_info_read($mh)) {
        // 处理单个完成请求
    }
    if ($running) {
        curl_multi_select($mh, 0.1);
    }
} while ($running > 0);

实时处理完成请求,避免堆积。

非阻塞模式
// 单次执行,不等待完成
curl_multi_exec($mh, $running);
// 可以在这里执行其他任务
if ($running) {
    curl_multi_select($mh, 0);
}

适合在游戏循环或事件驱动架构中使用。

超时控制模式
$timeout = 30;
$start = time();
do {
    curl_multi_exec($mh, $running);
    if ($running && (time() - $start) < $timeout) {
        curl_multi_select($mh, 1);
    } else {
        break; // 超时退出
    }
} while ($running > 0);

防止请求无限期挂起。

最佳实践

  • 总是检查返回值 - 检查curl_multi_exec()的返回值,处理可能的错误
  • 使用curl_multi_select() - 避免忙等待,减少CPU占用
  • 及时处理完成请求 - 在循环中调用curl_multi_info_read()处理已完成请求
  • 设置合理的超时 - 为单个请求和多句柄执行都设置超时时间
  • 监控资源使用 - 在处理大量请求时注意内存和文件描述符限制
  • 错误处理与重试 - 为失败请求实现适当的重试机制
  • 避免阻塞主线程 - 在Web应用中考虑使用非阻塞模式

注意事项

重要:
  • curl_multi_exec()是非阻塞的,需要通过循环调用直到所有请求完成
  • still_running参数表示当前正在运行的句柄数量,而不是剩余句柄总数
  • 在PHP 5.5+中,CURLM_CALL_MULTI_PERFORM已弃用,不需要特殊处理
  • curl_multi_select()在某些系统上可能立即返回,需要适当处理
  • 大量并发请求时,注意系统资源限制(文件描述符、内存等)
  • 在多线程环境中使用cURL多句柄需要特别注意线程安全
  • 及时清理已完成请求的句柄,防止资源泄漏