NumPy 并行计算基础

随着数据量的增长,单核计算往往难以满足性能需求。并行计算利用多核CPU、GPU甚至多台机器同时处理任务,可以显著加速数值计算。本章将介绍并行计算的基本概念,并探讨如何在使用NumPy时利用并行技术提升效率。

1. 并行计算概述

并行计算是将一个大任务分解为多个小任务,并同时执行这些小任务的计算模式。主要分为:

  • 多线程:同一进程内多个线程共享内存,适合I/O密集型或轻量级并行任务。
  • 多进程:多个独立进程,内存独立,适合CPU密集型任务(Python由于GIL的限制,多线程在CPU密集型上效果有限,多进程更常用)。
  • 分布式计算:多台机器协同工作,处理超大规模数据。

NumPy本身的一些操作(如线性代数)底层通过BLAS库自动利用多线程,而更高层次的并行则需要借助其他工具。

2. NumPy的隐式并行:BLAS多线程

NumPy的许多线性代数运算(如点积、矩阵乘法、SVD等)调用的是经过高度优化的BLAS库(如OpenBLAS、Intel MKL、ATLAS)。这些库会自动使用多线程并行计算,无需用户干预。

import numpy as np
import time

a = np.random.rand(2000, 2000)
b = np.random.rand(2000, 2000)

start = time.time()
c = a @ b  # 矩阵乘法,底层BLAS多线程
print("矩阵乘法耗时:", time.time() - start)

可以通过环境变量控制BLAS使用的线程数,例如在运行前设置 OMP_NUM_THREADS=4MKL_NUM_THREADS=4

3. 使用 NumExpr 加速表达式计算

NumExpr是一个快速数值表达式计算器,它可以将复杂的数组表达式编译为高效的机器码,并自动利用多核并行。对于涉及多次算术运算的表达式,NumExpr通常比NumPy快数倍。

import numexpr as ne
import numpy as np

a = np.random.rand(1000000)
b = np.random.rand(1000000)
c = np.random.rand(1000000)

# NumPy 表达式
start = time.time()
result_np = (a + b) * c - a / b
print("NumPy耗时:", time.time() - start)

# NumExpr 并行计算
start = time.time()
result_ne = ne.evaluate('(a + b) * c - a / b')
print("NumExpr耗时:", time.time() - start)

NumExpr会自动检测CPU核心数并并行执行,也支持设置线程数:ne.set_num_threads(4)

4. 使用 Numba 并行编译

Numba是一个JIT编译器,可以将Python函数(尤其是涉及NumPy数组的循环)编译为高效的机器码,并支持显式并行。

4.1 向量化与 @vectorize

@vectorize 可以将一个标量函数转换为类似ufunc的并行函数。

from numba import vectorize, float64
import numpy as np

@vectorize([float64(float64, float64)], target='parallel')
def func_parallel(x, y):
    return x ** 2 + y ** 2

a = np.random.rand(1000000)
b = np.random.rand(1000000)
result = func_parallel(a, b)  # 并行执行

4.2 并行循环 @njit(parallel=True)

对于复杂的循环,可以使用 @njit(parallel=True),并用 prange 指定并行循环。

from numba import njit, prange

@njit(parallel=True)
def parallel_sum(arr):
    total = 0.0
    for i in prange(len(arr)):  # prange 表示并行循环
        total += arr[i]
    return total

arr = np.random.rand(1000000)
result = parallel_sum(arr)

Numba的并行循环会自动分配到多个线程,大幅加速CPU密集型循环。

5. 使用 Dask 进行分布式与并行计算

Dask是一个灵活的并行计算库,能够处理超出内存的数据集,并将计算任务分发到多核或多台机器。它提供了类似NumPy的数组接口(dask.array)和类似Pandas的DataFrame接口。

import dask.array as da

# 创建一个大的 Dask 数组(分块)
x = da.random.random((10000, 10000), chunks=(1000, 1000))

# 执行计算(如求均值)
mean = x.mean().compute()  # compute() 触发实际计算,并利用多线程
print(mean)

Dask将数组划分为多个块,每个块可以独立计算,支持多线程、多进程或分布式调度。非常适合处理大规模数据。

6. 并行策略选择指南

  • 简单的逐元素运算或表达式:使用 NumExpr 最简单高效。
  • 自定义的复杂循环/算法:使用 Numba@njit(parallel=True)@vectorize
  • 线性代数密集的任务:确保 NumPy 链接了多线程 BLAS(如 OpenBLAS 或 MKL),通常已自动并行。
  • 超大数据集(超出内存):使用 Dask 进行分布式处理。
  • 多机分布式:Dask 或 Spark 等。

7. 注意事项

  • GIL问题:Python 的多线程受 GIL 限制,CPU 密集型任务应使用多进程或释放 GIL 的扩展(如 NumPy、Numba)。
  • 内存带宽:并行计算可能受内存带宽限制,增加线程不一定线性加速。
  • 调试难度:并行程序更难调试,建议先用小数据验证正确性。
  • 环境变量:控制 BLAS 线程数:OMP_NUM_THREADSMKL_NUM_THREADSOPENBLAS_NUM_THREADS 等。
提示: 在启用并行计算前,建议使用 top 或任务管理器观察 CPU 使用率,确认并行是否生效。对于小规模数据,并行可能因开销反而变慢,需权衡。

总结

并行计算是提升 NumPy 性能的重要手段。通过合理利用 BLAS 多线程、NumExpr、Numba 或 Dask,可以充分发挥现代多核硬件的潜力,加速数据处理和分析。掌握这些工具,能让你的代码在处理大规模数据时更加高效。