Django 信号是一种观察者模式的实现,允许特定的发送者在应用程序中的某些动作发生时通知一组接收者。信号使得解耦应用程序成为可能,当某些事件发生时,可以执行额外的操作而不需要修改原始代码。
| 组件 | 描述 | 示例 |
|---|---|---|
| 发送者 | 触发信号的对象 | 模型实例、请求对象 |
| 信号 | 事件的通知机制 | post_save, pre_delete |
| 接收者 | 响应信号的函数 | 自定义处理函数 |
| 信号参数 | 传递给接收者的数据 | instance, created |
Django 信号允许在框架的特定位置插入自定义代码,而不需要修改框架本身的代码。
创建信号实例或使用内置信号
将处理函数连接到信号
在适当的时候发送信号
所有连接的接收者被执行
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth.models import User
# 定义接收者函数
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""
当用户被创建时,自动创建用户配置
"""
if created:
Profile.objects.create(user=instance)
print(f"为用户 {instance.username} 创建了配置")
# 或者使用 connect 方法手动连接
def log_user_save(sender, instance, **kwargs):
print(f"用户 {instance.username} 被保存")
post_save.connect(log_user_save, sender=User)
# 信号发送的基本语法
signal.send(sender, **kwargs)
# 常见信号参数:
# - sender: 发送信号的对象类
# - instance: 相关的模型实例
# - created: 是否是新建对象 (post_save)
# - raw: 是否是原始数据库加载
# - using: 使用的数据库别名
# - update_fields: 更新的字段列表
# - **kwargs: 其他参数
# 接收者函数签名
def my_receiver(sender, **kwargs):
# sender: 发送者的类
# kwargs: 其他信号参数
pass
装饰器方式 - 使用 @receiver 装饰器
手动连接 - 使用 signal.connect() 方法
断开连接 - 使用 signal.disconnect() 方法
Django 提供了多个内置信号,用于在框架的关键位置插入自定义逻辑。
| 信号 | 触发时机 | 常见参数 | 使用场景 |
|---|---|---|---|
pre_init |
模型 __init__() 开始时 |
instance, args, kwargs |
初始化前验证或修改参数 |
post_init |
模型 __init__() 完成后 |
instance |
初始化后设置额外属性 |
pre_save |
save() 方法开始时 |
instance, raw, using, update_fields |
保存前数据验证或修改 |
post_save |
save() 方法完成后 |
instance, created, raw, using, update_fields |
保存后执行相关操作 |
pre_delete |
delete() 方法开始时 |
instance, using |
删除前检查或备份 |
post_delete |
delete() 方法完成后 |
instance, using |
删除后清理相关数据 |
m2m_changed |
ManyToMany 字段变更时 | instance, action, reverse, model, pk_set |
处理多对多关系变化 |
| 信号 | 触发时机 | 常见参数 | 使用场景 |
|---|---|---|---|
request_started |
HTTP 请求开始时 | environ |
请求日志记录 |
request_finished |
HTTP 请求结束时 | 无 | 资源清理、性能监控 |
got_request_exception |
请求处理出现异常时 | request |
异常处理、错误报告 |
from django.db.backends.signals import (
connection_created,
connection_created
)
@receiver(connection_created)
def setup_connection(sender, connection, **kwargs):
"""
数据库连接创建时的设置
"""
if connection.vendor == 'sqlite':
# 为 SQLite 启用外键约束
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys=ON;')
# 其他数据库信号:
# - connection_created: 数据库连接创建时
# - connection_created: 连接创建时(同上)
from django.core.management.signals import (
pre_migrate,
post_migrate
)
@receiver(pre_migrate)
def before_migration(sender, app_config, verbosity, interactive,
using, plan, apps, **kwargs):
"""
数据库迁移前的操作
"""
print(f"开始迁移应用: {app_config.name}")
@receiver(post_migrate)
def after_migration(sender, app_config, verbosity, interactive,
using, plan, apps, **kwargs):
"""
数据库迁移后的操作
"""
print(f"完成迁移应用: {app_config.name}")
# 其他管理信号:
# - pre_migrate: 迁移开始前
# - post_migrate: 迁移完成后
# - pre_syncdb: 同步数据库前 (已弃用)
# - post_syncdb: 同步数据库后 (已弃用)
Django 提供了多种方式来连接信号和接收者,每种方式都有其适用场景。
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from myapp.models import MyModel
# 基本用法
@receiver(post_save, sender=MyModel)
def my_handler(sender, instance, **kwargs):
print(f"{instance} 被保存了")
# 连接多个信号
@receiver([post_save, pre_delete], sender=MyModel)
def multi_signal_handler(sender, **kwargs):
if sender == post_save:
print("保存操作")
elif sender == pre_delete:
print("删除操作")
# 带条件的信号处理
@receiver(post_save, sender=MyModel)
def conditional_handler(sender, instance, created, **kwargs):
if created:
print("新对象被创建")
else:
print("现有对象被更新")
# 使用 dispatch_uid 避免重复连接
@receiver(post_save, sender=MyModel, dispatch_uid="my_unique_handler")
def unique_handler(sender, instance, **kwargs):
print("这个处理程序只会被连接一次")
from django.db.models.signals import post_save
from myapp.models import MyModel
def manual_handler(sender, instance, **kwargs):
print(f"手动处理: {instance}")
# 手动连接信号
post_save.connect(manual_handler, sender=MyModel)
# 使用 weak=False 避免垃圾回收
post_save.connect(manual_handler, sender=MyModel, weak=False)
# 使用 dispatch_uid
post_save.connect(
manual_handler,
sender=MyModel,
dispatch_uid="manual_handler_unique"
)
# 断开信号连接
post_save.disconnect(manual_handler, sender=MyModel)
# 或者使用 dispatch_uid 断开
post_save.disconnect(dispatch_uid="manual_handler_unique")
# 临时连接(使用后自动断开)
from django.dispatch import Signal
def temp_handler(sender, **kwargs):
print("临时处理程序")
# 执行后自动断开
some_signal.disconnect(temp_handler)
some_signal.connect(temp_handler)
装饰器方式 - 简洁、易于阅读、自动连接
手动连接 - 更灵活、可以动态连接/断开
dispatch_uid - 避免重复连接、精确控制
# myapp/apps.py
from django.apps import AppConfig
class MyAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'myapp'
def ready(self):
# 导入信号处理程序
import myapp.signals
# myapp/__init__.py
default_app_config = 'myapp.apps.MyAppConfig'
# myapp/signals.py
from django.db.models.signals import post_save, pre_delete
from django.dispatch import receiver
from .models import MyModel
@receiver(post_save, sender=MyModel)
def handle_post_save(sender, instance, **kwargs):
# 处理保存后逻辑
pass
@receiver(pre_delete, sender=MyModel)
def handle_pre_delete(sender, instance, **kwargs):
# 处理删除前逻辑
pass
ready() 方法中被导入,以避免重复连接或连接失败。
除了使用内置信号,你还可以创建自定义信号来处理应用程序特定的事件。
# myapp/signals.py
from django.dispatch import Signal
# 创建自定义信号
# 无参数信号
user_logged_in = Signal()
# 带参数的信号
order_created = Signal()
# 提供参数名称以便文档化和类型检查
order_created = Signal(["order", "user", "timestamp"])
# 复杂的自定义信号
profile_updated = Signal(["profile", "fields_changed", "old_values"])
# 在需要的地方发送信号
def some_view(request):
# ... 处理逻辑
# 发送自定义信号
user_logged_in.send(sender=request.user.__class__, user=request.user)
# 发送带参数的信号
order = Order.objects.create(...)
order_created.send(
sender=Order,
order=order,
user=request.user,
timestamp=timezone.now()
)
# 在其他地方连接自定义信号
from django.dispatch import receiver
from myapp.signals import user_logged_in, order_created
@receiver(user_logged_in)
def handle_user_login(sender, user, **kwargs):
"""
处理用户登录事件
"""
# 记录登录时间
user.last_login = timezone.now()
user.save(update_fields=['last_login'])
# 发送欢迎消息
messages.success(
user,
f"欢迎回来,{user.username}!"
)
@receiver(order_created)
def handle_new_order(sender, order, user, timestamp, **kwargs):
"""
处理新订单创建事件
"""
# 发送确认邮件
send_order_confirmation_email(order, user)
# 更新库存
update_inventory(order)
# 记录审计日志
AuditLog.objects.create(
user=user,
action='order_created',
target_object=order,
timestamp=timestamp
)
# 手动连接自定义信号
def another_order_handler(sender, order, **kwargs):
print(f"新订单: {order.id}")
order_created.connect(another_order_handler)
from django.dispatch import Signal
class ValidatedSignal(Signal):
"""
带参数验证的自定义信号
"""
def __init__(self, providing_args=None):
super().__init__(providing_args)
self._validators = []
def add_validator(self, validator_func):
"""添加参数验证器"""
self._validators.append(validator_func)
def send(self, sender, **named):
"""发送信号前执行验证"""
for validator in self._validators:
validator(sender, **named)
return super().send(sender, **named)
# 使用带验证的信号
registration_signal = ValidatedSignal(["user", "profile_data"])
def validate_registration(sender, user, profile_data, **kwargs):
"""验证注册数据"""
if not user.email:
raise ValueError("用户必须提供邮箱")
if len(profile_data.get('bio', '')) > 500:
raise ValueError("个人简介不能超过500字符")
registration_signal.add_validator(validate_registration)
# 发送信号(会自动验证)
try:
registration_signal.send(
sender=User,
user=new_user,
profile_data=profile_data
)
except ValueError as e:
print(f"验证失败: {e}")
import asyncio
from django.dispatch import Signal
from asgiref.sync import sync_to_async
class AsyncSignal(Signal):
"""
支持异步处理程序的信号
"""
async def send_async(self, sender, **named):
"""异步发送信号"""
responses = []
for receiver in self._live_receivers(sender):
if asyncio.iscoroutinefunction(receiver):
# 异步接收者
response = await receiver(signal=self, sender=sender, **named)
else:
# 同步接收者,在线程中运行
response = await sync_to_async(receiver)(
signal=self, sender=sender, **named
)
responses.append((receiver, response))
return responses
# 创建异步信号
async_signal = AsyncSignal()
# 异步接收者
@receiver(async_signal)
async def async_handler(sender, **kwargs):
"""异步信号处理程序"""
await asyncio.sleep(1) # 模拟异步操作
print("异步处理完成")
# 同步接收者(仍然可用)
@receiver(async_signal)
def sync_handler(sender, **kwargs):
"""同步信号处理程序"""
print("同步处理完成")
# 发送异步信号
async def my_async_view(request):
# ... 业务逻辑
await async_signal.send_async(sender=request.user.__class__)
信号在多种场景下都非常有用,以下是一些常见的应用示例。
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.contrib.auth.models import User
from .models import AuditLog
@receiver(post_save, sender=User)
def log_user_change(sender, instance, created, **kwargs):
"""
记录用户模型的变更
"""
action = 'create' if created else 'update'
AuditLog.objects.create(
model_name='User',
object_id=instance.id,
action=action,
changes={
'username': instance.username,
'email': instance.email,
'is_active': instance.is_active
},
user=get_current_user() # 需要实现这个函数
)
@receiver(post_delete, sender=User)
def log_user_delete(sender, instance, **kwargs):
"""
记录用户删除
"""
AuditLog.objects.create(
model_name='User',
object_id=instance.id,
action='delete',
changes={
'username': instance.username,
'email': instance.email
},
user=get_current_user()
)
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.core.cache import cache
from .models import Product
@receiver([post_save, post_delete], sender=Product)
def clear_product_cache(sender, **kwargs):
"""
产品数据变更时清理缓存
"""
cache_keys = [
'featured_products',
'product_categories',
'product_stats'
]
for key in cache_keys:
cache.delete(key)
print("产品缓存已清理")
from django.db.models.signals import post_save, m2m_changed
from django.dispatch import receiver
from .models import Order, OrderItem, Inventory
@receiver(post_save, sender=OrderItem)
def update_order_total(sender, instance, **kwargs):
"""
更新订单项时重新计算订单总额
"""
order = instance.order
order.total = sum(item.subtotal for item in order.items.all())
order.save(update_fields=['total'])
@receiver(m2m_changed, sender=Order.items.through)
def update_inventory(sender, instance, action, **kwargs):
"""
订单项变更时更新库存
"""
if action in ['post_add', 'post_remove', 'post_clear']:
for item in instance.items.all():
inventory = Inventory.objects.get(product=item.product)
if action == 'post_add':
inventory.quantity -= item.quantity
elif action == 'post_remove':
inventory.quantity += item.quantity
inventory.save()
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.core.mail import send_mail
from .models import Comment, Notification
@receiver(post_save, sender=Comment)
def notify_comment(sender, instance, created, **kwargs):
"""
新评论时发送通知
"""
if created:
# 创建站内通知
Notification.objects.create(
user=instance.post.author,
title="新评论",
message=f"{instance.author} 评论了您的文章",
target_url=instance.post.get_absolute_url()
)
# 发送邮件通知(如果是重要评论)
if instance.post.author.email_notifications:
send_mail(
subject=f"您收到了新评论 - {instance.post.title}",
message=f"{instance.author} 在您的文章中评论: {instance.content}",
from_email='noreply@example.com',
recipient_list=[instance.post.author.email],
fail_silently=True
)
# 使用 Celery 处理耗时操作
@receiver(post_save, sender=Order)
def process_order_async(sender, instance, **kwargs):
from .tasks import process_order_task
process_order_task.delay(instance.id)
@receiver(post_save, sender=User)
def safe_handler(sender, instance, **kwargs):
try:
# 信号处理逻辑
except Exception as e:
logger.error(f"信号处理错误: {e}")
# 继续执行,不中断主流程
signals.py 文件中dispatch_uid 避免重复连接
# 良好的组织方式
# myapp/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import MyModel
@receiver(post_save, sender=MyModel)
def handle_my_model_save(sender, instance, **kwargs):
"""处理 MyModel 保存后的逻辑"""
pass
# 测试信号
from django.test import TestCase
from myapp import signals
class SignalTests(TestCase):
def test_post_save_signal(self):
obj = MyModel.objects.create(...)
# 验证信号处理程序的效果
self.assertEqual(...)
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 信号不执行 | 信号未正确连接或导入 | 检查 ready() 方法和导入路径 |
| 重复执行 | 信号被多次连接 | 使用 dispatch_uid |
| 性能问题 | 信号处理程序耗时过长 | 优化代码或使用异步任务 |
| 循环导入 | 模块间相互导入 | 重构代码或使用字符串导入 |
# 信号调试装饰器
def debug_signal(handler):
@wraps(handler)
def wrapper(*args, **kwargs):
print(f"执行信号处理: {handler.__name__}")
start = time.time()
result = handler(*args, **kwargs)
duration = time.time() - start
print(f"完成信号处理: {duration:.2f}s")
return result
return wrapper
@receiver(post_save)
@debug_signal
def my_handler(sender, **kwargs):
pass