不耐烦的生锈学习者的多线程。
#编程 #node #rust #multithreading

作为全栈开发人员,我总是试图获得新的知识。我
几年前听说了生锈的编程语言,但双手没有伸出援手尝试真实的例子。几天前,我花了一些时间在Rust上,我想与您分享一些经验。

首先,我想指出,本文主要适用于Nodejs/JavaScript开发人员,他们在尝试第一步或尝试第一个步骤的人都没有听说过生锈。

第二,我将仅描述我解决的一个问题,而我不打算为您提供最终的知识集。尽管如此,我希望我的榜样能使您与学习生锈有关。

我想,在这一点上,您问我一个问题。作为网络开发人员,我为什么要学习生锈?我预测了这个问题。老实说,这篇文章是我上一篇的逻辑延续。请阅读Node & Rust: Friendship Forever. The NAPI-rs Way.

是时候告诉你几个关于生锈的单词。

Wikipedia告诉我们有关生锈的信息。

“ Rust是一种多范式,高级,通用编程语言。锈强调性能,类型安全性和并发。生锈执行记忆安全性,即所有参考都指向有效记忆需要使用其他内存安全语言中存在垃圾收集器或参考计数。为了同时执行记忆安全并防止并发数据竞赛,Rust的“借用检查器”跟踪了编译过程中所有参考文献的对象。对于系统编程,但还提供了高级功能,包括一些功能性编程结构。软件开发人员Graydon Hoare在2006年在Mozilla Research工作时创建了Rust作为个人项目。Mozilla于2009年正式赞助了该项目。自2015年5月首次稳定发布以来,Rust已被包括亚马逊,Discord,Dropbox,Facebook(Meta),Google(Alphabet)和Microsoft等公司采用。”

不要将Rust视为JavaScript类似的。这种语言完全不同。请注意以下要点。

  1. Rust是一种可编译的语言。
  2. 尽管有一般性,但它看起来像C ++(甚至C)中的竞争对手。如果您是golang-afimliariar的人,请不要将Rust与Golang进行比较!他们也不同。
  3. 主要功能之一是安全的多线程。
  4. 对于JavaScript Folk来说,"References and Borrowing" topic可能有点困难。请关注它!
  5. 请阅读此resource

我想专注于本文中的安全多线程,因为了解此功能是理解这种美丽语言的数字一种方法。我还了解到,安全的多线程主题是Rust之外的语言中最棘手的主题之一。 Java,Goarsng和C ++是一个很好的例子。 Rust语言有一个庞大的社区,并且有很多资源(例如this one)。但是我面临缺乏实际的例子和简单的解释。尽管如此,我找到了一个经典的例子,可以帮助您尽快输入此主题。

认识餐饮哲学家!

Dining Philosophers Problem是描述here的经典多线程任务。

“五个沉默的哲学家坐在圆桌上,用一碗意大利面条。叉子都放在每对相邻的哲学家之间。每个哲学家都必须交替思考和吃。但是,哲学家只能在左右两者左右吃意大利面时吃意大利面叉子。每个叉子只能由一位哲学家持有,因此哲学家只有在不被另一个哲学家使用时才可以使用叉子。在个人哲学家完成饮食后,他们需要放下两个叉子,以使叉子可用对他人而言。哲学家可以在右边或左边的叉子上拿起叉子,但在左上是左侧的叉子,但在获得叉子之前不能开始进食。饮食不受剩余的意大利面条或胃部空间的限制;无限的供应和无限的供应假设有无限的需求。问题是如何设计行为学科(一种并发算法),以至于没有哲学家会饿死;即,每个人都能永远继续在饮食和思维之间继续交替,假设没有其他哲学家可以知道其他人何时可以知道何时可以想吃或思考。”

最初,Rust的解决方案看起来像下面的解决方案。您可以找到它here

use std::sync::{Arc, Mutex};
use std::{thread, time};

struct Philosopher {
    name: String,
    left: usize,
    right: usize,
}

impl Philosopher {
    fn new(name: &str, left: usize, right: usize) -> Philosopher {
        Philosopher {
            name: name.to_string(),
            left: left,
            right: right,
        }
    }

    fn eat(&self, table: &Table) {
        let _left = table.forks[self.left].lock().unwrap();
        let _right = table.forks[self.right].lock().unwrap();

        println!("{} is eating.", self.name);

        let delay = time::Duration::from_millis(1000);

        thread::sleep(delay);

        println!("{} is done eating.", self.name);
    }
}

struct Table {
    forks: Vec<Mutex<()>>,
}

fn main() {
    let table = Arc::new(Table {
        forks: vec![
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
        ],
    });


    let philosophers = vec![
        Philosopher::new("Donald", 0, 1),
        Philosopher::new("Larry", 1, 2),
        Philosopher::new("Mark", 2, 3),
        Philosopher::new("John", 3, 4),
        Philosopher::new("Bruce", 0, 4),
    ];

    let handles: Vec<_> = philosophers
        .into_iter()
        .map(|p| {
            let table = table.clone();

            thread::spawn(move || {
                p.eat(&table);
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }
}

我不想经常挖掘上述代码,我想您会喜欢从您身边做。但是我需要专注于一些基本要点。

主要的多线程任务是防止数据碰撞,这已不是什么秘密。在我们的示例中,碰撞意味着邻居哲学家同时采用了同样的叉子,因为每个哲学家都有他的线索,并且正在与他人同时饮食和思考。 Mutex解决了。静音意味着相互排斥,“一次只有一个”。这就是为什么叉子与相关静音是一个好主意的原因。


在这一点上,我想打断我的故事,并告诉您一个重要的笔记。老实说,这个例子比我解释的要复杂。我只是打算激发新的生锈人。对不起,亲爱的专家,告诉您这一点。尽管Mutex是金色并发标准,但它不是灵丹妙药。即使在这里也可能存在一些问题。我将在文章的路线图部分中提供更多有用的信息。请仔细阅读有关“僵局”,“ livelock”和“饥饿”的信息。


相关代码是以下内容。

    let table = Arc::new(Table {
        forks: vec![
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
        ],
    });


        let _left = table.forks[self.left].lock().unwrap();
        let _right = table.forks[self.right].lock().unwrap();

上面的代码字面意思是以下内容。

哲学家拿了几个叉子并持有它们。

目前他的邻居会发生什么?

他们想拿叉子。但是叉子已经被拿走了。

在这种情况下,邻居会发生什么?

他们(含义相关线程)在第一个哲学家释放叉子(解锁他的静音)时正在等待。

他们为什么要等?

因为静音!

查看下面的代码。

    fn eat(&self, table: &Table) {
        let _left = table.forks[self.left].lock().unwrap();
        let _right = table.forks[self.right].lock().unwrap();

        println!("{} is eating.", self.name);

        let delay = time::Duration::from_millis(1000);

        thread::sleep(delay);

        println!("{} is done eating.", self.name);
    }

当哲学家离开叉子时?

他抓住叉子,等待1秒钟。 eat函数完成后,静音将发布。

其他幸运的邻居(左右)抓住相关的叉子。请注意在单独线程中运行的邻居(即同时)。

另外,让我们看一下与多线程直接相关的以下代码。

    let handles: Vec<_> = philosophers
        .into_iter()
        .map(|p| {
            let table = table.clone();

            thread::spawn(move || {
                p.eat(&table);
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

是时候运行原始示例了。

git clone git@github.com:buchslava/dining-philosophers-problem.git
cd dining-philosophers-problem
git checkout original-version
cargo build
./target/debug/dining-philosophers

Original result

我问自己。

可以在程序执行期间收集所有结果(我的意思是打印消息),并在整个逻辑完成后立即提供?

我发现这项任务具有挑战性,因为我从其他技术中知道,交叉互动总是很痛苦。作为JavaScript的人,我首先想到了Promise.all Technique之类的东西。

令人惊讶的是,我在YOSHUA WUYTS博客中找到了类似的技术。我强烈建议阅读此资源。

请查看以下article

此外,我找到了下表的JavaScript Folk。

时短路 时短路 时短路
javascript Rust 描述
Promise.allsettled 未来::加入 不短路
Promise.all 未来:: try_join 拒绝输入值
Promise.race 未来:: SELECT 在解决输入值
Promise.any 未来:: try_select 当输入值满足

根据上面的信息,新解决方案应该看起来像下面的解决方案。

use async_std::future;

let a = future::ready(Ok(1));
let b = future::ready(Ok(2));

let c = future::try_join(a, b);
assert_eq!(c.await?, (1, 2));

作为一个不耐烦的学习者,我同时介绍了期货方法和起源示例。我有点失望,因为我无法以一种基于以后的方式重新完成原始版本。老实说,我没有足够的时间。此外,根据挖掘结果,未来的概念与线程大不相同。这就是为什么我不确定这个想法是否好。我推迟了这项活动。

我决定选择另一种方式。我进行了更多研究,发现了Using Message Passing to Transfer Data Between Threads

我认为您会同意以下方式对原始的用餐哲学家实施最友好。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

是时候向前迈进并修改我们的解决方案了。我想提供完整的源代码并逐步解释。

use std::sync::{Arc, Mutex};
use std::{thread, time};
use std::sync::mpsc::{Sender};
use std::sync::mpsc;

struct Philosopher {
    name: String,
    left: usize,
    right: usize,
}

impl Philosopher {
    fn new(name: &str, left: usize, right: usize) -> Philosopher {
        Philosopher {
            name: name.to_string(),
            left: left,
            right: right,
        }
    }

    fn eat(&self, table: &Table, sender: &Sender<String>) {
        let _left = table.forks[self.left].lock().unwrap();
        let _right = table.forks[self.right].lock().unwrap();

        // println!("{} is eating.", self.name);
        sender.send(format!("{} is eating.", self.name).to_string()).unwrap();

        let delay = time::Duration::from_millis(1000);

        thread::sleep(delay);

        // println!("{} is done eating.", self.name);
        sender.send(format!("{} is done eating.", self.name).to_string()).unwrap();
    }
}

struct Table {
    forks: Vec<Mutex<()>>,
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let table = Arc::new(Table {
        forks: vec![
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
        ],
    });


    let philosophers = vec![
        Philosopher::new("Donald", 0, 1),
        Philosopher::new("Larry", 1, 2),
        Philosopher::new("Mark", 2, 3),
        Philosopher::new("John", 3, 4),
        Philosopher::new("Bruce", 0, 4),
    ];

    let handles: Vec<_> = philosophers
        .into_iter()
        .map(|p| {
            let table = table.clone();
            let sender = tx.clone();

            thread::spawn(move || {
                p.eat(&table, &sender);
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }

    tx.send("Done".to_string()).unwrap();

    let mut result: String = String::from("");

    for received in rx {
        if received == "Done" {
            break;
        }
        result.push_str(&received);
        result.push_str("\n");
    }
    println!("{}", result);
}

添加相关软件包

use std::sync::mpsc::{Sender};
use std::sync::mpsc;

初始化通道

fn main() {
    let (tx, rx) = mpsc::channel();
    // ...
}

将发件人传递到eat函数

            thread::spawn(move || {
                p.eat(&table, &sender);
            })

发送预期信息而不是立即打印

    fn eat(&self, table: &Table, sender: &Sender<String>) {
        let _left = table.forks[self.left].lock().unwrap();
        let _right = table.forks[self.right].lock().unwrap();

        // println!("{} is eating.", self.name);
        sender.send(format!("{} is eating.", self.name).to_string()).unwrap();

        let delay = time::Duration::from_millis(1000);

        thread::sleep(delay);

        // println!("{} is done eating.", self.name);
        sender.send(format!("{} is done eating.", self.name).to_string()).unwrap();
    }

收集最终结果

    for h in handles {
        h.join().unwrap();
    }

    tx.send("Done".to_string()).unwrap();

    let mut result: String = String::from("");

    for received in rx {
        if received == "Done" {
            break;
        }
        result.push_str(&received);
        result.push_str("\n");
    }
    println!("{}", result);

请注意“完成”消息。这是过程结束的标准。

是时候运行最终解决方案了。

git checkout main
cargo build
./target/debug/dining-philosophers

Final result

看起来不错!


路线图。

正如所承诺的,我将为此任务提供一些关键的补充。

任务限制。

当所有哲学家完全持有一个叉子时,在此任务中可能会发生僵局。目前在文章中通过忽略the philosophers sit at a round table正在解决这一问题:由于唐纳德(Donald)和布鲁斯(Bruce首先尝试离开)。
如果我们考虑圆桌会部部分,布鲁斯的左叉应为4,他的右叉应为0:

Philosopher::new("Bruce", 4, 0),

现在,这引入了可能的僵局。很难复制,但是一旦我们实施“假定无限的供应和无限需求”的一部分,就很容易复制。与无限时期一样,我们最终将陷入困境,所有哲学家将永远被困在思考中。

复制它的一种简单方法是在每个哲学家的左叉拾音器上等待5ms等待5ms;它将死锁。

use std::sync::{Arc, Mutex};
use std::{thread, time};

struct Philosopher {
    name: String,
    left: usize,
    right: usize,
}

impl Philosopher {
    fn new(name: &str, left: usize, right: usize) -> Philosopher {
        Philosopher {
            name: name.to_string(),
            left: left,
            right: right,
        }
    }

    fn eat(&self, table: &Table) {
        println!("{} is picking up the left fork.", self.name);
        let _left = table.forks[self.left].lock().unwrap();

        // added 5ms duration
        thread::sleep(time::Duration::from_millis(5));

        println!("{} is picking up the right fork.", self.name);
        let _right = table.forks[self.right].lock().unwrap();

        println!("{} is eating.", self.name);

        let delay = time::Duration::from_millis(1000);

        thread::sleep(delay);

        println!("{} is done eating.", self.name);
    }
}

struct Table {
    forks: Vec<Mutex<()>>,
}

fn main() {
    let table = Arc::new(Table {
        forks: vec![
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
            Mutex::new(()),
        ],
    });                                         


    let philosophers = vec![
        Philosopher::new("Donald", 0, 1),
        Philosopher::new("Larry", 1, 2),
        Philosopher::new("Mark", 2, 3),
        Philosopher::new("John", 3, 4),
        // changed from Philosopher::new("Bruce", 0, 4),
        Philosopher::new("Bruce", 4, 0),
    ];

    let handles: Vec<_> = philosophers
        .into_iter()
        .map(|p| {
            let table = table.clone();

            thread::spawn(move || {
                p.eat(&table);
            })
        })
        .collect();

    for h in handles {
        h.join().unwrap();
    }
}

您可以找到一个工作(但实际上不起作用...)示例here

有用的信息和参考

作为并发的人,您应该开始考虑Deadlock, Livelock,Starvation.,请阅读here

处理Deadlock, Livelock,Starvation并不容易,这里没有银色子弹。尽管您可以研究该主题的不同现有解决方案。例如,This one

另外,最好学习原子计算。要成为并发的忍者,请开始从Rust Atomics and Locks Low-Level Concurrency in Practice挖掘。

nodejs

最后,我会满足您的好奇心,并给您解决方案的NodeJS version。它取自here并进行了纠正...

您可以阅读并运行它。

cd nodejs
node index

我希望它也会扩大您的nodejs视野。

快乐黑客!


ps:感谢Eduardo Speroni的非常有用的评论以及在我在文章上的工作中提供的帮助。