# 安装 Requests 和 BeautifulSoup4
pip install requests beautifulsoup4
# 安装可选的解析器(推荐)
pip install lxml # 高性能解析器
pip install html5lib # HTML5解析器
# 验证安装
python -c "import requests; import bs4; print('安装成功')"
# 或者使用 requirements.txt
# requests==2.28.1
# beautifulsoup4==4.11.1
# lxml==4.9.1
import requests
from bs4 import BeautifulSoup
import time
import re
import json
import csv
import os
# 配置请求头,模拟浏览器
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
# 配置BeautifulSoup解析器
PARSER = 'lxml' # 也可以使用 'html.parser' 或 'html5lib'
# 创建会话
session = requests.Session()
session.headers.update(HEADERS)
使用 Requests 获取网页,然后使用 BeautifulSoup 解析的基本流程。
import requests
from bs4 import BeautifulSoup
def fetch_and_parse(url):
"""获取网页并解析的基本函数"""
try:
# 1. 使用Requests获取网页
response = requests.get(url, timeout=10)
response.raise_for_status() # 检查HTTP错误
# 2. 设置正确的编码(如果需要)
response.encoding = response.apparent_encoding
# 3. 使用BeautifulSoup解析HTML
soup = BeautifulSoup(response.text, 'lxml')
# 4. 提取数据
# 获取页面标题
page_title = soup.title.string if soup.title else "无标题"
print(f"页面标题: {page_title}")
# 获取所有链接
links = soup.find_all('a')
print(f"找到 {len(links)} 个链接")
# 获取特定元素
main_content = soup.find('div', class_='content')
if main_content:
print(f"主要内容: {main_content.get_text(strip=True)[:100]}...")
return soup
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
return None
except Exception as e:
print(f"解析错误: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 测试URL
test_url = "https://httpbin.org/html"
soup = fetch_and_parse(test_url)
if soup:
print("解析成功!")
# 可以继续处理soup对象...
<div class="product">
<h3>Python编程</h3>
<p class="price">¥59.99</p>
<a href="/buy/python">购买</a>
</div>
{
"title": "Python编程",
"price": "¥59.99",
"link": "/buy/python"
}
product = soup.find('div', class_='product')
title = product.h3.text
price = product.find('p', class_='price').text
link = product.a['href']
find() 查找第一个匹配元素.text 获取元素文本内容['href'] 获取元素属性通过标签名选择元素
# 查找所有div标签
soup.find_all('div')
# 查找第一个p标签
soup.find('p')
通过CSS类选择元素
# 查找class为content的元素
soup.find_all(class_='content')
# 查找class包含active的元素
soup.find_all(class_=re.compile('active'))
通过ID选择元素
# 查找id为main的元素
soup.find(id='main')
# 通过属性查找
soup.find(attrs={'id': 'main'})
通过属性选择元素
# 查找有href属性的a标签
soup.find_all('a', href=True)
# 查找href以http开头的a标签
soup.find_all('a', href=re.compile('^http'))
使用CSS选择器语法
# 选择所有class为item的div
soup.select('div.item')
# 选择id为content下的所有p标签
soup.select('#content p')
# 选择第一个匹配的元素
soup.select_one('.item')
通过层级关系选择元素
# 选择所有div下的直接子p标签
soup.select('div > p')
# 选择所有div后的兄弟p标签
soup.select('div ~ p')
# 选择紧接在div后的p标签
soup.select('div + p')
| 选择方法 | 速度 | 灵活性 | 易用性 | 适用场景 |
|---|---|---|---|---|
find() / find_all() |
快 | 中等 | 简单 | 简单选择,已知元素结构 |
select() / select_one() |
中等 | 高 | 中等 | 复杂CSS选择器,熟悉CSS语法 |
| 属性选择器 | 快 | 中等 | 简单 | 基于属性筛选 |
| 正则表达式 | 慢 | 高 | 复杂 | 模糊匹配,复杂模式 |
import re
from bs4 import BeautifulSoup
# 假设我们有以下HTML
html_doc = """
<div class="container">
<article id="post-123" class="post">
<h2 class="title">Python爬虫教程</h2>
<div class="meta">
<span class="author">张三</span>
<span class="date">2023-10-01</span>
</div>
<div class="content">
<p>这是一篇关于Python爬虫的教程。</p>
<a href="https://example.com/more" class="read-more">阅读更多</a>
</div>
<div class="tags">
<a href="/tag/python" class="tag">Python</a>
<a href="/tag/web" class="tag">Web</a>
</div>
</article>
</div>
"""
soup = BeautifulSoup(html_doc, 'lxml')
# 1. 使用find()和find_all()
article = soup.find('article', class_='post')
title = article.find('h2', class_='title').text
author = article.find('span', class_='author').text
# 2. 使用CSS选择器
content = soup.select_one('.content p').text
read_more_link = soup.select_one('.read-more')['href']
# 3. 使用属性选择器
tags = soup.find_all('a', class_='tag', href=re.compile(r'^/tag/'))
# 4. 使用正则表达式
# 查找所有包含"Python"的文本
python_text = soup.find_all(text=re.compile('Python'))
# 5. 组合使用
# 查找article下的所有直接子div
article_divs = article.find_all('div', recursive=False)
# 提取数据
data = {
'title': title,
'author': author,
'content': content,
'read_more': read_more_link,
'tags': [tag.text for tag in tags],
'article_div_count': len(article_divs)
}
print("提取的数据:")
for key, value in data.items():
print(f"{key}: {value}")
# 获取元素所有文本
text = element.get_text()
# 获取文本并去除空白
clean_text = element.get_text(strip=True)
# 获取特定子元素的文本
title = element.find('h1').text
# 获取单个属性
href = element['href']
# 获取所有属性
attrs = element.attrs
# 安全获取属性(避免KeyError)
href = element.get('href', '')
# 提取所有链接
links = [a['href'] for a in soup.find_all('a', href=True)]
# 提取所有图片
images = [img['src'] for img in soup.find_all('img', src=True)]
# 处理相对链接
from urllib.parse import urljoin
full_url = urljoin(base_url, relative_url)
# 提取表格数据
table = soup.find('table')
rows = table.find_all('tr')
for row in rows:
cols = row.find_all('td')
row_data = [col.text.strip() for col in cols]
# 处理row_data...
import re
from datetime import datetime
class DataCleaner:
"""数据清洗工具类"""
@staticmethod
def clean_text(text):
"""清理文本:去除多余空白、换行符等"""
if not text:
return ""
# 替换多个空白字符为单个空格
text = re.sub(r'\s+', ' ', text)
# 去除首尾空白
text = text.strip()
# 去除不可见字符
text = ''.join(char for char in text if char.isprintable())
return text
@staticmethod
def extract_price(text):
"""从文本中提取价格"""
if not text:
return None
# 匹配数字,包括小数点和千分位分隔符
match = re.search(r'[\d,]+\.?\d*', text)
if match:
# 去除千分位分隔符
price_str = match.group().replace(',', '')
try:
return float(price_str)
except ValueError:
return None
return None
@staticmethod
def extract_date(text, date_formats=None):
"""从文本中提取日期"""
if not text:
return None
if date_formats is None:
date_formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%d-%m-%Y',
'%d/%m/%Y',
'%Y年%m月%d日',
'%m月%d日, %Y',
]
for date_format in date_formats:
try:
return datetime.strptime(text.strip(), date_format)
except ValueError:
continue
# 尝试使用正则表达式匹配
date_patterns = [
r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})日?',
r'(\d{1,2})[-/月](\d{1,2})[-/日,]?\s*(\d{4})',
]
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
groups = match.groups()
if len(groups) == 3:
try:
year, month, day = map(int, groups)
# 处理两位数年份
if year < 100:
year += 2000
return datetime(year, month, day)
except ValueError:
continue
return None
@staticmethod
def normalize_url(url, base_url):
"""规范化URL,处理相对链接"""
from urllib.parse import urljoin, urlparse
if not url:
return None
# 处理JavaScript链接
if url.startswith(('javascript:', 'mailto:', 'tel:')):
return None
# 处理相对链接
full_url = urljoin(base_url, url)
# 解析URL以确保格式正确
parsed = urlparse(full_url)
# 重建URL(去除片段标识符等)
normalized = parsed._replace(fragment='', params='', query='')
return normalized.geturl()
@staticmethod
def extract_emails(text):
"""从文本中提取邮箱地址"""
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
return re.findall(email_pattern, text)
@staticmethod
def extract_phone_numbers(text):
"""从文本中提取电话号码"""
phone_patterns = [
r'\+?[\d\s-]{10,}', # 国际格式
r'\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}', # 美国格式
r'1[3-9]\d{9}', # 中国手机号
]
phones = []
for pattern in phone_patterns:
phones.extend(re.findall(pattern, text))
return phones
# 使用示例
if __name__ == "__main__":
cleaner = DataCleaner()
# 测试文本清洗
dirty_text = " Python \n 爬虫 \t教程 "
clean_text = cleaner.clean_text(dirty_text)
print(f"文本清洗: '{dirty_text}' -> '{clean_text}'")
# 测试价格提取
price_text = "价格: ¥1,299.99"
price = cleaner.extract_price(price_text)
print(f"价格提取: '{price_text}' -> {price}")
# 测试日期提取
date_text = "发布日期: 2023-10-01"
date = cleaner.extract_date(date_text)
print(f"日期提取: '{date_text}' -> {date}")
import json
from bs4 import BeautifulSoup
def extract_nested_data(soup):
"""从复杂HTML结构中提取嵌套数据"""
results = []
# 假设HTML结构如下:
#
#
#
# 产品名称
#
# 价格
#
#
#
# - 特征1
# - 特征2
#
#
#
#
# 查找所有产品项
product_items = soup.select('.product-item')
for item in product_items:
product_data = {}
# 提取基本信息
name_elem = item.select_one('h3')
if name_elem:
product_data['name'] = name_elem.get_text(strip=True)
# 提取价格和评分
price_elem = item.select_one('.price')
if price_elem:
product_data['price'] = price_elem.get_text(strip=True)
rating_elem = item.select_one('.rating')
if rating_elem:
product_data['rating'] = rating_elem.get_text(strip=True)
# 提取特征列表
features = []
feature_items = item.select('.features li')
for feature in feature_items:
features.append(feature.get_text(strip=True))
product_data['features'] = features
# 提取所有链接
links = []
for a in item.find_all('a', href=True):
link_data = {
'text': a.get_text(strip=True),
'href': a['href']
}
links.append(link_data)
product_data['links'] = links
# 提取所有图片
images = []
for img in item.find_all('img', src=True):
img_data = {
'src': img['src'],
'alt': img.get('alt', '')
}
images.append(img_data)
product_data['images'] = images
# 提取自定义数据属性
# 假设有data-*属性
data_attrs = {}
for attr_name, attr_value in item.attrs.items():
if attr_name.startswith('data-'):
data_attrs[attr_name[5:]] = attr_value
product_data['data_attributes'] = data_attrs
results.append(product_data)
return results
def extract_table_data(soup):
"""提取表格数据并转换为结构化格式"""
tables = soup.find_all('table')
table_data = []
for table in tables:
table_info = {
'headers': [],
'rows': []
}
# 提取表头
headers = table.find_all('th')
if headers:
table_info['headers'] = [th.get_text(strip=True) for th in headers]
# 提取表格行
rows = table.find_all('tr')
for row in rows:
# 跳过表头行(如果已经单独处理了th)
if row.find('th'):
continue
cols = row.find_all('td')
if cols:
row_data = {
'cells': [td.get_text(strip=True) for td in cols],
'raw_html': str(row)
}
# 如果有表头,创建字典格式的行数据
if table_info['headers'] and len(cols) == len(table_info['headers']):
row_dict = {}
for i, header in enumerate(table_info['headers']):
row_dict[header] = cols[i].get_text(strip=True)
row_data['dict'] = row_dict
table_info['rows'].append(row_data)
table_data.append(table_info)
return table_data
# 使用示例
if __name__ == "__main__":
# 模拟HTML
html = """
型号
价格
库存
iPhone 14
¥6999
50
Samsung S23
¥5999
30
"""
soup = BeautifulSoup(html, 'lxml')
# 提取嵌套数据
products = extract_nested_data(soup)
print("产品数据:")
print(json.dumps(products, indent=2, ensure_ascii=False))
# 提取表格数据
tables = extract_table_data(soup)
print("\n表格数据:")
print(json.dumps(tables, indent=2, ensure_ascii=False))
确定需要提取的数据:标题、发布时间、作者、内容、分类等。
import requests
from bs4 import BeautifulSoup
import re
from datetime import datetime
import json
class NewsScraper:
"""新闻网站爬虫"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def fetch_article_links(self, category_url):
"""获取文章列表页的所有文章链接"""
try:
response = self.session.get(category_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
# 根据实际网站结构调整选择器
article_links = []
# 方法1: 通过CSS选择器
articles = soup.select('.article-list .article-item a')
for article in articles:
if article.get('href'):
link = article['href']
if not link.startswith('http'):
link = self.base_url + link
article_links.append(link)
# 方法2: 通过正则表达式
# 查找所有包含/article/的链接
all_links = soup.find_all('a', href=re.compile(r'/article/\d+'))
for link in all_links:
href = link['href']
if not href.startswith('http'):
href = self.base_url + href
article_links.append(href)
# 去重
article_links = list(set(article_links))
return article_links
except Exception as e:
print(f"获取文章链接失败: {e}")
return []
def parse_article(self, article_url):
"""解析单篇文章"""
try:
response = self.session.get(article_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
article_data = {
'url': article_url,
'title': '',
'author': '',
'publish_time': '',
'content': '',
'category': '',
'tags': []
}
# 提取标题(根据实际网站调整选择器)
title_elem = soup.find('h1') or soup.find(class_='title') or soup.find(id='title')
if title_elem:
article_data['title'] = title_elem.get_text(strip=True)
# 提取作者
author_elem = soup.find(class_='author') or soup.find(attrs={'itemprop': 'author'})
if author_elem:
article_data['author'] = author_elem.get_text(strip=True)
# 提取发布时间
time_elem = soup.find(class_='publish-time') or soup.find('time')
if time_elem:
time_text = time_elem.get_text(strip=True)
article_data['publish_time'] = self._parse_datetime(time_text)
# 提取内容
content_elem = soup.find(class_='content') or soup.find(attrs={'itemprop': 'articleBody'})
if content_elem:
# 清理内容,移除脚本、样式等
for script in content_elem.find_all(['script', 'style']):
script.decompose()
article_data['content'] = content_elem.get_text(strip=True)
# 提取分类
category_elem = soup.find(class_='category') or soup.find(attrs={'rel': 'category'})
if category_elem:
article_data['category'] = category_elem.get_text(strip=True)
# 提取标签
tags_container = soup.find(class_='tags') or soup.find(class_='keywords')
if tags_container:
tags = tags_container.find_all('a')
article_data['tags'] = [tag.get_text(strip=True) for tag in tags]
return article_data
except Exception as e:
print(f"解析文章失败 {article_url}: {e}")
return None
def _parse_datetime(self, time_text):
"""解析日期时间字符串"""
date_patterns = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%Y年%m月%d日 %H:%M',
'%Y/%m/%d %H:%M',
]
for pattern in date_patterns:
try:
return datetime.strptime(time_text, pattern).isoformat()
except ValueError:
continue
# 如果无法解析,返回原始文本
return time_text
def scrape_category(self, category_url, max_articles=10):
"""爬取整个分类的文章"""
print(f"开始爬取分类: {category_url}")
# 获取文章链接
article_links = self.fetch_article_links(category_url)
print(f"找到 {len(article_links)} 篇文章")
# 限制爬取数量
article_links = article_links[:max_articles]
# 爬取每篇文章
articles = []
for i, link in enumerate(article_links, 1):
print(f"正在爬取第 {i}/{len(article_links)} 篇文章: {link}")
article_data = self.parse_article(link)
if article_data:
articles.append(article_data)
# 避免请求过快
import time
time.sleep(1)
return articles
# 使用示例
if __name__ == "__main__":
# 创建爬虫实例
scraper = NewsScraper('https://news.example.com')
# 爬取新闻分类
news_articles = scraper.scrape_category(
'https://news.example.com/tech',
max_articles=5
)
print(f"\n爬取完成,共获取 {len(news_articles)} 篇文章")
# 保存数据
if news_articles:
with open('news_articles.json', 'w', encoding='utf-8') as f:
json.dump(news_articles, f, indent=2, ensure_ascii=False)
print("数据已保存到 news_articles.json")
# 显示部分数据
print("\n第一篇文章摘要:")
first_article = news_articles[0]
for key in ['title', 'author', 'publish_time']:
print(f"{key}: {first_article.get(key, 'N/A')}")
import requests
from bs4 import BeautifulSoup
import re
import json
import csv
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
class PriceMonitor:
"""电商网站价格监控"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9',
})
self.price_history = {}
def monitor_product(self, product_url, selector_config):
"""监控单个商品价格"""
try:
response = self.session.get(product_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
# 提取商品信息
product_info = {}
# 提取商品名称
if 'name_selector' in selector_config:
name_elem = soup.select_one(selector_config['name_selector'])
if name_elem:
product_info['name'] = name_elem.get_text(strip=True)
# 提取商品价格
if 'price_selector' in selector_config:
price_elem = soup.select_one(selector_config['price_selector'])
if price_elem:
price_text = price_elem.get_text(strip=True)
product_info['price'] = self._extract_price_number(price_text)
product_info['price_text'] = price_text
# 提取库存状态
if 'stock_selector' in selector_config:
stock_elem = soup.select_one(selector_config['stock_selector'])
if stock_elem:
product_info['stock'] = stock_elem.get_text(strip=True)
# 提取评分
if 'rating_selector' in selector_config:
rating_elem = soup.select_one(selector_config['rating_selector'])
if rating_elem:
product_info['rating'] = rating_elem.get_text(strip=True)
# 记录时间戳
product_info['check_time'] = datetime.now().isoformat()
product_info['url'] = product_url
return product_info
except Exception as e:
print(f"监控商品失败 {product_url}: {e}")
return None
def _extract_price_number(self, price_text):
"""从价格文本中提取数字"""
# 移除货币符号和千分位分隔符
clean_text = re.sub(r'[^\d.,]', '', price_text)
clean_text = clean_text.replace(',', '')
try:
# 尝试转换为浮点数
return float(clean_text)
except ValueError:
return None
def check_price_drop(self, product_url, current_price, threshold=0.9):
"""检查价格是否下降"""
if product_url not in self.price_history:
# 第一次检查,记录价格
self.price_history[product_url] = {
'last_price': current_price,
'lowest_price': current_price,
'price_changes': []
}
return False
history = self.price_history[product_url]
last_price = history['last_price']
# 记录价格变化
price_change = {
'time': datetime.now().isoformat(),
'price': current_price,
'change': current_price - last_price if last_price else 0,
'change_percent': ((current_price - last_price) / last_price * 100) if last_price else 0
}
history['price_changes'].append(price_change)
history['last_price'] = current_price
# 更新最低价格
if current_price < history['lowest_price']:
history['lowest_price'] = current_price
# 检查价格下降是否超过阈值
if last_price and current_price < last_price * threshold:
return True
return False
def save_to_csv(self, product_info, filename='price_history.csv'):
"""保存价格历史到CSV文件"""
fieldnames = ['check_time', 'name', 'price', 'price_text', 'stock', 'rating', 'url']
# 如果文件不存在,创建并写入表头
try:
with open(filename, 'r', encoding='utf-8') as f:
# 文件已存在
pass
except FileNotFoundError:
with open(filename, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
# 追加数据
with open(filename, 'a', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writerow(product_info)
def send_price_alert(self, product_info, old_price, new_price, recipient_email):
"""发送价格下降提醒邮件"""
subject = f"价格提醒: {product_info['name']} 降价了!"
message = f"""
商品名称: {product_info['name']}
商品链接: {product_info['url']}
价格变化:
原价: ¥{old_price}
现价: ¥{new_price}
降价: ¥{old_price - new_price} ({(old_price - new_price) / old_price * 100:.1f}%)
检查时间: {product_info['check_time']}
"""
# 发送邮件(需要配置SMTP)
try:
msg = MIMEText(message, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = 'price-monitor@example.com'
msg['To'] = recipient_email
# 使用SMTP发送邮件
# with smtplib.SMTP('smtp.example.com', 587) as server:
# server.starttls()
# server.login('username', 'password')
# server.send_message(msg)
print(f"价格提醒邮件已发送到 {recipient_email}")
except Exception as e:
print(f"发送邮件失败: {e}")
# 使用示例
if __name__ == "__main__":
monitor = PriceMonitor()
# 配置不同网站的商品选择器
products_to_monitor = [
{
'url': 'https://www.example.com/product/123',
'name': 'Python编程书',
'selectors': {
'name_selector': '.product-title',
'price_selector': '.price-special',
'stock_selector': '.stock-status',
'rating_selector': '.rating-value'
}
},
# 可以添加更多商品
]
print("开始价格监控...")
for product in products_to_monitor:
print(f"监控商品: {product['name']}")
# 获取当前价格
product_info = monitor.monitor_product(
product['url'],
product['selectors']
)
if product_info and 'price' in product_info:
print(f"当前价格: ¥{product_info['price']}")
# 检查价格是否下降
price_dropped = monitor.check_price_drop(
product['url'],
product_info['price'],
threshold=0.95 # 价格下降5%时提醒
)
if price_dropped:
print(f"价格下降! 发送提醒...")
# 发送提醒邮件
# monitor.send_price_alert(
# product_info,
# monitor.price_history[product['url']]['last_price'],
# product_info['price'],
# 'user@example.com'
# )
# 保存到CSV
monitor.save_to_csv(product_info)
# 避免请求过快
time.sleep(2)
print("监控完成")
许多现代网站使用JavaScript动态加载内容,Requests + BeautifulSoup无法直接获取这些动态生成的内容。
使用浏览器开发者工具分析页面加载的API请求,直接请求API获取数据。
使用Selenium控制真实浏览器,等待JavaScript执行完成后再获取页面内容。
pip install selenium微软开发的浏览器自动化工具,比Selenium更现代。
pip install playwrightKenneth Reitz开发的库,内置JavaScript执行引擎。
pip install requests-htmlfrom selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
import time
class JSPageScraper:
"""处理JavaScript页面的爬虫"""
def __init__(self, headless=True):
"""初始化Selenium驱动"""
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless') # 无头模式
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1080')
# 可以添加更多选项
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def load_page(self, url, wait_for=None):
"""加载页面并等待元素出现"""
print(f"加载页面: {url}")
self.driver.get(url)
if wait_for:
# 等待特定元素出现
if isinstance(wait_for, str):
# CSS选择器
self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, wait_for))
)
elif isinstance(wait_for, tuple):
# (By, selector) 元组
self.wait.until(
EC.presence_of_element_located(wait_for)
)
# 等待页面完全加载
time.sleep(2)
def get_page_source(self):
"""获取页面源代码(包含JavaScript渲染后的内容)"""
return self.driver.page_source
def parse_with_bs4(self):
"""使用BeautifulSoup解析当前页面"""
html = self.get_page_source()
return BeautifulSoup(html, 'lxml')
def click_and_wait(self, selector, wait_selector=None):
"""点击元素并等待"""
element = self.driver.find_element(By.CSS_SELECTOR, selector)
element.click()
if wait_selector:
self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector))
)
time.sleep(1)
def scroll_to_bottom(self):
"""滚动到页面底部(用于加载更多内容)"""
# 获取当前页面高度
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
# 滚动到底部
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 等待加载
time.sleep(2)
# 计算新的页面高度
new_height = self.driver.execute_script("return document.body.scrollHeight")
# 如果高度没有变化,停止滚动
if new_height == last_height:
break
last_height = new_height
def execute_js(self, script):
"""执行JavaScript代码"""
return self.driver.execute_script(script)
def extract_dynamic_content(self, url, content_selector):
"""提取动态加载的内容"""
# 加载页面
self.load_page(url, wait_for=content_selector)
# 如果需要,滚动加载更多内容
self.scroll_to_bottom()
# 使用BeautifulSoup解析
soup = self.parse_with_bs4()
# 提取内容
content_elements = soup.select(content_selector)
results = []
for element in content_elements:
# 提取所需数据
item_data = {
'text': element.get_text(strip=True),
'html': str(element),
'attributes': element.attrs
}
results.append(item_data)
return results
def close(self):
"""关闭浏览器驱动"""
self.driver.quit()
# 使用示例
if __name__ == "__main__":
scraper = JSPageScraper(headless=True)
try:
# 加载一个使用JavaScript渲染的页面
url = "https://example-spa.com/products" # 假设这是一个单页面应用
# 提取动态内容
products = scraper.extract_dynamic_content(
url,
content_selector='.product-item'
)
print(f"提取到 {len(products)} 个产品")
# 显示前几个产品
for i, product in enumerate(products[:3], 1):
print(f"\n产品 {i}:")
print(f"文本: {product['text'][:100]}...")
if 'class' in product['attributes']:
print(f"类名: {product['attributes']['class']}")
# 执行自定义JavaScript
# 例如:获取页面标题
page_title = scraper.execute_js("return document.title;")
print(f"\n页面标题: {page_title}")
# 获取页面中的链接数量
link_count = scraper.execute_js("return document.querySelectorAll('a').length;")
print(f"页面链接数量: {link_count}")
finally:
# 确保关闭浏览器
scraper.close()
import requests
import json
import re
class APIScraper:
"""通过分析API请求获取数据"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json',
})
def find_api_endpoints(self, url):
"""分析页面,查找API端点"""
try:
# 首先获取页面HTML
response = self.session.get(url)
soup = BeautifulSoup(response.text, 'lxml')
# 方法1: 查找包含API端点的script标签
api_endpoints = []
# 查找所有script标签
for script in soup.find_all('script'):
if script.string:
# 在JavaScript代码中查找API URL
# 常见的API URL模式
patterns = [
r'https?://[^"\']+?/api/[^"\']+',
r'https?://[^"\']+?/v\d+/[^"\']+',
r'fetch\(["\']([^"\']+)["\']\)',
r'axios\.(get|post|put|delete)\(["\']([^"\']+)["\']',
r'\.ajax\([^)]*url:\s*["\']([^"\']+)["\']',
]
for pattern in patterns:
matches = re.findall(pattern, script.string, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
# 对于有多个分组的正则表达式
api_url = match[1] if len(match) > 1 else match[0]
else:
api_url = match
if api_url not in api_endpoints:
api_endpoints.append(api_url)
# 方法2: 查找data属性中的API信息
data_attrs = soup.find_all(attrs={'data-api': True})
for elem in data_attrs:
api_url = elem['data-api']
if api_url not in api_endpoints:
api_endpoints.append(api_url)
return api_endpoints
except Exception as e:
print(f"查找API端点失败: {e}")
return []
def call_api(self, api_url, method='GET', params=None, data=None):
"""调用API接口"""
try:
if method.upper() == 'GET':
response = self.session.get(api_url, params=params, timeout=10)
elif method.upper() == 'POST':
response = self.session.post(api_url, json=data, timeout=10)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
response.raise_for_status()
# 尝试解析JSON响应
try:
return response.json()
except ValueError:
# 如果不是JSON,返回文本
return response.text
except Exception as e:
print(f"调用API失败 {api_url}: {e}")
return None
def scrape_via_api(self, page_url, api_pattern=None):
"""通过API获取数据"""
print(f"分析页面: {page_url}")
# 查找API端点
endpoints = self.find_api_endpoints(page_url)
print(f"找到 {len(endpoints)} 个API端点")
if api_pattern:
# 筛选匹配特定模式的API
endpoints = [ep for ep in endpoints if re.search(api_pattern, ep)]
print(f"匹配模式的API端点: {len(endpoints)}")
results = {}
for endpoint in endpoints[:5]: # 限制测试前5个
print(f"\n测试API: {endpoint}")
# 调用API
api_data = self.call_api(endpoint)
if api_data:
results[endpoint] = api_data
# 显示API响应摘要
if isinstance(api_data, dict):
print(f"响应类型: JSON对象 (键: {list(api_data.keys())})")
elif isinstance(api_data, list):
print(f"响应类型: JSON数组 (长度: {len(api_data)})")
else:
print(f"响应类型: {type(api_data).__name__}")
return results
# 使用示例
if __name__ == "__main__":
scraper = APIScraper()
# 示例:分析一个使用API加载数据的页面
page_url = "https://jsonplaceholder.typicode.com/" # 示例API网站
# 通过API获取数据
api_results = scraper.scrape_via_api(
page_url,
api_pattern=r'/posts' # 只获取posts相关的API
)
if api_results:
print(f"\n获取到 {len(api_results)} 个API的数据")
# 保存第一个API的数据
first_api = list(api_results.keys())[0]
data = api_results[first_api]
if isinstance(data, list) and len(data) > 0:
print(f"\n第一个API的示例数据:")
print(f"API端点: {first_api}")
print(f"数据条数: {len(data)}")
# 显示第一条数据
first_item = data[0]
print(f"\n第一条数据:")
for key, value in list(first_item.items())[:5]: # 只显示前5个字段
print(f" {key}: {value}")
# 保存到文件
with open('api_data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"\n数据已保存到 api_data.json")
遵守网站的爬虫协议,避免爬取禁止的内容。
在请求之间添加延迟,避免给服务器造成过大压力。
import time
time.sleep(1) # 延迟1秒
设置合理的User-Agent,模拟真实浏览器。
headers = {
'User-Agent': '合理的UA'
}
完善的错误处理机制,处理网络异常和解析错误。
try:
response = requests.get(url)
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
及时保存爬取的数据,防止数据丢失。
# 保存到JSON
import json
with open('data.json', 'w') as f:
json.dump(data, f)
使用Session对象复用连接,提高性能。
session = requests.Session()
session.get(url1)
session.get(url2) # 复用连接
import requests
from bs4 import BeautifulSoup
import time
import json
import logging
from urllib.parse import urljoin, urlparse
from datetime import datetime
import random
class ProductionScraper:
"""生产环境级别的爬虫框架"""
def __init__(self, config):
self.config = config
self.setup_logging()
self.session = self.create_session()
self.visited_urls = set()
self.data_buffer = []
def setup_logging(self):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('scraper.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def create_session(self):
"""创建配置好的Session"""
session = requests.Session()
# 配置请求头
headers = {
'User-Agent': self.config.get('user_agent',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
session.headers.update(headers)
# 配置代理(如果需要)
if 'proxy' in self.config:
session.proxies.update(self.config['proxy'])
return session
def get_with_retry(self, url, max_retries=3):
"""带重试机制的GET请求"""
for attempt in range(max_retries):
try:
# 随机延迟,避免模式化请求
delay = random.uniform(self.config.get('min_delay', 1),
self.config.get('max_delay', 3))
time.sleep(delay)
response = self.session.get(
url,
timeout=self.config.get('timeout', 30),
allow_redirects=True
)
response.raise_for_status()
# 检查响应内容类型
content_type = response.headers.get('content-type', '')
if 'html' not in content_type.lower() and 'text' not in content_type.lower():
self.logger.warning(f"非HTML响应: {content_type}")
return response
except requests.exceptions.RequestException as e:
self.logger.warning(f"请求失败 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt == max_retries - 1:
self.logger.error(f"所有重试都失败: {url}")
raise
# 指数退避
time.sleep(2 ** attempt)
def parse_page(self, response):
"""解析页面内容"""
try:
# 设置编码
response.encoding = response.apparent_encoding
# 使用BeautifulSoup解析
soup = BeautifulSoup(response.text, self.config.get('parser', 'lxml'))
# 提取数据
data = self.extract_data(soup, response.url)
# 提取链接(用于进一步爬取)
links = self.extract_links(soup, response.url)
return {
'data': data,
'links': links,
'url': response.url,
'status_code': response.status_code,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"解析页面失败 {response.url}: {e}")
return None
def extract_data(self, soup, url):
"""提取数据(需要子类实现)"""
# 这是一个示例实现,实际使用时需要根据具体网站重写
data = {
'title': soup.title.string if soup.title else '',
'url': url,
'text_length': len(soup.get_text()),
'links_count': len(soup.find_all('a')),
}
return data
def extract_links(self, soup, base_url):
"""提取页面中的链接"""
links = []
for a in soup.find_all('a', href=True):
href = a['href']
# 跳过非HTTP链接
if href.startswith(('javascript:', 'mailto:', 'tel:', '#')):
continue
# 处理相对链接
full_url = urljoin(base_url, href)
# 规范化URL
parsed = urlparse(full_url)
normalized = parsed._replace(fragment='', params='')
# 只保留特定域名的链接
if self.config.get('allowed_domains'):
if parsed.netloc not in self.config['allowed_domains']:
continue
links.append(normalized.geturl())
return links
def save_data(self, data, filename=None):
"""保存数据"""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'data_{timestamp}.json'
try:
with open(filename, 'a', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
f.write('\n')
self.logger.info(f"数据已保存到 {filename}")
except Exception as e:
self.logger.error(f"保存数据失败: {e}")
def run(self, start_urls):
"""运行爬虫"""
self.logger.info(f"开始爬取,初始URL: {len(start_urls)} 个")
urls_to_visit = list(start_urls)
max_pages = self.config.get('max_pages', 100)
pages_crawled = 0
while urls_to_visit and pages_crawled < max_pages:
url = urls_to_visit.pop(0)
# 检查是否已经访问过
if url in self.visited_urls:
continue
self.logger.info(f"爬取 ({pages_crawled+1}/{max_pages}): {url}")
try:
# 获取页面
response = self.get_with_retry(url)
# 解析页面
result = self.parse_page(response)
if result:
# 保存数据
self.save_data(result['data'])
# 添加到已访问列表
self.visited_urls.add(url)
# 添加新链接到待访问列表
for link in result['links']:
if link not in self.visited_urls and link not in urls_to_visit:
urls_to_visit.append(link)
pages_crawled += 1
except Exception as e:
self.logger.error(f"处理URL失败 {url}: {e}")
self.logger.info(f"爬取完成,共爬取 {pages_crawled} 个页面")
# 配置示例
config = {
'user_agent': 'MyScraper/1.0 (+https://example.com/bot)',
'parser': 'lxml',
'timeout': 30,
'min_delay': 1,
'max_delay': 3,
'max_pages': 50,
'allowed_domains': ['example.com', 'www.example.com'],
}
# 使用示例
if __name__ == "__main__":
scraper = ProductionScraper(config)
# 开始爬取
scraper.run(['https://example.com'])
爬取电影网站的信息,包括电影名称、评分、导演、演员、简介等,并保存到数据库。
import requests
from bs4 import BeautifulSoup
import sqlite3
import json
import time
import re
from datetime import datetime
from urllib.parse import urljoin, urlparse
import logging
class MovieScraper:
"""电影信息爬虫"""
def __init__(self, db_path='movies.db'):
self.base_url = 'https://movie.example.com' # 示例网站
self.session = requests.Session()
self.setup_session()
self.setup_database(db_path)
self.setup_logging()
# 爬取统计
self.stats = {
'total_crawled': 0,
'successful': 0,
'failed': 0,
'start_time': datetime.now()
}
def setup_logging(self):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('movie_scraper.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def setup_session(self):
"""配置Session"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.google.com/',
}
self.session.headers.update(headers)
# 设置超时
self.session.request = lambda method, url, **kwargs: (
self.session.request(method, url, timeout=(10, 30), **kwargs)
)
def setup_database(self, db_path):
"""设置数据库"""
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()
# 创建电影表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS movies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
original_title TEXT,
year INTEGER,
rating REAL,
rating_count INTEGER,
director TEXT,
actors TEXT, -- JSON数组
genres TEXT, -- JSON数组
duration TEXT,
release_date TEXT,
country TEXT,
language TEXT,
summary TEXT,
poster_url TEXT,
detail_url TEXT UNIQUE,
crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建爬取日志表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS crawl_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
status_code INTEGER,
success BOOLEAN,
error_message TEXT,
crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def fetch_movie_list(self, page=1):
"""获取电影列表页"""
list_url = f'{self.base_url}/list?page={page}'
try:
response = self.session.get(list_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
# 提取电影链接
movie_links = []
# 根据实际网站结构调整选择器
movie_items = soup.select('.movie-item')
for item in movie_items:
link_elem = item.select_one('a.movie-link')
if link_elem and link_elem.get('href'):
movie_url = urljoin(self.base_url, link_elem['href'])
movie_links.append(movie_url)
# 如果没有找到特定选择器的元素,尝试其他方法
if not movie_links:
# 查找所有包含/movie/的链接
all_links = soup.find_all('a', href=re.compile(r'/movie/\d+'))
for link in all_links:
movie_url = urljoin(self.base_url, link['href'])
movie_links.append(movie_url)
return movie_links
except Exception as e:
self.logger.error(f"获取电影列表失败 (第{page}页): {e}")
return []
def parse_movie_detail(self, movie_url):
"""解析电影详情页"""
try:
response = self.session.get(movie_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'lxml')
movie_data = {
'detail_url': movie_url,
'title': '',
'original_title': '',
'year': None,
'rating': None,
'rating_count': None,
'director': '',
'actors': [],
'genres': [],
'duration': '',
'release_date': '',
'country': '',
'language': '',
'summary': '',
'poster_url': ''
}
# 提取电影标题
title_elem = soup.select_one('h1.movie-title')
if title_elem:
movie_data['title'] = title_elem.get_text(strip=True)
# 提取原始标题
original_elem = soup.select_one('.original-title')
if original_elem:
movie_data['original_title'] = original_elem.get_text(strip=True)
# 提取年份
year_match = re.search(r'(\d{4})', movie_data.get('title', ''))
if year_match:
movie_data['year'] = int(year_match.group(1))
# 提取评分
rating_elem = soup.select_one('.rating-value')
if rating_elem:
try:
movie_data['rating'] = float(rating_elem.get_text(strip=True))
except ValueError:
pass
# 提取评分人数
rating_count_elem = soup.select_one('.rating-count')
if rating_count_elem:
count_text = rating_count_elem.get_text(strip=True)
count_match = re.search(r'(\d+)', count_text.replace(',', ''))
if count_match:
movie_data['rating_count'] = int(count_match.group(1))
# 提取导演
director_elem = soup.select_one('.director')
if director_elem:
movie_data['director'] = director_elem.get_text(strip=True)
# 提取演员
actor_elems = soup.select('.actor-list .actor')
for actor_elem in actor_elems:
actor_name = actor_elem.get_text(strip=True)
if actor_name:
movie_data['actors'].append(actor_name)
# 提取类型
genre_elems = soup.select('.genre-tag')
for genre_elem in genre_elems:
genre = genre_elem.get_text(strip=True)
if genre:
movie_data['genres'].append(genre)
# 提取时长
duration_elem = soup.select_one('.duration')
if duration_elem:
movie_data['duration'] = duration_elem.get_text(strip=True)
# 提取上映日期
release_elem = soup.select_one('.release-date')
if release_elem:
movie_data['release_date'] = release_elem.get_text(strip=True)
# 提取国家/地区
country_elem = soup.select_one('.country')
if country_elem:
movie_data['country'] = country_elem.get_text(strip=True)
# 提取语言
language_elem = soup.select_one('.language')
if language_elem:
movie_data['language'] = language_elem.get_text(strip=True)
# 提取简介
summary_elem = soup.select_one('.summary')
if summary_elem:
movie_data['summary'] = summary_elem.get_text(strip=True)
# 提取海报URL
poster_elem = soup.select_one('.movie-poster img')
if poster_elem and poster_elem.get('src'):
poster_url = poster_elem['src']
if not poster_url.startswith('http'):
poster_url = urljoin(self.base_url, poster_url)
movie_data['poster_url'] = poster_url
return movie_data
except Exception as e:
self.logger.error(f"解析电影详情失败 {movie_url}: {e}")
# 记录错误到数据库
self.log_crawl_result(movie_url, None, False, str(e))
return None
def save_movie_to_db(self, movie_data):
"""保存电影数据到数据库"""
try:
# 转换列表为JSON字符串
actors_json = json.dumps(movie_data.get('actors', []), ensure_ascii=False)
genres_json = json.dumps(movie_data.get('genres', []), ensure_ascii=False)
# 检查电影是否已存在
self.cursor.execute(
'SELECT id FROM movies WHERE detail_url = ?',
(movie_data['detail_url'],)
)
if self.cursor.fetchone():
# 更新现有记录
self.cursor.execute('''
UPDATE movies SET
title = ?, original_title = ?, year = ?, rating = ?,
rating_count = ?, director = ?, actors = ?, genres = ?,
duration = ?, release_date = ?, country = ?,
language = ?, summary = ?, poster_url = ?
WHERE detail_url = ?
''', (
movie_data.get('title'),
movie_data.get('original_title'),
movie_data.get('year'),
movie_data.get('rating'),
movie_data.get('rating_count'),
movie_data.get('director'),
actors_json,
genres_json,
movie_data.get('duration'),
movie_data.get('release_date'),
movie_data.get('country'),
movie_data.get('language'),
movie_data.get('summary'),
movie_data.get('poster_url'),
movie_data['detail_url']
))
else:
# 插入新记录
self.cursor.execute('''
INSERT INTO movies (
title, original_title, year, rating, rating_count,
director, actors, genres, duration, release_date,
country, language, summary, poster_url, detail_url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
movie_data.get('title'),
movie_data.get('original_title'),
movie_data.get('year'),
movie_data.get('rating'),
movie_data.get('rating_count'),
movie_data.get('director'),
actors_json,
genres_json,
movie_data.get('duration'),
movie_data.get('release_date'),
movie_data.get('country'),
movie_data.get('language'),
movie_data.get('summary'),
movie_data.get('poster_url'),
movie_data['detail_url']
))
self.conn.commit()
# 记录成功的爬取
self.log_crawl_result(
movie_data['detail_url'],
200,
True,
None
)
self.stats['successful'] += 1
return True
except Exception as e:
self.logger.error(f"保存电影数据失败: {e}")
# 记录失败的爬取
self.log_crawl_result(
movie_data.get('detail_url', 'unknown'),
None,
False,
str(e)
)
self.stats['failed'] += 1
return False
def log_crawl_result(self, url, status_code, success, error_message):
"""记录爬取结果到数据库"""
try:
self.cursor.execute('''
INSERT INTO crawl_log (url, status_code, success, error_message)
VALUES (?, ?, ?, ?)
''', (url, status_code, success, error_message))
self.conn.commit()
except Exception as e:
self.logger.error(f"记录爬取日志失败: {e}")
def crawl_movies(self, start_page=1, max_pages=10, delay=2):
"""爬取电影数据"""
self.logger.info(f"开始爬取电影数据,从第{start_page}页开始,最多{max_pages}页")
for page in range(start_page, start_page + max_pages):
self.logger.info(f"正在爬取第{page}页...")
# 获取电影列表
movie_links = self.fetch_movie_list(page)
if not movie_links:
self.logger.warning(f"第{page}页没有找到电影链接")
break
self.logger.info(f"第{page}页找到 {len(movie_links)} 部电影")
# 爬取每部电影的详情
for i, movie_url in enumerate(movie_links, 1):
self.stats['total_crawled'] += 1
self.logger.info(f"正在爬取电影 {i}/{len(movie_links)}: {movie_url}")
# 解析电影详情
movie_data = self.parse_movie_detail(movie_url)
if movie_data:
# 保存到数据库
success = self.save_movie_to_db(movie_data)
if success:
self.logger.info(f"成功保存: {movie_data.get('title', '未知标题')}")
else:
self.logger.error(f"保存失败: {movie_url}")
else:
self.logger.error(f"解析失败: {movie_url}")
self.stats['failed'] += 1
# 延迟,避免请求过快
time.sleep(delay)
# 页面间延迟
time.sleep(delay * 2)
# 打印统计信息
self.print_stats()
def print_stats(self):
"""打印爬取统计信息"""
elapsed = datetime.now() - self.stats['start_time']
print("\n" + "="*50)
print("爬取统计")
print("="*50)
print(f"开始时间: {self.stats['start_time']}")
print(f"总耗时: {elapsed}")
print(f"尝试爬取总数: {self.stats['total_crawled']}")
print(f"成功数: {self.stats['successful']}")
print(f"失败数: {self.stats['failed']}")
if self.stats['total_crawled'] > 0:
success_rate = self.stats['successful'] / self.stats['total_crawled'] * 100
print(f"成功率: {success_rate:.1f}%")
# 从数据库获取更多统计
self.cursor.execute('SELECT COUNT(*) FROM movies')
total_movies = self.cursor.fetchone()[0]
print(f"数据库中的电影总数: {total_movies}")
self.cursor.execute('SELECT COUNT(DISTINCT url) FROM crawl_log WHERE success = 1')
successful_crawls = self.cursor.fetchone()[0]
print(f"成功爬取的唯一URL数: {successful_crawls}")
def export_to_json(self, filename='movies_export.json'):
"""导出数据到JSON文件"""
try:
self.cursor.execute('SELECT * FROM movies')
movies = self.cursor.fetchall()
# 获取列名
column_names = [description[0] for description in self.cursor.description]
# 转换为字典列表
movies_dict = []
for movie in movies:
movie_dict = dict(zip(column_names, movie))
# 解析JSON字段
if movie_dict.get('actors'):
movie_dict['actors'] = json.loads(movie_dict['actors'])
if movie_dict.get('genres'):
movie_dict['genres'] = json.loads(movie_dict['genres'])
movies_dict.append(movie_dict)
# 保存到文件
with open(filename, 'w', encoding='utf-8') as f:
json.dump(movies_dict, f, ensure_ascii=False, indent=2)
self.logger.info(f"已导出 {len(movies_dict)} 部电影到 {filename}")
except Exception as e:
self.logger.error(f"导出数据失败: {e}")
def close(self):
"""关闭数据库连接"""
self.conn.close()
self.logger.info("数据库连接已关闭")
# 使用示例
if __name__ == "__main__":
# 创建爬虫实例
scraper = MovieScraper(db_path='movies_data.db')
try:
# 开始爬取
scraper.crawl_movies(
start_page=1,
max_pages=3, # 爬取前3页
delay=1 # 1秒延迟
)
# 导出数据
scraper.export_to_json('movies.json')
# 显示示例数据
scraper.cursor.execute('SELECT title, rating, director FROM movies LIMIT 5')
sample_movies = scraper.cursor.fetchall()
print("\n示例数据:")
for movie in sample_movies:
print(f"标题: {movie[0]}, 评分: {movie[1]}, 导演: {movie[2]}")
finally:
# 确保关闭连接
scraper.close()
Requests 结合 BeautifulSoup 是 Python 网络爬虫和数据抓取的黄金组合。关键要点:
掌握这个组合,你将能够从互联网上获取和处理各种数据,为数据分析、机器学习、市场研究等应用提供数据支持。