Pandas 读取数据库

数据库是数据存储的常见形式。Pandas 提供了 read_sql() 函数,可以从各种 SQL 数据库中直接读取数据到 DataFrame,实现数据分析和数据库的无缝连接。

📦 安装依赖库

读取数据库需要安装数据库驱动以及 SQLAlchemy(推荐用于连接管理):

# SQLAlchemy (核心)
pip install sqlalchemy

# 根据不同数据库安装对应驱动
# SQLite (Python内置,无需额外安装)
# MySQL / MariaDB
pip install pymysql
# PostgreSQL
pip install psycopg2-binary
# Microsoft SQL Server
pip install pyodbc
# Oracle
pip install cx_oracle

🔌 建立数据库连接

使用 read_sql() 需要提供一个数据库连接对象,通常使用 SQLAlchemy 的 create_engine() 创建引擎,也可以使用 DB-API 的连接(如 sqlite3 连接)。

SQLite (文件数据库)
from sqlalchemy import create_engine

# 连接到本地的 SQLite 文件
engine = create_engine('sqlite:///mydatabase.db')
MySQL
engine = create_engine(
    'mysql+pymysql://username:password@host:port/dbname'
)
PostgreSQL
engine = create_engine(
    'postgresql+psycopg2://username:password@host:port/dbname'
)
SQL Server
engine = create_engine(
    'mssql+pyodbc://username:password@host:port/dbname?driver=ODBC+Driver+17+for+SQL+Server'
)

📖 基本查询读取

pd.read_sql() 是最主要的函数,它接受 SQL 查询和连接对象,返回 DataFrame。

import pandas as pd
from sqlalchemy import create_engine

# 创建引擎
engine = create_engine('sqlite:///employees.db')

# 执行查询
df = pd.read_sql('SELECT * FROM employees', con=engine)
print(df.head())

# 也可以直接使用表名(读取整张表)
df = pd.read_sql_table('employees', con=engine)

# 更灵活的方式:read_sql_query 专门用于查询
df = pd.read_sql_query('SELECT name, age FROM employees WHERE age > 30', con=engine)

⚙️ 常用参数详解

sql

要执行的 SQL 查询字符串,或者表名(若使用 read_sql_table)。

con

数据库连接,可以是 SQLAlchemy 引擎,也可以是 DB-API 连接对象(如 sqlite3.Connection)。

params

为 SQL 查询提供参数,防止 SQL 注入。可以是列表、元组或字典。

df = pd.read_sql('SELECT * FROM employees WHERE age > :age',
                 con=engine, params={'age': 30})
parse_dates

指定哪些列解析为日期类型。可以是列名列表,或字典指定解析方式。

df = pd.read_sql('SELECT * FROM sales', con=engine,
                 parse_dates=['order_date'])
columns

当使用 read_sql_table 时,可以指定只读取某些列。

df = pd.read_sql_table('employees', con=engine,
                       columns=['name', 'age'])
chunksize

如果指定,返回一个迭代器,每次读取指定行数的块,适用于处理大表。

for chunk in pd.read_sql('SELECT * FROM large_table',
                          con=engine, chunksize=10000):
    process(chunk)
index_col

指定某列作为行索引。

df = pd.read_sql('SELECT * FROM employees',
                 con=engine, index_col='id')
coerce_float

尝试将非字符串、非数字对象转换为浮点数,默认为 True。

🧪 综合示例:SQLite 数据库操作

下面演示从创建数据库到查询的完整流程:

import pandas as pd
from sqlalchemy import create_engine, text
import numpy as np

# 1. 创建内存中的 SQLite 数据库(便于演示)
engine = create_engine('sqlite:///:memory:')

# 2. 准备示例数据
data = {
    'id': [1, 2, 3, 4],
    'name': ['张三', '李四', '王五', '赵六'],
    'age': [28, 35, 42, 29],
    'salary': [8000, 12000, 15000, 9500],
    'hire_date': ['2020-01-15', '2018-06-01', '2015-11-23', '2021-03-10']
}
df_original = pd.DataFrame(data)

# 3. 将数据写入数据库表
df_original.to_sql('employees', con=engine, index=False, if_exists='replace')

# 4. 读取整张表
df_all = pd.read_sql('SELECT * FROM employees', con=engine)
print("所有员工:\n", df_all)

# 5. 带参数的条件查询,并解析日期
df_filtered = pd.read_sql(
    'SELECT * FROM employees WHERE age > :min_age AND salary < :max_salary',
    con=engine,
    params={'min_age': 30, 'max_salary': 14000},
    parse_dates=['hire_date']
)
print("\n年龄大于30且工资低于14000的员工:\n", df_filtered)

# 6. 使用 chunksize 分块读取(模拟大表)
for chunk in pd.read_sql('SELECT * FROM employees', con=engine, chunksize=2):
    print("\n--- 块 ---")
    print(chunk)

🔐 参数化查询(防止 SQL 注入)

永远不要使用字符串格式化拼接 SQL,应使用 params 参数传递变量,保障安全。

# 不安全的做法(避免)
user_input = "100; DROP TABLE users;"
df = pd.read_sql(f"SELECT * FROM products WHERE id = {user_input}", con=engine)

# 安全的做法
df = pd.read_sql("SELECT * FROM products WHERE id = :id",
                 con=engine, params={'id': user_input})

⚠️ 常见问题及解决方法

连接超时或拒绝连接:检查数据库地址、端口、用户名密码是否正确,以及防火墙设置。
驱动未安装:根据使用的数据库安装对应的驱动,如 pymysql、psycopg2 等。
日期解析错误:使用 parse_dates 参数,或在读取后使用 pd.to_datetime() 转换。
内存不足处理大表:使用 chunksize 分块读取,或只读取所需列/行。
SQLAlchemy 引擎和连接池:记得在使用后处理引擎,但通常引擎可以全局使用,不需要显式关闭。如果是用连接对象,用完后应关闭。
提示:如果只需要读取少量数据,可以先在数据库端进行聚合、筛选,减少数据传输量。另外,使用 SQLAlchemy 引擎可以更好地管理连接。