考虑到 MonolithAi 的例子,不丢脸出为什么学习如何在 Web 编程中实现排队不仅有用,而且还为开拓职员供应了另一种办理方案,增加了他们可以办理的问题数量。
在本章中,我们将谈论以下主题:
支配排队项目,描述所需的组件和方法

构建 HTTP 做事器
建立投票事情职员
让我们的运用程序与 Redis 一起运行
为工人定义任务
为 Redis 行列步队定义
在 HTTP 做事器中集成路由
在 Docker 中运行所有做事器和事情职员
到本章结束时,您将能够构建一个 Rust 程序,该程序可以是事情程序,也可以是做事器程序,详细取决于通报给它的环境变量。 您还可以以不同构造的形式序列化一系列任务,并将它们插入到 Redis 行列步队中,从而使这些构造能够在不同的做事器之间排队和传输。 这不仅为您供应了实现行列步队的技能,还可以利用 Redis 来实现许多其他办理方案,例如多个做事器通过 Redis 发布/订阅通道通过广播吸收。
技能哀求在本章中,我们将纯粹关注如何在 Redis 行列步队上利用 Tokio 和 Hyper 构建事情线程。 因此,在构建我们自己的新做事器时,我们将不会依赖任何以前的代码。
分解我们的项目在我们的系统中,我们有一系列须要实行的任务。 然而,这些任务须要很永劫光才能完成。 如果我们只有一个普通的做事器来处理任务,做事器终极将被壅塞,并且多个用户将收到延迟的体验。 如果任务太长,则用户的连接可能会超时。
为了避免在须要永劫光任务时降落用户体验,我们利用了排队系统。 这是 HTTP 做事器吸收来自用户的要求的地方。 然后,与要求关联的长任务被发送到前辈先出行列步队,由事情池处理。 由于任务在行列步队中,因此 HTTP 做事器除了响运用户任务已发送且要求已被处理之外,无能为力。 由于流量的起伏,当流量较低时,我们不须要所有的事情职员和 HTTP 做事器。 但是,当流量增加时,我们须要创建并连接额外的 HTTP 做事器和事情线程,如下图所示:
图 18.1 – 我们处理冗长任务的方法
考虑到上图,我们将须要以下根本举动步伐:
Redis数据库:用于存储行列步队中的任务
HTTP做事器:将任务发送到行列步队中进行处理
Worker:从行列步队中拉取/弹出/轮询/处理任务
我们可以为事情职员和 HTTP 做事器构建单独的运用程序。 然而,这会增加繁芜性而没有任何好处。 对付两个独立的运用程序,我们必须掩护两个独立的 Docker 镜像。 我们还会重复大量代码,由于 HTTP 做事器发送到 Redis 行列步队的任务必须与事情线程拾取和处理的任务相同。 对付特界说务,从 HTTP 做事器通报到事情职员的字段之间可能会不匹配。 我们可以通过利用具有一系列输入字段的任务构造和一个利用这些字段实行任务的运行函数来防止这种不匹配。 这些任务构造的序列化特色使我们能够通过队列传递字段并吸收它们。
在构建 HTTP 做事器和事情线程时,我们可以构建做事器,以便在程序启动时检说情况变量。 如果环境变量声明运用程序是事情线程,则运用程序可以启动轮询行列步队的参与者。 如果环境变量声明运用程序是 HTTP 做事器,则运用程序可以运行 HTTP 做事器并侦听要求。
对付我们的任务行列步队项目,我们有以下概要:
├── Cargo.toml├── docker-compose.yml└── src ├── main.rs └── tasks ├── add.rs ├── mod.rs ├── multiply.rs └── subtract.rs
我们将在 src/main.rs 文件中定义做事器入口点。 然后,我们将在 src/tasks/ 目录中定义任务构造。 就 Cargo.toml 文件中的依赖项而言,我们有以下内容:
[dependencies]bincode = "1.0"bytes = "1.2.1"redis = "0.22.1"serde_json = "1.0.86"tokio = { version = "1", features = ["full"] }hyper = { version = "0.14.20", features = ["full"] }serde = { version = "1.0.136", features = ["derive"] }
除了字节和二进制码包之外,这些依赖项对您来说都不应该是新的。 我们将利用字节将构造转换为 HTTP 相应,并利用 bincode 将构造序列化为二进制以存储在 Redis 中。
通过我们在本节中刚刚提出的方法,我们将能够构建一个大略的任务处理行列步队,在个中我们可以确保做事器和事情职员之间的任务定义始终保持同步。 定义了我们的方法后,我们可以连续实行任务旅程的第一部分,即 HTTP 做事器。
构建 HTTP 做事器对付我们的 HTTP 做事器,我们须要实行以下步骤:
定义一个反序列化 HTTP 要求正文的构造体。
定义一个处理传入要求的函数。
根据环境变量定义程序运行的路径。
运行一个监听传入要求的做事器。
我们不会为每个步骤划分单独的部分,由于我们已经在上一章中先容了所有这些步骤/过程。 在实行所有步骤之前,我们必须将以下内容导入到 src/main.rs 文件中:
use hyper::{Body, Request, Response, Server};use hyper::body;use hyper::service::{make_service_fn, service_fn};use std::net::SocketAddr;use std::env;use serde::{Serialize, Deserialize};use serde_json;use bytes::{BufMut, BytesMut};
除了 bytes 导入之外,您该当熟习所有这些导入,我们将在定义 HTTP 句柄函数时先容这些导入。 首先,我们将利用以下代码定义一个大略的构造来序列化传入的 HTTP 要求主体:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct IncomingBody { pub one: String, pub two: i32}
这与我们的 Actix Web 运用程序的方法相同。 我们将能够利用序列化和反序列化特色来注释我们的任务构造。
现在我们已经定义了 IncomingBody 构造体,我们可以利用以下代码定义我们的句柄函数:
async fn handle(req: Request<Body>) -> Result<Response<Body>, &'static str> { let bytes = body::to_bytes(req.into_body()).await .unwrap(); let response_body: IncomingBody = serde_json::from_slice(&bytes).unwrap(); let mut buf = BytesMut::new().writer(); serde_json::to_writer(&mut buf, &response_body).unwrap(); Ok(Response::new(Body::from(buf.into_inner().freeze())))}
必须把稳的是,我们在返回主体时调用了 freeze 函数。 此冻结函数将可变字节转换为不可变字节,从而防止任何缓冲区修正。 在这里,我们可以看到我们正在接管带有要求的通用主体。 然后,我们可以利用 serde 序列化主体,并利用 BytesMut 构造(实质上只是一个连续的内存片段)将主体返回给用户,实质上创建了一个回显做事器。
我们现在可以利用以下代码定义 main 函数,它是入口点:
#[tokio::main]async fn main() { let app_type = env::var("APP_TYPE").unwrap(); match app_type.as_str() { "server" => { . . . }, "worker" => { println!("worker not defined yet"); } _ => { panic!("{} app type not supported", app_type); } }}
这里我们可以看到环境变量“APP_TYPE”被提取出来。 根据运用程序类型的不同,我们会实行不同的代码块。 现在,如果运用程序类型是“worker”,我们将只打印一条声明,表明未定义worker。 我们还声明,如果运用程序类型既不是“做事器”也不是“事情职员”类型,则程序将会涌现惶恐。
在我们的做事器块中,我们利用以下代码定义了 addr 和 server:
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));let server = Server::bind(&addr).serve(make_service_fn( |_conn| { async { Ok::<_, hyper::Error>(service_fn( move |req| { async {handle(req).await} })) }}));if let Err(e) = server.await { eprintln!("server error: {}", e);}
这与上一章中的做事器代码非常相似。
然后我们利用以下命令运行做事器:
APP_TYPE=server cargo run
然后我们可以发送以下要求:
图 18.2 – 对 HTTP 做事器的要求
在这里,我们可以看到我们的做事器正在事情并回显发送到做事器的相同正文。 我们现在可以连续构建我们的事情运用程序。
构建投票事情职员我们的事情职员实质上是在 Redis 中循环和轮询行列步队。 如果行列步队中有,worker就会实行从行列步队中提取的任务。 为了构建轮询事情器部分,事情器将创建一个构造体,将该构造体插入到 Redis 行列步队中,然后从行列步队中提取插入的构造体以进行打印。 这不是我们想要的行为,但这确实意味着我们可以测试以理解行列步队插入如何快速事情。 到本章结束时,我们的 HTTP 做事器将插入任务,而我们的事情线程将花费任务。
我们不肯望事情线程不间断地不断轮询 Redis 行列步队。 为了将轮询降落到合理的速率,我们须要让事情线程在每个循环期间休眠。 因此,我们必须在 src/main.rs 文件中导入以下内容,以使我们的事情线程休眠:
use std::{thread, time};
现在,我们可以转到运行事情程序的部分,在主函数的以下部分中定义我们的事情程序代码:
match app_type.as_str() { "server" => { . . . }, "worker" => { // worker code is going to be inserted here . . . } _ => { panic!("{} app type not supported", app_type); }}
我们的工人代码采取以下总体轮廓:
let client = redis::Client::open("redis://127.0.0.1/").unwrap();loop { . . .}
在这里,我们可以看到我们定义了 Redis 客户端,然后无限循环地运行事情线程。 在此循环中,我们将与 Redis 建立连接,轮询 Redis 中的行列步队,然后删除连接。 我们可以在循环中建立和删除连接,由于该任务将花费很永劫光。 在全体任务期间保持 Redis 连接是没故意义的。
不幸的是,在撰写本书时,Rust Redis 箱还没有大略的行列步队实现。 然而,这不应该阻碍我们。 如果我们知道让 Redis 实现我们的行列步队所需的原始命令,我们就可以实现我们自己的行列步队。 Redis 的性能类似于 SQL 数据库。 如果您理解这些命令,您就可以像 SQL 一样实现自己的逻辑。
在无限循环中,我们将创建一个实现了序列化和反序列化特色的通用构造,然后利用以下代码将该构造序列化为二进制:
let body = IncomingBody{one: "one".to_owned(), two: 2};let bytes = bincode::serialize(&body).unwrap();
我们的构造现在是一个字节向量。 然后我们将与Redis建立连接,并利用“LPUSH”命令将“some_queue”推送到行列步队,这会将值插入到行列步队的头部,代码如下:
let outcome: Option<Vec<u8>>;{ let mut con = client.get_connection().unwrap(); let _ : () = redis::cmd("LPUSH").arg("some_queue") .arg(bytes.clone()) .query(&mut con) .unwrap(); // pop our task from the queue outcome = redis::cmd("LPOP").arg("some_queue") .query(&mut con) .unwrap();}
我们有 Option<Vec<u8>> 由于行列步队中可能没有任何内容。 如果行列步队中没有任何内容,则结果将为“无”。 现在,我们永久不会得到一个“无”,由于我们在从行列步队中提取任务之前直接将任务插入行列步队中。 然而,在流量较低的期间,我们的事情职员将轮询可能会空一段韶光的行列步队。
现在我们有了却果,我们可以利用以下匹配语句来处理它:
match outcome { Some(data) => { . . . }, None => { . . . }}
如果我们有一些数据,我们只需反序列化二进制数据并利用以下代码打印出构造体:
let deserialized_struct: IncomingBody = bincode::deserialize(&data).unwrap();println!("{:?}", deserialized_struct);
如果行列步队中没有任何内容,则结果为 None,我们可以在利用以下代码再次运行循环之前休眠五秒钟:
let five_seconds = time::Duration::from_secs(5);tokio::time::sleep(five_seconds).await;
这样,我们的工人就可以接管测试了。 在构建这样的异步程序时,您总是可以做更多的事情。 然而,为了避免本章臃肿,我们将坚持我们的基本运用程序。 如果您想进一步理解 Redis,您可以研究构建一个发布/订阅系统,个中一个事情职员不断轮询行列步队,而其他事情职员则关闭,并由一个参与者侦听通道上的。 当主worker收到新任务时,主worker可以向通道发布,唤醒其他worker。 如果你真的想推动自己,你可以研究 Kubernetes 掌握器,让一个主事情线程启动并根据流量销毁事情节点。 然而,这些项目将超出本书的范围。
为了让我们的运用程序在一章的范围内事情,我们必须连续让我们的运用程序与 Redis 一起运行。
让我们的运用程序与 Redis 一起运行在本地利用 Redis 运行我们的运用程序须要我们将 Redis 与 Docker 结合利用,将 APP_TYPE 环境变量导出为“worker”,然后利用 Cargo 运行我们的运用程序。 对付我们的 Redis,我们的 docker-compose.yml 文件采取以下形式:
version: "3.7"services: redis: container_name: 'queue-redis' image: 'redis' ports: - '6379:6379'
然后我们可以利用以下命令导出 APP_TYPE 环境变量:
export APP_TYPE=worker
然后我们可以利用以下命令运行我们的运用程序:
cargo run
当我们运行我们的运用程序时,我们将得到以下打印输出:
IncomingBody { one: "one", two: 2 }IncomingBody { one: "one", two: 2 }IncomingBody { one: "one", two: 2 }IncomingBody { one: "one", two: 2 }. . .
IncomingBody 构造的打印输出将是无限的,由于我们正在运行无限循环。 然而,这表明以下机制正在运行和事情:
图 18.3 – 我们如何从 Redis 行列步队插入和提取数据的过程
只管我们的事情线程正在利用 Redis 行列步队,但它只是打印出放入 Redis 行列步队的构造。 不才一节中,我们将功能构建到插入 Redis 行列步队的构造中,以便我们的事情职员可以实行任务。
为worker定义任务当谈到运行我们的任务时,我们须要字段,以便我们可以将它们作为输入通报给正在运行的任务。 我们的任务还须要一个运行函数,这样我们就可以选择何时运行任务,由于运行任务须要很永劫光。 我们可以利用以下代码在 src/tasks/add.rs 文件中定义基本的添加任务:
use std::{thread, time};use serde::{Serialize, Deserialize};#[derive(Debug, Clone, Serialize, Deserialize)]pub struct AddTask { pub one: i32, pub two: i32}impl AddTask { pub fn run(self) -> i32 { let duration = time::Duration::from_secs(20); thread::sleep(duration); return self.one + self.two }}
这些代码都不应该令人震荡。 我们将实现序列化和反序列化特色,以便我们可以将任务插入到 Redis 行列步队中。 然后我们可以利用就寝函数来仿照永劫光任务。 末了,我们只需将两个数字相加即可。 对付 src/tasks/multiply.rs 文件中的任务,run 函数采取以下形式:
impl MultiplyTask { pub fn run(self) -> i32 { let duration = time::Duration::from_secs(20); thread::sleep(duration); return self.one self.two }}
创造 src/tasks/subtract.rs 文件中的 run 函数具有以下构造并不奇怪:
impl SubtractTask { pub fn run(self) -> i32 { let duration = time::Duration::from_secs(20); thread::sleep(duration); return self.one - self.two }}
现在,我们想要实现一个任务,看看是否可以从 Redis 行列步队中提取任务构造并运行它。 我们利用 src/tasks/mod.rs 文件中的以下代码使任务可以从模块访问:
pub mod add;pub mod multiply;pub mod subtract;
在我们的 src/main.rs 文件中,我们最初利用以下代码导入任务:
mod tasks;use tasks::{ add::AddTask, subtract::SubtractTask, multiply::MultiplyTask};
我们现在可以在事情代码块中实现我们的任务之一。 在此事情代码块的开头,我们将利用以下代码将 IncomingBody 构造与 AddTask 构造交流:
let body = AddTask{one: 1, two: 2};
除了我们对结果匹配语句的 Some 部分所做的操作之外,不须要变动其他任何内容,该部分现在采取以下形式:
let deserialized_struct: AddTask = bincode::deserialize(&data).unwrap();println!("{:?}", deserialized_struct.run());
在这里,我们可以看到我们将二进制数据反序列化为 AddTask 构造,运行 run 函数,然后打印结果。 在实际运用程序中,我们会将结果插入数据库或利用 HTTP 将结果发送到另一台做事器。 然而,在本章中,我们只想理解排队任务是如何实行的。 我们在书中多次先容了数据库插入和 HTTP 要求。
如果我们现在运行我们的事情运用程序,我们将得到 15 秒的延迟,然后得到以下打印输出:
3
如果再等 15 秒,我们将得到另一个相同的打印输出。 这表明我们的任务正在从 Redis 行列步队中提取、反序列化,然后以与我们期望的完备相同的办法运行,由于一加二即是三。 然而,这里有一个问题。 我们只能发送和吸收 AddTask 构造。 这没有用,由于我们还有其余两项任务,并且我们希望支持所有这些任务。 因此,我们必须连续定义可以支持一系列任务的。
为 Redis 行列步队定义为了支持多个任务,我们必须采取两步方法来打包要插入到 Redis 行列步队中的任务。 这意味着我们将把任务构造序列化为 Vec<u8>,然后将此字节向量添加到另一个构造中,该构造有一个字段表示中任务的类型。 我们可以通过首先利用以下代码在 src/tasks/mod.rs 文件中导入序列化和反序列化特色来定义此过程:
use serde::{Serialize, Deserialize};
然后我们可以利用以下代码定义列举任务类型和构造:
#[derive(Debug, Clone, Serialize, Deserialize)]use add::AddTask;use multiply::MultiplyTask;use subtract::SubtractTask;#[derive(Debug, Clone, Serialize, Deserialize)]pub enum TaskType { ADD(AddTask), MULTIPLY(MultiplyTask), SUBTRACT(SubtractTask)}#[derive(Debug, Clone, Serialize, Deserialize)]pub struct TaskMessage { pub task: TaskType}
我们的构造现在已准备好打包一系列要插入到 Redis 行列步队中的任务。 在我们的 src/main.rs 文件中,我们可以利用以下代码导入 TaskType 和 TaskMessage 构造:
mod tasks;use tasks::{ add::AddTask, TaskType, TaskMessage};
我们现在准备重写事情代码块中的无限循环。 我们首先创建AddTask,序列化AddTask,然后将这个序列化的任务打包到TaskMessage中,代码如下:
let body = AddTask{one: 1, two: 2};let message = TaskMessage{task: TaskType::ADD(body)};let serialized_message = bincode::serialize(&message).unwrap();
然后我们将建立一个 Redis 连接,然后利用以下代码将序列化的推送到 Redis 行列步队:
let mut con = client.get_connection().unwrap();let _ : () = redis::cmd("LPUSH").arg("some_queue") .arg(serialized_message .clone()) .query(&mut con).unwrap();
然后,我们将从 Redis 行列步队中弹出任务并利用以下代码删除连接:
let outcome: Option<Vec<u8>> = redis::cmd("RPOP").arg("some_queue").query(&mut con) .unwrap();std::mem::drop(con);
我们现在将 TaskMessage 构造移入和移出 Redis 行列步队。 如果有 TaskMessage,我们必须对其进行处理。 在 Results 的 Some 语句的 match 块中,我们必须反序列化从 Redis 行列步队中获取的字节,然后利用以下代码匹配任务类型:
let deserialized_message: TaskMessage = bincode::deserialize(&data).unwrap();match deserialized_message.task { TaskType::ADD(task) => { println!("{:?}", task.run()); }, TaskType::MULTIPLY(task) => { println!("{:?}", task.run()); }, TaskType::SUBTRACT(task) => { println!("{:?}", task.run()); }}
现在,这使我们能够处理从 Redis 行列步队中提取并运行的各个任务。
我们的事情职员现在支持我们的所有三项任务!
然而,我们目前只是创建,然后直接在worker中消费这些。 我们须要使 HTTP 做事器能够接管一系列不同的要求,以将一系列不同的任务发送到 Redis 行列步队以供事情职员利用。
我们现在正处于让 HTTP 做事器接管传入要求以根据 URI 来创建一系列任务的阶段。 为了让我们的 HTTP 支持多个任务,我们实质上必须重写 src/main.rs 文件中的句柄函数。 在我们重写 main 函数之前,我们必须利用以下代码导入我们须要的内容:
use hyper::body;use hyper::http::StatusCode;
我们导入这些东西是由于如果通报了缺点的 URI,我们将返回 NOT_FOUND 状态代码。 我们还将从传入要求的正文中提取数据。 在重构句柄函数之前,我们须要变动 IncomingBody 构造以接管以下形式的两个整数:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct IncomingBody { pub one: i32, pub two: i32}
在我们的句柄函数中,我们可以定义我们的 Redis 客户端,通过删除尾部斜杠来清理我们的 URI,并利用以下代码从传入要求中提取数据:
let client = redis::Client::open("redis://127.0.0.1/").unwrap();let task_type = req.uri().to_string().replace("/", "")"");let body_bytes = body::to_bytes(req.into_body()).await.unwrap();let body: IncomingBody = _json::from_slice(&body_bytes).unwrap();
我们可以看到我们可以从 URI 中提取任务类型。 现在,我们将支持加法、减法和乘法。 现在我们已经从传入的要求中得到了所需的统统; 我们可以利用以下代码根据 URI 布局适当的任务:
let message_type: TaskType;match task_type.as_str() { "add" => { let body = AddTask{one: body.one, two: body.two}; message_type = TaskType::ADD(body); }, "multiply" => { let body = MultiplyTask{one: body.one, two: body.two}; message_type = TaskType::MULTIPLY(body); }, "subtract" => { let body = SubtractTask{one: body.one, two: body.two}; message_type = TaskType::SUBTRACT(body); }, _ => { . . . }}
我们可以看到,无论任务是什么,我们都须要将任务构造打包到 TaskType 列举中,该列举可以序列化为二进制向量,以便将发送到 Redis 行列步队。 对付 match 语句的末了一部分,它捕获所有与“add”、“multiply”或“subtract”不匹配的任务要求,我们仅返回一个 NOT_FOUND HTTP 相应,代码如下:
let response = Response::builder().status(StatusCode::NOT_FOUND) .body(Body::from("task not found"));return Ok(response.unwrap())
现在,我们已拥有创建可插入 Redis 行列步队的通用任务所需的统统。 有了这些信息,我们就可以创建 TaskMessage 构造并在我们刚刚用以下代码覆盖的 match 语句之后序列化 TaskMessage:
let message = TaskMessage{task_type: message_type, task: bytes};let serialized_message = bincode::serialize(&message).unwrap();
然后,我们将建立一个 Redis 连接,将序列化的推送到 Redis 行列步队,然后利用以下代码删除 Redis 连接:
let mut con = client.get_connection().unwrap();let _ : () = redis::cmd("LPUSH").arg("some_queue") .arg(serialized_message .clone()) .query(&mut con).unwrap();
末了,我们返回一个 Ok HTTP 相应,表明任务已发送,代码如下:
Ok(Response::new(Body::from("task sent")))
我们的句柄功能现在已经完成。 我们现在须要做的便是删除事情代码块中将 AddTask 构造插入 Redis 行列步队的代码。 我们将从事情职员代码块中删除任务插入代码,由于我们不再须要事情职员插入任务。 插入代码的删除采取以下形式:
let client = redis::Client::open("redis://127.0.0.1/").unwrap();loop { let outcome: Option<Vec<u8>> = { let mut con = client.get_connection() .unwrap(); redis::cmd("RPOP").arg("some_queue") .query(&mut con) .unwrap() }; match outcome { . . . }}
现在,我们已准备好将这些事情职员和 HTTP 做事器打包到 Docker 中,以便我们可以利用任意数量的事情职员来运行我们的运用程序。
全部在 Docker 中运行我们现在正处于可以在 Docker 中运行全体运用程序的阶段。 这使我们能够让多个事情职员从同一个 Redis 行列步队中提取数据。 首先,我们须要定义 Dockerfile 来构建我们的事情/做事器镜像。 我们将利用以下代码为 Docker 构建进行无发行版构建:
FROM rust:1.62.1 as buildENV PKG_CONFIG_ALLOW_CROSS=1WORKDIR /appCOPY . .cargo build --release FROM gcr.io/distroless/cc-debian10COPY --from=build /app/target/release/task_queue /usr/local/bin/task_queueEXPOSE 3000ENTRYPOINT ["task_queue"]
在本书的这一点上,这种无发行版本该当不足为奇。 我们只是编译运用程序,然后将静态二进制文件复制到 distroless 映像中。 在以任何办法运行构建之前,我们必须确保不会将过多的文件从目标目录复制到我们的 Docker 构建中,并在 .dockerignore 文件中利用以下代码:
./target.github
我们的构建现已准备就绪。 我们可以利用以下概要定义 docker-compose.yml:
version: "3.7"services: server_1: . . . worker_1: . . . worker_2: . . . worker_3: . . . redis: container_name: 'queue-redis' image: 'redis' ports: - '6379:6379'
在这里,我们可以看到我们有三个事情职员和一台做事器。 我们的做事器采取以下形式:
server_1: container_name: server_1 image: server_1 build: context: . environment: - 'APP_TYPE=server' - 'REDIS_URL=redis://redis:6379' depends_on: redis: condition: service_started restart: on-failure ports: - "3000:3000" expose: - 3000
在这里,我们可以看到我们可以公开端口,指出构建高下文位于当前目录中,并且我们的容器该当在 Redis 启动后启动。
标准事情职员采取以下形式:
worker_1: container_name: worker_1 image: worker_1 build: context: . environment: - 'APP_TYPE=worker' - 'REDIS_URL=redis://redis:' depends_on: redis: condition: service_started restart: on-failure
我们可以想象其他工人具有与前一个工人相同的构造,这是事实。 如果我们想添加另一个事情职员,我们可以利用与worker_1完备相同的规格,除非我们只是增加附加到图像和容器名称的数字,从而导致新事情职员被称为worker_2。 您可能已经把稳到,我们已将 REDIS_URL 添加到环境变量中。 这是由于事情职员和做事器必须访问其容器外部的 Redis 数据库。 将 localhost 通报到 Redis 客户端将导致无法连接到 Redis。 因此,我们必须删除对 Redis 客户真个所有引用,并将这些引用更换为以下代码:
let client = redis::Client::open(env::var("REDIS_URL").unwrap()) .unwrap();
如果我们现在启动 docker_compose 并向做事器发送一系列不同的 HTTP 要求,我们会得到以下打印输出:
. . .queue-redis | 1:M 30 Oct 2022 18:42:52.334 RDB memory usage when created 0.85 Mbqueue-redis | 1:M 30 Oct 2022 18:42:52.334 Done loading RDB, keys loaded: 0, keys expired: 0.queue-redis | 1:M 30 Oct 2022 18:42:52.334 DB loaded from disk: 0.002 secondsqueue-redis | 1:M 30 Oct 2022 18:42:52.334 Ready to accept connectionsworker_1 | empty queueworker_3 | empty queueworker_1 | empty queueworker_3 | empty queueworker_2 | multiply: 9worker_3 | multiply: 25worker_1 | multiply: 8worker_3 | empty queueworker_3 | empty queueworker_2 | multiply: 4worker_2 | empty queue. . .
这是一个很大的打印输出,但我们可以看到 Redis 开始旋转,并且有多个事情线程轮询 Redis 行列步队。 我们还可以看到多个worker同时处理多个任务。 这里描述了如何向做事器发出要求的示例:
图 18.4 – 向我们的做事器发送乘法要求的示例
图 18.5 – 向我们的做事器发送减法要求的示例
图 18.6 – 向我们的做事器发送添加要求的示例
在这里,我们有它!
我们有一个接管要求的做事器。 根据 URI,我们的做事器布局一个任务,将其打包成,然后将其发送到 Redis 行列步队。 然后,我们让多个事情职员轮询 Redis 行列步队来处理长任务。
在本章中,我们构建了一个可以作为事情线程或做事器运行的运用程序。 然后,我们构建了可以序列化并插入到 Redis 行列步队中的构造。 这使得我们的事情职员能够花费这些任务,然后在自己的韶光处理它们。 您现在可以构建处理永劫光任务的系统,而无需占用 HTTP 做事器。 序列化 Rust 构造并将其插入 Redis 的机制不仅仅止于处理大型任务。 我们可以序列化 Rust 构造,并通过 Redis 中的 pub/sub 通道将它们发送到其他 Rust 做事器,实质上是创建更大规模的参与者模型方法。
通过我们的 distroless 镜像,这些 Rust 做事器的大小大约只有 50 MB,使得这个观点具有可扩展性。 我们还探索了将原始命令运用于 Redis,这让您可以自由且自傲地完备接管 Redis 所供应的功能。 进一步阅读部分给出了可以对 Redis 实行的所有命令的高等列表。 您会对自己能做的事情感到震荡,我希望您在查看可用命令时想到利用 Redis 可以实现的所有办理方案时会像我一样愉快。
我们已经读到了本书的结尾。 我很感激你能走到这一步,当读者伸出援手时我总是很高兴。 Rust 确实是一种革命性的编程措辞。 借助 Rust,我们已经能够构建和支配快速的小型做事器。 我们探索了异步编程和参与者模型。 我们已经构建了支配管道。 你的旅程还没有结束; 总是有更多东西须要学习。 然而,我希望我已经让您理解了基本观点,以便您可以连续阅读更多文档、实践,并有朝一日打破 Web 编程的界线。