Rust 实战 - 使用套接字联网API(二)

栏目: 编程语言 · Rust · 发布时间: 5年前

内容简介:上一节,我们已经实现了一个最小可运行版本。之所以使用Rust而不是C,是因为Rust具备了必要的抽象能力,还能获得跟C差不多的性能。这一节,我们对上一节的代码做必要的封装,顺便还能把我将上一节的源码放到了还记得上一节,我们把使用到的

上一节,我们已经实现了一个最小可运行版本。之所以使用Rust而不是C,是因为Rust具备了必要的抽象能力,还能获得跟C差不多的性能。这一节,我们对上一节的代码做必要的封装,顺便还能把 unsafe 的代码包装成 safe 的API。

我将上一节的源码放到了 这里 ,你可以去查看。

还记得上一节,我们把使用到的 libc 中的函数 socketbindconnect 和结构体 sockaddrsockaddr_inin_addr 等,在Rust这边定义了出来。实际上,几乎 libc 中的函数, libc 这个crate都帮我们定义好了。你可以去 这里 查看。编译器和标准库本身也使用了这个crate,我们也使用这个。

首先在 Cargo.toml 文件的 [dependencies] 下面加入 libc = "0.2" :

[dependencies]
libc = "0.2"

接着在 main.rs 文件上方加入 use libc; ,也可以 use libc as c; 。或者你直接简单粗暴 use libc::* ,并不推荐这样,除非你明确知道你使用的函数来自哪里。并将我们定义的与 libc 中对用的常量、函数、结构体删除。再添加 libc::c:: 到我们使用那些常量、结构体、函数的地方。如果你是直接 use libc::* ,除了直接删除那部分代码外,几乎什么都不用做。目前的代码:

use std::ffi::c_void;
use libc as c;

fn main() {
    use std::io::Error;
    use std::mem;
    use std::thread;
    use std::time::Duration;

    thread::spawn(|| {

        // server
        unsafe {
            let socket = c::socket(c::AF_INET, c::SOCK_STREAM, c::IPPROTO_TCP);
            if socket < 0 {
                panic!("last OS error: {:?}", Error::last_os_error());
            }

            let servaddr = c::sockaddr_in {
                sin_family: c::AF_INET as u16,
                sin_port: 8080u16.to_be(),
                sin_addr: c::in_addr {
                    s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be()
                },
                sin_zero: mem::zeroed()
            };

            let result = c::bind(socket, &servaddr as *const c::sockaddr_in as *const c::sockaddr, mem::size_of_val(&servaddr) as u32);
            if result < 0 {
                println!("last OS error: {:?}", Error::last_os_error());
                c::close(socket);
            }

            c::listen(socket, 128);

            loop {
                let mut cliaddr: c::sockaddr_storage = mem::zeroed();
                let mut len = mem::size_of_val(&cliaddr) as u32;

                let client_socket = c::accept(socket, &mut cliaddr as *mut c::sockaddr_storage as *mut c::sockaddr, &mut len);
                if client_socket < 0 {
                    println!("last OS error: {:?}", Error::last_os_error());
                    break;
                }

                thread::spawn(move || {
                    loop {
                        let mut buf = [0u8; 64];
                        let n = c::read(client_socket, &mut buf as *mut _ as *mut c_void, buf.len());
                        if n <= 0 {
                            break;
                        }

                        println!("{:?}", String::from_utf8_lossy(&buf[0..n as usize]));

                        let msg = b"Hi, client!";
                        let n = c::write(client_socket, msg as *const _ as *const c_void, msg.len());
                        if n <= 0 {
                            break;
                        }
                    }

                    c::close(client_socket);
                });
            }

            c::close(socket);
        }

    });

    thread::sleep(Duration::from_secs(1));

    // client
    unsafe {
        let socket = c::socket(c::AF_INET, c::SOCK_STREAM, c::IPPROTO_TCP);
        if socket < 0 {
            panic!("last OS error: {:?}", Error::last_os_error());
        }

        let servaddr = c::sockaddr_in {
            sin_family: c::AF_INET as u16,
            sin_port: 8080u16.to_be(),
            sin_addr: c::in_addr {
                s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be()
            },
            sin_zero: mem::zeroed()
        };

        let result = c::connect(socket, &servaddr as *const c::sockaddr_in as *const c::sockaddr, mem::size_of_val(&servaddr) as u32);
        if result < 0 {
            println!("last OS error: {:?}", Error::last_os_error());
            c::close(socket);
        }

        let msg = b"Hello, server!";
        let n = c::write(socket, msg as *const _ as *const c_void, msg.len());
        if n <= 0 {
            println!("last OS error: {:?}", Error::last_os_error());
            c::close(socket);
        }

        let mut buf = [0u8; 64];
        let n = c::read(socket, &mut buf as *mut _ as *mut c_void, buf.len());
        if n <= 0 {
            println!("last OS error: {:?}", Error::last_os_error());
        }

        println!("{:?}", String::from_utf8_lossy(&buf[0..n as usize]));

        c::close(socket);
    }
}

你编译运行,应该能得到与上一节同样的结果。

接下来,我们尝试把上面代码中函数,封装成更具Rust风格的API,除了TCP外,也还要考虑之后把UDP、UNIX域和SCTP也增加进来。同时,我们跟标准库里 net 相关的API保持一致的风格。我们暂时不考虑跨平台,只考虑Linux,因此可以大胆的将一些 linux 独有的API添加进来。

UNIX中一切皆文件,套接字也不例外。字节流套接字上的read和write函数所表现出来的行为,不同于通常的文件I/O。字节流套接字上调用read和write输入或输出字节数可能比请求的要少,这个现象的原因在于内核中用于套接字的缓冲区可能已经达到了极限。不过,这并不是我们正真关心的。我们来看看标准库中 File 的实现:

pub struct File(FileDesc);

impl File {
    ...
    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
            self.0.read(buf)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
            self.0.write(buf)
    }

    pub fn duplicate(&self) -> io::Result<File> {
            self.0.duplicate().map(File)
    }
    ...
}

File 是一个元组结构体,标准库已经实现了 readwrite ,以及 duplicateduplicate 很有用,用于复制出一个新的描述符。我们继续看 File 中"包裹的 FileDesc :

pub struct FileDesc {
    fd: c_int,
}

impl File {
    ...
    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
            let ret = cvt(unsafe {
               libc::read(self.fd,
                       buf.as_mut_ptr() as *mut c_void,
                       cmp::min(buf.len(), max_len()))
            })?;
            Ok(ret as usize)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
            let ret = cvt(unsafe {
                    libc::write(self.fd,
                        buf.as_ptr() as *const c_void,
                        cmp::min(buf.len(), max_len()))
            })?;
            Ok(ret as usize)
    }

    pub fn set_cloexec(&self) -> io::Result<()> {
            unsafe {
                    cvt(libc::ioctl(self.fd, libc::FIOCLEX))?;
                    Ok(())
            }
    }

    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
            unsafe {
                    let v = nonblocking as c_int;
                    cvt(libc::ioctl(self.fd, libc::FIONBIO, &v))?;
                    Ok(())
            }
    }
}

这一层应该是到头了,你可以看到,Rust中的 File 也是直接对 libc 的封装,不过你不用担心,一开始就提到,Rust 的ABI与C的ABI是兼容的,也就意味着Rust和C互相调用是几乎是零开销的。 FileDescreadwrite 中的实现,与我们之前对 sockfdreadwrite 基本是一样的。除了 readwrite 外,还有两个很有用的方法 set_cloexecset_nonblocking

我把“依附于”某个类型的函数叫做方法,与普通函数不同的是,依附于某个类型的函数,必须通过它所依附的类型调用。Rust通过这种方式来实现OOP,但是与某些语言的OOP不同的是,Rust的这种实现是零开销的。也就是,你将一些函数依附到某个类型上,并不会对运行时造成额外的开销,这些都在编译时去处理。

set_cloexec 方法会对描述符设置 FD_CLOEXEC 。我们经常会碰到需要fork子进程的情况,而且子进程很可能会继续exec新的程序。对描述符设置 FD_CLOEXEC ,就意味着,我们fork子进程时,父子进程中相同的文件描述符指向系统文件表的同一项,但是,我们如果调用exec执行另一个程序,此时会用全新的程序替换子进程的正文。为了较少不必要的麻烦,我们以后要对打开的描述符设置 FD_CLOEXEC ,除非遇到特殊情况。

set_nonblocking 用于将描述符设置为非阻塞模式,如果我们要使用poll、epoll等api的话。

既然标准库已经封装好了 FileDesc ,我想直接使用的,然而 FileDesc 在标准库之外是不可见的。如果使用 File 的话, set_cloexecset_nonblocking 还是要我们再写一次,但是 File 并不是“我自己”的类型,我没法直接给 File 附加方法,为此还需要一个额外的Tarit或者用一个“我自己”的类型,去包裹它。挺绕的。那既然这样,我们还是自己来吧。不过我们已经有了参考,可以将标准库里的 FileDecs 直接复制出来,然后去掉与Linux无关的代码,当然你也可以自由发挥一下。

要注意的是,这段代码中还调用了一个函数 cvt ,我们把相关代码也复制过来:

use std::io::{self, ErrorKind};

#[doc(hidden)]
pub trait IsMinusOne {
    fn is_minus_one(&self) -> bool;
}

macro_rules! impl_is_minus_one {
    ($($t:ident)*) => ($(impl IsMinusOne for $t {
        fn is_minus_one(&self) -> bool {
            *self == -1
        }
    })*)
}

impl_is_minus_one! { i8 i16 i32 i64 isize }

pub fn cvt<T: IsMinusOne>(t: T) -> io::Result<T> {
    if t.is_minus_one() {
        Err(io::Error::last_os_error())
    } else {
        Ok(t)
    }
}

pub fn cvt_r<T, F>(mut f: F) -> io::Result<T>
    where T: IsMinusOne,
          F: FnMut() -> T
{
    loop {
        match cvt(f()) {
            Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
            other => return other,
        }
    }
}

还记得上一节我们使用过的 last_os_error() 方法么,这段代码通过宏 impl_is_minus_onei32 等常见类型实现了 IsMinusOne 这个 Tarit ,然后我们就可以使用 cvt 函数更便捷得调用 last_os_error() 取得错误。 我将这段代码放到 util.rs 文件中,并在 main.rs 文件上方加入 pub mod util;

然后再来看 FileDesc 最终的实现:

use std::mem;
use std::io;
use std::cmp;
use std::os::unix::io::FromRawFd;

use libc as c;

use crate::util::cvt;

#[derive(Debug)]
pub struct FileDesc(c::c_int);

pub fn max_len() -> usize {
    <c::ssize_t>::max_value() as usize
}

impl FileDesc {
    pub fn raw(&self) -> c::c_int {
        self.0
    }

    pub fn into_raw(self) -> c::c_int {
        let fd = self.0;
        mem::forget(self);
        fd
    }

    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
        let ret = cvt(unsafe {
            c::read(
                self.0,
                buf.as_mut_ptr() as *mut c::c_void,
                cmp::min(buf.len(), max_len())
            )
        })?;

        Ok(ret as usize)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
        let ret = cvt(unsafe {
            c::write(
                self.0,
                buf.as_ptr() as *const c::c_void,
                cmp::min(buf.len(), max_len())
            )
        })?;

        Ok(ret as usize)
    }

    pub fn get_cloexec(&self) -> io::Result<bool> {
        unsafe {
            Ok((cvt(libc::fcntl(self.0, c::F_GETFD))? & libc::FD_CLOEXEC) != 0)
        }
    }

    pub fn set_cloexec(&self) -> io::Result<()> {
        unsafe {
            cvt(c::ioctl(self.0, c::FIOCLEX))?;
            Ok(())
        }
    }

    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
        unsafe {
            let v = nonblocking as c::c_int;
            cvt(c::ioctl(self.0, c::FIONBIO, &v))?;
            Ok(())
        }
    }

    pub fn duplicate(&self) -> io::Result<FileDesc> {
        cvt(unsafe { c::fcntl(self.0, c::F_DUPFD_CLOEXEC, 0) }).and_then(|fd| {
            let fd = FileDesc(fd);
            Ok(fd)
        })
    }
}

impl FromRawFd for FileDesc {
    unsafe fn from_raw_fd(fd: c::c_int) -> FileDesc {
        FileDesc(fd)
    }
}

impl Drop for FileDesc {
    fn drop(&mut self) {
        let _ = unsafe { c::close(self.0) };
    }
}

我已经将与 Linux 不相关的代码删除掉了。之所以原有 duplicate 那么冗长,是因为旧的Linux内核不支持 F_DUPFD_CLOEXEC 这个设置。 fcntl 这个函数,用来设置控制文件描述符的选项,我们稍后还会遇到用来设置和获取套接字的 getsockoptsetsockopt 。还有 read_atwrite_at 等实现比较复杂的函数,我们用不到,也将他们删除。还有 impl<'a> Read for &'a FileDesc ,因为内部使了一个Unstable的API,我也将其去掉了。

我自由发挥了一下,把:

pub struct FileDesc {
    fd: c_int,
}

替换成了:

pub struct FileDesc(c::c_int);

它们是等效的。不知你注意到没有,我把 pub fn new(...) 函数给去掉了,因为这个函数是 unsafe 的----如果我们今后将这些代码作为库让别人使用的话,他可能传入了一个不存在的描述符,并由此可能引起程序崩溃----但他们并不一定知道。我们可以通过在这个函数前面加 unsafe 来告诉使用者这个函数是 unsafe 的: pub unsafe fn new(...) 。不过,Rust的开发者们已经考虑到了这一点,我们用约定俗成的 from_raw_fd 来代替 pub unsafe fn new(...) ,于是才有了下面这一段:

impl FromRawFd for FileDesc {
    unsafe fn from_raw_fd(fd: c::c_int) -> FileDesc {
        FileDesc(fd)
    }
}

最后,还利用Rust的 drop 实现了 close 函数,也就意味着,描述符离开作用域后,会自动 close ,就不再需要我们手动 close 了。与之先关的是 into_raw 方法,意思是把 FileDesc 转换为“未加工的”或者说是“裸的”描述符,也就是C的描述符。这个方法里面调用了 forget ,之后变量离开作用域后,就不会调用 drop 了。当你使用这个方法拿到描述符,使用完请不要忘记手动 close 或者再次 from_raw_fd

pub fn into_raw(self) -> c::c_int {
        let fd = self.0;
        mem::forget(self);
        fd
}

我将这段代码放到了一个新的文件 fd.rs 中,并在 main.rs 文件上方加入 pub mod fd;

接着,我们还需一个 Socket 类型,将 socketbindconnect 等函数附加上去。这一步应该简单多了。同时你也会发现,我们已经把 unsafe 的代码,封装成了 safe 的代码。

use std::io;
use std::mem;
use std::os::unix::io::{RawFd, AsRawFd, FromRawFd};

use libc as c;

use crate::fd::FileDesc;
use crate::util::cvt;

pub struct Socket(FileDesc);

impl Socket {
    pub fn new(family: c::c_int, ty: c::c_int, protocol: c::c_int) -> io::Result<Socket> {
        unsafe {
            cvt(c::socket(family, ty | c::SOCK_CLOEXEC, protocol))
                .map(|fd| Socket(FileDesc::from_raw_fd(fd)))
        }
    }

    pub fn bind(&self, storage: *const c::sockaddr, len: c::socklen_t) -> io::Result<()> {
        self.setsockopt(c::SOL_SOCKET, c::SO_REUSEADDR, 1)?;

        cvt(unsafe { c::bind(self.0.raw(), storage, len) })?;

        Ok(())
    }

    pub fn listen(&self, backlog: c::c_int) -> io::Result<()> {
        cvt(unsafe { c::listen(self.0.raw(), backlog) })?;
        Ok(())
    }

    pub fn accept(&self, storage: *mut c::sockaddr, len: *mut c::socklen_t) -> io::Result<Socket> {
        let fd = cvt(unsafe { c::accept4(self.0.raw(), storage, len, c::SOCK_CLOEXEC) })?;
        Ok(Socket(unsafe { FileDesc::from_raw_fd(fd) }))
    }

    pub fn connect(&self, storage: *const c::sockaddr, len: c::socklen_t) -> io::Result<()> {
        cvt(unsafe { c::connect(self.0.raw(), storage, len) })?;
        Ok(())
    }

    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
        self.0.read(buf)
    }

    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
        self.0.write(buf)
    }

    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
        self.0.set_nonblocking(nonblocking)
    }

    pub fn get_cloexec(&self) -> io::Result<bool> {
        self.0.get_cloexec()
    }

    pub fn set_cloexec(&self) -> io::Result<()> {
        self.0.set_cloexec()
    }

    pub fn setsockopt<T>(&self, opt: libc::c_int, val: libc::c_int, payload: T) -> io::Result<()> {
        unsafe {
            let payload = &payload as *const T as *const libc::c_void;

            cvt(libc::setsockopt(
                self.0.raw(),
                opt,
                val,
                payload,
                mem::size_of::<T>() as libc::socklen_t
            ))?;

            Ok(())
        }
    }

    pub fn getsockopt<T: Copy>(&self, opt: libc::c_int, val: libc::c_int) -> io::Result<T> {
        unsafe {
            let mut slot: T = mem::zeroed();
            let mut len = mem::size_of::<T>() as libc::socklen_t;

            cvt(libc::getsockopt(
                self.0.raw(),
                opt,
                val,
                &mut slot as *mut T as *mut libc::c_void,
                &mut len
            ))?;

            assert_eq!(len as usize, mem::size_of::<T>());
            Ok(slot)
        }
    }
}

impl FromRawFd for Socket {
    unsafe fn from_raw_fd(fd: RawFd) -> Socket {
        Socket(FileDesc::from_raw_fd(fd))
    }
}

impl AsRawFd for Socket {
    fn as_raw_fd(&self) -> RawFd {
        self.0.raw()
    }
}

我已经将上一节中我们使用到的socket相关的主要的5个函数,外加 readwrite ,等几个描述符设置的函数,“依附”到了 Socket 上。保存在 socket.rs 文件里。

要说明的是,我在 newaccept 方法中,通过 flags 直接为新创建的描述符设置了 SOCK_CLOEXEC 选项,如果不想一步设置的话,就需要创建出描述符后,再调用 set_cloexec 方法。 bind 中,在调用 c::bind 之前,我给套接字设置了个选项 SO_REUSEADDR ,意为允许重用本地地址,这里不展开讲,如果你细心的话就会发现,上一节的例子,如果没有正常关闭socket的话,就可能会出现 error:98,Address already in use ,等一会儿才会好。 accept4 不是个标准的方法,只有Linux才支持,我们暂时不考虑兼容性。 setsockoptgetsockopt 方法中涉及到了类型转换,结合前面的例子,这里应该难不倒你了。除了 from_raw_fd ,我还又给 Socket 实现了又一个约定俗成的方法 as_raw_fd

我已经将远吗放到了 这里 ,你可以去查看。你还可以尝试将上一节的例子,修改成我们今天封装的 Socket 。这一节到这里就结束了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

社交电商

社交电商

[美] Stephan Spencer(斯蒂芬.斯宾塞)、[美] Jimmy Harding(吉米.哈丁)、[美] Jennifer Sheahan(詹尼弗.希汉) / 谭磊 / 电子工业出版社 / 2015-3 / 69.00元

你想要在互联网上赚钱吗?想要做好电子商务吗?那么你一定不能忽视社交媒体的力量。不管你想要营销的是实物商品、电子类产品还是本地的服务,这本书会教你怎么做。 《社交电商》全面介绍形形色色的社交媒体以及如何利用这些社交媒体来为你的企业做好服务。如果你经营得不好,在社交媒体上散发出的只是噪声而不是真正的信息。 而如果做得好,社交媒体会成为你最有效的营销工具,帮助你赢得老客户的拥戴,获得新的客户。 ......一起来看看 《社交电商》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具