You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

13 KiB

构建一个单线程Web服务器

Web服务器涉及的两个主要协议是超文本传输协议(HTTP)和传输控制协议(TCP)。这两个协议都是请求-响应协议,意味着客户端发起请求,服务器监听请求并提供响应给客户端。这些请求和响应的内容由这些协议定义。

监听TCP连接

use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        println!("Connection established!");
    }
}

//当访问此端口:
// Connection established!
// Connection established!
// Connection established!

读取请求

use std::{
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let http_request: Vec<_> = buf_reader
        .lines()
        .map(|result| result.unwrap())
        .take_while(|line| !line.is_empty())
        .collect();

    println!("Request: {:#?}", http_request);
}

此时当访问端口后,会输出以下内容:

image-20231030092459777

take_whilefor 循环都可以用于迭代集合中的元素,但它们有不同的用途和行为:

  1. for 循环:
    • for 循环是一种通用的迭代工具,它会遍历整个集合,处理每个元素。
    • 你可以在 for 循环中执行任何你想要的操作,例如处理集合中的每个元素,筛选元素,转换元素等。
    • for 循环通常用于遍历整个集合,而不是根据特定条件选择性地处理元素。
  2. take_while 方法:
    • take_while 是一个迭代器方法,它根据条件选择性地从集合中取元素,并返回一个新的迭代器。
    • 它会从集合的开头开始,一直遍历元素,直到条件不再满足为止。
    • 通常,take_while 用于处理集合中的一部分元素,而不是整个集合。

总之,for 循环通常用于遍历整个集合,而 take_while 通常用于处理集合的部分元素,具体取决于满足的条件。

返回实际 HTML 页面

首先在项目根目录创建一个hello.html文件:

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
  </body>
</html>

修改handle_connection,添加返回信息:

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}
fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let http_request: Vec<_> = buf_reader
        .lines()
        .map(|result| result.unwrap())
        .take_while(|line| !line.is_empty())
        .collect();

    let status_line = "HTTP/1.1 200 OK";
    let contents = fs::read_to_string("hello.html").unwrap();
    let length = contents.len();
    let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
    stream.write_all(response.as_bytes()).unwrap();
    println!("Request: {:#?}", http_request);
    println!("Response: {:#?}", response);
}

image-20231030162924786

验证请求并选择性响应

根据请求自定义响应,仅对 / 路径的请求返回HTML文件。修改handle_connection函数:

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();
    // 只查看HTTP请求的第一行,调用 next 来从迭代器中获取第一个。
    println!("Request: {:#?}", request_line);
    // let http_request: Vec<_> = buf_reader
    //     .lines()
    //     .map(|result| result.unwrap())
    //     .take_while(|line| !line.is_empty())
    //     .collect();
    if request_line == "GET / HTTP/1.1" {
        let status_line = "HTTP/1.1 200 OK";
        let contents = fs::read_to_string("hello.html").unwrap();
        let length = contents.len();

        let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
        stream.write_all(response.as_bytes()).unwrap();
    }else {
        // other
    }
}

效果图:

image-20231030165417178

在根目录添加404页面,当请求不存在时显示:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>

添加else分支代码:

else {
        let status_line = "HTTP/1.1 404 NOT FOUND";
        let contents = fs::read_to_string("404.html").unwrap();
        let length = contents.len();

        let response = format!(
            "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"
        );
        stream.write_all(response.as_bytes()).unwrap();
    }

效果图:

image-20231030170414132

重构代码

重构代码,提取重复代码并使404更合理。

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}
fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
        ("HTTP/1.1 200 OK", "hello.html")
    }else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
    stream.write_all(response.as_bytes()).unwrap();

}

效果图:

image-20231030171544102

创建多线程服务器

当前服务器会逐个处理每个请求,在第一个请求处理完成之前,它不会处理第二个连接。

模拟慢请求

当执行sleep满请求时,其他请求都不会执行。

image-20231030172734366

使用线程池改善吞吐量

线程池thread pool)是一组预先分配的等待或准备处理任务的线程。可通过限制线程池中线程的数量,防止耗尽服务器的所有资源,将请求的处理推向停滞。

每个请求创建一个线程(当请求过多时,会耗尽系统资源)

修改main函数:

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();
        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

效果图:

image-20231031103731348

使用线程池管理线程

thread::spawn 函数定义:

#[stable(feature = "rust1", since = "1.0.0")]
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    Builder::new().spawn(f).expect("failed to spawn thread")
}

where 子句中每个约束的含义:

  1. F: FnOnce() -> T: 这个约束表示泛型参数 F 必须是一个闭包(函数),并且这个闭包不需要任何参数(FnOnce()),它的返回值类型必须与 T 相匹配。也就是说,F 必须是一个能够执行一次的函数,而返回值类型必须是 T
  2. F: Send + 'static: 这个约束要求泛型参数 F 必须实现 Send 特性。Send 特性表示类型是可以安全地在不同线程之间传递的。此外,'static 生命周期表示 F 必须具有静态生命周期,也就是说,它的生命周期可以持续到整个程序的运行期间。
  3. T: Send + 'static: 同样,这个约束要求泛型参数 T 必须实现 Send 特性并且必须具有静态生命周期。

根据spawn函数参数及返回值,编写以下线程池文件:

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        // sender可以多个、receiver只能有一个,使用Arc在多个线程下使用同一个receiver
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id,Arc::clone(&receiver)));
        }
        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
        where F: FnOnce() + Send + 'static,
    {
        let job =  Box::new(f);
        self.sender.send(job).unwrap();
    }
}
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            println!("Worker {id} got a job; executing.");
            job();
        });
        Worker { id, thread }
    }
}

主函数代码:

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
use hello::ThreadPool;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);
    for stream in listener.incoming() {
        let stream = stream.unwrap();
        pool.execute(|| {
            handle_connection(stream);
        });
    }
}
fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();
    println!("request: {}", request_line);
    // 使用match匹配多个请求
    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(10));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
    stream.write_all(response.as_bytes()).unwrap();

}

new函数会同时创建一组Worker线程,Worker线程在创建后,因发送端无数据会阻塞当前线程,等待直到有数据可用。当执行线程池 execute方法时,job会被发送给接收端,然后Worker线程会竞争获取队列中的任务来执行。

因为Worker的new方法中使用创建了新的线程,所以是异步执行,故for可以继续创建Worker实例而不会阻塞主线程。同时每当一个工作线程完成了它的任务,就会创建一个新的具有相同标识符id的新worker实例,以继续等待任务的到来。运行图:

image-20231101162658607

当发送端有job发送时,以及执行完任务后,效果图:

image-20231101163138635

若使用while-let替换let来实现new:

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");
                job();
            }
        });
        Worker { id, thread }
    }
}

在使用let job = receiver.lock().unwrap().recv().unwrap();的代码中,通过使用let,右侧表达式中使用的任何临时值在let语句结束时会被立即丢弃。然而,while let(以及if let和match)不会丢弃临时值,直到关联块的末尾。while-let中,锁会一直被持有,直到job()函数执行完毕,这会导致其他线程无法获取锁并执行任务。

只有第一个线程在工作,效果图:

image-20231101172020795