BeautifulSoup 负责解析 HTML,而 requests 库负责获取网页内容。两者结合是 Python 网页爬虫的标准配置。本章将详细介绍如何配合使用这两个强大的库,构建完整的网页抓取和解析流程。
在开始配合使用前,先了解 requests 库的基本用法。
pip install requests
import requests
# 发送GET请求
response = requests.get('https://httpbin.org/get')
# 检查状态码
print(f"状态码: {response.status_code}")
print(f"响应头: {response.headers['content-type']}")
print(f"编码: {response.encoding}")
print(f"内容长度: {len(response.content)} bytes")
import requests
from bs4 import BeautifulSoup
def get_page_title(url):
"""获取网页标题"""
try:
# 发送请求
response = requests.get(url)
response.raise_for_status() # 检查请求是否成功
# 解析HTML
soup = BeautifulSoup(response.content, 'lxml')
# 提取标题
title = soup.title.string if soup.title else "无标题"
return title
except requests.exceptions.RequestException as e:
return f"请求错误: {e}"
except Exception as e:
return f"解析错误: {e}"
# 使用示例
url = 'https://httpbin.org/html'
title = get_page_title(url)
print(f"页面标题: {title}")
import requests
from bs4 import BeautifulSoup
def fetch_and_parse(url):
"""获取并解析网页"""
try:
# 发送请求
print(f"正在获取: {url}")
response = requests.get(url)
# 检查响应
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
return None
# 自动检测编码
response.encoding = response.apparent_encoding
# 使用BeautifulSoup解析
soup = BeautifulSoup(response.text, 'lxml')
# 提取基本信息
info = {
'url': url,
'status_code': response.status_code,
'encoding': response.encoding,
'title': soup.title.string if soup.title else None,
'meta_description': '',
'h1_count': len(soup.find_all('h1')),
'link_count': len(soup.find_all('a')),
'image_count': len(soup.find_all('img'))
}
# 提取meta描述
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
info['meta_description'] = meta_desc.get('content', '')
return info
except Exception as e:
print(f"错误: {e}")
return None
# 使用示例
info = fetch_and_parse('https://httpbin.org/html')
if info:
for key, value in info.items():
print(f"{key}: {value}")
import requests
from bs4 import BeautifulSoup
# 方法1:手动构造URL
base_url = 'https://httpbin.org/get'
params = {'key1': 'value1', 'key2': 'value2'}
url_with_params = f"{base_url}?key1=value1&key2=value2"
# 方法2:使用params参数(推荐)
response = requests.get('https://httpbin.org/get', params=params)
# 解析响应
soup = BeautifulSoup(response.content, 'html.parser')
print(f"响应URL: {response.url}")
print(f"查询参数: {soup.prettify()[:500]}...")
# 实际搜索示例
def google_search(query):
"""模拟Google搜索"""
url = 'https://www.google.com/search'
params = {
'q': query,
'hl': 'zh-CN',
'num': 10 # 结果数量
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'lxml')
# 提取搜索结果标题
titles = []
for h3 in soup.find_all('h3'):
if h3.parent and h3.parent.get('href'):
titles.append(h3.text)
return titles
return []
# 注意:实际使用Google搜索需要处理反爬虫机制
import requests
from bs4 import BeautifulSoup
import json
# 示例1:发送表单数据
def submit_form():
"""提交表单数据"""
url = 'https://httpbin.org/post'
# 表单数据
data = {
'username': 'testuser',
'password': 'testpass',
'email': 'test@example.com'
}
# 发送POST请求
response = requests.post(url, data=data)
# 解析响应
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
result = json.loads(soup.text)
print(f"表单数据: {result['form']}")
return result
return None
# 示例2:发送JSON数据
def send_json_data():
"""发送JSON数据"""
url = 'https://httpbin.org/post'
# JSON数据
json_data = {
'name': '张三',
'age': 25,
'skills': ['Python', 'JavaScript', 'HTML/CSS']
}
headers = {'Content-Type': 'application/json'}
# 发送POST请求
response = requests.post(url, json=json_data, headers=headers)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
result = json.loads(soup.text)
print(f"JSON数据: {result['json']}")
return result
return None
# 运行示例
print("=== 表单提交 ===")
submit_form()
print("\n=== JSON提交 ===")
send_json_data()
import requests
from bs4 import BeautifulSoup
def get_with_headers(url):
"""使用自定义请求头获取网页"""
# 常见请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0'
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# 解析页面
soup = BeautifulSoup(response.content, 'lxml')
# 提取信息
info = {
'title': soup.title.string if soup.title else '无标题',
'final_url': response.url,
'headers_sent': dict(response.request.headers),
'headers_received': dict(response.headers)
}
return info
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
# 使用示例
info = get_with_headers('https://httpbin.org/headers')
if info:
print("请求成功")
print(f"页面标题: {info['title']}")
print(f"最终URL: {info['final_url']}")
print("\n发送的请求头:")
for key, value in info['headers_sent'].items():
print(f" {key}: {value}")
import requests
from bs4 import BeautifulSoup
import random
def get_random_user_agent():
"""生成随机User-Agent"""
user_agents = [
# Chrome
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
# Firefox
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) Gecko/20100101 Firefox/120.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:120.0) Gecko/20100101 Firefox/120.0',
# Safari
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15',
# Edge
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0'
]
return random.choice(user_agents)
def fetch_with_random_ua(url):
"""使用随机User-Agent获取页面"""
headers = {
'User-Agent': get_random_user_agent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'lxml')
# 记录使用的User-Agent
print(f"使用的User-Agent: {headers['User-Agent'][:50]}...")
print(f"页面标题: {soup.title.string if soup.title else '无标题'}")
return soup
else:
print(f"请求失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"错误: {e}")
return None
# 使用示例
soup = fetch_with_random_ua('https://httpbin.org/user-agent')
if soup:
# 提取用户代理信息
pre_tag = soup.find('pre')
if pre_tag:
print(f"服务器收到的User-Agent: {pre_tag.text[:100]}...")
import requests
from bs4 import BeautifulSoup
def simulate_login():
"""模拟登录过程"""
# 创建Session对象
session = requests.Session()
# 第一次请求:获取登录页面
login_url = 'https://httpbin.org/cookies/set/sessionid/123456'
print("1. 访问登录页面...")
response = session.get(login_url)
# 解析登录页面(示例)
soup = BeautifulSoup(response.text, 'lxml')
print(f" 页面标题: {soup.title.string if soup.title else '登录页面'}")
# 检查Cookies
print(f" Cookies: {session.cookies.get_dict()}")
# 第二次请求:访问需要登录的页面
protected_url = 'https://httpbin.org/cookies'
print("\n2. 访问需要登录的页面...")
response2 = session.get(protected_url)
# 解析响应
soup2 = BeautifulSoup(response2.text, 'lxml')
pre_tag = soup2.find('pre')
if pre_tag:
print(f" 返回的Cookies信息: {pre_tag.text}")
# 第三次请求:登出
logout_url = 'https://httpbin.org/cookies/delete?sessionid'
print("\n3. 登出...")
response3 = session.get(logout_url)
soup3 = BeautifulSoup(response3.text, 'lxml')
pre_tag3 = soup3.find('pre')
if pre_tag3:
print(f" 登出后的Cookies: {pre_tag3.text}")
return session
# 运行示例
session = simulate_login()
print(f"\n最终的Cookies: {session.cookies.get_dict()}")
import requests
from bs4 import BeautifulSoup
def manage_cookies_manually():
"""手动管理Cookies"""
# 第一次请求:设置Cookies
print("1. 设置Cookies...")
response1 = requests.get('https://httpbin.org/cookies/set/name/value')
# 获取服务器设置的Cookies
cookies_from_server = response1.cookies
print(f" 服务器设置的Cookies: {dict(cookies_from_server)}")
# 第二次请求:发送特定的Cookies
print("\n2. 发送自定义Cookies...")
cookies_to_send = {
'session_id': 'abc123',
'user_id': '456',
'preferences': 'dark_mode'
}
response2 = requests.get(
'https://httpbin.org/cookies',
cookies=cookies_to_send
)
# 解析响应
soup = BeautifulSoup(response2.text, 'lxml')
pre_tag = soup.find('pre')
if pre_tag:
print(f" 服务器收到的Cookies: {pre_tag.text}")
# 第三次请求:结合Session和手动Cookies
print("\n3. 使用Session并更新Cookies...")
session = requests.Session()
# 设置初始Cookies
session.cookies.update(cookies_to_send)
# 发送请求
response3 = session.get('https://httpbin.org/cookies')
soup3 = BeautifulSoup(response3.text, 'lxml')
pre_tag3 = soup3.find('pre')
if pre_tag3:
print(f" Session Cookies: {pre_tag3.text}")
# 更新Cookies
print("\n4. 更新Cookies...")
new_cookies = {'new_cookie': 'new_value'}
session.cookies.update(new_cookies)
response4 = session.get('https://httpbin.org/cookies')
soup4 = BeautifulSoup(response4.text, 'lxml')
pre_tag4 = soup4.find('pre')
if pre_tag4:
print(f" 更新后的Cookies: {pre_tag4.text}")
return session
# 运行示例
session = manage_cookies_manually()
import requests
from bs4 import BeautifulSoup
import time
def fetch_with_retry(url, max_retries=3, delay=2):
"""带重试机制的网页获取"""
for attempt in range(max_retries):
try:
print(f"第{attempt+1}次尝试获取: {url}")
response = requests.get(
url,
timeout=10,
headers={'User-Agent': 'Mozilla/5.0'}
)
# 检查状态码
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'lxml')
print("请求成功")
return soup
elif response.status_code == 404:
print("页面不存在")
return None
elif response.status_code == 403:
print("访问被拒绝")
time.sleep(delay * 2) # 403错误等待更长时间
continue
elif response.status_code == 429:
print("请求过于频繁,等待后重试")
time.sleep(delay * 3)
continue
elif response.status_code >= 500:
print(f"服务器错误: {response.status_code}")
time.sleep(delay)
continue
else:
print(f"未知状态码: {response.status_code}")
time.sleep(delay)
continue
except requests.exceptions.Timeout:
print("请求超时")
time.sleep(delay)
except requests.exceptions.ConnectionError:
print("连接错误")
time.sleep(delay)
except Exception as e:
print(f"其他错误: {e}")
time.sleep(delay)
print(f"经过{max_retries}次尝试后仍失败")
return None
# 使用示例
soup = fetch_with_retry('https://httpbin.org/status/200')
if soup:
print(f"页面标题: {soup.title.string if soup.title else '无标题'}")
import requests
from bs4 import BeautifulSoup
import json
def handle_different_responses(url):
"""处理不同类型的响应"""
try:
response = requests.get(url, timeout=10)
# 获取内容类型
content_type = response.headers.get('content-type', '').lower()
print(f"URL: {url}")
print(f"状态码: {response.status_code}")
print(f"内容类型: {content_type}")
# 根据内容类型处理
if 'text/html' in content_type:
# HTML响应
soup = BeautifulSoup(response.content, 'lxml')
title = soup.title.string if soup.title else '无标题'
print(f"HTML页面标题: {title}")
# 提取有用信息
links = []
for a in soup.find_all('a', href=True):
links.append({
'text': a.text.strip()[:50],
'href': a['href']
})
return {
'type': 'html',
'title': title,
'links': links[:5] # 只返回前5个链接
}
elif 'application/json' in content_type:
# JSON响应
try:
data = response.json()
print(f"JSON数据示例: {json.dumps(data, ensure_ascii=False)[:200]}...")
return {'type': 'json', 'data': data}
except:
# 如果JSON解析失败,尝试其他方式
pass
elif 'text/plain' in content_type:
# 纯文本响应
text = response.text[:200] + '...' if len(response.text) > 200 else response.text
print(f"纯文本内容: {text}")
return {'type': 'text', 'content': response.text}
elif 'image/' in content_type:
# 图片响应
print(f"图片大小: {len(response.content)} bytes")
return {'type': 'image', 'size': len(response.content)}
else:
# 其他类型
print(f"二进制数据,大小: {len(response.content)} bytes")
return {'type': 'binary', 'size': len(response.content)}
except Exception as e:
print(f"错误: {e}")
return {'type': 'error', 'message': str(e)}
# 测试不同类型的响应
test_urls = [
'https://httpbin.org/html', # HTML
'https://httpbin.org/json', # JSON
'https://httpbin.org/robots.txt', # 纯文本
'https://httpbin.org/image/png', # 图片
]
for url in test_urls:
print("\n" + "="*50)
result = handle_different_responses(url)
print(f"处理结果类型: {result['type']}")
import requests
from bs4 import BeautifulSoup
import time
def scrape_paginated_data(base_url, max_pages=5):
"""抓取分页数据"""
all_data = []
for page in range(1, max_pages + 1):
print(f"正在抓取第 {page} 页...")
# 构建分页URL
if '?' in base_url:
url = f"{base_url}&page={page}"
else:
url = f"{base_url}?page={page}"
try:
# 发送请求
response = requests.get(
url,
headers={'User-Agent': 'Mozilla/5.0'},
timeout=10
)
if response.status_code != 200:
print(f"第 {page} 页请求失败,状态码: {response.status_code}")
break
# 解析页面
soup = BeautifulSoup(response.content, 'lxml')
# 示例:提取文章列表(根据实际页面结构调整)
articles = soup.find_all('article') or soup.find_all('div', class_='item')
if not articles:
print(f"第 {page} 页没有找到数据")
break
page_data = []
for article in articles:
# 提取文章信息
title_elem = article.find('h2') or article.find('h3')
title = title_elem.text.strip() if title_elem else '无标题'
link_elem = article.find('a')
link = link_elem['href'] if link_elem and link_elem.has_attr('href') else ''
content_elem = article.find('p')
content = content_elem.text.strip() if content_elem else ''
item = {
'title': title,
'link': link,
'content': content[:100] + '...' if len(content) > 100 else content,
'page': page
}
page_data.append(item)
all_data.extend(page_data)
print(f"第 {page} 页找到 {len(page_data)} 条数据")
# 检查是否还有下一页
next_button = soup.find('a', string=lambda x: x and '下一页' in x)
if not next_button:
next_button = soup.find('a', class_='next')
if not next_button:
print("没有找到下一页按钮,停止抓取")
break
# 避免请求过快
time.sleep(1)
except Exception as e:
print(f"第 {page} 页抓取出错: {e}")
break
print(f"\n总共抓取 {len(all_data)} 条数据")
return all_data
# 使用示例(注意:这是一个通用示例,实际URL需要替换)
# data = scrape_paginated_data('https://example.com/articles', max_pages=3)
# for item in data[:3]: # 显示前3条
# print(f"标题: {item['title']}, 页码: {item['page']}")
import requests
from bs4 import BeautifulSoup
import concurrent.futures
import time
def fetch_single_page(url):
"""获取单个页面"""
try:
response = requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
if response.status_code == 200:
return response.content
else:
print(f"请求失败: {url}, 状态码: {response.status_code}")
return None
except Exception as e:
print(f"请求错误: {url}, 错误: {e}")
return None
def parse_page_content(html_content, url):
"""解析页面内容"""
if not html_content:
return None
try:
soup = BeautifulSoup(html_content, 'lxml')
info = {
'url': url,
'title': soup.title.string if soup.title else '无标题',
'h1_count': len(soup.find_all('h1')),
'paragraph_count': len(soup.find_all('p')),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
return info
except Exception as e:
print(f"解析错误: {url}, 错误: {e}")
return None
def fetch_and_parse_concurrently(urls, max_workers=5):
"""并发获取和解析多个页面"""
results = []
# 使用线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交获取任务
future_to_url = {
executor.submit(fetch_single_page, url): url
for url in urls
}
# 处理完成的任务
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
html_content = future.result()
if html_content:
# 解析内容
parsed_info = parse_page_content(html_content, url)
if parsed_info:
results.append(parsed_info)
print(f"完成: {url}")
except Exception as e:
print(f"处理异常: {url}, 错误: {e}")
return results
# 使用示例
test_urls = [
'https://httpbin.org/html',
'https://httpbin.org/xml',
'https://httpbin.org/robots.txt',
'https://httpbin.org/json',
]
print("开始并发获取页面...")
start_time = time.time()
results = fetch_and_parse_concurrently(test_urls, max_workers=3)
end_time = time.time()
print(f"\n总共获取 {len(results)} 个页面")
print(f"总耗时: {end_time - start_time:.2f} 秒")
# 显示结果
for result in results:
print(f"\nURL: {result['url']}")
print(f"标题: {result['title']}")
print(f"段落数: {result['paragraph_count']}")
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
import os
class SimpleWebCrawler:
"""简单的网络爬虫"""
def __init__(self, start_url, max_pages=10, delay=1):
self.start_url = start_url
self.max_pages = max_pages
self.delay = delay
self.visited = set()
self.to_visit = set([start_url])
self.results = []
def is_valid_url(self, url):
"""检查URL是否有效"""
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)
def get_all_links(self, soup, base_url):
"""获取页面所有链接"""
links = set()
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
# 转换为绝对URL
absolute_url = urljoin(base_url, href)
# 过滤无效URL
if self.is_valid_url(absolute_url):
links.add(absolute_url)
return links
def crawl_page(self, url):
"""抓取单个页面"""
try:
print(f"抓取: {url}")
# 发送请求
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f" 状态码: {response.status_code}")
return []
# 解析页面
soup = BeautifulSoup(response.content, 'lxml')
# 提取信息
page_info = {
'url': url,
'title': soup.title.string if soup.title else '无标题',
'meta_description': '',
'h1_count': len(soup.find_all('h1')),
'p_count': len(soup.find_all('p')),
'img_count': len(soup.find_all('img')),
'links_count': len(soup.find_all('a'))
}
# 提取meta描述
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
page_info['meta_description'] = meta_desc.get('content', '')
self.results.append(page_info)
# 提取链接
new_links = self.get_all_links(soup, url)
# 避免请求过快
time.sleep(self.delay)
return new_links
except Exception as e:
print(f" 错误: {e}")
return []
def crawl(self):
"""开始爬取"""
print(f"开始爬取,起始URL: {self.start_url}")
print(f"最大页面数: {self.max_pages}")
while self.to_visit and len(self.visited) < self.max_pages:
# 获取下一个URL
current_url = self.to_visit.pop()
# 跳过已访问的URL
if current_url in self.visited:
continue
# 标记为已访问
self.visited.add(current_url)
# 抓取页面
new_links = self.crawl_page(current_url)
# 添加新链接到待访问列表
for link in new_links:
if link not in self.visited:
self.to_visit.add(link)
# 显示进度
print(f"进度: 已访问 {len(self.visited)}/{self.max_pages} 个页面")
print(f"\n爬取完成!总共访问 {len(self.visited)} 个页面")
return self.results
def save_results(self, filename='crawl_results.json'):
"""保存结果"""
import json
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
print(f"结果已保存到: {filename}")
# 使用示例(注意:实际使用请遵守robots.txt和网站政策)
# crawler = SimpleWebCrawler('https://httpbin.org', max_pages=3, delay=2)
# results = crawler.crawl()
# crawler.save_results()
import requests
from urllib.parse import urljoin
def check_robots_txt(url):
"""检查robots.txt"""
try:
# 获取robots.txt
robots_url = urljoin(url, '/robots.txt')
response = requests.get(robots_url, timeout=5)
if response.status_code == 200:
print(f"找到robots.txt: {robots_url}")
print("内容:")
print(response.text[:500] + "..." if len(response.text) > 500 else response.text)
return response.text
else:
print(f"没有找到robots.txt或无法访问: {robots_url}")
return None
except Exception as e:
print(f"检查robots.txt时出错: {e}")
return None
# 使用示例
robots_content = check_robots_txt('https://www.google.com')
if robots_content:
# 解析robots.txt内容,判断是否允许爬取
pass
import time
import random
class PoliteRequester:
"""有礼貌的请求器,避免对服务器造成压力"""
def __init__(self, min_delay=1, max_delay=3):
self.min_delay = min_delay
self.max_delay = max_delay
self.last_request_time = 0
def wait_if_needed(self):
"""如果需要,等待一段时间"""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_delay:
# 计算需要等待的时间
wait_time = self.min_delay - time_since_last
# 添加随机抖动
wait_time += random.uniform(0, self.max_delay - self.min_delay)
print(f"等待 {wait_time:.2f} 秒...")
time.sleep(wait_time)
self.last_request_time = time.time()
def get(self, url, **kwargs):
"""发送GET请求"""
self.wait_if_needed()
return requests.get(url, **kwargs)
def post(self, url, **kwargs):
"""发送POST请求"""
self.wait_if_needed()
return requests.post(url, **kwargs)
# 使用示例
requester = PoliteRequester(min_delay=2, max_delay=5)
# 连续请求多个页面
urls = ['https://httpbin.org/html', 'https://httpbin.org/json', 'https://httpbin.org/xml']
for url in urls:
print(f"\n请求: {url}")
response = requester.get(url)
print(f"状态码: {response.status_code}")
import requests
# 创建配置良好的Session
session = requests.Session()
# 配置适配器
adapter = requests.adapters.HTTPAdapter(
pool_connections=10, # 连接池大小
pool_maxsize=10, # 最大连接数
max_retries=3, # 最大重试次数
)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 配置请求头
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
})
# 配置超时(全局)
session.request = lambda method, url, **kwargs: requests.Session.request(
session, method, url, timeout=10, **kwargs
)
# 使用配置好的Session
response = session.get('https://httpbin.org/get')
print(f"状态码: {response.status_code}")