Actix Actor 模型:Rust 并发编程思想
在现代软件开发中,高并发处理是一个核心挑战。传统的并发模型往往依赖共享可变状态,这导致复杂的锁管理和潜在的死锁问题。Actix 框架通过 Actor 模型为 Rust 提供了一种优雅的解决方案,将消息传递作为并发通信的核心机制,从而实现内存安全且高效的系统设计。
Actor 模型概述
什么是 Actor 模型?
Actor 模型是一种并发计算模型,其中每个 Actor 是独立的计算单元:
- 隔离性:每个 Actor 拥有自己的私有状态
- 消息驱动:Actor 之间通过异步消息传递进行通信
- 并发处理:每个 Actor 可以独立处理消息,无需锁机制
为什么选择 Actor 模型?
传统的共享状态并发模型存在以下问题:
- 数据竞争:多个线程同时修改共享数据
- 死锁:不当的锁顺序导致系统僵死
- 复杂性:锁管理增加了代码复杂度
Actor 模型通过以下方式解决这些问题:
- 无共享可变状态:Actor 之间不共享数据
- 消息传递通信:通过不可变消息进行交互
- 清晰的边界:每个 Actor 有明确的职责范围
Actix Actor 核心概念
Actor 生命周期
use actix::{Actor, Context, System};
struct MyActor;
impl Actor for MyActor {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("Actor 已启动");
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("Actor 正在停止");
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("Actor 已停止");
}
}
fn main() {
let system = System::new();
system.block_on(async {
MyActor.start();
});
system.run().unwrap();
}Actor 状态
use actix::{Actor, Context};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct CounterActor {
count: Arc<AtomicUsize>,
}
impl CounterActor {
fn new() -> Self {
CounterActor {
count: Arc::new(AtomicUsize::new(0)),
}
}
fn get_count(&self) -> usize {
self.count.load(Ordering::SeqCst)
}
fn increment(&self) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
impl Actor for CounterActor {
type Context = Context<Self>;
}消息传递
定义消息
在 Actix 中,任何类型都可以作为消息:
use actix::prelude::*;
// 简单的无响应消息
#[derive(Message)]
#[rtype(result = "()")]
struct Increment;
#[derive(Message)]
#[rtype(result = "usize")]
struct GetCount;
// 带数据的请求消息
#[derive(Message)]
#[rtype(result = "String")]
struct Greet(String);实现消息处理器
use actix::{Actor, Context, Handler, System, Running};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct Counter {
count: Arc<AtomicUsize>,
}
impl Counter {
fn new() -> Self {
Counter {
count: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Actor for Counter {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("Counter Actor 已启动,初始值: {}", self.count.load(Ordering::SeqCst));
}
}
impl Handler<Increment> for Counter {
type Result = ();
fn handle(&mut self, _msg: Increment, _ctx: &mut Self::Context) {
self.count.fetch_add(1, Ordering::SeqCst);
println!("计数器增加,当前值: {}", self.count.load(Ordering::SeqCst));
}
}
impl Handler<GetCount> for Counter {
type Result = usize;
fn handle(&mut self, _msg: GetCount, _ctx: &mut Self::Context) -> usize {
self.count.load(Ordering::SeqCst)
}
}发送消息
#[actix::main]
async fn main() {
let counter = Counter::new().start();
// 发送不需要响应的消息
counter.do_send(Increment);
// 发送需要响应的消息并等待结果
let count: usize = counter.send(GetCount).await.unwrap();
println!("当前计数: {}", count);
// 多次增加并验证
for _ in 0..5 {
counter.do_send(Increment);
}
let final_count: usize = counter.send(GetCount).await.unwrap();
println!("最终计数: {}", final_count);
}异步消息处理
处理长时间运行的任务
use actix::{Actor, AsyncContext, Context, Handler, Message};
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "()")]
struct ProcessData {
data: String,
}
struct DataProcessor;
impl Actor for DataProcessor {
type Context = Context<Self>;
}
impl Handler<ProcessData> for DataProcessor {
type Result = ();
fn handle(&mut self, msg: ProcessData, ctx: &mut Self::Context) {
let addr = ctx.address();
// 异步处理,不阻塞 Actor
actix::spawn(async move {
println!("开始处理: {}", msg.data);
// 模拟长时间运行的任务
tokio::time::sleep(Duration::from_secs(2)).await;
println!("处理完成: {}", msg.data);
});
}
}定时消息
use actix::{Actor, AsyncContext, Context, Handler};
use std::time::Duration;
struct TimerActor;
impl Actor for TimerActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("定时器 Actor 启动");
// 设置定时器,每 5 秒执行一次
ctx.run_interval(Duration::from_secs(5), |_act, ctx| {
println!("定时器触发!");
// 可以在这里停止定时器
// ctx.stop();
});
}
}Actor 间通信
双向通信
use actix::{Actor, Context, Handler, Message, System};
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "String")]
struct Calculate {
a: i32,
b: i32,
}
#[derive(Message)]
#[rtype(result = "String")]
struct GetStatus;
struct Calculator;
impl Calculator {
fn add(a: i32, b: i32) -> String {
format!("{} + {} = {}", a, b, a + b)
}
fn multiply(a: i32, b: i32) -> String {
format!("{} * {} = {}", a, b, a * b)
}
}
impl Actor for Calculator {
type Context = Context<Self>;
}
impl Handler<Calculate> for Calculator {
type Result = String;
fn handle(&mut self, msg: Calculate, _ctx: &mut Self::Context) -> String {
Calculator::add(msg.a, msg.b)
}
}
impl Handler<GetStatus> for Calculator {
type Result = String;
fn handle(&mut self, _msg: GetStatus, _ctx: &mut Self::Context) -> String {
"Calculator is ready".to_string()
}
}
#[actix::main]
async fn main() {
let calc = Calculator.start();
// 发送计算请求
let result: String = calc.send(Calculate { a: 10, b: 20 }).await.unwrap();
println!("计算结果: {}", result);
// 查询状态
let status: String = calc.send(GetStatus).await.unwrap();
println!("状态: {}", status);
}Actor 协调器
use actix::{Actor, Context, Handler, Message, Recipient, System};
use std::collections::HashMap;
#[derive(Message)]
#[rtype(result = "()")]
struct RegisterWorker {
id: String,
addr: Recipient<WorkerMessage>,
}
#[derive(Message)]
#[rtype(result = "()")]
struct UnregisterWorker {
id: String,
}
#[derive(Message)]
#[rtype(result = "()")]
struct DistributeWork {
work: String,
}
#[derive(Message)]
#[rtype(result = "()")]
struct WorkerMessage {
task: String,
}
struct Coordinator {
workers: HashMap<String, Recipient<WorkerMessage>>,
round_robin: usize,
}
impl Coordinator {
fn new() -> Self {
Coordinator {
workers: HashMap::new(),
round_robin: 0,
}
}
}
impl Actor for Coordinator {
type Context = Context<Self>;
}
impl Handler<RegisterWorker> for Coordinator {
type Result = ();
fn handle(&mut self, msg: RegisterWorker, _ctx: &mut Self::Context) {
println!("注册 worker: {}", msg.id);
self.workers.insert(msg.id, msg.addr);
}
}
impl Handler<UnregisterWorker> for Coordinator {
type Result = ();
fn handle(&mut self, msg: UnregisterWorker, _ctx: &mut Self::Context) {
println!("注销 worker: {}", msg.id);
self.workers.remove(&msg.id);
}
}
impl Handler<DistributeWork> for Coordinator {
type Result = ();
fn handle(&mut self, msg: DistributeWork, _ctx: &mut Self::Context) {
if self.workers.is_empty() {
println!("没有可用的 worker");
return;
}
let worker_ids: Vec<_> = self.workers.keys().collect();
let target_id = worker_ids[self.round_robin % worker_ids.len()];
if let Some(worker) = self.workers.get(target_id) {
let _ = worker.try_send(WorkerMessage { task: msg.work });
println!("工作分配给 worker: {}", target_id);
}
self.round_robin += 1;
}
}Actor 监督策略
错误处理和重启
use actix::{Actor, Context, Failed, System};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Message)]
#[rtype(result = "Result<usize, String>")]
struct RiskyOperation;
struct ResilientActor {
restart_count: Arc<AtomicUsize>,
}
impl ResilientActor {
fn new() -> Self {
ResilientActor {
restart_count: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Actor for ResilientActor {
type Context = Context<Self>;
fn restarting(&mut self, _ctx: &mut Self::Context) {
let count = self.restart_count.fetch_add(1, Ordering::SeqCst) + 1;
println!("Actor 重启次数: {}", count);
}
}
impl Handler<RiskyOperation> for ResilientActor {
type Result = Result<usize, String>;
fn handle(&mut self, _msg: RiskyOperation, _ctx: &mut Self::Context) -> Self::Result {
// 模拟可能失败的操作
let should_fail = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() % 3 == 0;
if should_fail {
Err("Operation failed".to_string())
} else {
Ok(42)
}
}
}层级监督
use actix::{Actor, ActorContext, Context, Handler, Supervised, System};
use std::sync::Arc;
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "()")]
struct ProcessJob(String);
struct Worker {
id: u32,
supervisor: actix::Addr<Supervisor>,
}
impl Worker {
fn new(id: u32, supervisor: actix::Addr<Supervisor>) -> Self {
Worker { id, supervisor }
}
}
impl Actor for Worker {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("Worker {} 已启动", self.id);
}
}
impl Supervised for Worker {
fn restarting(&mut self, _ctx: &mut Self::Context) {
println!("Worker {} 正在重启", self.id);
// 可以在这里尝试恢复状态
}
}
impl Handler<ProcessJob> for Worker {
type Result = ();
fn handle(&mut self, msg: ProcessJob, ctx: &mut Self::Context) {
println!("Worker {} 处理任务: {}", self.id, msg.0);
// 模拟失败
if msg.0.contains("fail") {
println!("Worker {} 失败了", self.id);
ctx.stop();
}
}
}
struct Supervisor;
impl Supervisor {
fn spawn_worker(&self, id: u32, ctx: &mut Context<Supervisor>) -> actix::Addr<Worker> {
let addr = ctx.address();
Worker::new(id, addr).start()
}
}
impl Actor for Supervisor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("Supervisor 已启动");
// 启动一些 worker
for i in 1..=3 {
self.spawn_worker(i, ctx);
}
}
}实用模式
模式一:共享状态 Actor
use actix::{Actor, Context, Handler, Message};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Message)]
#[rtype(result = "Option<String>")]
struct Get(String);
#[derive(Message)]
#[rtype(result = "()")]
struct Put(String, String);
struct Cache {
data: Arc<RwLock<HashMap<String, String>>>,
}
impl Cache {
fn new() -> Self {
Cache {
data: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl Actor for Cache {
type Context = Context<Self>;
}
impl Handler<Get> for Cache {
type Result = Option<String>;
fn handle(&mut self, msg: Get, _ctx: &mut Self::Context) -> Self::Result {
self.data.read().unwrap().get(&msg.0).cloned()
}
}
impl Handler<Put> for Cache {
type Result = ();
fn handle(&mut self, msg: Put, _ctx: &mut Self::Context) -> Self::Result {
self.data.write().unwrap().insert(msg.0, msg.1);
}
}模式二:事件总线
use actix::{Actor, Context, Handler, Message, Recipient};
use std::collections::HashMap;
#[derive(Message)]
#[rtype(result = "()")]
struct Subscribe {
event_type: String,
recipient: Recipient<Event>,
}
#[derive(Message)]
#[rtype(result = "()")]
struct Unsubscribe {
event_type: String,
recipient: Recipient<Event>,
}
#[derive(Message, Clone)]
#[rtype(result = "()")]
struct Event {
event_type: String,
payload: String,
}
struct EventBus {
subscribers: HashMap<String, Vec<Recipient<Event>>>,
}
impl EventBus {
fn new() -> Self {
EventBus {
subscribers: HashMap::new(),
}
}
}
impl Actor for EventBus {
type Context = Context<Self>;
}
impl Handler<Subscribe> for EventBus {
type Result = ();
fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) {
self.subscribers
.entry(msg.event_type.clone())
.or_insert_with(Vec::new)
.push(msg.recipient);
}
}
impl Handler<Unsubscribe> for EventBus {
type Result = ();
fn handle(&mut self, msg: Unsubscribe, _ctx: &mut Self::Context) {
if let Some(subscribers) = self.subscribers.get_mut(&msg.event_type) {
subscribers.retain(|r| r != &msg.recipient);
}
}
}
impl Handler<Event> for EventBus {
type Result = ();
fn handle(&mut self, msg: Event, _ctx: &mut Self::Context) {
if let Some(subscribers) = self.subscribers.get(&msg.event_type) {
for subscriber in subscribers {
let _ = subscriber.try_send(msg.clone());
}
}
}
}模式三:请求-响应 Actor
use actix::{Actor, Context, Handler, Message};
use std::time::{Duration, Instant};
#[derive(Message)]
#[rtype(result = "Response")]
struct Request {
id: u64,
data: String,
}
#[derive(Message, Clone)]
#[rtype(result = "()")]
struct Response {
request_id: u64,
result: String,
latency_ms: u64,
}
struct ApiGateway {
request_counter: u64,
}
impl ApiGateway {
fn new() -> Self {
ApiGateway { request_counter: 0 }
}
}
impl Actor for ApiGateway {
type Context = Context<Self>;
}
impl Handler<Request> for ApiGateway {
type Result = Response;
fn handle(&mut self, msg: Request, _ctx: &mut Self::Context) -> Self::Result {
let start = Instant::now();
self.request_counter += 1;
// 模拟处理
let result = format!("Processed: {}", msg.data);
Response {
request_id: msg.id,
result,
latency_ms: start.elapsed().as_millis() as u64,
}
}
}与 Web 开发结合
在 Actix-web 中使用 Actor
use actix::{Actor, System};
use actix_web::{web, App, HttpResponse, HttpServer};
struct AppState {
counter: actix::Addr<Counter>,
}
// 创建一个可以被 web 使用的 Actor
struct Counter;
impl Actor for Counter {
type Context = actix::Context<Self>;
}
impl Handler<Increment> for Counter {
type Result = usize;
fn handle(&mut self, _: Increment, _: &mut Self::Context) -> usize {
static COUNT: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
}
#[derive(actix::Message)]
#[rtype(result = "usize")]
struct Increment;
async fn get_count(state: web::Data<AppState>) -> HttpResponse {
let count = state.counter.send(Increment).await.unwrap();
HttpResponse::Ok().json(serde_json::json!({ "count": count }))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let counter = Counter.start();
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(AppState {
counter: counter.clone(),
}))
.route("/count", web::get().to(get_count))
})
.bind("127.0.0.1:8080")?
.run()
.await
}WebSocket 与 Actor
use actix::{Actor, ActorContext, StreamHandler};
use actix_web::{web, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use std::time::{Duration, Instant};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(30);
struct MyWs {
hb: Instant,
}
impl MyWs {
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
println!("WebSocket 心跳超时,断开连接");
ctx.stop();
return;
}
ctx.ping(b"");
});
}
}
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
println!("WebSocket 连接已建立");
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => {
println!("收到消息: {}", text);
ctx.text(format!("Echo: {}", text));
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => ctx.close(reason),
_ => ctx.stop(),
}
}
}
async fn ws_route(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let ws = MyWs { hb: Instant::now() };
ws::start(ws, &req, stream)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
actix_web::App::new().route("/ws", web::get().to(ws_route))
})
.bind("127.0.0.1:8080")?
.run()
.await
}最佳实践
1. 消息设计原则
// 好:清晰、单一职责的消息
#[derive(Message)]
#[rtype(result = "Result<User, UserError>")]
struct GetUser { id: Uuid }
#[derive(Message)]
#[rtype(result = "()")]
struct CreateUser { user: NewUser }
// 不好:消息承担太多职责
#[derive(Message)]
#[rtype(result = "Result<ManyThings, ManyErrors>")]
struct DoEverything {
// 太多不同的操作
}2. 避免阻塞操作
// 好:在 Actor 内异步处理
impl Handler<LongTask> for MyActor {
type Result = ();
fn handle(&mut self, msg: LongTask, ctx: &mut Self::Context) {
let addr = ctx.address();
actix::spawn(async move {
// 异步处理
process_long_task(msg.data).await;
});
}
}
// 不好:在 Actor 内阻塞
impl Handler<LongTask> for MyActor {
type Result = ();
fn handle(&mut self, msg: LongTask, _ctx: &mut Self::Context) {
// 这会阻塞 Actor
std::thread::sleep(Duration::from_secs(10));
}
}3. 合理使用消息类型
// 用于需要响应的操作
#[derive(Message)]
#[rtype(result = "Data")]
struct GetData;
// 用于 fire-and-forget 场景
#[derive(Message)]
#[rtype(result = "()")]
struct LogEvent;
// 使用 do_send 发送不需要等待响应的消息
actor.do_send(LogEvent { /* ... */ });
// 使用 send 发送需要等待响应的消息
let result: Result<Data, MailboxError> = actor.send(GetData).await;4. Actor 数量控制
// 避免创建过多 Actor
struct ActorPool {
workers: Vec<actix::Addr<Worker>>,
}
impl ActorPool {
fn new(size: usize) -> Self {
let workers: Vec<_> = (0..size)
.map(|_| Worker.start())
.collect();
ActorPool { workers }
}
fn dispatch(&self, job: Job) -> actix::Addr<Worker> {
// 简单的负载均衡
let idx = rand::random::<usize>() % self.workers.len();
self.workers[idx].clone()
}
}总结
Actix 的 Actor 模型为 Rust 带来了优雅的并发编程范式:
核心优势:
- 内存安全:通过消息传递避免数据竞争
- 清晰的边界:每个 Actor 有独立的生命周期和状态
- 可组合性:Actor 可以构建复杂的系统
- 容错性:支持监督和重启策略
适用场景:
- 聊天服务器和实时通信系统
- 后台任务处理和消息队列
- 游戏服务器状态管理
- 分布式系统协调服务
关键设计原则:
- 优先使用消息而非共享状态
- 保持 Actor 职责单一
- 避免在 Actor 内执行阻塞操作
- 合理设计消息类型和监督策略
通过掌握 Actor 模型,你可以构建出既安全又高效的并发系统,充分利用 Rust 的类型系统和内存安全特性。
快乐编程,大家来 Rust! 🦀