广播聊天应用程序

在本练习中,我们想要使用我们的新知识来实现一个广播聊天应用。我们有一个聊天服务器,客户端连接到该服务器并发布他们的消息。客户端从标准输入读取用户消息,并将其发送到服务器。聊天服务器将收到的每条消息广播给所有客户端。

For this, we use a broadcast channel on the server, and tokio_websockets for the communication between the client and the server.

创建一个新的 Cargo 项目并添加以下依赖:

Cargo.toml:

[package]
name = "chat-async"
version = "0.1.0"
edition = "2021"

[dependencies]
futures-util = { version = "0.3.28", features = ["sink"] }
http = "0.2.9"
tokio = { version = "1.28.1", features = ["full"] }
tokio-websockets = { version = "0.4.0", features = ["client", "fastrand", "server", "sha1_smol"] }

所需的API

You are going to need the following functions from tokio and tokio_websockets. Spend a few minutes to familiarize yourself with the API.

两个可执行文件

通常在一个Cargo项目中,你只能有一个二进制文件,和一个src/main.rs文件。在这个项目中,我们需要两个二进制文件。一个用于客户端,另一个用于服务器。你可能会考虑将它们制作成两个单独的Cargo项目,但我们将它们放在一个包含两个二进制文件的Cargo项目中。为了使这个工作,客户端和服务器的代码应该放在src/bin下(参见文档)。

将以下服务器和客户端代码分别复制到 src/bin/server.rssrc/bin/client.rs 中。您的任务是按照下面的描述完成这些文件。

src/bin/server.rs:

use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::error::Error;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::{channel, Sender};
use tokio_websockets::{Message, ServerBuilder, WebsocketStream};

async fn handle_connection(
    addr: SocketAddr,
    mut ws_stream: WebsocketStream<TcpStream>,
    bcast_tx: Sender<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {

    // TODO: For a hint, see the description of the task below.

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    let (bcast_tx, _) = channel(16);

    let listener = TcpListener::bind("127.0.0.1:2000").await?;
    println!("listening on port 2000");

    loop {
        let (socket, addr) = listener.accept().await?;
        println!("New connection from {addr:?}");
        let bcast_tx = bcast_tx.clone();
        tokio::spawn(async move {
            // Wrap the raw TCP stream into a websocket.
            let ws_stream = ServerBuilder::new().accept(socket).await?;

            handle_connection(addr, ws_stream, bcast_tx).await
        });
    }
}

src/bin/client.rs:

use futures_util::stream::StreamExt;
use futures_util::SinkExt;
use http::Uri;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_websockets::{ClientBuilder, Message};

#[tokio::main]
async fn main() -> Result<(), tokio_websockets::Error> {
    let (mut ws_stream, _) =
        ClientBuilder::from_uri(Uri::from_static("ws://127.0.0.1:2000"))
            .connect()
            .await?;

    let stdin = tokio::io::stdin();
    let mut stdin = BufReader::new(stdin).lines();


    // TODO: For a hint, see the description of the task below.

}

运行可执行文件

使用以下命令运行服务器:

cargo run --bin server

and the client with:

cargo run --bin client

任务

  • src/bin/server.rs 中实现 handle_connection 函数。
    • 提示:使用 tokio::select! 在一个连续的循环中并发执行两个任务。一个任务从客户端接收消息并广播它们。另一个任务将服务器接收到的消息发送给客户端。
  • 完成 src/bin/client.rs 中的 main 函数。
    • Hint: As before, use tokio::select! in a continuous loop for concurrently performing two tasks: (1) reading user messages from standard input and sending them to the server, and (2) receiving messages from the server, and displaying them for the user.
  • Optional: Once you are done, change the code to broadcast messages to all clients, but the sender of the message.