read_sql() 函数,可以从各种 SQL 数据库中直接读取数据到 DataFrame,实现数据分析和数据库的无缝连接。
读取数据库需要安装数据库驱动以及 SQLAlchemy(推荐用于连接管理):
# SQLAlchemy (核心)
pip install sqlalchemy
# 根据不同数据库安装对应驱动
# SQLite (Python内置,无需额外安装)
# MySQL / MariaDB
pip install pymysql
# PostgreSQL
pip install psycopg2-binary
# Microsoft SQL Server
pip install pyodbc
# Oracle
pip install cx_oracle
使用 read_sql() 需要提供一个数据库连接对象,通常使用 SQLAlchemy 的 create_engine() 创建引擎,也可以使用 DB-API 的连接(如 sqlite3 连接)。
from sqlalchemy import create_engine
# 连接到本地的 SQLite 文件
engine = create_engine('sqlite:///mydatabase.db')
engine = create_engine(
'mysql+pymysql://username:password@host:port/dbname'
)
engine = create_engine(
'postgresql+psycopg2://username:password@host:port/dbname'
)
engine = create_engine(
'mssql+pyodbc://username:password@host:port/dbname?driver=ODBC+Driver+17+for+SQL+Server'
)
pd.read_sql() 是最主要的函数,它接受 SQL 查询和连接对象,返回 DataFrame。
import pandas as pd
from sqlalchemy import create_engine
# 创建引擎
engine = create_engine('sqlite:///employees.db')
# 执行查询
df = pd.read_sql('SELECT * FROM employees', con=engine)
print(df.head())
# 也可以直接使用表名(读取整张表)
df = pd.read_sql_table('employees', con=engine)
# 更灵活的方式:read_sql_query 专门用于查询
df = pd.read_sql_query('SELECT name, age FROM employees WHERE age > 30', con=engine)
sql要执行的 SQL 查询字符串,或者表名(若使用 read_sql_table)。
con数据库连接,可以是 SQLAlchemy 引擎,也可以是 DB-API 连接对象(如 sqlite3.Connection)。
params为 SQL 查询提供参数,防止 SQL 注入。可以是列表、元组或字典。
df = pd.read_sql('SELECT * FROM employees WHERE age > :age',
con=engine, params={'age': 30})
parse_dates指定哪些列解析为日期类型。可以是列名列表,或字典指定解析方式。
df = pd.read_sql('SELECT * FROM sales', con=engine,
parse_dates=['order_date'])
columns当使用 read_sql_table 时,可以指定只读取某些列。
df = pd.read_sql_table('employees', con=engine,
columns=['name', 'age'])
chunksize如果指定,返回一个迭代器,每次读取指定行数的块,适用于处理大表。
for chunk in pd.read_sql('SELECT * FROM large_table',
con=engine, chunksize=10000):
process(chunk)
index_col指定某列作为行索引。
df = pd.read_sql('SELECT * FROM employees',
con=engine, index_col='id')
coerce_float尝试将非字符串、非数字对象转换为浮点数,默认为 True。
下面演示从创建数据库到查询的完整流程:
import pandas as pd
from sqlalchemy import create_engine, text
import numpy as np
# 1. 创建内存中的 SQLite 数据库(便于演示)
engine = create_engine('sqlite:///:memory:')
# 2. 准备示例数据
data = {
'id': [1, 2, 3, 4],
'name': ['张三', '李四', '王五', '赵六'],
'age': [28, 35, 42, 29],
'salary': [8000, 12000, 15000, 9500],
'hire_date': ['2020-01-15', '2018-06-01', '2015-11-23', '2021-03-10']
}
df_original = pd.DataFrame(data)
# 3. 将数据写入数据库表
df_original.to_sql('employees', con=engine, index=False, if_exists='replace')
# 4. 读取整张表
df_all = pd.read_sql('SELECT * FROM employees', con=engine)
print("所有员工:\n", df_all)
# 5. 带参数的条件查询,并解析日期
df_filtered = pd.read_sql(
'SELECT * FROM employees WHERE age > :min_age AND salary < :max_salary',
con=engine,
params={'min_age': 30, 'max_salary': 14000},
parse_dates=['hire_date']
)
print("\n年龄大于30且工资低于14000的员工:\n", df_filtered)
# 6. 使用 chunksize 分块读取(模拟大表)
for chunk in pd.read_sql('SELECT * FROM employees', con=engine, chunksize=2):
print("\n--- 块 ---")
print(chunk)
永远不要使用字符串格式化拼接 SQL,应使用 params 参数传递变量,保障安全。
# 不安全的做法(避免)
user_input = "100; DROP TABLE users;"
df = pd.read_sql(f"SELECT * FROM products WHERE id = {user_input}", con=engine)
# 安全的做法
df = pd.read_sql("SELECT * FROM products WHERE id = :id",
con=engine, params={'id': user_input})
parse_dates 参数,或在读取后使用 pd.to_datetime() 转换。
chunksize 分块读取,或只读取所需列/行。