PHP curl_multi_init函数

定义和用法

curl_multi_init() 函数用于初始化一个新的cURL多句柄,允许并行处理多个cURL传输。这使得可以同时执行多个HTTP请求,显著提高请求效率,特别适用于需要从多个API端点获取数据的场景。

提示: cURL多句柄功能允许同时处理多个cURL句柄,相比顺序执行单个请求,可以大幅减少总等待时间。

语法

resource curl_multi_init ( void )

参数

此函数没有参数。

返回值

成功时返回一个cURL多句柄资源,失败时返回 FALSE

单句柄 vs 多句柄

顺序请求(单句柄)
  • 一次执行一个请求
  • 简单易用
  • 总时间 = 所有请求时间之和
  • 适合少量请求
请求1
请求2
请求3
请求4
并行请求(多句柄)
  • 同时执行多个请求
  • 复杂度较高
  • 总时间 ≈ 最慢的请求时间
  • 适合批量请求
请求1
请求2
请求3
请求4

cURL多句柄工作流程

1
curl_multi_init()
初始化多句柄
2
curl_init() +
curl_setopt()
创建单个句柄
3
curl_multi_add_handle()
添加句柄
4
curl_multi_exec()
执行请求
5
curl_multi_getcontent()
获取内容
6
curl_multi_remove_handle()
移除句柄
7
curl_multi_close()
关闭多句柄

示例

示例 1:基本的并行请求

@php
// 创建cURL多句柄
$mh = curl_multi_init();

// 要请求的URL列表
$urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2',
    'https://httpbin.org/delay/3',
    'https://httpbin.org/get'
];

$handles = [];

// 为每个URL创建cURL句柄
foreach ($urls as $i => $url) {
    $handles[$i] = curl_init();
    curl_setopt_array($handles[$i], [
        CURLOPT_URL => $url,
        CURLOPT_RETURNTRANSFER => true,
        CURLOPT_TIMEOUT => 10
    ]);

    // 将句柄添加到多句柄
    curl_multi_add_handle($mh, $handles[$i]);
}

echo "开始并行执行 " . count($urls) . " 个请求...\n";

// 执行多句柄
$running = null;
do {
    $status = curl_multi_exec($mh, $running);

    // 检查是否有活动连接
    if ($running) {
        // 等待活动,避免CPU占用过高
        curl_multi_select($mh, 1.0);
    }
} while ($running > 0);

// 检查执行状态
if ($status !== CURLM_OK) {
    echo "cURL多句柄执行出错\n";
}

// 处理结果
$results = [];
foreach ($handles as $i => $handle) {
    // 获取内容
    $content = curl_multi_getcontent($handle);
    $info = curl_getinfo($handle);

    $results[$i] = [
        'url' => $urls[$i],
        'content' => $content,
        'http_code' => $info['http_code'],
        'total_time' => $info['total_time']
    ];

    // 从多句柄中移除并关闭单个句柄
    curl_multi_remove_handle($mh, $handle);
    curl_close($handle);
}

// 关闭多句柄
curl_multi_close($mh);

// 输出结果
echo "所有请求完成!\n";
foreach ($results as $i => $result) {
    echo "请求 {$i}: {$result['url']}\n";
    echo "  HTTP状态码: {$result['http_code']}\n";
    echo "  耗时: " . round($result['total_time'], 3) . " 秒\n";
    echo "  内容长度: " . strlen($result['content']) . " 字节\n\n";
}
@endphp

示例 2:带错误处理的并行请求类

@php
class ParallelRequest {
    private $multiHandle;
    private $handles = [];
    private $results = [];

    public function __construct() {
        $this->multiHandle = curl_multi_init();
    }

    public function addRequest($url, $options = [], $id = null) {
        if ($id === null) {
            $id = uniqid('req_');
        }

        $ch = curl_init();

        // 默认选项
        $defaultOptions = [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => 30,
            CURLOPT_FOLLOWLOCATION => true
        ];

        // 合并选项
        $finalOptions = $options + $defaultOptions;
        curl_setopt_array($ch, $finalOptions);

        // 存储句柄
        $this->handles[$id] = $ch;

        // 添加到多句柄
        curl_multi_add_handle($this->multiHandle, $ch);

        return $id;
    }

    public function execute() {
        if (empty($this->handles)) {
            return [];
        }

        $startTime = microtime(true);

        // 执行所有请求
        $running = null;
        do {
            $status = curl_multi_exec($this->multiHandle, $running);

            if ($running) {
                // 等待活动
                curl_multi_select($this->multiHandle, 0.1);
            }

            // 检查是否有完成的请求
            while ($info = curl_multi_info_read($this->multiHandle)) {
                $handle = $info['handle'];
                $id = array_search($handle, $this->handles, true);

                if ($id !== false) {
                    $this->processCompletedRequest($id, $handle, $info);
                }
            }

        } while ($running > 0 && $status === CURLM_OK);

        $totalTime = microtime(true) - $startTime;

        // 清理资源
        $this->cleanup();

        return [
            'results' => $this->results,
            'total_time' => $totalTime,
            'request_count' => count($this->handles)
        ];
    }

    private function processCompletedRequest($id, $handle, $info) {
        $content = curl_multi_getcontent($handle);
        $curlInfo = curl_getinfo($handle);
        $error = curl_error($handle);
        $errno = curl_errno($handle);

        $this->results[$id] = [
            'success' => ($errno === 0 && $content !== false),
            'content' => $content,
            'info' => $curlInfo,
            'error' => $error,
            'errno' => $errno,
            'curl_result' => $info['result']
        ];

        // 从多句柄中移除
        curl_multi_remove_handle($this->multiHandle, $handle);
    }

    private function cleanup() {
        // 关闭所有单个句柄
        foreach ($this->handles as $handle) {
            if (is_resource($handle)) {
                curl_close($handle);
            }
        }

        // 关闭多句柄
        if (is_resource($this->multiHandle)) {
            curl_multi_close($this->multiHandle);
        }

        $this->handles = [];
        $this->multiHandle = null;
    }

    public function __destruct() {
        $this->cleanup();
    }
}

// 使用并行请求类
$parallel = new ParallelRequest();

// 添加多个请求
$urls = [
    'api1' => 'https://httpbin.org/json',
    'api2' => 'https://httpbin.org/xml',
    'api3' => 'https://httpbin.org/headers',
    'api4' => 'https://invalid-url-12345.com' // 这个会失败
];

foreach ($urls as $id => $url) {
    $parallel->addRequest($url, [], $id);
}

// 执行并行请求
$result = $parallel->execute();

echo "并行请求完成!\n";
echo "总耗时: " . round($result['total_time'], 3) . " 秒\n";
echo "请求数量: " . $result['request_count'] . "\n\n";

// 分析结果
$successCount = 0;
$errorCount = 0;

foreach ($result['results'] as $id => $requestResult) {
    if ($requestResult['success']) {
        $successCount++;
        echo "✓ {$id}: 成功 - HTTP {$requestResult['info']['http_code']}";
        echo " (" . round($requestResult['info']['total_time'], 3) . "秒)\n";
    } else {
        $errorCount++;
        echo "✗ {$id}: 失败 - {$requestResult['error']}\n";
    }
}

echo "\n统计: {$successCount} 成功, {$errorCount} 失败\n";
@endphp

示例 3:限制并发数量的并行请求

@php
class ThrottledParallelRequest {
    private $maxConcurrent;
    private $multiHandle;
    private $pendingUrls = [];
    private $activeHandles = [];
    private $results = [];

    public function __construct($maxConcurrent = 5) {
        $this->maxConcurrent = $maxConcurrent;
        $this->multiHandle = curl_multi_init();
    }

    public function addUrl($url, $options = []) {
        $this->pendingUrls[] = [
            'url' => $url,
            'options' => $options
        ];
    }

    public function executeAll() {
        $startTime = microtime(true);

        while (!empty($this->pendingUrls) || !empty($this->activeHandles)) {
            // 添加新的请求直到达到并发限制
            while (count($this->activeHandles) < $this->maxConcurrent && !empty($this->pendingUrls)) {
                $this->addNextRequest();
            }

            if (empty($this->activeHandles)) {
                break;
            }

            // 执行当前活动的请求
            $this->executeActiveRequests();

            // 处理完成的请求
            $this->processCompletedRequests();
        }

        $totalTime = microtime(true) - $startTime;

        $this->cleanup();

        return [
            'results' => $this->results,
            'total_time' => $totalTime,
            'total_requests' => count($this->results)
        ];
    }

    private function addNextRequest() {
        if (empty($this->pendingUrls)) {
            return;
        }

        $request = array_shift($this->pendingUrls);
        $id = uniqid('req_');

        $ch = curl_init();

        $defaultOptions = [
            CURLOPT_URL => $request['url'],
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => 30
        ];

        $finalOptions = $request['options'] + $defaultOptions;
        curl_setopt_array($ch, $finalOptions);

        $this->activeHandles[$id] = $ch;
        curl_multi_add_handle($this->multiHandle, $ch);

        echo "开始请求: {$request['url']}\n";
    }

    private function executeActiveRequests() {
        $running = null;
        do {
            $status = curl_multi_exec($this->multiHandle, $running);

            if ($running) {
                curl_multi_select($this->multiHandle, 0.1);
            }
        } while ($running > 0 && $status === CURLM_OK);
    }

    private function processCompletedRequests() {
        while ($info = curl_multi_info_read($this->multiHandle)) {
            $handle = $info['handle'];
            $id = array_search($handle, $this->activeHandles, true);

            if ($id !== false) {
                $this->processSingleResult($id, $handle, $info);
                unset($this->activeHandles[$id]);
            }
        }
    }

    private function processSingleResult($id, $handle, $info) {
        $content = curl_multi_getcontent($handle);
        $curlInfo = curl_getinfo($handle);
        $error = curl_error($handle);

        $this->results[$id] = [
            'url' => $curlInfo['url'],
            'success' => ($info['result'] === CURLE_OK),
            'http_code' => $curlInfo['http_code'],
            'total_time' => $curlInfo['total_time'],
            'content_length' => strlen($content),
            'error' => $error
        ];

        curl_multi_remove_handle($this->multiHandle, $handle);
        curl_close($handle);

        $status = $info['result'] === CURLE_OK ? '完成' : '失败';
        echo "请求完成: {$curlInfo['url']} [$status]\n";
    }

    private function cleanup() {
        // 关闭所有活动句柄
        foreach ($this->activeHandles as $handle) {
            if (is_resource($handle)) {
                curl_multi_remove_handle($this->multiHandle, $handle);
                curl_close($handle);
            }
        }

        if (is_resource($this->multiHandle)) {
            curl_multi_close($this->multiHandle);
        }

        $this->activeHandles = [];
    }
}

// 使用限流并行请求
$throttled = new ThrottledParallelRequest(3); // 最大并发3个

// 添加多个URL(模拟20个请求)
for ($i = 1; $i <= 20; $i++) {
    $throttled->addUrl("https://httpbin.org/delay/" . ($i % 3 + 1), [
        CURLOPT_TIMEOUT => 10
    ]);
}

echo "开始执行限流并行请求(最大并发: 3)...\n\n";

$result = $throttled->executeAll();

echo "\n所有请求完成!\n";
echo "总请求数: " . $result['total_requests'] . "\n";
echo "总耗时: " . round($result['total_time'], 2) . " 秒\n";

// 统计成功率
$successful = array_filter($result['results'], function($r) {
    return $r['success'];
});

echo "成功请求: " . count($successful) . "\n";
echo "失败请求: " . (count($result['results']) - count($successful)) . "\n";
@endphp

示例 4:性能对比 - 顺序 vs 并行

@php
// 性能对比测试
class PerformanceComparator {
    private $urls;

    public function __construct($urlCount = 10) {
        // 生成测试URL(每个URL延迟1-3秒)
        $this->urls = [];
        for ($i = 0; $i < $urlCount; $i++) {
            $delay = rand(1, 3);
            $this->urls[] = "https://httpbin.org/delay/{$delay}";
        }
    }

    public function testSequential() {
        echo "开始顺序请求测试...\n";
        $startTime = microtime(true);
        $results = [];

        foreach ($this->urls as $i => $url) {
            echo "执行请求 {$i}/" . count($this->urls) . "...";

            $ch = curl_init();
            curl_setopt_array($ch, [
                CURLOPT_URL => $url,
                CURLOPT_RETURNTRANSFER => true,
                CURLOPT_TIMEOUT => 10
            ]);

            $response = curl_exec($ch);
            $info = curl_getinfo($ch);

            $results[] = [
                'url' => $url,
                'time' => $info['total_time'],
                'success' => ($response !== false)
            ];

            curl_close($ch);
            echo " 完成 (" . round($info['total_time'], 2) . "秒)\n";
        }

        $totalTime = microtime(true) - $startTime;

        return [
            'type' => 'sequential',
            'total_time' => $totalTime,
            'results' => $results
        ];
    }

    public function testParallel() {
        echo "开始并行请求测试...\n";
        $startTime = microtime(true);

        $mh = curl_multi_init();
        $handles = [];
        $results = [];

        // 创建所有句柄
        foreach ($this->urls as $i => $url) {
            $handles[$i] = curl_init();
            curl_setopt_array($handles[$i], [
                CURLOPT_URL => $url,
                CURLOPT_RETURNTRANSFER => true,
                CURLOPT_TIMEOUT => 10
            ]);
            curl_multi_add_handle($mh, $handles[$i]);
        }

        echo "所有请求已就绪,开始并行执行...\n";

        // 执行并行请求
        $running = null;
        do {
            curl_multi_exec($mh, $running);
            if ($running) {
                curl_multi_select($mh, 0.1);
            }
        } while ($running > 0);

        // 收集结果
        foreach ($handles as $i => $handle) {
            $info = curl_getinfo($handle);
            $response = curl_multi_getcontent($handle);

            $results[] = [
                'url' => $this->urls[$i],
                'time' => $info['total_time'],
                'success' => ($response !== false)
            ];

            curl_multi_remove_handle($mh, $handle);
            curl_close($handle);
        }

        curl_multi_close($mh);

        $totalTime = microtime(true) - $startTime;

        return [
            'type' => 'parallel',
            'total_time' => $totalTime,
            'results' => $results
        ];
    }

    public function compare() {
        echo "性能对比测试 (" . count($this->urls) . " 个请求)\n";
        echo str_repeat("=", 50) . "\n";

        $sequential = $this->testSequential();
        echo "\n";
        $parallel = $this->testParallel();

        echo "\n" . str_repeat("=", 50) . "\n";
        echo "性能对比结果:\n";
        echo "顺序请求总耗时: " . round($sequential['total_time'], 2) . " 秒\n";
        echo "并行请求总耗时: " . round($parallel['total_time'], 2) . " 秒\n";

        $improvement = (($sequential['total_time'] - $parallel['total_time']) / $sequential['total_time']) * 100;
        echo "性能提升: " . round($improvement, 1) . "%\n";

        // 计算平均请求时间
        $seqAvg = array_sum(array_column($sequential['results'], 'time')) / count($sequential['results']);
        $parAvg = array_sum(array_column($parallel['results'], 'time')) / count($parallel['results']);

        echo "平均单个请求时间:\n";
        echo "  顺序: " . round($seqAvg, 2) . " 秒\n";
        echo "  并行: " . round($parAvg, 2) . " 秒\n";

        return [
            'sequential' => $sequential,
            'parallel' => $parallel,
            'improvement' => $improvement
        ];
    }
}

// 运行性能对比测试
$comparator = new PerformanceComparator(5); // 测试5个请求
$results = $comparator->compare();
@endphp

优势与应用场景

性能提升

并行执行多个请求,总时间接近最慢的单个请求时间。

资源优化

减少总体等待时间,提高服务器资源利用率。

复杂数据处理

适合从多个数据源并行获取数据并整合。

最佳实践

  • 合理设置并发数 - 根据服务器资源和目标服务限制调整并发数量
  • 使用curl_multi_select() - 避免CPU占用过高,让出控制权
  • 及时清理资源 - 完成后立即移除句柄并关闭连接
  • 错误处理 - 为每个请求单独处理错误,避免影响其他请求
  • 超时设置 - 为并行请求设置合理的超时时间
  • 内存管理 - 监控内存使用,避免大量并发导致内存溢出

注意事项

重要:
  • curl_multi_init()创建的多句柄必须使用curl_multi_close()关闭
  • 单个cURL句柄在添加到多句柄后不应单独使用curl_exec()
  • 使用curl_multi_getcontent()获取单个请求的内容,而不是curl_exec()
  • 并行请求可能对目标服务器造成较大压力,请合理使用
  • 某些服务器可能对并发连接数有限制
  • 在处理大量请求时,注意系统资源限制(文件描述符数量等)