Tokio 是 Rust 生态系统中最重要的异步运行时,它为 Rust 提供了高效、可靠的异步编程能力。几乎所有现代 Rust Web 框架(Axum、Actix-web、Rocket)都建立在 Tokio 之上。理解 Tokio 是掌握现代 Rust 异步编程的关键。

Tokio 概述

什么是 Tokio?

Tokio 是一个用于构建可靠、无锁、高性能异步应用程序的框架。它提供了:

  • 异步运行时:多线程和单线程运行时
  • 异步 I/O:TCP/UDP/Unix Socket
  • 任务调度:futures 和 async/await
  • 同步原语:Mutex、RwLock、Semaphore 等
  • 时间管理:定时器和延迟

Tokio 的优势

[dependencies]
tokio = { version = "1", features = ["full"] }
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    println!("Tokio: 快速、可靠、内存安全");
    sleep(Duration::from_millis(100)).await;
    println!("完成!");
}

为什么选择 Tokio:

  • 速度快:基于 Rust,性能卓越
  • 可靠:经过生产环境验证
  • 零成本抽象:async/await 不会增加额外开销
  • 活跃生态:庞大的社区和库支持

核心概念:Future 和 async/await

Future 基础

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// Future 是一个可以在未来某个时刻产生值的计算
struct MyFuture {
    ready: bool,
}

impl Future for MyFuture {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.ready {
            Poll::Ready("Future completed!".to_string())
        } else {
            self.ready = true;
            // 模拟异步操作
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

async/await 语法

#[tokio::main]
async fn main() {
    // async fn 返回一个 Future
    let result = fetch_data().await;
    println!("Received: {}", result);
}

async fn fetch_data() -> String {
    // 模拟异步操作
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    "Data fetched!".to_string()
}

Future 执行流程

#[tokio::main]
async fn main() {
    println!("1. 开始执行");
    
    // 创建 Future(此时不执行)
    let future = async {
        println!("3. Future 内部开始");
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        println!("5. Future 完成");
        42
    };

    println!("2. 等待 Future");
    let result = future.await;
    println!("4. 结果: {}", result);
}

运行时配置

单线程运行时

use tokio::runtime::Runtime;

fn main() {
    // 创建单线程运行时
    let rt = Runtime::new().unwrap();
    
    rt.block_on(async {
        println!("单线程运行时中执行");
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    });
}

多线程运行时

use tokio::runtime::Runtime;

fn main() {
    // 创建多线程运行时(默认)
    let rt = tokio::runtime::Runtime::new().unwrap();
    
    rt.block_on(async {
        // 可以使用 tokio::spawn 在多线程中调度任务
        let handle = tokio::spawn(async {
            "在 worker 线程中执行"
        });
        
        let result = handle.await.unwrap();
        println!("{}", result);
    });
}

自定义运行时

use tokio::runtime::{Builder, Runtime};

fn create_runtime() -> Runtime {
    Builder::new_multi_thread()
        .worker_threads(8)                    // 8 个工作线程
        .max_blocking_threads(256)             // 最多 256 个阻塞线程
        .thread_name("my-worker")              // 线程名称前缀
        .thread_stack_size(3 * 1024 * 1024)   // 3MB 栈大小
        .enable_all()                          // 启用所有功能
        .build()
        .unwrap()
}

运行时特征

use tokio::runtime::Builder;

fn main() {
    // 单线程运行时 - 适合需要严格顺序执行的场景
    let single = Builder::new_current_thread()
        .build()
        .unwrap();

    // 多线程运行时 - 适合大多数场景
    let multi = Builder::new_multi_thread()
        .worker_threads(4)
        .build()
        .unwrap();

    // 自动选择(单线程如果没有启用 rt-multi-thread 特性)
    let auto = Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();
}

任务 (Tasks)

tokio::spawn

#[tokio::main]
async fn main() {
    // Spawn 一个独立的任务
    let handle = tokio::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        "任务完成"
    });

    // 在主任务中继续执行
    println!("主任务继续执行...");

    // 等待并获取结果
    match handle.await {
        Ok(result) => println!("结果: {}", result),
        Err(e) => println!("任务 panic: {}", e),
    }
}

spawn多个任务

#[tokio::main]
async fn main() {
    let handles: Vec<_> = (0..10)
        .map(|i| {
            tokio::spawn(async move {
                tokio::time::sleep(tokio::time::Duration::from_millis(i * 100)).await;
                i
            })
        })
        .collect();

    // 等待所有任务完成
    let results: Vec<_> = futures::future::join_all(handles)
        .await
        .into_iter()
        .filter_map(|r| r.ok())
        .collect();

    println!("结果: {:?}", results);
}

join! 和 try_join!

#[tokio::main]
async fn main() {
    // 并行执行多个 Future
    let (result1, result2, result3) = tokio::join!(
        async { 1 },
        async { 2 },
        async { 3 }
    );

    println!("{} {} {}", result1, result2, result3);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // try_join! 会在任何一个 Future 返回 Err 时立即返回
    let (result1, result2) = tokio::try_join!(
        async { Ok::<_, String>(1) },
        async { Ok::<_, String>(2) },
    )?;

    println!("{} {}", result1, result2);
    Ok(())
}

select!

#[tokio::main]
async fn main() {
    let start = tokio::time::Instant::now();
    
    tokio::select! {
        _ = tokio::time::sleep(tokio::time::Duration::from_secs(5)) => {
            println!("5秒超时");
        }
        _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
            println!("1秒触发!");
        }
        _ = tokio::time::sleep(tokio::time::Duration::from_secs(3)) => {
            println!("3秒触发");
        }
    }

    println!("select! 完成,耗时: {:?}", start.elapsed());
}

abort 任务

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        loop {
            println!("任务运行中...");
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        }
    });

    // 让任务运行一会儿
    tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

    // 取消任务
    handle.abort();
    
    // 检查是否已取消
    match handle.await {
        Ok(_) => println!("任务正常完成"),
        Err(e) if e.is_cancelled() => println!("任务被取消"),
        Err(e) => println!("任务 panic: {}", e),
    }
}

通道 (Channels)

mpsc 通道 (多生产者单消费者)

#[tokio::main]
async fn main() {
    // 创建容量为 32 的通道
    let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(32);

    // Spawn 生产者任务
    let producer = tokio::spawn(async move {
        for i in 0..5 {
            let msg = format!("Message {}", i);
            if tx.send(msg).await.is_err() {
                println!("Receiver 已关闭");
                return;
            }
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        }
    });

    // 在主任务中接收消息
    while let Some(msg) = rx.recv().await {
        println!("Received: {}", msg);
    }

    producer.await.unwrap();
}

多个生产者

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel::<i32>(100);

    // Spawn 多个生产者
    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            for j in 0..5 {
                tx.send(i * 10 + j).await.unwrap();
            }
        });
    }

    // 主任务接收所有消息
    drop(tx);  // 显式丢弃主发送者
    let mut total = Vec::new();
    while let Some(msg) = rx.recv().await {
        total.push(msg);
    }

    total.sort();
    println!("收到 {} 条消息: {:?}", total.len(), total);
}

oneshot 通道 (单次通信)

#[tokio::main]
async fn main() {
    // 创建 oneshot 通道
    let (tx, rx) = tokio::sync::oneshot::channel::<i32>();

    // Spawn 发送者任务
    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        tx.send(42).unwrap();
    });

    // 等待接收
    match rx.await {
        Ok(value) => println!("收到值: {}", value),
        Err(_) => println!("发送者已关闭"),
    }
}

broadcast 通道 (广播)

#[tokio::main]
async fn main() {
    let (tx, _rx) = tokio::sync::broadcast::channel::<String>(16);

    // 创建多个订阅者
    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();

    // Spawn 接收者
    tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("Receiver 1: {}", msg);
        }
    });

    tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("Receiver 2: {}", msg);
        }
    });

    // 广播消息
    tx.send("Hello".to_string()).unwrap();
    tx.send("World".to_string()).unwrap();

    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

watch 通道 (观察者模式)

#[tokio::main]
async fn main() {
    let (tx, rx) = tokio::sync::watch::channel::<String>("initial".to_string());

    // Spawn 观察者
    tokio::spawn(async move {
        let mut prev = rx;
        while let Ok(msg) = prev.changed().await {
            println!("收到更新: {}", *msg);
        }
    });

    // 发送更新
    tx.send("update 1".to_string()).unwrap();
    tx.send("update 2".to_string()).unwrap();

    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

同步原语

Mutex

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(tokio::spawn(async move {
            let mut num = counter.lock().await;
            *num += 1;
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("Result: {}", *counter.lock().await);
}

RwLock

use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));

    // 多个读取锁
    let read_handles: Vec<_> = (0..3)
        .map(|_| {
            let data = Arc::clone(&data);
            tokio::spawn(async move {
                let read = data.read().await;
                format!("Read: {:?}", &*read)
            })
        })
        .collect();

    // 一个写入锁
    let write_handle = {
        let data = Arc::clone(&data);
        tokio::spawn(async move {
            let mut write = data.write().await;
            write.push(4);
            format!("Wrote: {:?}", &*write)
        })
    };

    for handle in read_handles {
        println!("{}", handle.await.unwrap());
    }
    println!("{}", write_handle.await.unwrap());
}

Semaphore

use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let sem = Arc::new(Semaphore::new(3));  // 最多 3 个并发
    let mut handles = vec![];

    for i in 0..10 {
        let sem = Arc::clone(&sem);
        handles.push(tokio::spawn(async move {
            let _permit = sem.acquire().await.unwrap();
            println!("Task {} 开始", i);
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
            println!("Task {} 结束", i);
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

Barrier

#[tokio::main]
async fn main() {
    let barrier = Arc::new(tokio::sync::Barrier::new(5));
    let mut handles = vec![];

    for i in 0..5 {
        let barrier = Arc::clone(&barrier);
        handles.push(tokio::spawn(async move {
            println!("Task {} 准备中...", i);
            barrier.wait().await;
            println!("Task {} 继续执行!", i);
        }));
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

Condvar (条件变量)

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let flag = Arc::new(Mutex::new(false));
    let flag_clone = Arc::clone(&flag);

    // 等待者
    let waiter = tokio::spawn(async move {
        let mut flag = flag_clone.lock().await;
        while !*flag {
            flag = Arc::new(Mutex::new(false)).lock().await;
        }
        println!("Flag is now true!");
    });

    // 设置者
    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        let mut flag = flag.lock().await;
        *flag = true;
    });

    waiter.await.unwrap();
}

时间管理

sleep

#[tokio::main]
async fn main() {
    println!("开始睡眠...");
    
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    
    println!("2秒后醒来");
}

interval

#[tokio::main]
async fn main() {
    let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
    
    for i in 0..5 {
        interval.tick().await;
        println!("Tick {}", i);
    }
}

timeout

use tokio::time::timeout;

#[tokio::main]
async fn main() {
    let result = timeout(
        tokio::time::Duration::from_millis(100),
        tokio::time::sleep(tokio::time::Duration::from_secs(1))
    ).await;

    match result {
        Ok(_) => println!("操作完成"),
        Err(_) => println!("操作超时!"),
    }
}

Instant

use tokio::time::Instant;

#[tokio::main]
async fn main() {
    let start = Instant::now();
    
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    
    println!("耗时: {:?}", start.elapsed());
}

延迟初始化

    let _timer = tokio::time::sleep(std::time::Duration::from_secs(10));
    tokio::time::timeout(Duration::from_secs(10), long_operation()).await
});

I/O 操作

TCP

use tokio::net::TcpListener;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // TCP 服务器
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("服务器监听在 127.0.0.1:8080");

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("新连接: {}", addr);

        tokio::spawn(async move {
            let mut buf = vec![0u8; 1024];
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => {
                        println!("客户端断开: {}", addr);
                        return;
                    }
                    Ok(n) => {
                        println!("收到 {} 字节", n);
                        if socket.write_all(&buf[..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => return,
                }
            }
        });
    }
}

TCP 客户端

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    
    let msg = b"Hello, Server!";
    stream.write_all(msg).await?;
    
    let mut buf = vec![0u8; 1024];
    let n = stream.read(&mut buf).await?;
    
    println!("收到: {}", String::from_utf8_lossy(&buf[..n]));
    
    Ok(())
}

UDP

use tokio::net::UdpSocket;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let socket = UdpSocket::bind("127.0.0.1:0").await?;
    println!("UDP 绑定到 {}", socket.local_addr()?);

    socket.connect("127.0.0.1:8080").await?;

    socket.send(b"Hello UDP").await?;
    
    let mut buf = vec![0u8; 1024];
    let (len, _) = socket.recv_from(&mut buf).await?;
    
    println!("收到: {}", String::from_utf8_lossy(&buf[..len]));
    
    Ok(())
}

Unix Socket

use tokio::net::UnixStream;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut stream = UnixStream::connect("/tmp/example.sock").await?;
    
    stream.write_all(b"Hello").await?;
    
    let mut buf = vec![0u8; 1024];
    let n = stream.read(&mut buf).await?;
    
    println!("收到: {}", String::from_utf8_lossy(&buf[..n]));
    
    Ok(())
}

文件操作

use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 异步读取文件
    let contents = fs::read_to_string("example.txt").await?;
    println!("文件内容: {}", contents);
    
    // 异步写入文件
    let mut file = File::create("output.txt").await?;
    file.write_all(b"Hello, Tokio!").await?;
    
    // 逐行读取
    let contents = fs::read_to_string("large_file.txt").await?;
    for line in contents.lines().take(10) {
        println!("{}", line);
    }
    
    Ok(())
}

进程管理

Spawn 子进程

use tokio::process::Command;

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    // 运行外部命令
    let output = Command::new("ls")
        .arg("-la")
        .output()
        .await?;

    println!("stdout: {}", String::from_utf8_lossy(&output.stdout));
    println!("stderr: {}", String::from_utf8_lossy(&output.stderr));
    
    Ok(())
}

管道

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut child = Command::new("ps")
        .arg("aux")
        .stdout(std::process::Stdio::piped())
        .stderr(std::process::Stdio::piped())
        .spawn()?;

    let stdout = child.stdout.take().unwrap();
    let mut lines = tokio::io::BufReader::new(stdout).lines();
    
    while let Some(line) = lines.next_line().await? {
        if line.contains("tokio") {
            println!("{}", line);
        }
    }

    let status = child.wait().await?;
    println!("进程退出码: {:?}", status);
    
    Ok(())
}

错误处理

Result 和 ?

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data = fetch_data().await?;
    println!("数据: {}", data);
    Ok(())
}

async fn fetch_data() -> Result<String, Box<dyn std::error::Error>> {
    let mut stream = tokio::net::TcpStream::connect("127.0.0.1:8080").await?;
    let mut buf = vec![0u8; 1024];
    let n = stream.read(&mut buf).await?;
    Ok(String::from_utf8_lossy(&buf[..n]).to_string())
}

Anyhow 简化错误

[dependencies]
anyhow = "1"
use anyhow::{Context, Result};

#[tokio::main]
async fn main() -> Result<()> {
    let config = load_config()
        .context("无法加载配置文件")?;
    
    println!("配置: {:?}", config);
    Ok(())
}

async fn load_config() -> Result<serde_json::Value> {
    let contents = tokio::fs::read_to_string("config.json")
        .await
        .context("读取配置文件失败")?;
    
    serde_json::from_str(&contents)
        .context("解析配置文件失败")
}

thiserror 自定义错误

[dependencies]
thiserror = "1"
use thiserror::Error;

#[derive(Error, Debug)]
pub enum AppError {
    #[error("连接失败: {0}")]
    ConnectionError(String),
    
    #[error("资源未找到: {0}")]
    NotFound(String),
    
    #[error("超时")]
    Timeout,
    
    #[error(transparent)]
    Io(#[from] std::io::Error),
}

#[tokio::main]
async fn main() -> Result<(), AppError> {
    let result = connect().await?;
    Ok(())
}

async fn connect() -> Result<String, AppError> {
    let stream = tokio::net::TcpStream::connect("127.0.0.1:9999")
        .await
        .map_err(|e| AppError::ConnectionError(e.to_string()))?;
    
    Ok("Connected".to_string())
}

最佳实践

1. 避免阻塞主线程

// 不好:在 async 上下文中使用阻塞调用
#[tokio::main]
async fn bad_example() {
    let data = std::fs::read_to_string("file.txt").unwrap(); // 阻塞!
}

// 好:使用 tokio::task::spawn_blocking
#[tokio::main]
async fn good_example() {
    let data = tokio::task::spawn_blocking(|| {
        std::fs::read_to_string("file.txt").unwrap()
    }).await.unwrap();
}

2. 正确使用 Mutex

// 好:尽快释放锁
async fn good_lock(data: Arc<tokio::Mutex<Vec<i32>>>) {
    {
        let mut vec = data.lock().await;
        vec.push(1);
    } // 锁在这里释放
    // 继续其他工作
}

// 不好:持有锁期间进行异步操作
async fn bad_lock(data: Arc<tokio::Mutex<Vec<i32>>>) {
    let mut vec = data.lock().await;
    vec.push(1);
    some_async_operation().await; // 不要这样做!
}

3. 使用 join! 并行执行

// 不好:串行执行
#[tokio::main]
async fn bad_sequential() {
    let a = fetch_a().await;
    let b = fetch_b().await;
}

// 好:并行执行
#[tokio::main]
async fn good_parallel() {
    let (a, b) = tokio::join!(
        fetch_a(),
        fetch_b()
    );
}

4. 优雅关闭

use tokio::signal;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
    
    println!("服务器运行中,按 Ctrl+C 关闭");
    
    match signal::ctrl_c().await {
        Ok(()) => println!("\n收到关闭信号,正在关闭..."),
        Err(e) => eprintln!("Error listening for shutdown signal: {}", e),
    }
    
    Ok(())
}

5. 资源清理

#[tokio::main]
async fn main() {
    let guard = SomeResource::new();
    
    // 使用 guard 进行工作
    if let Some(resource) = guard {
        // ...
    }
    
    // 显式清理
    drop(guard);
}

生态工具

tracing (结构化日志)

[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
use tracing::{info, error, instrument};

#[instrument(skip(data))]
async fn process_data(data: &[u8]) -> Result<usize, std::io::Error> {
    info!("开始处理数据,长度: {}", data.len());
    
    let result = do_processing(data).await?;
    
    info!("处理完成,结果: {}", result);
    Ok(result)
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt()
        .with_target(false)
        .init();
    
    process_data(b"hello").await.unwrap();
}

tokio-console (调试工具)

cargo install tokio-console
[dependencies]
console-subscriber = "0.2"
fn main() {
    console_subscriber::init();
    // 你的应用代码
}

metrics (指标)

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<u64>(100);
    
    // 生产指标
    tokio::spawn(async move {
        for i in 0.. {
            tx.send(i).await.unwrap();
        }
    });
    
    // 消费指标
    while let Some(metric) = rx.recv().await {
        println!("Metric: {}", metric);
    }
}

总结

Tokio 是 Rust 异步编程的核心运行时:

核心组件:

  • Future 和 async/await:零成本抽象
  • 任务调度:tokio::spawn、join!、select!
  • 通道:mpsc、oneshot、broadcast、watch
  • 同步原语:Mutex、RwLock、Semaphore、Barrier
  • 时间管理:sleep、interval、timeout
  • I/O:TCP/UDP/文件/进程

关键特性:

  • 多线程运行时:充分利用多核
  • 工作窃取:高效的任务调度
  • 异步 I/O:高性能网络编程
  • 结构化并发:安全地管理大量并发任务

最佳实践:

  • 避免在 async 上下文中阻塞
  • 使用 join! 并行执行独立任务
  • 正确使用锁和通道
  • 实现优雅关闭
  • 使用 tracing 进行结构化日志

掌握 Tokio,你将能够构建高性能、可靠的 Rust 异步应用程序!

快乐编程,大家来 Rust! 🦀