Rust简明教程第九章-多线程&并发-Channel 管道

  • 使用消息传递跨线程传递数据

  • Channel:用通信共享内存,包含发送端、接收端

  • 如果发送端、接收端任意一端被丢弃,那么Channel就关闭了

recv:阻塞当前线程,有消息返回消息,发送端关闭返回RecvErr

use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hello");
        tx.send(val).unwrap(); //发送消息
    });
    //recv阻止当前线程执行
    let received = rx.recv().unwrap(); //接收消息,有消息返回消息,发送端关闭返回RecvErr
    println!("接收:{}", received); //接收:hello
}

try_recv:不阻塞当前线程,无消息返回Empty错误,Channel关闭返回Disconnected

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    // 创建一个通道,用于在不同线程之间传递消息
    let (sender, receiver) = mpsc::channel();
    // 启动一个新线程,发送消息到通道
    thread::spawn(move || {
        let messages = vec!["hello", "world", "from", "another", "thread"];
        for msg in messages {
            sender.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    // 在主线程中接收通道中的消息
    loop {
        match receiver.try_recv() {
            Ok(msg) => println!("收到消息: {}", msg),
            Err(mpsc::TryRecvError::Empty) => {
                println!("没有接收到消息, 等待...");
                thread::sleep(Duration::from_secs(2));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("出错!, 关闭...");
                break;
            }
        }
    }
}

Rc<T>

引用计数类型,其数据可以有多个所有者

  • 通过不可变引用,使程序不同部分共享只读数据
  • 实现了clone trait,通过Rc::clone(&rc)增加引用计数
  • 共享不可变数据的所有权,避免大型数据结构的开销
  • 单线程场景使用,性能开销低
#[derive(Debug)]
struct SharedData {
    message: String,
}
fn main() {
    use std::rc::Rc;
    // 创建一个包含共享数据的 Rc 智能指针
    let shared_data = Rc::new(SharedData {
        message: String::from("Hello, Rc!"),
    });
    // 克隆 Rc 智能指针,增加引用计数
    let shared_data1 = Rc::clone(&shared_data);
    let shared_data2 = Rc::clone(&shared_data);

    // 输出引用计数
    println!("引用计数:{}", Rc::strong_count(&shared_data)); //引用计数:3
    // 访问共享数据
    println!("共享数据1:{:?}", shared_data1); //共享数据1:SharedData { message: "Hello, Rc!" }
    println!("共享数据2:{:?}", shared_data2); //共享数据2:SharedData { message: "Hello, Rc!" }
}

RefCell<T>

  • 运行时检查内部可变性
  • 允许在不可变引用(&T)存在的情况下获取可变引用(&mut T),也就是运行时动态修改数据
  • 单线程场景使用,性能开销低
use std::cell::RefCell;
fn main() {
    // 创建一个包含可变数据的 RefCell
    let data = RefCell::new(vec![1, 2, 3]);

    // 在不可变引用存在的情况下,获取可变引用并修改数据
    {
        let mut borrow = data.borrow_mut();
        borrow.push(4);
    }
    // 获取不可变引用并访问数据
    let borrow = data.borrow();
    println!("Data: {:?}", *borrow); //Data: [1, 2, 3, 4]
}

Mutex<T>

共享状态并发

mutal exclusion互斥锁,允许多线程访问数据,但同一时刻只允许一个线程来访问某些数据

  • 访问数据前需要获得lock互斥锁
  • 使用完数据必须解锁,例如在块内使用自动调用drop
  • 多线程场景使用,性能开销高
use std::sync::Mutex;
fn main() {
    let m = Mutex::new(5);
    {
        let mut n = m.lock().unwrap();
        *n = 6;// move所有权并修改值
    } //drop解锁
    println!("m:{:?}", m); //m:Mutex { data: 6, poisoned: false, .. }
}

Arc<T>:Atomic Reference Counted

  • Arc允许多个线程之间共享数据不可变所有权,并且在引用计数管理上是线程安全的
  • Arc 本身不提供内部可变性,因此不能直接用于多个线程修改同一数据,通常与 MutexRwLock 结合使用
  • 多线程场景使用,性能开销高
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
    // 创建一个共享的可变数据结构
    let counter = Arc::new(Mutex::new(0));
    // 创建多个线程来增加计数器的值
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter); // 每个线程都需要共享 Arc 的引用
        let handle = thread::spawn(move || {
            // 获取锁,并在作用域结束时自动释放锁
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
    // 输出最终计数器的值
    println!("计数最终值: {:?}", counter.lock().unwrap()); //计数最终值: 10
}
  • 用户线程:默认情况下创建的线程或线程池都是用户线程
  • 守护线程:后台线程或服务线程
use std::thread;
use std::time::Duration;
fn main() {
    // 创建一个用户线程
    let user_thread = thread::spawn(|| {
        user_task();
    });
    // 创建一个守护线程
    let _daemon_thread = thread::spawn(|| {
        daemon_task();
    });
    // 主线程休眠3秒
    thread::sleep(Duration::from_secs(3));
    println!("主线程退出");
    // 等待用户线程结束
    user_thread.join().unwrap();
}
// 用户线程任务
fn user_task() {
    println!("用户线程开始执行");
    thread::sleep(Duration::from_secs(2));
    println!("用户线程执行完毕");
}
// 守护线程任务
fn daemon_task() {
    println!("守护线程开始执行");
    loop {
        thread::sleep(Duration::from_millis(500));
        println!("守护线程执行中");
    }
}

线程同步

取款案例,不加锁会造成数据竞争

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct BankAccount {
    balance: f64,
}
impl BankAccount {
    fn new(initial_balance: f64) -> Self {
        BankAccount {
            balance: initial_balance,
        }
    }
    // 使用 Mutex 确保线程安全
    fn deposit(&mut self, amount: f64) {
        if amount > 0.0 {
            self.balance += amount;
            println!("{} 存款 {},当前余额: {}", thread::current().name().unwrap(), amount, self.get_balance());
        }
    }
    fn get_balance(&self) -> f64 {
        self.balance
    }
}
fn main() {
        //Arc原子引用计数类型,允许多个线程安全地共享所有权
    let account = Arc::new(Mutex::new(BankAccount::new(1000.0)));
    let account1 = Arc::clone(&account);
    let account2 = Arc::clone(&account);
    // 创建两个线程同时进行存款操作
    let t1 = thread::Builder::new()
        .name("线程1".to_string())
        .spawn(move || {
            for _ in 0..10 {
                let mut account = account1.lock().unwrap();
                //存款100
                account.deposit(100.0);
                thread::sleep(Duration::from_millis(100));
            }
        })
        .unwrap();

    let t2 = thread::Builder::new()
        .name("线程2".to_string())
        .spawn(move || {
            for _ in 0..10 {
                let mut account = account2.lock().unwrap();
                //存款200
                account.deposit(200.0);
                thread::sleep(Duration::from_millis(100));
            }
        })
        .unwrap();

    t1.join().unwrap();
    t2.join().unwrap();
}

死锁

多个进程在运行过程中因争夺资源而造成的一种僵局

产生死锁的条件:

  • 互斥条件:一个资源每次只能被一个进程使用
  • 请求与保持条件:一个进程因请求资源而阻塞时,不能强行剥夺
  • 不剥夺条件:进程已获得的资源在未使用完之前不能强行剥夺
  • 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系

Task1 获取到 lock1 并等待 lock2,而 Task2 获取到 lock2 并等待 lock1 时,两个线程互相等待对方释放锁,导致死锁

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // 定义两个对象锁
    let lock1 = Arc::new(Mutex::new(()));
    let lock2 = Arc::new(Mutex::new(()));

    let lock1_task1 = Arc::clone(&lock1);
    let lock2_task1 = Arc::clone(&lock2);

    // 创建线程1,执行 Task1
    let thread1 = thread::spawn(move || {
        let _lock1 = lock1_task1.lock().unwrap();
        println!("任务1获得锁1");
        thread::sleep(Duration::from_millis(1000));
        let _lock2 = lock2_task1.lock().unwrap();
        println!("任务1获得锁2");
    });

    let lock1_task2 = Arc::clone(&lock1);
    let lock2_task2 = Arc::clone(&lock2);

    // 创建线程2,执行 Task2
    let thread2 = thread::spawn(move || {
        let _lock2 = lock2_task2.lock().unwrap();
        println!("任务2获得锁2");
        thread::sleep(Duration::from_millis(1000));
        let _lock1 = lock1_task2.lock().unwrap();
        println!("任务2获得锁1");
    });

    // 等待两个线程完成
    thread1.join().unwrap();
    thread2.join().unwrap();
}

通过使用获得锁的相同顺序避免死锁

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // 定义两个对象锁
    let lock1 = Arc::new(Mutex::new(()));
    let lock2 = Arc::new(Mutex::new(()));

    // 创建线程1,执行 Task1
    let lock1_task1 = Arc::clone(&lock1);
    let lock2_task1 = Arc::clone(&lock2);
    let thread1 = thread::spawn(move || {
        let _lock1 = lock1_task1.lock().unwrap();
        println!("任务1获得锁1");
        thread::sleep(Duration::from_millis(1000));
        let _lock2 = lock2_task1.lock().unwrap();
        println!("任务1获得锁2");
    });

    // 创建线程2,执行 Task2
    let lock1_task2 = Arc::clone(&lock1);
    let lock2_task2 = Arc::clone(&lock2);
    let thread2 = thread::spawn(move || {
        let _lock1 = lock1_task2.lock().unwrap();
        println!("任务2获得锁1");
        thread::sleep(Duration::from_millis(1000));
        let _lock2 = lock2_task2.lock().unwrap();
        println!("任务2获得锁2");
    });

    // 等待两个线程完成
    thread1.join().unwrap();
    thread2.join().unwrap();
}

线程通信:生产者和消费者

用通信共享内存

管程法:使用缓冲区

use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
const MAX_SIZE: usize = 5;
struct Monitor {
    buffer: Mutex<VecDeque<i32>>,
    condvar: Condvar,
}
impl Monitor {
    fn new() -> Self {
        Monitor {
            //mutex互斥
            buffer: Mutex::new(VecDeque::new()),
            //条件变量
            condvar: Condvar::new(),
        }
    }
    fn produce(&self, value: i32) {
        let mut buffer = self.buffer.lock().unwrap();
        //比5小就生产
        while buffer.len() >= MAX_SIZE {
            buffer = self.condvar.wait(buffer).unwrap();
        }
        buffer.push_back(value);
        println!("生产者: {}", value);
        //唤醒所有等待队列中阻塞的线程
        self.condvar.notify_all();
    }
    fn consume(&self) -> i32 {
        let mut buffer = self.buffer.lock().unwrap();
        while buffer.is_empty() {
            //未空则等待
            buffer = self.condvar.wait(buffer).unwrap();
        }
        let value = buffer.pop_front().unwrap();
        println!("消费者: {}", value);
         //唤醒所有等待队列中阻塞的线程
        self.condvar.notify_all();
        value
    }
}
fn main() {
    let monitor = Arc::new(Monitor::new());
    let monitor_producer = Arc::clone(&monitor);
    let monitor_consumer = Arc::clone(&monitor);

    let producer_thread = thread::spawn(move || {
        let mut value = 0;
        loop {
            monitor_producer.produce(value);
            value += 1;
            //模拟生产者
            thread::sleep(Duration::from_secs(1));
        }
    });
    let consumer_thread = thread::spawn(move || loop {
        monitor_consumer.consume();
        //模拟消费者
        thread::sleep(Duration::from_secs(2));
    });
    // 等待线程结束
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

信号灯法:标志位

信号量

  • emptyfull 信号量用于跟踪缓冲区中的可用空间和已填充空间。
  • 生产者在生产一个项目之前,会尝试获取 empty 信号量许可。如果缓冲区已满,则生产者会等待,直到有空间可用
  • 消费者在消费一个项目之前,会尝试获取 full 信号量许可。如果缓冲区为空,则消费者会等待,直到有项目可用

互斥锁

  • mutex 信号量确保生产者和消费者不会同时访问缓冲区。它充当一个二进制信号量(互斥锁),以确保对共享资源的互斥访问
  • 生产者或消费者在访问缓冲区时,会获取 mutex 信号量许可,并在完成访问后,释放许可(通过增加许可数)

Cargo.toml里添加,cargo run一下

[dependencies]
tokio = { version = "1", features = ["full"] }
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::Semaphore;

const MAX_SIZE: usize = 5;

struct SemaphoreBuffer {
    buffer: Mutex<VecDeque<i32>>,
    mutex: Arc<Semaphore>,
    empty: Arc<Semaphore>,
    full: Arc<Semaphore>,
}
impl SemaphoreBuffer {
    fn new() -> Self {
        SemaphoreBuffer {
            //缓冲区
            buffer: Mutex::new(VecDeque::new()),
            //三个信号
            mutex: Arc::new(Semaphore::new(1)),//互斥
            empty: Arc::new(Semaphore::new(MAX_SIZE)),//空信号
            full: Arc::new(Semaphore::new(0)),//满信号
        }
    }

    async fn produce(&self, value: i32) {
        self.empty.acquire().await.unwrap().forget();
        self.mutex.acquire().await.unwrap().forget();
        let mut buffer = self.buffer.lock().unwrap();
        //生产
        buffer.push_back(value);
        println!("生产者: {}", value);
        drop(buffer);
        //增加信号许可证
        self.mutex.add_permits(1);
        self.full.add_permits(1);
    }

    async fn consume(&self) -> i32 {
        //获得full和mutex许可证
        self.full.acquire().await.unwrap().forget();
        self.mutex.acquire().await.unwrap().forget();
        let mut buffer = self.buffer.lock().unwrap();
        //消费
        let value = buffer.pop_front().unwrap();
        println!("消费者: {}", value);
        drop(buffer);
        //增加信号许可证
        self.mutex.add_permits(1);
        self.empty.add_permits(1);
        value
    }
}

#[tokio::main]
async fn main() {
    let semaphore_buffer = Arc::new(SemaphoreBuffer::new());
    let producer_buffer = Arc::clone(&semaphore_buffer);
    let consumer_buffer = Arc::clone(&semaphore_buffer);
    let producer_thread = tokio::spawn(async move {
        let mut value = 0;
        loop {
            producer_buffer.produce(value).await;
            value += 1;
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    });
    let consumer_thread = tokio::spawn(async move {
        loop {
            consumer_buffer.consume().await;
            tokio::time::sleep(Duration::from_secs(2)).await;
        }
    });
    //等待异步任务完成再结束
    let _ = tokio::try_join!(producer_thread, consumer_thread);
}

线程池

经常创建和销毁线程对性能影响大,提前创建好线程放入线程池,使用时直接获取,使用结束放回线程池,避免频繁创建销毁线程

use tokio::time::{sleep, Duration};
use tokio::task::JoinHandle;
struct WorkerTask {
    task_name: String,
}
impl WorkerTask {
    fn new(task_name: String) -> Self {
        WorkerTask { task_name }
    }
    async fn execute(&self) {
        println!("执行任务: {}", self.task_name);
        // 模拟任务执行
        sleep(Duration::from_secs(1)).await;
        println!("任务完成啦: {}", self.task_name);
    }
}
#[tokio::main]
async fn main() {
    let mut handles: Vec<JoinHandle<()>> = Vec::new();
    // 创建并行任务数量为5的线程池
    let max_threads = 5;
    let mut current_threads = 0;

    // 模拟20个任务
    for i in 0..20 {
        let task_name = format!("任务 {}", i + 1);
        let worker_task = WorkerTask::new(task_name);
        if current_threads >= max_threads {
            // 如果当前并行任务数达到最大值,等待第一个任务完成
            let handle = handles.remove(0);
            handle.await.unwrap();
            current_threads -= 1;
        }
        // 提交任务给线程池执行
        let handle = tokio::spawn(async move {
            worker_task.execute().await;
        });
        handles.push(handle);
        current_threads += 1;
    }
    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }
}
上一篇:Java后端开发(十三)-- Java8 stream的 orElse(null) 和 orElseGet(null)


下一篇:原型模式的实现