13 KiB
构建一个单线程Web服务器
Web服务器涉及的两个主要协议是超文本传输协议(HTTP)和传输控制协议(TCP)。这两个协议都是请求-响应协议,意味着客户端发起请求,服务器监听请求并提供响应给客户端。这些请求和响应的内容由这些协议定义。
监听TCP连接
use std::net::TcpListener;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
println!("Connection established!");
}
}
//当访问此端口:
// Connection established!
// Connection established!
// Connection established!
读取请求
use std::{
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let http_request: Vec<_> = buf_reader
.lines()
.map(|result| result.unwrap())
.take_while(|line| !line.is_empty())
.collect();
println!("Request: {:#?}", http_request);
}
此时当访问端口后,会输出以下内容:
take_while
和 for
循环都可以用于迭代集合中的元素,但它们有不同的用途和行为:
for
循环:for
循环是一种通用的迭代工具,它会遍历整个集合,处理每个元素。- 你可以在
for
循环中执行任何你想要的操作,例如处理集合中的每个元素,筛选元素,转换元素等。 for
循环通常用于遍历整个集合,而不是根据特定条件选择性地处理元素。
take_while
方法:take_while
是一个迭代器方法,它根据条件选择性地从集合中取元素,并返回一个新的迭代器。- 它会从集合的开头开始,一直遍历元素,直到条件不再满足为止。
- 通常,
take_while
用于处理集合中的一部分元素,而不是整个集合。
总之,for
循环通常用于遍历整个集合,而 take_while
通常用于处理集合的部分元素,具体取决于满足的条件。
返回实际 HTML 页面
首先在项目根目录创建一个hello.html文件:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
修改handle_connection,添加返回信息:
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let http_request: Vec<_> = buf_reader
.lines()
.map(|result| result.unwrap())
.take_while(|line| !line.is_empty())
.collect();
let status_line = "HTTP/1.1 200 OK";
let contents = fs::read_to_string("hello.html").unwrap();
let length = contents.len();
let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
stream.write_all(response.as_bytes()).unwrap();
println!("Request: {:#?}", http_request);
println!("Response: {:#?}", response);
}
验证请求并选择性响应
根据请求自定义响应,仅对 /
路径的请求返回HTML文件。修改handle_connection函数:
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
// 只查看HTTP请求的第一行,调用 next 来从迭代器中获取第一个。
println!("Request: {:#?}", request_line);
// let http_request: Vec<_> = buf_reader
// .lines()
// .map(|result| result.unwrap())
// .take_while(|line| !line.is_empty())
// .collect();
if request_line == "GET / HTTP/1.1" {
let status_line = "HTTP/1.1 200 OK";
let contents = fs::read_to_string("hello.html").unwrap();
let length = contents.len();
let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
stream.write_all(response.as_bytes()).unwrap();
}else {
// other
}
}
效果图:
在根目录添加404页面,当请求不存在时显示:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
添加else分支代码:
else {
let status_line = "HTTP/1.1 404 NOT FOUND";
let contents = fs::read_to_string("404.html").unwrap();
let length = contents.len();
let response = format!(
"{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"
);
stream.write_all(response.as_bytes()).unwrap();
}
效果图:
重构代码
重构代码,提取重复代码并使404更合理。
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = if request_line == "GET / HTTP/1.1" {
("HTTP/1.1 200 OK", "hello.html")
}else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
stream.write_all(response.as_bytes()).unwrap();
}
效果图:
创建多线程服务器
当前服务器会逐个处理每个请求,在第一个请求处理完成之前,它不会处理第二个连接。
模拟慢请求
当执行sleep满请求时,其他请求都不会执行。
使用线程池改善吞吐量
线程池(thread pool)是一组预先分配的等待或准备处理任务的线程。可通过限制线程池中线程的数量,防止耗尽服务器的所有资源,将请求的处理推向停滞。
每个请求创建一个线程(当请求过多时,会耗尽系统资源)
修改main函数:
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
效果图:
使用线程池管理线程
thread::spawn 函数定义:
#[stable(feature = "rust1", since = "1.0.0")]
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
}
where
子句中每个约束的含义:
F: FnOnce() -> T
: 这个约束表示泛型参数F
必须是一个闭包(函数),并且这个闭包不需要任何参数(FnOnce()
),它的返回值类型必须与T
相匹配。也就是说,F
必须是一个能够执行一次的函数,而返回值类型必须是T
。F: Send + 'static
: 这个约束要求泛型参数F
必须实现Send
特性。Send
特性表示类型是可以安全地在不同线程之间传递的。此外,'static
生命周期表示F
必须具有静态生命周期,也就是说,它的生命周期可以持续到整个程序的运行期间。T: Send + 'static
: 同样,这个约束要求泛型参数T
必须实现Send
特性并且必须具有静态生命周期。
根据spawn函数参数及返回值,编写以下线程池文件:
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
// sender可以多个、receiver只能有一个,使用Arc在多个线程下使用同一个receiver
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id,Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
主函数代码:
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
use hello::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
println!("request: {}", request_line);
// 使用match匹配多个请求
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(10));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response = format!("{}\r\nContent-Length: {}\r\n\r\n{}", status_line, length, contents);
stream.write_all(response.as_bytes()).unwrap();
}
new函数会同时创建一组Worker线程,Worker线程在创建后,因发送端无数据会阻塞当前线程,等待直到有数据可用。当执行线程池 execute方法时,job会被发送给接收端,然后Worker线程会竞争获取队列中的任务来执行。
因为Worker的new方法中使用创建了新的线程,所以是异步执行,故for可以继续创建Worker实例而不会阻塞主线程。同时每当一个工作线程完成了它的任务,就会创建一个新的具有相同标识符id的新worker实例,以继续等待任务的到来。运行图:
当发送端有job发送时,以及执行完任务后,效果图:
若使用while-let替换let来实现new:
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
在使用let job = receiver.lock().unwrap().recv().unwrap();的代码中,通过使用let,右侧表达式中使用的任何临时值在let语句结束时会被立即丢弃。然而,while let(以及if let和match)不会丢弃临时值,直到关联块的末尾。while-let中,锁会一直被持有,直到job()函数执行完毕,这会导致其他线程无法获取锁并执行任务。
只有第一个线程在工作,效果图: