当处理大量网页数据时,性能成为关键因素。本章将详细介绍BeautifulSoup的性能优化技巧,帮助你提升网页解析速度、降低内存占用,构建高效的爬虫程序。
在优化之前,先了解如何测试和衡量性能:
import time
import tracemalloc
from bs4 import BeautifulSoup
def performance_benchmark(html_content, parser='lxml', iterations=10):
"""BeautifulSoup性能基准测试"""
print(f"性能测试 - 解析器: {parser}, 迭代次数: {iterations}")
print("=" * 50)
# 时间性能测试
start_time = time.time()
for i in range(iterations):
soup = BeautifulSoup(html_content, parser)
# 模拟常见操作
soup.find_all('div')
soup.find_all('a')
soup.find_all('p')
end_time = time.time()
avg_time = (end_time - start_time) / iterations
print(f"平均解析时间: {avg_time:.4f} 秒")
# 内存性能测试
tracemalloc.start()
soups = []
for i in range(10):
soup = BeautifulSoup(html_content, parser)
soups.append(soup)
current, peak = tracemalloc.get_traced_memory()
print(f"当前内存使用: {current / 10**6:.2f} MB")
print(f"峰值内存使用: {peak / 10**6:.2f} MB")
tracemalloc.stop()
# 清理
del soups
return avg_time, peak
# 生成测试HTML
test_html = "<html><body>" + "<div><p>测试内容</p><a href='#'>链接</a></div>" * 1000 + "</body></html>"
# 测试不同解析器
parsers = ['lxml', 'html.parser', 'html5lib']
results = {}
for parser in parsers:
try:
avg_time, peak_memory = performance_benchmark(test_html, parser, iterations=5)
results[parser] = {'time': avg_time, 'memory': peak_memory}
except Exception as e:
print(f"解析器 {parser} 测试失败: {e}")
print("\n=== 性能对比 ===")
for parser, metrics in results.items():
print(f"{parser}: {metrics['time']:.4f}秒, {metrics['memory']/10**6:.2f}MB")
| 解析器 | 速度 | 内存使用 | 容错性 | 适用场景 |
|---|---|---|---|---|
| lxml | 非常快 | 低 | 中等 | 大型文档、性能敏感项目 |
| html.parser | 中等 | 中等 | 中等 | 简单项目、无外部依赖 |
| html5lib | 慢 | 高 | 非常好 | 不规范HTML、浏览器兼容性要求高 |
def smart_parser_selection(html_content, prefer_speed=True):
"""根据需求智能选择解析器"""
if prefer_speed:
# 优先选择速度快的解析器
try:
from lxml import etree
return 'lxml'
except ImportError:
return 'html.parser'
else:
# 优先选择容错性好的解析器
try:
import html5lib
return 'html5lib'
except ImportError:
try:
from lxml import etree
return 'lxml'
except ImportError:
return 'html.parser'
def adaptive_parsing(html_content):
"""自适应解析策略"""
# 根据HTML大小选择解析器
html_size = len(html_content)
if html_size < 10 * 1024: # 小于10KB
# 小文件,使用html.parser即可
parser = 'html.parser'
elif html_size < 100 * 1024: # 小于100KB
# 中等文件,优先使用lxml
try:
parser = 'lxml'
except:
parser = 'html.parser'
else:
# 大文件,必须使用lxml
parser = 'lxml'
print(f"HTML大小: {html_size/1024:.1f}KB, 选择解析器: {parser}")
return BeautifulSoup(html_content, parser)
# 使用示例
html_small = "<html><body><p>小页面</p></body></html>"
html_large = "<html><body>" + "<div>内容</div>" * 5000 + "</body></html>"
soup_small = adaptive_parsing(html_small)
soup_large = adaptive_parsing(html_large)
import gc
from bs4 import BeautifulSoup
class MemoryOptimizedScraper:
"""内存优化的爬虫类"""
def __init__(self):
self.soups = [] # 用于追踪创建的soup对象
def parse_with_memory_control(self, html_content):
"""控制内存使用的解析方法"""
# 解析HTML
soup = BeautifulSoup(html_content, 'lxml')
# 提取需要的数据
data = self.extract_data(soup)
# 立即删除soup对象
del soup
# 强制垃圾回收(谨慎使用)
if len(self.soups) % 100 == 0:
gc.collect()
return data
def extract_data(self, soup):
"""提取数据,避免保持对soup的引用"""
# 提取为原始数据类型,而不是保持BeautifulSoup对象
data = {
'title': soup.title.string if soup.title else None,
'links': [
{
'text': a.text,
'href': a.get('href', '')
}
for a in soup.find_all('a', href=True)[:10] # 限制数量
],
'text_sample': soup.get_text()[:200] # 只保留文本样本
}
return data
def batch_processing(self, html_list):
"""批量处理HTML,控制内存使用"""
results = []
for i, html in enumerate(html_list):
if i % 50 == 0:
print(f"处理第 {i} 个文件,当前内存使用: {self.get_memory_usage():.2f}MB")
data = self.parse_with_memory_control(html)
results.append(data)
# 每处理100个文件,清理一次内存
if i % 100 == 0:
self.cleanup_memory()
return results
def get_memory_usage(self):
"""获取内存使用情况"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # 返回MB
def cleanup_memory(self):
"""清理内存"""
# 删除不再需要的对象
if hasattr(self, 'temp_soups'):
del self.temp_soups
# 调用垃圾回收
gc.collect()
def __del__(self):
"""析构函数,清理资源"""
self.cleanup_memory()
# 使用示例
scraper = MemoryOptimizedScraper()
# 模拟处理多个HTML文件
html_files = ["<html><body><p>页面 {i}</p></body></html>" for i in range(500)]
results = scraper.batch_processing(html_files[:10]) # 只处理前10个作为示例
print(f"处理完成,得到 {len(results)} 个结果")
from bs4 import BeautifulSoup
class StreamParser:
"""流式解析器,适用于大文件"""
def parse_large_html(self, file_path, chunk_size=1024*1024): # 1MB chunks
"""分块解析大型HTML文件"""
with open(file_path, 'r', encoding='utf-8') as f:
buffer = ""
while True:
chunk = f.read(chunk_size)
if not chunk:
break
buffer += chunk
# 尝试找到完整的HTML边界
# 这里简化处理,实际应用中需要更复杂的边界检测
if "</html>" in buffer:
# 解析完整的HTML
soup = BeautifulSoup(buffer, 'lxml')
yield soup
buffer = ""
# 或者按标签分块处理
elif "</div>" in chunk:
# 处理包含完整div的块
parts = buffer.split("</div>")
for i in range(len(parts)-1):
html_part = parts[i] + "</div>"
if html_part.strip():
soup = BeautifulSoup(html_part, 'lxml')
yield soup
buffer = parts[-1] # 保留未处理的部分
def extract_data_stream(self, file_path):
"""流式提取数据"""
for soup in self.parse_large_html(file_path):
# 提取当前块的数据
data = self.extract_from_chunk(soup)
if data:
yield data
def extract_from_chunk(self, soup):
"""从HTML块中提取数据"""
# 这里实现具体的数据提取逻辑
return {
'titles': [h.text for h in soup.find_all(['h1', 'h2', 'h3'])],
'links': [a.get('href', '') for a in soup.find_all('a', href=True)]
}
# 使用生成器处理数据
def process_html_generator(html_list):
"""使用生成器处理HTML列表"""
for html in html_list:
soup = BeautifulSoup(html, 'lxml')
# 使用生成器表达式而不是列表推导式
links_generator = (a.get('href', '') for a in soup.find_all('a', href=True))
# 逐个处理链接,不保存完整列表
for link in links_generator:
if link: # 处理非空链接
yield {
'html': html[:50] + '...', # 只保留部分HTML
'link': link
}
# 使用示例
html_list = [
f"<html><body><a href='page{i}.html'>链接{i}</a></body></html>"
for i in range(1000)
]
# 流式处理,内存友好
for result in process_html_generator(html_list[:10]):
print(f"处理: {result['link']}")
from bs4 import BeautifulSoup
import time
class OptimizedFinder:
"""优化查找操作的工具类"""
def __init__(self, soup):
self.soup = soup
self.cache = {} # 简单的查询缓存
def find_with_cache(self, selector, force_refresh=False):
"""带缓存的查找"""
if not force_refresh and selector in self.cache:
return self.cache[selector]
result = self.soup.select(selector)
self.cache[selector] = result
return result
def benchmark_find_methods(self, html):
"""对比不同查找方法的性能"""
soup = BeautifulSoup(html, 'lxml')
# 测试数据
test_cases = [
("find_all('div')", lambda: soup.find_all('div')),
("select('div')", lambda: soup.select('div')),
("find_all(class_='content')", lambda: soup.find_all(class_='content')),
("select('.content')", lambda: soup.select('.content')),
("find_all(attrs={'id': 'main'})", lambda: soup.find_all(attrs={'id': 'main'})),
("select('#main')", lambda: soup.select('#main')),
]
results = {}
for name, func in test_cases:
start_time = time.time()
for _ in range(100): # 重复多次获得稳定结果
func()
elapsed = time.time() - start_time
results[name] = elapsed
print(f"{name}: {elapsed:.4f}秒")
return results
def optimized_find_chain(self, *selectors):
"""优化链式查找"""
# 错误的链式查找:每次都从头开始
# result = soup.find('div').find('ul').find_all('li')
# 优化的链式查找:缓存中间结果
current = self.soup
for selector in selectors:
if isinstance(selector, str):
if selector.startswith(('.', '#')):
# CSS选择器
current = current.select_one(selector)
else:
# 标签名
current = current.find(selector)
elif isinstance(selector, dict):
# 属性查找
current = current.find(attrs=selector)
else:
# 其他类型的选择器
current = current.find(selector)
if current is None:
return None
return current
def batch_find_elements(self, elements, selector):
"""批量查找元素,减少重复遍历"""
# 传统方法:逐个查找
# results = []
# for element in elements:
# results.extend(element.find_all(selector))
# 优化方法:先收集所有元素,再统一查找
all_results = []
for element in elements:
all_results.append(element)
# 这里可以进一步优化,比如使用多线程
return all_results
# 性能对比示例
html_content = "<div>" + "<p class='content'>段落</p>" * 100 + "</div>"
finder = OptimizedFinder(BeautifulSoup(html_content, 'lxml'))
print("=== 查找方法性能对比 ===")
finder.benchmark_find_methods(html_content)
def optimize_css_selectors():
"""CSS选择器优化技巧"""
# 1. 使用更具体的选择器
# 慢:过于宽泛
slow = soup.select('div')
# 快:更具体
fast = soup.select('div.content > p.intro')
# 2. 使用ID选择器(最快)
fastest = soup.select_one('#main-content')
# 3. 避免过度嵌套
# 慢:过度嵌套
slow_nested = soup.select('body > div > main > section > article > div > p')
# 快:简化选择器
fast_simple = soup.select('article p')
# 4. 使用属性选择器优化
# 慢:使用正则表达式属性选择器
slow_attr = soup.select('[href*="example"]')
# 快:使用精确匹配
fast_attr = soup.select('[href="https://example.com"]')
# 5. 限制查找范围
# 慢:在整个文档中查找
slow_global = soup.select('a.external')
# 快:在特定区域内查找
content_div = soup.select_one('#content')
if content_div:
fast_scoped = content_div.select('a.external')
return {
'tip': 'CSS选择器优化',
'rules': [
'优先使用ID选择器',
'避免过度嵌套',
'使用更具体的选择器',
'限制查找范围'
]
}
# CSS选择器性能测试函数
def css_selector_performance_test(soup):
"""测试不同CSS选择器的性能"""
test_selectors = [
('#id-selector', '#main'),
('.class-selector', '.content'),
('tag-selector', 'div'),
('nested-selector', 'div.container > div.row > div.col'),
('attribute-selector', '[data-id]'),
('complex-selector', 'div.content p.intro a.external[target="_blank"]'),
]
results = {}
for name, selector in test_selectors:
start_time = time.time()
for _ in range(100):
soup.select(selector)
elapsed = time.time() - start_time
results[name] = elapsed
print(f"{name}: {elapsed:.4f}秒")
return results
import concurrent.futures
from bs4 import BeautifulSoup
import time
from queue import Queue
import threading
class ConcurrentParser:
"""并发解析器"""
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
def parse_multiple_htmls(self, html_list):
"""并发解析多个HTML"""
print(f"开始并发解析 {len(html_list)} 个HTML文件")
print(f"使用线程数: {self.max_workers}")
start_time = time.time()
# 使用线程池提交任务
future_to_html = {
self.executor.submit(self.parse_single_html, html): html
for html in html_list
}
results = []
completed = 0
# 收集结果
for future in concurrent.futures.as_completed(future_to_html):
html = future_to_html[future]
try:
result = future.result()
results.append(result)
completed += 1
# 显示进度
if completed % 10 == 0:
print(f"已完成 {completed}/{len(html_list)}")
except Exception as e:
print(f"解析HTML出错: {e}")
end_time = time.time()
print(f"并发解析完成,耗时: {end_time - start_time:.2f}秒")
print(f"解析成功: {len(results)}/{len(html_list)}")
return results
def parse_single_html(self, html):
"""解析单个HTML"""
soup = BeautifulSoup(html, 'lxml')
# 提取数据
data = {
'title': soup.title.string if soup.title else None,
'h1_count': len(soup.find_all('h1')),
'link_count': len(soup.find_all('a')),
'text_length': len(soup.get_text())
}
# 模拟处理时间
time.sleep(0.01)
return data
def __del__(self):
"""清理资源"""
self.executor.shutdown(wait=True)
# 生产者-消费者模式
class ProducerConsumerParser:
"""生产者-消费者模式解析器"""
def __init__(self, num_consumers=4):
self.queue = Queue(maxsize=100)
self.num_consumers = num_consumers
self.consumers = []
self.results = []
self.lock = threading.Lock()
def produce(self, html_list):
"""生产任务"""
for i, html in enumerate(html_list):
self.queue.put((i, html))
# 添加结束标记
for _ in range(self.num_consumers):
self.queue.put(None)
def consume(self):
"""消费任务"""
while True:
item = self.queue.get()
if item is None:
break
idx, html = item
try:
soup = BeautifulSoup(html, 'lxml')
result = {
'id': idx,
'title': soup.title.string if soup.title else None,
'success': True
}
except Exception as e:
result = {
'id': idx,
'error': str(e),
'success': False
}
with self.lock:
self.results.append(result)
self.queue.task_done()
def parse_with_producer_consumer(self, html_list):
"""使用生产者-消费者模式解析"""
print(f"开始生产者-消费者模式解析,HTML数量: {len(html_list)}")
start_time = time.time()
# 启动消费者线程
for i in range(self.num_consumers):
consumer = threading.Thread(target=self.consume)
consumer.start()
self.consumers.append(consumer)
# 启动生产者线程
producer = threading.Thread(target=self.produce, args=(html_list,))
producer.start()
# 等待所有任务完成
self.queue.join()
# 等待所有线程结束
producer.join()
for consumer in self.consumers:
consumer.join()
end_time = time.time()
print(f"解析完成,耗时: {end_time - start_time:.2f}秒")
# 按ID排序结果
self.results.sort(key=lambda x: x['id'])
return self.results
# 使用示例
html_list = [f"<html><head><title>页面{i}</title></head><body><h1>标题{i}</h1></body></html>" for i in range(100)]
# 使用线程池
parser = ConcurrentParser(max_workers=8)
results = parser.parse_multiple_htmls(html_list[:20]) # 只测试20个
print(f"解析完成 {len(results)} 个页面")
# 使用生产者-消费者模式
pc_parser = ProducerConsumerParser(num_consumers=4)
pc_results = pc_parser.parse_with_producer_consumer(html_list[:20])
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
class AsyncParser:
"""异步解析器"""
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_html(self, session, url):
"""异步获取HTML"""
async with self.semaphore:
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
async def parse_html(self, html):
"""解析HTML(CPU密集型,考虑使用线程池)"""
# 由于BeautifulSoup是CPU密集型的,使用线程池避免阻塞事件循环
loop = asyncio.get_event_loop()
soup = await loop.run_in_executor(
None, # 使用默认的线程池执行器
BeautifulSoup, html, 'lxml'
)
# 提取数据
data = {
'title': soup.title.string if soup.title else None,
'links': len(soup.find_all('a')),
'timestamp': time.time()
}
return data
async def process_url(self, session, url):
"""处理单个URL"""
html = await self.fetch_html(session, url)
if html:
data = await self.parse_html(html)
return {'url': url, 'data': data}
return {'url': url, 'error': '获取失败'}
async def process_urls(self, urls):
"""批量处理URLs"""
print(f"开始异步处理 {len(urls)} 个URL")
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [self.process_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print(f"异步处理完成,耗时: {end_time - start_time:.2f}秒")
# 过滤异常结果
valid_results = []
for result in results:
if not isinstance(result, Exception):
valid_results.append(result)
return valid_results
# 使用示例
async def run_async_example():
"""运行异步示例"""
urls = [
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
] * 3 # 重复3次以增加任务数量
parser = AsyncParser(max_concurrent=5)
results = await parser.process_urls(urls)
print(f"成功处理 {len(results)} 个URL")
for result in results[:3]: # 显示前3个结果
print(f"URL: {result.get('url')}, 标题: {result.get('data', {}).get('title', '无')}")
return results
# 注意:在Jupyter或IPython中可以直接await
# 在普通Python脚本中需要:
# results = asyncio.run(run_async_example())
import hashlib
import pickle
import os
from functools import lru_cache
from bs4 import BeautifulSoup
class HtmlCache:
"""HTML缓存管理器"""
def __init__(self, cache_dir='.html_cache'):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def get_cache_key(self, url, params=None):
"""生成缓存键"""
key_string = url
if params:
key_string += str(sorted(params.items()))
return hashlib.md5(key_string.encode()).hexdigest()
def get_cache_path(self, cache_key):
"""获取缓存文件路径"""
return os.path.join(self.cache_dir, f"{cache_key}.pickle")
def save_to_cache(self, url, html, params=None):
"""保存到缓存"""
cache_key = self.get_cache_key(url, params)
cache_path = self.get_cache_path(cache_key)
cache_data = {
'url': url,
'html': html,
'timestamp': time.time(),
'params': params
}
try:
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
return True
except Exception as e:
print(f"保存缓存失败: {e}")
return False
def load_from_cache(self, url, params=None, max_age=3600):
"""从缓存加载"""
cache_key = self.get_cache_key(url, params)
cache_path = self.get_cache_path(cache_key)
if not os.path.exists(cache_path):
return None
try:
with open(cache_path, 'rb') as f:
cache_data = pickle.load(f)
# 检查缓存是否过期
if time.time() - cache_data['timestamp'] > max_age:
print(f"缓存过期: {url}")
return None
return cache_data['html']
except Exception as e:
print(f"加载缓存失败: {e}")
return None
def clear_cache(self, older_than=None):
"""清理缓存"""
deleted = 0
for filename in os.listdir(self.cache_dir):
filepath = os.path.join(self.cache_dir, filename)
if older_than:
file_age = time.time() - os.path.getmtime(filepath)
if file_age > older_than:
os.remove(filepath)
deleted += 1
else:
os.remove(filepath)
deleted += 1
print(f"清理了 {deleted} 个缓存文件")
return deleted
# 使用lru_cache装饰器
class CachedParser:
"""使用LRU缓存的解析器"""
def __init__(self, maxsize=128):
self.parse_html_cached = lru_cache(maxsize=maxsize)(self._parse_html)
def _parse_html(self, html_content):
"""实际解析HTML的方法"""
return BeautifulSoup(html_content, 'lxml')
def parse_with_cache(self, html_content):
"""使用缓存的解析"""
return self.parse_html_cached(html_content)
def clear_cache(self):
"""清理缓存"""
self.parse_html_cached.cache_clear()
def cache_info(self):
"""获取缓存信息"""
return self.parse_html_cached.cache_info()
# 使用示例
cache_manager = HtmlCache()
# 模拟获取HTML
def fetch_html_with_cache(url):
"""带缓存的HTML获取"""
# 尝试从缓存加载
cached_html = cache_manager.load_from_cache(url, max_age=300) # 5分钟缓存
if cached_html:
print(f"从缓存加载: {url}")
return cached_html
else:
print(f"重新获取: {url}")
# 模拟网络请求
import requests
response = requests.get(url)
html = response.text
# 保存到缓存
cache_manager.save_to_cache(url, html)
return html
# 测试缓存效果
urls = ['https://httpbin.org/html', 'https://httpbin.org/xml']
print("第一次访问(未缓存):")
for url in urls:
html = fetch_html_with_cache(url)
print("\n第二次访问(使用缓存):")
for url in urls:
html = fetch_html_with_cache(url)
# 使用LRU缓存
parser = CachedParser(maxsize=100)
# 多次解析相同的HTML
html_content = "<html><body><p>测试内容</p></body></html>"
for i in range(5):
soup = parser.parse_with_cache(html_content)
print(f"解析 {i+1}: {parser.cache_info()}")
from bs4 import BeautifulSoup
import concurrent.futures
import time
import hashlib
import json
from functools import lru_cache
class OptimizedWebScraper:
"""全面优化的网页抓取器"""
def __init__(self, config=None):
# 默认配置
self.config = {
'parser': 'lxml', # 默认使用lxml
'max_workers': 4, # 线程池大小
'cache_enabled': True,
'cache_max_age': 300, # 缓存5分钟
'request_timeout': 10,
'batch_size': 100,
'memory_limit': 1024 * 1024 * 100, # 100MB内存限制
}
if config:
self.config.update(config)
# 初始化组件
self.session = None # 懒加载
self.executor = None
self.cache = {}
# 性能统计
self.stats = {
'total_requests': 0,
'cached_requests': 0,
'parse_time': 0,
'network_time': 0,
'start_time': time.time()
}
def get_session(self):
"""懒加载session"""
if self.session is None:
import requests
self.session = requests.Session()
# 配置session
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=3
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
return self.session
def get_executor(self):
"""懒加载线程池"""
if self.executor is None:
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.config['max_workers']
)
return self.executor
def get_html(self, url, use_cache=True):
"""获取HTML,支持缓存"""
cache_key = self.get_cache_key(url)
# 检查缓存
if use_cache and self.config['cache_enabled']:
if cache_key in self.cache:
cache_data = self.cache[cache_key]
# 检查缓存是否过期
if time.time() - cache_data['timestamp'] <= self.config['cache_max_age']:
self.stats['cached_requests'] += 1
return cache_data['html']
# 网络请求
start_time = time.time()
try:
session = self.get_session()
response = session.get(
url,
timeout=self.config['request_timeout'],
headers={'User-Agent': 'Mozilla/5.0'}
)
response.raise_for_status()
html = response.text
self.stats['network_time'] += time.time() - start_time
self.stats['total_requests'] += 1
# 更新缓存
if use_cache and self.config['cache_enabled']:
self.cache[cache_key] = {
'html': html,
'timestamp': time.time(),
'url': url
}
return html
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
@lru_cache(maxsize=1000)
def parse_html_cached(self, html_content):
"""带缓存的HTML解析"""
if not html_content:
return None
return BeautifulSoup(html_content, self.config['parser'])
def parse_html(self, html_content):
"""解析HTML,使用缓存"""
return self.parse_html_cached(html_content)
def extract_data_optimized(self, soup, selectors):
"""优化数据提取"""
data = {}
for key, selector in selectors.items():
if isinstance(selector, str):
# CSS选择器
if selector.startswith(('.', '#', '[')):
elements = soup.select(selector)
else:
elements = soup.find_all(selector)
# 根据元素数量决定返回什么
if len(elements) == 0:
data[key] = None
elif len(elements) == 1:
data[key] = elements[0].get_text(strip=True)
else:
data[key] = [elem.get_text(strip=True) for elem in elements]
elif isinstance(selector, dict):
# 复杂选择器
if 'method' in selector:
method = selector['method']
args = selector.get('args', [])
kwargs = selector.get('kwargs', {})
if hasattr(soup, method):
func = getattr(soup, method)
result = func(*args, **kwargs)
data[key] = result
else:
data[key] = None
else:
# 属性选择器
elements = soup.find_all(attrs=selector)
data[key] = [elem.get_text(strip=True) for elem in elements]
return data
def scrape_url(self, url, selectors):
"""抓取单个URL"""
# 获取HTML
html = self.get_html(url)
if not html:
return {'url': url, 'error': '获取HTML失败'}
# 解析HTML
parse_start = time.time()
soup = self.parse_html(html)
self.stats['parse_time'] += time.time() - parse_start
# 提取数据
data = self.extract_data_optimized(soup, selectors)
return {
'url': url,
'data': data,
'success': True
}
def scrape_batch(self, urls, selectors, batch_size=None):
"""批量抓取"""
if batch_size is None:
batch_size = self.config['batch_size']
print(f"开始批量抓取,URL数量: {len(urls)},批次大小: {batch_size}")
all_results = []
# 分批处理
for i in range(0, len(urls), batch_size):
batch = urls[i:i+batch_size]
print(f"处理批次 {i//batch_size + 1}/{(len(urls)-1)//batch_size + 1}")
# 使用线程池并发处理
executor = self.get_executor()
futures = []
for url in batch:
future = executor.submit(self.scrape_url, url, selectors)
futures.append(future)
# 收集结果
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
all_results.append(result)
except Exception as e:
print(f"处理URL出错: {e}")
# 显示进度
print(f" 已完成: {len(all_results)}/{len(urls)}")
# 检查内存使用
if self.check_memory_limit():
print("内存使用接近限制,暂停处理...")
self.cleanup_memory()
return all_results
def check_memory_limit(self):
"""检查内存使用是否超过限制"""
import psutil
import os
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss
return memory_usage > self.config['memory_limit']
def cleanup_memory(self):
"""清理内存"""
# 清理缓存
old_cache_size = len(self.cache)
self.cache = {k: v for k, v in self.cache.items()
if time.time() - v['timestamp'] <= self.config['cache_max_age']}
# 清理解析缓存
self.parse_html_cached.cache_clear()
# 强制垃圾回收
import gc
gc.collect()
print(f"内存清理完成,缓存从 {old_cache_size} 减少到 {len(self.cache)}")
def get_cache_key(self, url):
"""生成缓存键"""
return hashlib.md5(url.encode()).hexdigest()
def get_stats(self):
"""获取统计信息"""
total_time = time.time() - self.stats['start_time']
stats = {
'total_requests': self.stats['total_requests'],
'cached_requests': self.stats['cached_requests'],
'cache_hit_rate': self.stats['cached_requests'] / max(self.stats['total_requests'], 1),
'parse_time': self.stats['parse_time'],
'network_time': self.stats['network_time'],
'total_time': total_time,
'efficiency': (self.stats['parse_time'] + self.stats['network_time']) / total_time,
'cache_size': len(self.cache)
}
return stats
def __del__(self):
"""清理资源"""
if self.executor:
self.executor.shutdown(wait=False)
if self.session:
self.session.close()
# 使用示例
def run_optimized_example():
"""运行优化示例"""
# 配置选择器
selectors = {
'title': 'title',
'h1': 'h1',
'links': 'a',
'paragraphs': 'p',
'images': 'img'
}
# 创建优化抓取器
scraper = OptimizedWebScraper({
'max_workers': 8,
'batch_size': 20,
'cache_enabled': True,
'cache_max_age': 300
})
# 创建测试URLs
test_urls = [
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
] * 10 # 重复10次增加任务量
print(f"测试 {len(test_urls)} 个URL")
# 执行抓取
start_time = time.time()
results = scraper.scrape_batch(test_urls, selectors)
end_time = time.time()
print(f"\n抓取完成,耗时: {end_time - start_time:.2f}秒")
print(f"成功抓取: {len([r for r in results if r.get('success')])}/{len(test_urls)}")
# 显示统计信息
stats = scraper.get_stats()
print("\n=== 性能统计 ===")
for key, value in stats.items():
if isinstance(value, float):
print(f"{key}: {value:.4f}")
else:
print(f"{key}: {value}")
# 显示部分结果
print("\n=== 示例结果 ===")
for result in results[:3]:
if result.get('success'):
print(f"URL: {result['url']}")
print(f"标题: {result['data'].get('title', '无')[:50]}...")
print(f"链接数: {len(result['data'].get('links', []))}")
print()
return results
# 运行示例
# results = run_optimized_example()
| 优化领域 | 具体措施 | 预期效果 |
|---|---|---|
| 解析器选择 |
|
速度提升 3-10倍 |
| 内存管理 |
|
内存降低 50-80% |
| 查找优化 |
|
速度提升 2-5倍 |
| 并发处理 |
|
速度提升 5-20倍 |
| 缓存策略 |
|
重复请求快 10-100倍 |
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = {
'start_time': time.time(),
'parse_count': 0,
'cache_hits': 0,
'cache_misses': 0,
'memory_samples': []
}
def start_operation(self, operation_name):
"""开始操作计时"""
return {
'name': operation_name,
'start_time': time.time(),
'memory_before': self.get_memory_usage()
}
def end_operation(self, operation_context):
"""结束操作计时"""
operation_context['end_time'] = time.time()
operation_context['duration'] = operation_context['end_time'] - operation_context['start_time']
operation_context['memory_after'] = self.get_memory_usage()
operation_context['memory_delta'] = operation_context['memory_after'] - operation_context['memory_before']
return operation_context
def record_parse(self, html_size):
"""记录解析操作"""
self.metrics['parse_count'] += 1
def record_cache_hit(self):
"""记录缓存命中"""
self.metrics['cache_hits'] += 1
def record_cache_miss(self):
"""记录缓存未命中"""
self.metrics['cache_misses'] += 1
def get_memory_usage(self):
"""获取内存使用"""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB
def sample_memory(self):
"""采样内存使用"""
self.metrics['memory_samples'].append({
'time': time.time() - self.metrics['start_time'],
'memory': self.get_memory_usage()
})
def get_report(self):
"""获取性能报告"""
total_time = time.time() - self.metrics['start_time']
report = {
'total_time_seconds': total_time,
'parse_count': self.metrics['parse_count'],
'parse_rate': self.metrics['parse_count'] / total_time if total_time > 0 else 0,
'cache_hits': self.metrics['cache_hits'],
'cache_misses': self.metrics['cache_misses'],
'cache_hit_rate': self.metrics['cache_hits'] / max(self.metrics['cache_hits'] + self.metrics['cache_misses'], 1),
'current_memory_mb': self.get_memory_usage(),
'memory_samples': len(self.metrics['memory_samples'])
}
return report
# 使用示例
monitor = PerformanceMonitor()
# 在关键操作处添加监控
op_context = monitor.start_operation('parse_html')
# ... 执行解析操作 ...
op_result = monitor.end_operation(op_context)
print(f"操作 {op_result['name']} 耗时: {op_result['duration']:.4f}秒")
print(f"内存变化: {op_result['memory_delta']:.2f}MB")