• 消息传递
    • 初试通道(channel)
    • 消息类型
    • 异步通道(Channel)
    • 同步通道

    消息传递

    稍加考虑,上一节的练习题其实是不完整的,它只是评分系统中的一环,一个评分系统是需要先把信息从数据库或文件中读取出来,然后才是评分,最后还需要把评分结果再保存到数据库或文件中去。如果一步一步串行地做这三个步骤,是完全没有问题的。那么我们是否可以用三个线程来分别做这三个步骤呢?上一节练习题我们已经用了一个线程来实现评分,那么我们是否也可以再用一个线程来读取成绩,再用另个线程来实现保存呢? 如果能这样的话,那么我们就可以利用上多核多cpu的优势,加快整个评分的效率。既然在此提出这个问题,答案就很明显了。问题在于我们要怎么在Rust中来实现,关键在于三个线程怎么交换信息,以达到串行的逻辑处理顺序?

    为了解决这个问题,下面将介绍一种Rust在标准库中支持的消息传递技术。消息传递是并发模型里面大家比较推崇的模式,不仅仅是因为使用起来比较简单,关键还在于它可以减少数据竞争,提高并发效率,为此值得深入学习。Rust是通过一个叫做通道(channel)的东西来实现这种模式的,下面直接进入主题。

    初试通道(channel)

    Rust的通道(channel)可以把一个线程的消息(数据)传递到另一个线程,从而让信息在不同的线程中流动,从而实现协作。详情请参见std::sync::mpsc。通道的两端分别是发送者(Sender)和接收者(Receiver),发送者负责从一个线程发送消息,接收者则在另一个线程中接收该消息。下面我们来看一个简单的例子:

    1. use std::sync::mpsc;
    2. use std::thread;
    3. fn main() {
    4. // 创建一个通道
    5. let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) =
    6. mpsc::channel();
    7. // 创建线程用于发送消息
    8. thread::spawn(move || {
    9. // 发送一个消息,此处是数字id
    10. tx.send(1).unwrap();
    11. });
    12. // 在主线程中接收子线程发送的消息并输出
    13. println!("receive {}", rx.recv().unwrap());
    14. }

    程序说明参见代码中的注释,程序执行结果为:

    1. receive 1

    结果表明main所在的主线程接收到了新建线程发送的消息,用Rust在线程间传递消息就是这么简单!

    虽然简单,但使用过其他语言就会知道,通道有多种使用方式,且比较灵活,为此我们需要进一步考虑关于RustChannel的几个问题:

    1. 通道能保证消息的顺序吗?是否先发送的消息,先接收?
    2. 通道能缓存消息吗?如果能的话能缓存多少?
    3. 通道的发送者和接收者支持N:1,1:N,N:M模式吗?
    4. 通道能发送任何数据吗?
    5. 发送后的数据,在线程中继续使用没有问题吗?

    让我们带着这些问题和思考进入下一个小节,那里有相关的答案。

    消息类型

    上面的例子中,我们传递的消息类型为i32,除了这种类型之外,是否还可以传递更多的原始类型,或者更复杂的类型,和自定义类型?下面我们尝试发送一个更复杂的Rc类型的消息:

    1. use std::fmt;
    2. use std::sync::mpsc;
    3. use std::thread;
    4. use std::rc::Rc;
    5. pub struct Student {
    6. id: u32
    7. }
    8. impl fmt::Display for Student {
    9. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    10. write!(f, "student {}", self.id)
    11. }
    12. }
    13. fn main() {
    14. // 创建一个通道
    15. let (tx, rx): (mpsc::Sender<Rc<Student>>, mpsc::Receiver<Rc<Student>>) =
    16. mpsc::channel();
    17. // 创建线程用于发送消息
    18. thread::spawn(move || {
    19. // 发送一个消息,此处是数字id
    20. tx.send(Rc::new(Student{
    21. id: 1,
    22. })).unwrap();
    23. });
    24. // 在主线程中接收子线程发送的消息并输出
    25. println!("receive {}", rx.recv().unwrap());
    26. }

    编译代码,奇迹没有出现,编译时错误,错误提示:

    1. error: the trait `core::marker::Send` is not
    2. implemented for the type `alloc::rc::Rc<Student>` [E0277]
    3. note: `alloc::rc::Rc<Student>` cannot be sent between threads safely

    看来并不是所有类型的消息都可以通过通道发送,消息类型必须实现marker trait Send。Rust之所以这样强制要求,主要是为了解决并发安全的问题,再一次强调,安全是Rust考虑的重中之重。如果一个类型是Send,则表明它可以在线程间安全的转移所有权(ownership),当所有权从一个线程转移到另一个线程后,同一时间就只会存在一个线程能访问它,这样就避免了数据竞争,从而做到线程安全。ownership的强大又一次显示出来了。通过这种做法,在编译时即可要求所有的代码必须满足这一约定,这种方式方法值得借鉴,trait也是非常强大。

    看起来问题得到了完美的解决,然而由于Send本身是一个不安全的marker trait,并没有实际的API,所以实现它很简单,但没有强制保障,就只能靠开发者自己约束,否则还是可能引发并发安全问题。对于这一点,也不必太过担心,因为Rust中已经存在的类,都已经实现了Send!Send,我们只要使用就行。Send是一个默认应用到所有Rust已存在类的trait,所以我们用!Send显式标明该类没有实现Send。目前几乎所有的原始类型都是Send,例如前面例子中发送的i32。对于开发者而言,我们可能会更关心哪些是非Send,也就是实现了!Send,因为这会导致线程不安全。更全面的信息参见Send官网API。

    对于不是Send的情况(!Send),大致分为两类:

    1. 原始指针,包括*mut T*const T,因为不同线程通过指针都可以访问数据,从而可能引发线程安全问题。
    2. RcWeak也不是,因为引用计数会被共享,但是并没有做并发控制。

    虽然有这些!Send的情况,但是逃不过编译器的火眼金睛,只要你错误地使用了消息类型,编译器都会给出类似于上面的错误提示。我们要担心的不是这些,因为错误更容易出现在新创建的自定义类,有下面两点需要注意:

    1. 如果自定义类的所有字段都是Send,那么这个自定义类也是Send
      反之,如果有一个字段是!Send,那么这个自定义类也是!Send
      如果类的字段存在递归包含的情况,按照该原则以此类推来推论类是Send还是!Send

    2. 在为一个自定义类实现Send或者!Send时,必须确保符合它的约定。

    到此,消息类型的相关知识已经介绍完了,说了这么久,也该让大家自己练习一下了:请实现一个自定义类,该类包含一个Rc字段,让这个类变成可以在通道中发送的消息类型。

    异步通道(Channel)

    在粗略地尝试通道之后,是时候更深入一下了。Rust的标准库其实提供了两种类型的通道:异步通道和同步通道。上面的例子都是使用的异步通道,为此这一小节我们优先进一步介绍异步通道,后续再介绍同步通道。异步通道指的是:不管接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞。为了验证这一点,我们尝试多增加一个线程来发送消息:

    1. use std::sync::mpsc;
    2. use std::thread;
    3. // 线程数量
    4. const THREAD_COUNT :i32 = 2;
    5. fn main() {
    6. // 创建一个通道
    7. let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();
    8. // 创建线程用于发送消息
    9. for id in 0..THREAD_COUNT {
    10. // 注意Sender是可以clone的,这样就可以支持多个发送者
    11. let thread_tx = tx.clone();
    12. thread::spawn(move || {
    13. // 发送一个消息,此处是数字id
    14. thread_tx.send(id + 1).unwrap();
    15. println!("send {}", id + 1);
    16. });
    17. }
    18. thread::sleep_ms(2000);
    19. println!("wake up");
    20. // 在主线程中接收子线程发送的消息并输出
    21. for _ in 0..THREAD_COUNT {
    22. println!("receive {}", rx.recv().unwrap());
    23. }
    24. }

    运行结果:

    1. send 1
    2. send 2
    3. wake up
    4. receive 1
    5. receive 2

    在代码中,我们故意让main所在的主线程睡眠2秒,从而让发送者所在线程优先执行,通过结果可以发现,发送者发送消息时确实没有阻塞。还记得在前面提到过很多关于通道的问题吗?从这个例子里面还发现什么没?除了不阻塞之外,我们还能发现另外的三个特征:

    1.通道是可以同时支持多个发送者的,通过clone的方式来实现。
    这类似于Rc的共享机制。
    其实从Channel所在的库名std::sync::mpsc也可以知道这点。
    因为mpsc就是多生产者单消费者(Multiple Producers Single Consumer)的简写。
    可以有多个发送者,但只能有一个接收者,即支持的N:1模式。

    2.异步通道具备消息缓存的功能,因为1和2是在没有接收之前就发了的,在此之后还能接收到这两个消息。

    那么通道到底能缓存多少消息?在理论上是无穷的,尝试一下便知:

    1. use std::sync::mpsc;
    2. use std::thread;
    3. fn main() {
    4. // 创建一个通道
    5. let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();
    6. // 创建线程用于发送消息
    7. let new_thread = thread::spawn(move || {
    8. // 发送无穷多个消息
    9. let mut i = 0;
    10. loop {
    11. i = i + 1;
    12. // add code here
    13. println!("send {}", i);
    14. match tx.send(i) {
    15. Ok(_) => (),
    16. Err(e) => {
    17. println!("send error: {}, count: {}", e, i);
    18. return;
    19. },
    20. }
    21. }
    22. });
    23. // 在主线程中接收子线程发送的消息并输出
    24. new_thread.join().unwrap();
    25. println!("receive {}", rx.recv().unwrap());
    26. }

    最后的结果就是耗费内存为止。

    3.消息发送和接收的顺序是一致的,满足先进先出原则。

    上面介绍的内容大多是关于发送者和通道的,下面开始考察一下接收端。通过上面的几个例子,细心一点的可能已经发现接收者的recv方法应该会阻塞当前线程,如果不阻塞,在多线程的情况下,发送的消息就不可能接收完全。所以没有发送者发送消息,那么接收者将会一直等待,这一点要谨记。在某些场景下,一直等待是符合实际需求的。但某些情况下并不需一直等待,那么就可以考虑释放通道,只要通道释放了,recv方法就会立即返回。

    异步通道的具有良好的灵活性和扩展性,针对业务需要,可以灵活地应用于实际项目中,实在是必备良药!

    同步通道

    同步通道在使用上同异步通道一样,接收端也是一样的,唯一的区别在于发送端,我们先来看下面的例子:

    1. use std::sync::mpsc;
    2. use std::thread;
    3. fn main() {
    4. // 创建一个同步通道
    5. let (tx, rx): (mpsc::SyncSender<i32>, mpsc::Receiver<i32>) = mpsc::sync_channel(0);
    6. // 创建线程用于发送消息
    7. let new_thread = thread::spawn(move || {
    8. // 发送一个消息,此处是数字id
    9. println!("before send");
    10. tx.send(1).unwrap();
    11. println!("after send");
    12. });
    13. println!("before sleep");
    14. thread::sleep_ms(5000);
    15. println!("after sleep");
    16. // 在主线程中接收子线程发送的消息并输出
    17. println!("receive {}", rx.recv().unwrap());
    18. new_thread.join().unwrap();
    19. }

    运行结果:

    1. before sleep
    2. before send
    3. after sleep
    4. receive 1
    5. after send

    除了多了一些输出代码之外,上面这段代码几乎和前面的异步通道的没有什么区别,唯一不同的在于创建同步通道的那行代码。同步通道是sync_channel,对应的发送者也变成了SyncSender。为了显示出同步通道的区别,故意添加了一些打印。和异步通道相比,存在两点不同:

    1. 同步通道是需要指定缓存的消息个数的,但需要注意的是,最小可以是0,表示没有缓存。
    2. 发送者是会被阻塞的。当通道的缓存队列不能再缓存消息时,发送者发送消息时,就会被阻塞。

    对照上面两点和运行结果来分析,由于主线程在接收消息前先睡眠了,从而子线程这个时候会被调度执行发送消息,由于通道能缓存的消息为0,而这个时候接收者还没有接收,所以tx.send(1).unwrap()就会阻塞子线程,直到主线程接收消息,即执行println!("receive {}", rx.recv().unwrap());。运行结果印证了这点,要是没阻塞,那么在before send之后就应该是after send了。

    相比较而言,异步通道更没有责任感一些,因为消息发送者一股脑的只管发送,不管接收者是否能快速处理。这样就可能出现通道里面缓存大量的消息得不到处理,从而占用大量的内存,最终导致内存耗尽。而同步通道则能避免这种问题,把接受者的压力能传递到发送者,从而一直传递下去。