Actix Web 框架完全指南
Actix 是 Rust 生态中最受欢迎的高性能 Web 框架之一,由 Actor 模型提供支持,以其出色的性能和优雅的 API 设计著称。本文将详细介绍 Actix 生态系统的两个核心库:Actix(Actor 框架)和 Actix-web(Web 框架)。
Actix 生态系统概述
Actix 生态系统包含多个 crate:
- actix:Actor 框架核心库
- actix-web:基于 Actix 的 Web 框架
- actix-rt:Actix 运行时
- actix-derive:派生宏支持
Actix Actor 框架基础
Actix 基于 Actor 模型,每个 Actor 是一个独立的计算单元,通过消息传递进行通信。
基本概念
Actor 的核心概念包括:
- Actor:独立的计算单元
- Message:Actor 之间传递的消息
- Handler:处理消息的逻辑
- Context:Actor 的执行上下文
第一个 Actor
use actix::{Actor, Context, System};
struct MyActor;
impl Actor for MyActor {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("I am alive!");
System::current().stop(); // 停止系统
}
}
fn main() {
let system = System::new();
let _addr = system.block_on(async { MyActor.start() });
system.run().unwrap();
}定义和处理消息
use actix::{Actor, Context, Handler, System};
use actix_derive::{Message, MessageResponse};
// 定义消息及响应类型
#[derive(MessageResponse)]
struct Added(usize);
#[derive(Message)]
#[rtype(Added)]
struct Sum(usize, usize);
// 定义 Actor
#[derive(Default)]
struct Adder;
impl Actor for Adder {
type Context = Context<Self>;
}
// 实现消息处理器
impl Handler<Sum> for Adder {
type Result = <Sum as actix::Message>::Result;
fn handle(&mut self, msg: Sum, _: &mut Self::Context) -> Added {
Added(msg.0 + msg.1)
}
}
#[actix::main] // 简化异步 main 函数
async fn main() {
let addr = Adder.start();
let res = addr.send(Sum(10, 5)).await;
match res {
Ok(result) => println!("SUM: {}", result.0),
_ => println!("通信失败"),
}
}Actor 状态管理
use actix::prelude::*;
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "()")]
struct Ping {
pub id: usize,
}
// 带状态的 Actor
struct Game {
counter: usize,
name: String,
recipient: Recipient<Ping>,
}
impl Actor for Game {
type Context = Context<Game>;
}
// 消息处理器
impl Handler<Ping> for Game {
type Result = ();
fn handle(&mut self, msg: Ping, ctx: &mut Context<Self>) {
self.counter += 1;
if self.counter > 10 {
System::current().stop();
} else {
println!("[{}] 收到 Ping {}", self.name, msg.id);
// 延迟发送消息
ctx.run_later(Duration::new(0, 100), move |act, _| {
act.recipient.do_send(Ping { id: msg.id + 1 });
});
}
}
}Actix-web Web 框架
Actix-web 是基于 Actix 的高性能 Web 框架,支持 HTTP/1.x 和 HTTP/2、WebSocket 等功能。
快速入门
use actix_web::{get, web, App, HttpServer, Responder};
#[get("/hello/{name}")]
async fn greet(name: web::Path<String>) -> impl Responder {
format!("Hello {}!", name)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.service(greet)
})
.bind(("127.0.0.1", 8080))?
.run()
.await
}路由配置
Actix-web 提供了灵活的路由系统:
use actix_web::{web, App, HttpResponse, HttpServer};
use serde::Deserialize;
// 定义查询参数结构
#[derive(Deserialize)]
struct QueryInfo {
name: Option<String>,
age: Option<u32>,
}
// 定义 JSON 请求体
#[derive(serde::Deserialize)]
struct CreateUser {
name: String,
email: String,
}
// GET 路由 - 路径参数
#[get("/users/{id}")]
async fn get_user(path: web::Path<u64>) -> HttpResponse {
let user_id = path.into_inner();
HttpResponse::Ok().json(serde_json::json!({
"id": user_id,
"name": "张三",
"email": "zhangsan@example.com"
}))
}
// GET 路由 - 查询参数
async fn search_users(query: web::Query<QueryInfo>) -> HttpResponse {
let name = query.name.as_deref().unwrap_or("all");
HttpResponse::Ok().json(serde_json::json!({
"query": name,
"age": query.age,
"results": []
}))
}
// POST 路由 - JSON 请求体
async fn create_user(body: web::Json<CreateUser>) -> HttpResponse {
HttpResponse::Created().json(serde_json::json!({
"id": 1,
"name": body.name,
"email": body.email
}))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.route("/search", web::get().to(search_users))
.service(get_user)
.service(
web::resource("/users")
.route(web::post().to(create_user))
)
})
.bind("127.0.0.1:8080")?
.run()
.await
}使用 Scope 组织路由
use actix_web::{web, App, HttpResponse, HttpServer};
async fn index() -> HttpResponse {
HttpResponse::Ok().body("Welcome to API")
}
async fn health() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({"status": "healthy"}))
}
async fn list_users() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!([
{"id": 1, "name": "张三"},
{"id": 2, "name": "李四"}
]))
}
async fn get_user(path: web::Path<u64>) -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({
"id": *path
}))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.route("/", web::get().to(index))
// API v1 版本
.service(
web::scope("/api/v1")
.route("/health", web::get().to(health))
.service(
web::scope("/users")
.route("", web::get().to(list_users))
.route("/{id}", web::get().to(get_user))
)
)
})
.bind("127.0.0.1:8080")?
.run()
.await
}状态管理
use actix_web::{web, App, HttpResponse, HttpServer};
use std::sync::Mutex;
struct AppState {
db: Mutex<Vec<String>>,
}
async fn get_items(data: web::Data<AppState>) -> HttpResponse {
let items = data.db.lock().unwrap();
HttpResponse::Ok().json(items.clone())
}
async fn add_item(
item: web::Json<String>,
data: web::Data<AppState>,
) -> HttpResponse {
let mut items = data.db.lock().unwrap();
items.push(item.into_inner());
HttpResponse::Created().json(serde_json::json!({"status": "added"}))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let app_state = web::Data::new(AppState {
db: Mutex::new(vec![]),
});
HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/items", web::get().to(get_items))
.route("/items", web::post().to(add_item))
})
.bind("127.0.0.1:8080")?
.run()
.await
}中间件
Actix-web 支持丰富的中间件功能。
Logger 中间件
use actix_web::{web, App, HttpServer, HttpResponse};
use actix_web::middleware::Logger;
use std::env;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// 设置日志格式
env::set_var("RUST_LOG", "info");
env_logger::init();
HttpServer::new(|| {
App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.route("/", web::get().to(HttpResponse::Ok))
})
.bind("127.0.0.1:8080")?
.run()
.await
}Compress 中间件
use actix_web::{web, App, HttpServer, HttpResponse};
use actix_web::middleware::Compress;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.wrap(Compress::default())
.route("/", web::get().to(|| async {
HttpResponse::Ok().body("Large response body...")
}))
})
.bind("127.0.0.1:8080")?
.run()
.await
}错误处理中间件
use actix_web::{
dev::ServiceResponse,
http::{header, StatusCode},
middleware::{ErrorHandlerResponse, ErrorHandlers},
web, App, HttpResponse, HttpServer, Result,
};
fn add_error_header<B>(
mut res: ServiceResponse<B>
) -> Result<ErrorHandlerResponse<B>> {
res.response_mut().headers_mut().insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
);
Ok(ErrorHandlerResponse::Response(
res.map_into_left_body()
))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.wrap(
ErrorHandlers::new()
.handler(StatusCode::NOT_FOUND, add_error_header)
.handler(StatusCode::INTERNAL_SERVER_ERROR, add_error_header)
)
.route("/", web::get().to(HttpResponse::Ok))
})
.bind("127.0.0.1:8080")?
.run()
.await
}自定义中间件
use actix_web::{
dev::{Service, ServiceRequest, ServiceResponse, Transform},
Error, HttpResponse,
};
use futures_util::future::LocalBoxFuture;
use std::future::{ready, Ready};
use std::rc::Rc;
pub struct SayHi;
impl<S, B> Transform<S, ServiceRequest> for SayHi
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type InitError = ();
type Transform = SayHiMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(SayHiMiddleware { service }))
}
}
pub struct SayHiMiddleware<S> {
service: S,
}
impl<S, B> Service<ServiceRequest> for SayHiMiddleware<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&self,
cx: &mut std::task::Context<'_>
) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
println!("Hi from start. You requested: {}", req.path());
let fut = self.service.call(req);
Box::pin(async move {
let res = fut.await?;
println!("Hi from response");
Ok(res)
})
}
}错误处理
自定义错误类型
use actix_web::{
error, http::StatusCode, App, HttpResponse, HttpServer, ResponseError,
};
use serde::Serialize;
use std::fmt;
#[derive(Debug)]
enum MyError {
NotFound(String),
BadRequest(String),
InternalError(String),
}
#[derive(Serialize)]
struct ErrorResponse {
error: String,
message: String,
}
impl fmt::Display for MyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MyError::NotFound(msg) => write!(f, "Not Found: {}", msg),
MyError::BadRequest(msg) => write!(f, "Bad Request: {}", msg),
MyError::InternalError(msg) => write!(f, "Internal Error: {}", msg),
}
}
}
impl ResponseError for MyError {
fn status_code(&self) -> StatusCode {
match self {
MyError::NotFound(_) => StatusCode::NOT_FOUND,
MyError::BadRequest(_) => StatusCode::BAD_REQUEST,
MyError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
fn error_response(&self) -> HttpResponse {
let status = self.status_code();
HttpResponse::build(status).json(ErrorResponse {
error: status.to_string(),
message: self.to_string(),
})
}
}
async fn find_user(id: web::Path<u64>) -> Result<HttpResponse, MyError> {
if *id == 0 {
Err(MyError::NotFound(format!("User {} not found", id)))
} else if *id > 1000 {
Err(MyError::BadRequest("Invalid user ID range".to_string()))
} else {
Ok(HttpResponse::Ok().json(serde_json::json!({
"id": *id,
"name": "张三"
})))
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new().route("/users/{id}", web::get().to(find_user))
})
.bind("127.0.0.1:8080")?
.run()
.await
}使用 anyhow 进行错误处理
use actix_web::{web, App, HttpResponse, HttpServer, Result};
use anyhow::Context;
fn read_config() -> anyhow::Result<String> {
std::fs::read_to_string("config.json")
.context("无法读取配置文件")
}
async fn handler() -> Result<HttpResponse> {
let config = read_config()
.map_err(|e| actix_web::error::InternalError::from_response(
e,
HttpResponse::InternalServerError().body(e.to_string()),
))?;
Ok(HttpResponse::Ok().json(serde_json::json!({
"config": config
})))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new().route("/", web::get().to(handler))
})
.bind("127.0.0.1:8080")?
.run()
.await
}数据库集成
使用 sqlx
use actix_web::{web, App, HttpResponse, HttpServer};
use sqlx::{mysql::MySqlPoolOptions, MySql, Pool};
use serde::{Deserialize, Serialize};
#[derive(Serialize, sqlx::FromRow)]
struct User {
id: i64,
name: String,
email: String,
}
#[derive(Deserialize)]
struct CreateUser {
name: String,
email: String,
}
struct AppState {
db: Pool<MySql>,
}
async fn get_users(data: web::Data<AppState>) -> Result<HttpResponse, sqlx::Error> {
let users: Vec<User> = sqlx::query_as("SELECT id, name, email FROM users")
.fetch_all(&data.db)
.await?;
Ok(HttpResponse::Ok().json(users))
}
async fn create_user(
body: web::Json<CreateUser>,
data: web::Data<AppState>,
) -> Result<HttpResponse, sqlx::Error> {
let result = sqlx::query(
"INSERT INTO users (name, email) VALUES (?, ?)"
)
.bind(&body.name)
.bind(&body.email)
.execute(&data.db)
.await?;
Ok(HttpResponse::Created().json(serde_json::json!({
"id": result.last_insert_id()
})))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let pool = MySqlPoolOptions::new()
.max_connections(5)
.connect("mysql://user:password@localhost/database")
.await
.expect("无法连接数据库");
let app_state = web::Data::new(AppState { db: pool });
HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/users", web::get().to(get_users))
.route("/users", web::post().to(create_user))
})
.bind("127.0.0.1:8080")?
.run()
.await
}WebSocket 支持
use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
const HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
const CLIENT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
struct MyWs;
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
}
}
impl MyWs {
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
ctx.ping(b"ping");
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
println!("WebSocket Client heartbeat failed, disconnecting!");
ctx.stop();
}
});
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
println!("Received: {}", text);
ctx.text(format!("Echo: {}", text))
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => ctx.close(reason),
_ => ctx.stop(),
}
}
}
async fn ws_route(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let ws = MyWs { hb: Instant::now() };
ws::start(ws, &req, stream)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new().route("/ws", web::get().to(ws_route))
})
.bind("127.0.0.1:8080")?
.run()
.await
}测试
单元测试
use actix_web::{test, web, App};
#[cfg(test)]
mod tests {
use super::*;
#[actix_web::test]
async fn test_index() {
let app = test::init_service(
App::new().route("/", web::get().to(|| async {
actix_web::HttpResponse::Ok().body("Hello")
}))
).await;
let req = test::TestRequest::get().uri("/").to_request();
let resp = test::call_service(&app, req).await;
assert!(resp.status().is_success());
}
#[actix_web::test]
async fn test_json_response() {
let app = test::init_service(
App::new().route("/user", web::get().to(|| async {
actix_web::web::Json(serde_json::json!({
"id": 1,
"name": "张三"
}))
}))
).await;
let req = test::TestRequest::get().uri("/user").to_request();
let resp = test::call_service(&app, req).await;
assert!(resp.status().is_success());
}
}集成测试
#[cfg(test)]
mod tests {
use actix_web::{test, web, App, HttpResponse};
async fn health() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({
"status": "healthy"
}))
}
#[actix_web::test]
async fn test_health_endpoint() {
let app = test::init_service(
App::new().route("/health", web::get().to(health))
).await;
let req = test::TestRequest::get()
.uri("/health")
.to_request();
let resp = test::call_service(&app, req).await;
assert!(resp.status().is_success());
let body: serde_json::Value = test::read_body_json(resp).await;
assert_eq!(body["status"], "healthy");
}
}最佳实践
1. 使用 #[get]、#[post] 等属性宏
use actix_web::{get, post, web, HttpResponse, Responder};
// 推荐使用宏定义路由
#[get("/users/{id}")]
async fn get_user(path: web::Path<u64>) -> impl Responder {
HttpResponse::Ok().json(serde_json::json!({
"id": *path
}))
}
#[post("/users")]
async fn create_user(body: web::Json<serde_json::Value>) -> impl Responder {
HttpResponse::Created().json(serde_json::json!({
"status": "created"
}))
}2. 合理使用状态
use std::sync::Arc;
use tokio::sync::Mutex;
// 推荐使用 Arc<Mutex<T>> 或 Arc<RwLock<T>>
struct AppState {
counter: Arc<Mutex<i32>>,
}
async fn increment(data: web::Data<Arc<Mutex<i32>>>) -> HttpResponse {
let mut counter = data.lock().await;
*counter += 1;
HttpResponse::Ok().json(serde_json::json!({
"counter": *counter
}))
}3. 配置管理
use serde::Deserialize;
#[derive(Deserialize, Clone)]
struct Config {
host: String,
port: u16,
database_url: String,
}
impl Config {
fn from_env() -> Result<Self, env_logger::Err> {
Ok(Config {
host: std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()),
port: std::env::var("PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse()
.expect("PORT must be a number"),
database_url: std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set"),
})
}
}总结
Actix 生态系统为 Rust 开发者提供了强大的工具集:
- Actix:Actor 模型实现,适合构建高并发系统
- Actix-web:高性能 Web 框架,支持完整的 HTTP 功能
- 丰富的中间件:日志、压缩、认证、CORS 等
- 优秀的错误处理:自定义错误类型和错误中间件
- WebSocket 支持:实时双向通信
- 完善的测试支持:单元测试和集成测试
Actix 以其极致的性能和优雅的 API 设计,成为 Rust Web 开发的首选框架之一。无论是构建微服务、API 后端还是实时应用,Actix 都能提供出色的支持。
快乐编程,大家来 Rust! 🦀