使用 Actix-web 和 SSE 实现 Rust 后端实时事件推送
在现代 Web 应用中,实时通信变得越来越重要。Server-Sent Events (SSE) 提供了一种简单有效的服务器到客户端单向通信机制。本文将介绍如何在 Rust Actix-web 框架中实现 SSE 功能,实现实时事件推送。
概述
SSE 允许服务器通过 HTTP 连接向客户端推送更新,相比 WebSockets,它更简单且自带重连机制。我们的实现将包含以下核心组件:
- 全局事件广播器 (
SseNotifier) - SSE 流端点
- 业务逻辑中的事件触发机制
核心实现
1. 事件广播器
rustuse tokio::sync::broadcast; #[derive(Clone)] pub struct SseNotifier { tx: broadcast::Sender<String>, } impl SseNotifier { pub fn new() -> Self { let (tx, _) = broadcast::channel(100); SseNotifier { tx } } // 创建新的事件通道 pub fn create_channel(&self) -> broadcast::Receiver<String> { self.tx.subscribe() } // 发送消息 pub fn notify(&self, msg: &str) { let _ = self.tx.send(msg.to_string()); } }
这个结构体使用 Tokio 的广播通道,允许多个消费者订阅同一事件流。
2. SSE 流端点
rustuse crate::utils::sse::SseNotifier; use actix_web::{Responder, web}; use actix_web_lab::sse::{self, Event, Sse}; use futures_util::stream::StreamExt; use std::{convert::Infallible, time::Duration}; use tokio_stream::wrappers::BroadcastStream; pub async fn sse_stream(notifier: web::Data<SseNotifier>) -> impl Responder { // 订阅全局事件通道 let rx = notifier.create_channel(); // 创建兼容的事件流 let sse_stream = BroadcastStream::new(rx) .filter_map(|msg| async move { match msg { Ok(data) => Some(sse::Event::Data(sse::Data::new(data))), Err(_) => None, } }) // 将错误转换为 Infallible .map(|event| Ok::<Event, Infallible>(event)); // 创建SSE响应 Sse::from_stream(sse_stream).with_keep_alive(Duration::from_secs(5)) }
这个端点将广播流转换为 SSE 兼容格式,并设置了 5 秒的心跳间隔以保持连接活跃。
3. 在主函数中集成
rustasync fn main() -> Result<()> { let db = create_db_pool() .await .context("Failed to connect to database")?; // 将db添加到应用数据中 let db_pool = web::Data::new(db); // 添加SSE通知器 let notifier = web::Data::new(SseNotifier::new()); let _ = HttpServer::new(move || { let cors = Cors::default() .allowed_origin("http://127.0.0.1:5502") .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allowed_headers(vec!["Content-Type", "Authorization", "ACCEPT"]) .supports_credentials() .max_age(3600); App::new() .app_data(db_pool.clone()) .app_data(notifier.clone()) // 注册SSE通知器 .configure(config_routes) .wrap(cors) }) .bind(("0.0.0.0", 2345))? .run() .await; Ok(()) }
4.路由
tsuse actix_web::web; use crate::routes::user::{delete_demo, get_demo, get_demo_uuid}; use crate::routes::{echo, post_demo, sse}; pub fn config_routes(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/api") .service( web::scope("/v1") .route("/", web::get().to(get_demo)) .route("/{uuid:.*}", web::get().to(get_demo_uuid)) .route("/", web::post().to(post_demo)) // .route("/{uuid:.*}", web::put().to(put_demo)) .route("/{uuid:.*}", web::delete().to(delete_demo)), ) .service(web::scope("/ws").route("", web::get().to(echo))) .service(web::scope("/sse").route("/stream", web::get().to(sse::sse_stream))), ); }
5. 在业务逻辑中触发事件
rustpub async fn post_demo( db_pool: web::Data<DatabaseConnection>, user_data: web::Json<RegisterResponse>, notifier: web::Data<SseNotifier>, // 注入通知器 ) -> HttpResult { // ... 数据验证和业务逻辑 match new_user.insert(db_pool.as_ref()).await { Ok(user) => { println!("User created successfully: {:?}", user); // 创建通知消息 let notification = serde_json::json!({ "event": "user_updated", "data": { "user_id": user.id, "updated_fields": { "username": &user.user_name, } } }); // 发送通知给所有连接的客户端 notifier.notify(¬ification.to_string()); } Err(e) => println!("Error creating user: {}", e), } Ok(ApiResponse::success("添加成功", "添加成功").to_http_response()) }
实现要点
- 广播模式:使用 Tokio 的广播通道实现一对多的消息分发
- 错误处理:过滤掉广播中的错误,确保流稳定性
- 连接保持:设置心跳机制防止连接超时
- 结构化数据:使用 JSON 格式传递事件数据,便于客户端解析
- 依赖注入:通过 Actix-web 的数据共享机制在整个应用中共享通知器
客户端使用示例
javascript// 前端JavaScript代码示例 const eventSource = new EventSource('http://localhost:2345/sse-stream'); eventSource.onmessage = function(event) { const data = JSON.parse(event.data); console.log('Received event:', data.event, data.data); }; eventSource.onerror = function(error) { console.error('EventSource failed:', error); };
总结
本文展示了如何在 Rust Actix-web 框架中实现 SSE 功能,包括:
- 创建全局事件广播器
- 设置 SSE 流端点
- 在业务逻辑中触发事件
- 处理连接保持和错误情况
这种实现方式简单高效,适用于需要实时通知的场景,如实时更新、消息推送、状态同步等。通过使用广播通道,我们可以轻松地向所有连接的客户端发送事件,而无需维护单独的连接列表。
SSE 是构建实时应用的轻量级解决方案,特别适合服务器向客户端推送更新的场景,相比 WebSockets 更简单且具有自动重连等内置优势。
