在 Actix-web 中使用 Server-Sent Events (SSE) 实现实时通知
介绍
Server-Sent Events (SSE) 是一种服务器向客户端推送实时更新的技术,适用于服务器到客户端的单向通信。以下是如何在 Actix-web 中实现 SSE 的示例,并在更新用户信息时主动向客户端发送通知。
服务器端实现
1. 定义 SseNotifier 结构体
rustuse tokio::sync::broadcast; #[derive(Clone)] pub struct SseNotifier { tx: broadcast::Sender<String>, } impl SseNotifier { pub fn new() -> Self { let (tx, _) = broadcast::channel(100); SseNotifier { tx } } pub fn create_channel(&self) -> broadcast::Receiver<String> { self.tx.subscribe() } pub fn notify(&self, message: &str) { let _ = self.tx.send(message.to_string()); } }
2. 实现 SSE 流处理器
rustuse actix_web::{web, Responder}; use actix_web_lab::sse::{self, Event, Sse}; use futures_util::stream::StreamExt; use std::convert::Infallible; use tokio_stream::wrappers::BroadcastStream; pub async fn sse_stream(notifier: web::Data<SseNotifier>) -> impl Responder { let rx = notifier.create_channel(); let sse_stream = BroadcastStream::new(rx) .filter_map(|msg| async move { match msg { Ok(data) => Some(sse::Event::Data(sse::Data::new(data))), Err(_) => None, } }) .map(|event| Ok::<Event, Infallible>(event)); Sse::from_stream(sse_stream).with_keep_alive(Duration::from_secs(5)) }
3. 配置 Actix-web 应用
rustuse actix_web::{web, App, HttpServer}; pub fn config_routes(config: &mut web::ServiceConfig) { config.service(web::scope("/sse").route("/stream", web::get().to(sse_stream))); } #[actix_web::main] async fn main() -> std::io::Result<()> { // 初始化日志记录器和数据库连接池(假设已有相关函数) init_logger(); let db_pool = create_db_pool().await.unwrap(); let app_data = web::Data::new(db_pool); let notifier = web::Data::new(SseNotifier::new()); let host = "127.0.0.1"; let port = 8080; let server_addr = format!("{}:{}", host, port); log::info!("当前服务成功启动,监听地址为 http://{}", server_addr); HttpServer::new(move || { let cors = actix_cors::Cors::default() .allowed_origin("http://127.0.0.1:5502") .allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"]) .allowed_headers(vec!["Content-Type", "Authorization", "ACCEPT"]) .supports_credentials() .max_age(3600); App::new() .wrap(actix_web::middleware::Logger::default()) .app_data(notifier.clone()) .app_data(app_data.clone()) .configure(config_routes) .wrap(cors) }) .bind(&server_addr)? .run() .await }
4. 在更新用户时发送 SSE 通知
rustuse serde_json::json; use chrono::Local; pub async fn update_user( db: web::Data<DatabaseConnection>, notifier: web::Data<SseNotifier>, user_id: web::Path<i32>, user_info: web::Json<UpdateUserRequest>, ) -> Result<HttpResponse, AppError> { // 更新用户的逻辑(假设有一个 `User` 模型和 `update` 方法) let updated_user = User::update(db.get_ref(), user_id.into_inner(), user_info.into_inner()) .await .map_err(|e| AppError::from(e))?; // 创建事件数据 let event_data = json!({ "event": "user_updated", "user_id": updated_user.id, "timestamp": Local::now().to_rfc3339() }); // 通过 SSE 通知客户端 notifier.notify(&event_data.to_string()); // 返回响应 Ok(HttpResponse::Ok().json(updated_user)) }
客户端实现
html<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Actix-Web SSE Example</title> </head> <body> <h1>SSE Example</h1> <ul id="messages"></ul> <script> const eventSource = new EventSource('http://127.0.0.1:8080/sse/stream'); const messagesList = document.getElementById('messages'); eventSource.onmessage = function(event) { const newElement = document.createElement('li'); newElement.textContent = event.data; messagesList.appendChild(newElement); }; eventSource.onerror = function(error) { console.error('EventSource failed:', error); }; </script> </body> </html>
运行和测试
- 启动 Actix-web 服务。
- 打开浏览器并访问客户端 HTML 页面。
- 当更新用户信息时,服务器会通过 SSE 向客户端发送通知,客户端会实时显示更新的消息。
这个示例展示了如何在 Actix-web 中实现 SSE 以进行服务器到客户端的实时通信,并在更新用户信息时主动通知客户端。通过这种方式,你可以实现诸如实时通知、状态更新等功能。
