-
使用消息传递跨线程传递数据
-
Channel
:用通信共享内存,包含发送端、接收端 -
如果发送端、接收端任意一端被丢弃,那么
Channel
就关闭了
recv
:阻塞当前线程,有消息返回消息,发送端关闭返回RecvErr
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hello");
tx.send(val).unwrap(); //发送消息
});
//recv阻止当前线程执行
let received = rx.recv().unwrap(); //接收消息,有消息返回消息,发送端关闭返回RecvErr
println!("接收:{}", received); //接收:hello
}
try_recv
:不阻塞当前线程,无消息返回Empty
错误,Channel
关闭返回Disconnected
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个通道,用于在不同线程之间传递消息
let (sender, receiver) = mpsc::channel();
// 启动一个新线程,发送消息到通道
thread::spawn(move || {
let messages = vec!["hello", "world", "from", "another", "thread"];
for msg in messages {
sender.send(msg.to_string()).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 在主线程中接收通道中的消息
loop {
match receiver.try_recv() {
Ok(msg) => println!("收到消息: {}", msg),
Err(mpsc::TryRecvError::Empty) => {
println!("没有接收到消息, 等待...");
thread::sleep(Duration::from_secs(2));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("出错!, 关闭...");
break;
}
}
}
}
Rc<T>
引用计数类型,其数据可以有多个所有者
- 通过不可变引用,使程序不同部分共享只读数据
- 实现了
clone trait
,通过Rc::clone(&rc)
增加引用计数 - 共享不可变数据的所有权,避免大型数据结构的开销
- 单线程场景使用,性能开销低
#[derive(Debug)]
struct SharedData {
message: String,
}
fn main() {
use std::rc::Rc;
// 创建一个包含共享数据的 Rc 智能指针
let shared_data = Rc::new(SharedData {
message: String::from("Hello, Rc!"),
});
// 克隆 Rc 智能指针,增加引用计数
let shared_data1 = Rc::clone(&shared_data);
let shared_data2 = Rc::clone(&shared_data);
// 输出引用计数
println!("引用计数:{}", Rc::strong_count(&shared_data)); //引用计数:3
// 访问共享数据
println!("共享数据1:{:?}", shared_data1); //共享数据1:SharedData { message: "Hello, Rc!" }
println!("共享数据2:{:?}", shared_data2); //共享数据2:SharedData { message: "Hello, Rc!" }
}
RefCell<T>
- 运行时检查内部可变性
- 允许在不可变引用(
&T
)存在的情况下获取可变引用(&mut T
),也就是运行时动态修改数据 - 单线程场景使用,性能开销低
use std::cell::RefCell;
fn main() {
// 创建一个包含可变数据的 RefCell
let data = RefCell::new(vec![1, 2, 3]);
// 在不可变引用存在的情况下,获取可变引用并修改数据
{
let mut borrow = data.borrow_mut();
borrow.push(4);
}
// 获取不可变引用并访问数据
let borrow = data.borrow();
println!("Data: {:?}", *borrow); //Data: [1, 2, 3, 4]
}
Mutex<T>
共享状态并发
mutal exclusion
互斥锁,允许多线程访问数据,但同一时刻只允许一个线程来访问某些数据
- 访问数据前需要获得
lock
互斥锁 - 使用完数据必须解锁,例如在块内使用自动调用
drop
- 多线程场景使用,性能开销高
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut n = m.lock().unwrap();
*n = 6;// move所有权并修改值
} //drop解锁
println!("m:{:?}", m); //m:Mutex { data: 6, poisoned: false, .. }
}
Arc<T>
:Atomic Reference Counted
-
Arc
允许多个线程之间共享数据不可变所有权,并且在引用计数管理上是线程安全的 -
Arc
本身不提供内部可变性,因此不能直接用于多个线程修改同一数据,通常与Mutex
或RwLock
结合使用 - 多线程场景使用,性能开销高
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 创建一个共享的可变数据结构
let counter = Arc::new(Mutex::new(0));
// 创建多个线程来增加计数器的值
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter); // 每个线程都需要共享 Arc 的引用
let handle = thread::spawn(move || {
// 获取锁,并在作用域结束时自动释放锁
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 输出最终计数器的值
println!("计数最终值: {:?}", counter.lock().unwrap()); //计数最终值: 10
}
- 用户线程:默认情况下创建的线程或线程池都是用户线程
- 守护线程:后台线程或服务线程
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个用户线程
let user_thread = thread::spawn(|| {
user_task();
});
// 创建一个守护线程
let _daemon_thread = thread::spawn(|| {
daemon_task();
});
// 主线程休眠3秒
thread::sleep(Duration::from_secs(3));
println!("主线程退出");
// 等待用户线程结束
user_thread.join().unwrap();
}
// 用户线程任务
fn user_task() {
println!("用户线程开始执行");
thread::sleep(Duration::from_secs(2));
println!("用户线程执行完毕");
}
// 守护线程任务
fn daemon_task() {
println!("守护线程开始执行");
loop {
thread::sleep(Duration::from_millis(500));
println!("守护线程执行中");
}
}
线程同步
取款案例,不加锁会造成数据竞争
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct BankAccount {
balance: f64,
}
impl BankAccount {
fn new(initial_balance: f64) -> Self {
BankAccount {
balance: initial_balance,
}
}
// 使用 Mutex 确保线程安全
fn deposit(&mut self, amount: f64) {
if amount > 0.0 {
self.balance += amount;
println!("{} 存款 {},当前余额: {}", thread::current().name().unwrap(), amount, self.get_balance());
}
}
fn get_balance(&self) -> f64 {
self.balance
}
}
fn main() {
//Arc原子引用计数类型,允许多个线程安全地共享所有权
let account = Arc::new(Mutex::new(BankAccount::new(1000.0)));
let account1 = Arc::clone(&account);
let account2 = Arc::clone(&account);
// 创建两个线程同时进行存款操作
let t1 = thread::Builder::new()
.name("线程1".to_string())
.spawn(move || {
for _ in 0..10 {
let mut account = account1.lock().unwrap();
//存款100
account.deposit(100.0);
thread::sleep(Duration::from_millis(100));
}
})
.unwrap();
let t2 = thread::Builder::new()
.name("线程2".to_string())
.spawn(move || {
for _ in 0..10 {
let mut account = account2.lock().unwrap();
//存款200
account.deposit(200.0);
thread::sleep(Duration::from_millis(100));
}
})
.unwrap();
t1.join().unwrap();
t2.join().unwrap();
}
死锁
多个进程在运行过程中因争夺资源而造成的一种僵局
产生死锁的条件:
- 互斥条件:一个资源每次只能被一个进程使用
- 请求与保持条件:一个进程因请求资源而阻塞时,不能强行剥夺
- 不剥夺条件:进程已获得的资源在未使用完之前不能强行剥夺
- 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系
当 Task1
获取到 lock1
并等待 lock2
,而 Task2
获取到 lock2
并等待 lock1
时,两个线程互相等待对方释放锁,导致死锁
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
// 定义两个对象锁
let lock1 = Arc::new(Mutex::new(()));
let lock2 = Arc::new(Mutex::new(()));
let lock1_task1 = Arc::clone(&lock1);
let lock2_task1 = Arc::clone(&lock2);
// 创建线程1,执行 Task1
let thread1 = thread::spawn(move || {
let _lock1 = lock1_task1.lock().unwrap();
println!("任务1获得锁1");
thread::sleep(Duration::from_millis(1000));
let _lock2 = lock2_task1.lock().unwrap();
println!("任务1获得锁2");
});
let lock1_task2 = Arc::clone(&lock1);
let lock2_task2 = Arc::clone(&lock2);
// 创建线程2,执行 Task2
let thread2 = thread::spawn(move || {
let _lock2 = lock2_task2.lock().unwrap();
println!("任务2获得锁2");
thread::sleep(Duration::from_millis(1000));
let _lock1 = lock1_task2.lock().unwrap();
println!("任务2获得锁1");
});
// 等待两个线程完成
thread1.join().unwrap();
thread2.join().unwrap();
}
通过使用获得锁的相同顺序避免死锁
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
// 定义两个对象锁
let lock1 = Arc::new(Mutex::new(()));
let lock2 = Arc::new(Mutex::new(()));
// 创建线程1,执行 Task1
let lock1_task1 = Arc::clone(&lock1);
let lock2_task1 = Arc::clone(&lock2);
let thread1 = thread::spawn(move || {
let _lock1 = lock1_task1.lock().unwrap();
println!("任务1获得锁1");
thread::sleep(Duration::from_millis(1000));
let _lock2 = lock2_task1.lock().unwrap();
println!("任务1获得锁2");
});
// 创建线程2,执行 Task2
let lock1_task2 = Arc::clone(&lock1);
let lock2_task2 = Arc::clone(&lock2);
let thread2 = thread::spawn(move || {
let _lock1 = lock1_task2.lock().unwrap();
println!("任务2获得锁1");
thread::sleep(Duration::from_millis(1000));
let _lock2 = lock2_task2.lock().unwrap();
println!("任务2获得锁2");
});
// 等待两个线程完成
thread1.join().unwrap();
thread2.join().unwrap();
}
线程通信:生产者和消费者
用通信共享内存
管程法:使用缓冲区
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
const MAX_SIZE: usize = 5;
struct Monitor {
buffer: Mutex<VecDeque<i32>>,
condvar: Condvar,
}
impl Monitor {
fn new() -> Self {
Monitor {
//mutex互斥
buffer: Mutex::new(VecDeque::new()),
//条件变量
condvar: Condvar::new(),
}
}
fn produce(&self, value: i32) {
let mut buffer = self.buffer.lock().unwrap();
//比5小就生产
while buffer.len() >= MAX_SIZE {
buffer = self.condvar.wait(buffer).unwrap();
}
buffer.push_back(value);
println!("生产者: {}", value);
//唤醒所有等待队列中阻塞的线程
self.condvar.notify_all();
}
fn consume(&self) -> i32 {
let mut buffer = self.buffer.lock().unwrap();
while buffer.is_empty() {
//未空则等待
buffer = self.condvar.wait(buffer).unwrap();
}
let value = buffer.pop_front().unwrap();
println!("消费者: {}", value);
//唤醒所有等待队列中阻塞的线程
self.condvar.notify_all();
value
}
}
fn main() {
let monitor = Arc::new(Monitor::new());
let monitor_producer = Arc::clone(&monitor);
let monitor_consumer = Arc::clone(&monitor);
let producer_thread = thread::spawn(move || {
let mut value = 0;
loop {
monitor_producer.produce(value);
value += 1;
//模拟生产者
thread::sleep(Duration::from_secs(1));
}
});
let consumer_thread = thread::spawn(move || loop {
monitor_consumer.consume();
//模拟消费者
thread::sleep(Duration::from_secs(2));
});
// 等待线程结束
producer_thread.join().unwrap();
consumer_thread.join().unwrap();
}
信号灯法:标志位
信号量:
-
empty
和full
信号量用于跟踪缓冲区中的可用空间和已填充空间。 - 生产者在生产一个项目之前,会尝试获取
empty
信号量许可。如果缓冲区已满,则生产者会等待,直到有空间可用 - 消费者在消费一个项目之前,会尝试获取
full
信号量许可。如果缓冲区为空,则消费者会等待,直到有项目可用
互斥锁:
-
mutex
信号量确保生产者和消费者不会同时访问缓冲区。它充当一个二进制信号量(互斥锁),以确保对共享资源的互斥访问 - 生产者或消费者在访问缓冲区时,会获取
mutex
信号量许可,并在完成访问后,释放许可(通过增加许可数)
在Cargo.toml
里添加,cargo run
一下
[dependencies]
tokio = { version = "1", features = ["full"] }
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::Semaphore;
const MAX_SIZE: usize = 5;
struct SemaphoreBuffer {
buffer: Mutex<VecDeque<i32>>,
mutex: Arc<Semaphore>,
empty: Arc<Semaphore>,
full: Arc<Semaphore>,
}
impl SemaphoreBuffer {
fn new() -> Self {
SemaphoreBuffer {
//缓冲区
buffer: Mutex::new(VecDeque::new()),
//三个信号
mutex: Arc::new(Semaphore::new(1)),//互斥
empty: Arc::new(Semaphore::new(MAX_SIZE)),//空信号
full: Arc::new(Semaphore::new(0)),//满信号
}
}
async fn produce(&self, value: i32) {
self.empty.acquire().await.unwrap().forget();
self.mutex.acquire().await.unwrap().forget();
let mut buffer = self.buffer.lock().unwrap();
//生产
buffer.push_back(value);
println!("生产者: {}", value);
drop(buffer);
//增加信号许可证
self.mutex.add_permits(1);
self.full.add_permits(1);
}
async fn consume(&self) -> i32 {
//获得full和mutex许可证
self.full.acquire().await.unwrap().forget();
self.mutex.acquire().await.unwrap().forget();
let mut buffer = self.buffer.lock().unwrap();
//消费
let value = buffer.pop_front().unwrap();
println!("消费者: {}", value);
drop(buffer);
//增加信号许可证
self.mutex.add_permits(1);
self.empty.add_permits(1);
value
}
}
#[tokio::main]
async fn main() {
let semaphore_buffer = Arc::new(SemaphoreBuffer::new());
let producer_buffer = Arc::clone(&semaphore_buffer);
let consumer_buffer = Arc::clone(&semaphore_buffer);
let producer_thread = tokio::spawn(async move {
let mut value = 0;
loop {
producer_buffer.produce(value).await;
value += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let consumer_thread = tokio::spawn(async move {
loop {
consumer_buffer.consume().await;
tokio::time::sleep(Duration::from_secs(2)).await;
}
});
//等待异步任务完成再结束
let _ = tokio::try_join!(producer_thread, consumer_thread);
}
线程池
经常创建和销毁线程对性能影响大,提前创建好线程放入线程池,使用时直接获取,使用结束放回线程池,避免频繁创建销毁线程
use tokio::time::{sleep, Duration};
use tokio::task::JoinHandle;
struct WorkerTask {
task_name: String,
}
impl WorkerTask {
fn new(task_name: String) -> Self {
WorkerTask { task_name }
}
async fn execute(&self) {
println!("执行任务: {}", self.task_name);
// 模拟任务执行
sleep(Duration::from_secs(1)).await;
println!("任务完成啦: {}", self.task_name);
}
}
#[tokio::main]
async fn main() {
let mut handles: Vec<JoinHandle<()>> = Vec::new();
// 创建并行任务数量为5的线程池
let max_threads = 5;
let mut current_threads = 0;
// 模拟20个任务
for i in 0..20 {
let task_name = format!("任务 {}", i + 1);
let worker_task = WorkerTask::new(task_name);
if current_threads >= max_threads {
// 如果当前并行任务数达到最大值,等待第一个任务完成
let handle = handles.remove(0);
handle.await.unwrap();
current_threads -= 1;
}
// 提交任务给线程池执行
let handle = tokio::spawn(async move {
worker_task.execute().await;
});
handles.push(handle);
current_threads += 1;
}
// 等待所有任务完成
for handle in handles {
handle.await.unwrap();
}
}