广播聊天应用程序
在本练习中,我们想要使用我们的新知识来实现一个广播聊天应用。我们有一个聊天服务器,客户端连接到该服务器并发布他们的消息。客户端从标准输入读取用户消息,并将其发送到服务器。聊天服务器将收到的每条消息广播给所有客户端。
For this, we use a broadcast channel on the server, and tokio_websockets
for the communication between the client and the server.
创建一个新的 Cargo 项目并添加以下依赖:
Cargo.toml:
[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"
[dependencies]
futures-util = { version = "0.3.28", features = ["sink"] }
http = "0.2.9"
tokio = { version = "1.28.1", features = ["full"] }
tokio-websockets = { version = "0.4.0", features = ["client", "fastrand", "server", "sha1_smol"] }
所需的API
You are going to need the following functions from tokio
and tokio_websockets
. Spend a few minutes to familiarize yourself with the API.
- StreamExt::next() implemented by
WebsocketStream
: for asynchronously reading messages from a Websocket Stream. - SinkExt::send() 由
WebsocketStream
实现:用于在Websocket流上异步发送消息。 - Lines::next_line(): for asynchronously reading user messages from the standard input.
- Sender::subscribe():用于订阅广播频道。
两个可执行文件
通常在一个Cargo项目中,你只能有一个二进制文件,和一个src/main.rs
文件。在这个项目中,我们需要两个二进制文件。一个用于客户端,另一个用于服务器。你可能会考虑将它们制作成两个单独的Cargo项目,但我们将它们放在一个包含两个二进制文件的Cargo项目中。为了使这个工作,客户端和服务器的代码应该放在src/bin
下(参见文档)。
将以下服务器和客户端代码分别复制到 src/bin/server.rs
和 src/bin/client.rs
中。您的任务是按照下面的描述完成这些文件。
src/bin/server.rs:
use futures_util::sink::SinkExt; use futures_util::stream::StreamExt; use std::error::Error; use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::broadcast::{channel, Sender}; use tokio_websockets::{Message, ServerBuilder, WebsocketStream}; async fn handle_connection( addr: SocketAddr, mut ws_stream: WebsocketStream<TcpStream>, bcast_tx: Sender<String>, ) -> Result<(), Box<dyn Error + Send + Sync>> { // TODO: For a hint, see the description of the task below. } #[tokio::main] async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { let (bcast_tx, _) = channel(16); let listener = TcpListener::bind("127.0.0.1:2000").await?; println!("listening on port 2000"); loop { let (socket, addr) = listener.accept().await?; println!("New connection from {addr:?}"); let bcast_tx = bcast_tx.clone(); tokio::spawn(async move { // Wrap the raw TCP stream into a websocket. let ws_stream = ServerBuilder::new().accept(socket).await?; handle_connection(addr, ws_stream, bcast_tx).await }); } }
src/bin/client.rs:
use futures_util::stream::StreamExt; use futures_util::SinkExt; use http::Uri; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_websockets::{ClientBuilder, Message}; #[tokio::main] async fn main() -> Result<(), tokio_websockets::Error> { let (mut ws_stream, _) = ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000")) .connect() .await?; let stdin = tokio::io::stdin(); let mut stdin = BufReader::new(stdin).lines(); // TODO: For a hint, see the description of the task below. }
运行可执行文件
使用以下命令运行服务器:
cargo run --bin server
and the client with:
cargo run --bin client
任务
- 在
src/bin/server.rs
中实现handle_connection
函数。- 提示:使用
tokio::select!
在一个连续的循环中并发执行两个任务。一个任务从客户端接收消息并广播它们。另一个任务将服务器接收到的消息发送给客户端。
- 提示:使用
- 完成
src/bin/client.rs
中的main
函数。- Hint: As before, use
tokio::select!
in a continuous loop for concurrently performing two tasks: (1) reading user messages from standard input and sending them to the server, and (2) receiving messages from the server, and displaying them for the user.
- Hint: As before, use
- Optional: Once you are done, change the code to broadcast messages to all clients, but the sender of the message.