WebSocket 在握手请求/响应中通过 Sec-WebSocket-Extensions
头协商使用的扩展及其参数。
下面是握手请求示例:
在该示例中,客户端请求使用 permessage-deflate 扩展,其中 client_max_windows_bits 是 permessage-deflate 的参数。
下面是上面的请求对应的响应示例:
最终双方协商出使用 permessage-deflate 扩展。接下来介绍 permessage-deflate 的参数。
Me:
请介绍 WebSocket permessage-deflate 扩展有哪些参数。
ChatGPT:
WebSocket 的 permessage-deflate 扩展是一种用于在 WebSocket 通信中进行数据压缩的扩展。它使用了 zlib 压缩库来提供数据的压缩和解压缩功能。permessage-deflate 扩展支持以下参数:
server_no_context_takeover
(服务器端无上下文接管):该参数用于服务器端,表示服务器在每个消息上不会保持上下文(compression context),即不会在消息之间共享压缩上下文。默认情况下,服务器可以在消息之间保留上下文以提高压缩效率。
client_no_context_takeover
(客户端无上下文接管):该参数用于客户端,与 server_no_context_takeover
类似,表示客户端在每个消息上不会保持上下文。
server_max_window_bits
(服务器最大窗口位数):该参数用于服务器端,指定服务器使用的最大窗口位数(window size)。窗口位数表示用于压缩的滑动窗口的大小,影响压缩算法的效率和压缩比。默认值为 15,表示窗口大小为 2^15。
client_max_window_bits
(客户端最大窗口位数):该参数用于客户端,与 server_max_window_bits
类似,表示客户端使用的最大窗口位数。
这些参数可以在 WebSocket 握手过程中的握手扩展字段中使用,以便客户端和服务器之间共同协商压缩参数的配置。通过设置这些参数,可以控制压缩上下文的保持和窗口大小,从而影响压缩算法的性能和压缩比。
需要注意的是,这些参数是可选的,具体的参数配置取决于 WebSocket 客户端和服务器的支持和协商。在使用 permessage-deflate 扩展时,了解并配置这些参数可以帮助优化数据的传输和压缩效果。
上面的示例 WebSocket 服务是一个 Echo Server,也就是说,服务端会将客户端发送给它的文本原样地发回给客户端。
下面是客户端发送给服务端的帧示例:
RFC 规定客户端发送给服务端的数据必须进行掩码处理,7a b6 ... 是解除掩码后的十六进制形式的 Payload。
服务端发送给客户端的 Payload 相同,只是未进行掩码处理,因为 RFC 规定服务端发送给客户端的数据必须不进行掩码处理。
接下来,再发送相同的文本,得到 Payload 如下所示:
可见,客户端两次发送相同的文本,但是客户端压缩得到的内容不同。上次是 7a b6 ...,共 15 个字节,而这次是 7a 86 ...,共 5 个字节,压缩比例进一步提升。
服务端也类似,压缩比例更高。
这说明,客户端未启用 client_no_context_takeover
参数,服务端未启用 server_no_context_takeover
参数。通过握手报文可知,服务端和客户端使用的 max window bits 都是 15(默认值)。接下来介绍如何使用 Rust 进行解压缩。
Cargo.toml:
[package]
name = "websocket_permessage_deflate"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libc = "0.2.40"
libz-sys = "1.0.18"
src/deflate.rs:
use libc::{c_char, c_int, c_uint};
use libz_sys as ffi;
use std::mem;
use std::mem::MaybeUninit;
use std::slice;
const ZLIB_VERSION: &'static str = "1.2.7\0";
pub trait Context {
fn stream(&mut self) -> &mut ffi::z_stream;
fn stream_apply<F>(&mut self, input: &[u8], output: &mut Vec<u8>, each: F) -> Result<(), String>
where
F: Fn(&mut ffi::z_stream) -> Option<Result<(), String>>,
{
debug_assert!(output.len() == 0, "Output vector is not empty.");
let stream = self.stream();
stream.next_in = input.as_ptr() as *mut _;
stream.avail_in = input.len() as c_uint;
let mut output_size;
loop {
output_size = output.len();
if output_size == output.capacity() {
output.reserve(input.len())
}
let out_slice = unsafe {
slice::from_raw_parts_mut(
output.as_mut_ptr().offset(output_size as isize),
output.capacity() - output_size,
)
};
stream.next_out = out_slice.as_mut_ptr();
stream.avail_out = out_slice.len() as c_uint;
let before = stream.total_out;
let cont = each(stream);
unsafe {
output.set_len((stream.total_out - before) as usize + output_size);
}
if let Some(result) = cont {
return result;
}
}
}
}
pub struct Compressor {
// Box the z_stream to ensure it isn't moved. Moving the z_stream
// causes zlib to fail, because it maintains internal pointers.
stream: Box<ffi::z_stream>,
}
impl Compressor {
pub fn new(window_bits: i8) -> Compressor {
debug_assert!(window_bits >= 9, "Received too small window size.");
debug_assert!(window_bits <= 15, "Received too large window size.");
unsafe {
let mut stream: Box<MaybeUninit<ffi::z_stream>> = Box::new(MaybeUninit::zeroed());
let result = ffi::deflateInit2_(
stream.as_mut_ptr(),
9,
ffi::Z_DEFLATED,
-window_bits as c_int,
9,
ffi::Z_DEFAULT_STRATEGY,
ZLIB_VERSION.as_ptr() as *const c_char,
mem::size_of::<ffi::z_stream>() as c_int,
);
assert!(result == ffi::Z_OK, "Failed to initialize compressor.");
let stream = Box::from_raw(Box::into_raw(stream) as *mut ffi::z_stream);
Compressor { stream }
}
}
pub fn compress(&mut self, input: &[u8], output: &mut Vec<u8>) -> Result<(), String> {
self.stream_apply(input, output, |stream| unsafe {
match ffi::deflate(stream, ffi::Z_SYNC_FLUSH) {
ffi::Z_OK | ffi::Z_BUF_ERROR => {
if stream.avail_in == 0 && stream.avail_out > 0 {
Some(Ok(()))
} else {
None
}
}
code => Some(Err(format!("Failed to perform compression: {}", code))),
}
})
}
pub fn reset(&mut self) -> Result<(), String> {
match unsafe { ffi::deflateReset(self.stream.as_mut()) } {
ffi::Z_OK => Ok(()),
code => Err(format!("Failed to reset compression context: {}", code)),
}
}
}
impl Context for Compressor {
fn stream(&mut self) -> &mut ffi::z_stream {
self.stream.as_mut()
}
}
impl Drop for Compressor {
fn drop(&mut self) {
match unsafe { ffi::deflateEnd(self.stream.as_mut()) } {
// Compression stream encountered bad state
ffi::Z_STREAM_ERROR => {}
// Ignore discarded data error because we are raw.
// Deallocated compression context
ffi::Z_OK | ffi::Z_DATA_ERROR => {}
// Bad zlib status encountered: {code}
code => {}
}
}
}
pub struct Decompressor {
stream: Box<ffi::z_stream>,
}
impl Decompressor {
pub fn new(window_bits: i8) -> Decompressor {
debug_assert!(window_bits >= 8, "Received too small window size.");
debug_assert!(window_bits <= 15, "Received too large window size.");
unsafe {
let mut stream: Box<MaybeUninit<ffi::z_stream>> = Box::new(MaybeUninit::zeroed());
let result = ffi::inflateInit2_(
stream.as_mut_ptr(),
-window_bits as c_int,
ZLIB_VERSION.as_ptr() as *const c_char,
mem::size_of::<ffi::z_stream>() as c_int,
);
assert!(result == ffi::Z_OK, "Failed to initialize decompressor.");
let stream = Box::from_raw(Box::into_raw(stream) as *mut ffi::z_stream);
Decompressor { stream }
}
}
pub fn decompress(&mut self, input: &[u8], output: &mut Vec<u8>) -> Result<(), String> {
self.stream_apply(input, output, |stream| unsafe {
match ffi::inflate(stream, ffi::Z_SYNC_FLUSH) {
ffi::Z_OK | ffi::Z_BUF_ERROR => {
if stream.avail_in == 0 && stream.avail_out > 0 {
Some(Ok(()))
} else {
None
}
}
code => Some(Err(format!("Failed to perform decompression: {}", code))),
}
})
}
pub fn reset(&mut self) -> Result<(), String> {
match unsafe { ffi::inflateReset(self.stream.as_mut()) } {
ffi::Z_OK => Ok(()),
code => Err(format!("Failed to reset compression context: {}", code)),
}
}
}
impl Context for Decompressor {
fn stream(&mut self) -> &mut ffi::z_stream {
self.stream.as_mut()
}
}
impl Drop for Decompressor {
fn drop(&mut self) {
match unsafe { ffi::inflateEnd(self.stream.as_mut()) } {
// Decompression stream encountered bad state
ffi::Z_STREAM_ERROR => {}
// Deallocated decompression context
ffi::Z_OK => {}
// Bad zlib status encountered: {code}
code => {}
}
}
}
src/main.rs:
mod deflate;
use deflate::*;
fn hex_string_to_bytes(hex_string: &str) -> Vec<u8> {
let mut bytes = Vec::new();
let mut chars = hex_string.chars().peekable();
while let Some(c) = chars.next() {
if let Some(next_c) = chars.peek() {
let byte_str: String = vec![c, *next_c].into_iter().collect();
if let Ok(byte) = u8::from_str_radix(&byte_str, 16) {
bytes.push(byte);
chars.next(); // 跳过下一个字符
}
}
}
bytes
}
fn main() {
let mut dec = Decompressor::new(15);
let input = "7ab6b5fbc5faa9cfa6b53f9bb30600";
let mut input = hex_string_to_bytes(input);
input.extend(&[0, 0, 255, 255]);
println!("input {:?}", input);
let mut output = Vec::with_capacity(input.len() * 2);
dec.decompress(&input[..], &mut output).unwrap();
println!("{:?}", String::from_utf8(output).unwrap());
let input = "42660300";
// let input = "7a86c40600";
let mut input = hex_string_to_bytes(input);
input.extend(&[0, 0, 255, 255]);
println!("input {:?}", input);
let mut output = Vec::with_capacity(input.len() * 2);
dec.decompress(&input[..], &mut output).unwrap();
println!("{:?}", String::from_utf8(output).unwrap());
}
解析 WebSocket 协议时,处理 permessage-deflate 扩展的流程如下:
根据握手请求/响应的 Sec-WebSocket-Extensions
头确定是否启用 permessage-deflate 扩展,以及扩展参数
使用扩展参数为客户端和服务端分别生成解码器
如果启用 server_no_context_takeover
参数,那么服务端解码器在每次解码消息后,需要 reset,否则不能 reset。client_no_context_takeover
同理