default·
使用 Actix-web 和 SSE 实现 Rust 后端实时事件推送
本文介绍了如何在Rust的Actix-web框架中实现Server-Sent Events(SSE)实时事件推送功能。通过使用Tokio的广播通道构建全局事件广播器(SseNotifier),创建SSE流端点将广播流转换为SSE兼容格式,并设置5秒心跳保持连接。文章详细展示了核心实现代码,包括事件广播器创建、SSE端点配置、主程序集成以及在业务逻辑中触发事件的方法。该方案采用广播模式实现一对多消息分发,支持JSON格式结构化数据,适用于需要实时通知的应用场景,相比WebSockets更简单且具有自动重连优势
前端rust开发语言
使用 Actix-web 和 SSE 实现 Rust 后端实时事件推送
在现代 Web 应用中,实时通信变得越来越重要。Server-Sent Events (SSE) 提供了一种简单有效的服务器到客户端单向通信机制。本文将介绍如何在 Rust Actix-web 框架中实现 SSE 功能,实现实时事件推送。
概述
SSE 允许服务器通过 HTTP 连接向客户端推送更新,相比 WebSockets,它更简单且自带重连机制。我们的实现将包含以下核心组件:
- 全局事件广播器 (
SseNotifier) - SSE 流端点
- 业务逻辑中的事件触发机制
核心实现
1. 事件广播器
use tokio::sync::broadcast;
#[derive(Clone)]
pub struct SseNotifier {
tx: broadcast::Sender<String>,
}
impl SseNotifier {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(100);
SseNotifier { tx }
}
// 创建新的事件通道
pub fn create_channel(&self) -> broadcast::Receiver<String> {
self.tx.subscribe()
}
// 发送消息
pub fn notify(&self, msg: &str) {
let _ = self.tx.send(msg.to_string());
}
}
这个结构体使用 Tokio 的广播通道,允许多个消费者订阅同一事件流。
2. SSE 流端点
use crate::utils::sse::SseNotifier;
use actix_web::{Responder, web};
use actix_web_lab::sse::{self, Event, Sse};
use futures_util::stream::StreamExt;
use std::{convert::Infallible, time::Duration};
use tokio_stream::wrappers::BroadcastStream;
pub async fn sse_stream(notifier: web::Data<SseNotifier>) -> impl Responder {
// 订阅全局事件通道
let rx = notifier.create_channel();
// 创建兼容的事件流
let sse_stream = BroadcastStream::new(rx)
.filter_map(|msg| async move {
match msg {
Ok(data) => Some(sse::Event::Data(sse::Data::new(data))),
Err(_) => None,
}
})
// 将错误转换为 Infallible
.map(|event| Ok::<Event, Infallible>(event));
// 创建SSE响应
Sse::from_stream(sse_stream).with_keep_alive(Duration::from_secs(5))
}
这个端点将广播流转换为 SSE 兼容格式,并设置了 5 秒的心跳间隔以保持连接活跃。
3. 在主函数中集成
async fn main() -> Result<()> {
let db = create_db_pool()
.await
.context("Failed to connect to database")?;
// 将db添加到应用数据中
let db_pool = web::Data::new(db);
// 添加SSE通知器
let notifier = web::Data::new(SseNotifier::new());
let _ = HttpServer::new(move || {
let cors = Cors::default()
.allowed_origin("http://127.0.0.1:5502")
.allowed_methods(vec!["GET", "POST", "PUT", "DELETE", "OPTIONS"])
.allowed_headers(vec!["Content-Type", "Authorization", "ACCEPT"])
.supports_credentials()
.max_age(3600);
App::new()
.app_data(db_pool.clone())
.app_data(notifier.clone()) // 注册SSE通知器
.configure(config_routes)
.wrap(cors)
})
.bind(("0.0.0.0", 2345))?
.run()
.await;
Ok(())
}
4.路由
use actix_web::web;
use crate::routes::user::{delete_demo, get_demo, get_demo_uuid};
use crate::routes::{echo, post_demo, sse};
pub fn config_routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/api")
.service(
web::scope("/v1")
.route("/", web::get().to(get_demo))
.route("/{uuid:.*}", web::get().to(get_demo_uuid))
.route("/", web::post().to(post_demo))
// .route("/{uuid:.*}", web::put().to(put_demo))
.route("/{uuid:.*}", web::delete().to(delete_demo)),
)
.service(web::scope("/ws").route("", web::get().to(echo)))
.service(web::scope("/sse").route("/stream", web::get().to(sse::sse_stream))),
);
}
5. 在业务逻辑中触发事件
pub async fn post_demo(
db_pool: web::Data<DatabaseConnection>,
user_data: web::Json<RegisterResponse>,
notifier: web::Data<SseNotifier>, // 注入通知器
) -> HttpResult {
// ... 数据验证和业务逻辑
match new_user.insert(db_pool.as_ref()).await {
Ok(user) => {
println!("User created successfully: {:?}", user);
// 创建通知消息
let notification = serde_json::json!({
"event": "user_updated",
"data": {
"user_id": user.id,
"updated_fields": {
"username": &user.user_name,
}
}
});
// 发送通知给所有连接的客户端
notifier.notify(¬ification.to_string());
}
Err(e) => println!("Error creating user: {}", e),
}
Ok(ApiResponse::success("添加成功", "添加成功").to_http_response())
}
实现要点
- 广播模式:使用 Tokio 的广播通道实现一对多的消息分发
- 错误处理:过滤掉广播中的错误,确保流稳定性
- 连接保持:设置心跳机制防止连接超时
- 结构化数据:使用 JSON 格式传递事件数据,便于客户端解析
- 依赖注入:通过 Actix-web 的数据共享机制在整个应用中共享通知器
客户端使用示例
// 前端JavaScript代码示例
const eventSource = new EventSource('http://localhost:2345/sse-stream');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received event:', data.event, data.data);
};
eventSource.onerror = function(error) {
console.error('EventSource failed:', error);
};
总结
本文展示了如何在 Rust Actix-web 框架中实现 SSE 功能,包括:
- 创建全局事件广播器
- 设置 SSE 流端点
- 在业务逻辑中触发事件
- 处理连接保持和错误情况
这种实现方式简单高效,适用于需要实时通知的场景,如实时更新、消息推送、状态同步等。通过使用广播通道,我们可以轻松地向所有连接的客户端发送事件,而无需维护单独的连接列表。
SSE 是构建实时应用的轻量级解决方案,特别适合服务器向客户端推送更新的场景,相比 WebSockets 更简单且具有自动重连等内置优势。