内容简介:mio是rust实现的一个轻量级的I/O库。其实现基本上就是对不同操作系统底层相关API的封装,抽象出统一的接口供上层使用。Linux下为epoll,Windows下为IOCP,OS X下为kqueue。其使用方法与Linux中epoll差不多,mio底层封装了epoll,使用步骤思路:mio提供可跨平台的sytem selector访问,不同平台如下表,都可调用相同的API。不同平台使用的API开销不尽相同。由于mio是基于readiness(就绪状态)的API,与Linux epoll相似,可以看到很多
mio是rust实现的一个轻量级的I/O库。其实现基本上就是对不同操作系统底层相关API的封装,抽象出统一的接口供上层使用。Linux下为epoll,Windows下为IOCP,OS X下为kqueue。
一、关于mio
1、重要特性
- 非阻塞TCP,UDP
- I/O事件通知epoll,kqeue,IOCP实现
- 运行时零分配
- 平台可扩展
2、基础用法
其使用方法与 Linux 中epoll差不多,mio底层封装了epoll,使用步骤思路:
- 创建Poll
- 注册事件
- 事件循环等待与处理事件
mio提供可跨平台的sytem selector访问,不同平台如下表,都可调用相同的API。不同平台使用的API开销不尽相同。由于mio是基于readiness(就绪状态)的API,与Linux epoll相似,可以看到很多API在Linux上都可以一对一映射。相比之下,Windows IOCP是基于完成(completion-based)而非基于就绪的API,所以两者间会有较多桥接。 同时mio提供自身版本的TcpListener、TcpStream、UdpSocket,这些API封装了底层平台相关API,并设为非阻塞且实现Evented trait。
OS | Selector |
---|---|
Linux | epoll |
OS X, iOS | kqueue |
Windows | IOCP |
FreeBSD | kqueue |
Android | epoll |
mio实现的是一个单线程事件循环,并没有实现线程池及多线程事件循环,如果需要线程池及多线程事件循环等需要自己实现。
二、源码分析
先给出mio的源码目录结构,只列出了关键的部分,如下所示:
mio代码目录结构 mio |---->test |---->src |-------->deprecated //事件循环代码 |-------------->event_loop.rs //EventLoop的实现,内部封装了Poll 【1】 |-------------->handler.rs //供上层实现的接口 |-------->net |------------>mod.rs |------------>tcp.rs |------------>udp.rs |-------->sys //不同系统下的实现 |------------>mod.rs |------------>fuchsia |------------>unix //Linux下封装的epoll |------------------>mod.rs |------------------>epoll.rs 【3】 |------------------>awakener.rs |------------>windows //windows下封装的iocp |-------->lib.rs |-------->poll.rs //定义Poll 【2】 |-------->channel.rs 【4】 |-------->event_imp.rs |-------->timer.rs 【5】 |-------->...... 复制代码
对涉及不同操作系统的部分代码,以Linux操作系统为例。在Linux操作系统中,mio封装了epoll。后面会给出相应的代码。
【1】Eventloop代码分析
结合前面的代码示例给出相应的关键代码如下: EventLoop
事件循环定义,可以看到里面封装了 Poll
,以Linux系统举例, Poll
又封装了 epoll
。在使用 Poll
或Linux中 epoll
时,最重要的代码是 epoll_wait()
等待事件 Event
并针对每个 Event
进行不同的处理。这里 EventLoop
将 epoll_create()
、 epoll_wait()
、 epoll_ctl()
进行进一步的封装,将对 Event
的处理抽象成 Handler
,供上层实现具体的逻辑处理。
// Single threaded IO event loop. //这里是单线程事件循环,更多的时候我们需要加线程池,以此为基础,再进行一次封装,供上层使用 pub struct EventLoop<H: Handler> { run: bool, poll: Poll, events: Events, //对应epoll中的epoll_event timer: Timer<H::Timeout>, notify_tx: channel::SyncSender<H::Message>, notify_rx: channel::Receiver<H::Message>, config: Config, } 复制代码
抽象出接口供上层应用实现不同事件的逻辑处理。这里有点类似于回调函数,上层用户需要在此实现业务逻辑代码,实际运行时需要将函数指针传递给底层事件循环,底层事件循环运行时会调用用户传递过来的函数。在Rust中,可能描述的不是很精准,不过可以这样理解。
pub trait Handler: Sized { type Timeout; type Message; /// Invoked when the socket represented by `token` is ready to be operated /// on. `events` indicates the specific operations that are /// ready to be performed. /// This function will only be invoked a single time per socket per event /// loop tick. fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) { } //【1】 /// Invoked when a message has been received via the event loop's channel. fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) { } //【2】 /// Invoked when a timeout has completed. fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) { } //【3】 /// Invoked when `EventLoop` has been interrupted by a signal interrupt. fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) { } //【4】 /// Invoked at the end of an event loop tick. fn tick(&mut self, event_loop: &mut EventLoop<Self>) { } //【5】 } 复制代码
这里把 Poll
进行了封装,主要实现了 Eventloop::new()
----> Poll::new()
----> epoll_create()
, Eventloop::run()
---> Selecter::select()
----> epoll_wait()
,还有 register()
、 reregister()
、 deregister()
等等......
impl<H: Handler> EventLoop<H> { /// Constructs a new `EventLoop` using the default configuration values. /// The `EventLoop` will not be running. pub fn new() -> io::Result<EventLoop<H>> { EventLoop::configured(Config::default()) } fn configured(config: Config) -> io::Result<EventLoop<H>> { // Create the IO poller let poll = Poll::new()?; //Linux内部调用epoll_create() let timer = timer::Builder::default() .tick_duration(config.timer_tick) .num_slots(config.timer_wheel_size) .capacity(config.timer_capacity) .build(); // Create cross thread notification queue let (tx, rx) = channel::sync_channel(config.notify_capacity); //这里创建的是同步管道,可配置同步管道内部的buffer queue bound size. // Register the notification wakeup FD with the IO poller poll.register(℞, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?; //NOTIFY和TIMER由mio实现 poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?; Ok(EventLoop { run: true, poll: poll, timer: timer, notify_tx: tx, notify_rx: rx, config: config, events: Events::with_capacity(1024), }) } /// Keep spinning the event loop indefinitely, and notify the handler whenever /// any of the registered handles are ready. pub fn run(&mut self, handler: &mut H) -> io::Result<()> { self.run = true; while self.run { // Execute ticks as long as the event loop is running self.run_once(handler, None)?; //Linux下调用epoll_wait() } Ok(()) } pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> { trace!("event loop tick"); // Check the registered IO handles for any new events. Each poll // is for one second, so a shutdown request can last as long as // one second before it takes effect. let events = match self.io_poll(timeout) { Ok(e) => e, Err(err) => { if err.kind() == io::ErrorKind::Interrupted { handler.interrupted(self); //调用Handler::interrupted() 【4】 0 } else { return Err(err); } } }; self.io_process(handler, events); //处理就绪的事件,handler为如何处理各种事件的实例 handler.tick(self); //一轮事件处理后,最后调用Handler::tick() 调用【5】 Ok(()) } #[inline] fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> { self.poll.poll(&mut self.events, timeout) } // Process IO events that have been previously polled fn io_process(&mut self, handler: &mut H, cnt: usize) { let mut i = 0; trace!("io_process(..); cnt={}; len={}", cnt, self.events.len()); // Iterate over the notifications. Each event provides the token // it was registered with (which usually represents, at least, the // handle that the event is about) as well as information about // what kind of event occurred (readable, writable, signal, etc.) while i < cnt { //遍历所有就绪的事件,进行处理 let evt = self.events.get(i).unwrap(); trace!("event={:?}; idx={:?}", evt, i); // mio在epoll之上,增加了NOTIFY和TIMER match evt.token() { NOTIFY => self.notify(handler), //channel处理 ,这个epoll中是没有的,mio实现 TIMER => self.timer_process(handler), //Timer处理, 这个epoll中也是没有的,mio实现 _ => self.io_event(handler, evt) //IO事件的处理, 这个epoll有 } i += 1; } } fn io_event(&mut self, handler: &mut H, evt: Event) { handler.ready(self, evt.token(), evt.readiness()); //调用Handler::ready() 【1】 } fn notify(&mut self, handler: &mut H) { for _ in 0..self.config.messages_per_tick { match self.notify_rx.try_recv() { //从channel中接收数据,内部实现是std::sync::mpsc::sync_channel() Ok(msg) => handler.notify(self, msg), //调用Handler::notify() 【2】 _ => break, } } // Re-register let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot()); //PollOpt::oneshot(),必须重新reregister. } fn timer_process(&mut self, handler: &mut H) { while let Some(t) = self.timer.poll() { handler.timeout(self, t); //调用Handler::timeout() 【3】 } } /// Registers an IO handle with the event loop. pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented { self.poll.register(io, token, interest, opt) } /// Re-Registers an IO handle with the event loop. pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> where E: Evented { self.poll.reregister(io, token, interest, opt) } /// Deregisters an IO handle with the event loop. pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented { self.poll.deregister(io) } /// Returns a sender that allows sending messages to the event loop in a /// thread-safe way, waking up the event loop if needed. pub fn channel(&self) -> Sender<H::Message> { Sender::new(self.notify_tx.clone()) } /// Schedules a timeout after the requested time interval. When the /// duration has been reached, pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> { self.timer.set_timeout(delay, token) } /// If the supplied timeout has not been triggered, cancel it such that it /// will not be triggered in the future. pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool { self.timer.cancel_timeout(&timeout).is_some() } /// Tells the event loop to exit after it is done handling all events in the current iteration. pub fn shutdown(&mut self) { self.run = false; } /// Indicates whether the event loop is currently running. If it's not it has either /// stopped or is scheduled to stop on the next tick. pub fn is_running(&self) -> bool { self.run } } 复制代码
【2】Poll代码分析
Poll
屏蔽了不同系统的实现,给出了统一的抽象。 Poll
的实现代码这里只能列出较为重要的部分代码,有一部分代码省略掉了,详细代码可查看 mio/src/poll.rs :
pub struct Poll { // Platform specific IO selector selector: sys::Selector, // Custom readiness queue // The second readiness queue is implemented in user space by `ReadinessQueue`. It provides a way to implement purely user space `Evented` types. readiness_queue: ReadinessQueue, //区别于系统就绪队列(sys::Selector),这是上层自己实现的就绪队列 // Use an atomic to first check if a full lock will be required. This is a // fast-path check for single threaded cases avoiding the extra syscall lock_state: AtomicUsize, // Sequences concurrent calls to `Poll::poll` lock: Mutex<()>, // Wakeup the next waiter condvar: Condvar, } impl Poll { /// Return a new `Poll` handle. pub fn new() -> io::Result<Poll> { is_send::<Poll>(); is_sync::<Poll>(); let poll = Poll { selector: sys::Selector::new()?, readiness_queue: ReadinessQueue::new()?, lock_state: AtomicUsize::new(0), lock: Mutex::new(()), condvar: Condvar::new(), }; // Register the notification wakeup FD with the IO poller poll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge())?; Ok(poll) } /// Wait for readiness events /// /// Blocks the current thread and waits for readiness events for any of the /// `Evented` handles that have been registered with this `Poll` instance. /// The function will block until either at least one readiness event has /// been received or `timeout` has elapsed. A `timeout` of `None` means that /// `poll` will block until a readiness event has been received. pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> { self.poll1(events, timeout, false) //Poll::poll()非常最重要的一个方法, poll()-->poll1()-->poll2() } fn poll1(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> { let zero = Some(Duration::from_millis(0)); let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst); if 0 != curr { ... } //{ ... }代表中间有很多代码被省略掉了. let ret = self.poll2(events, timeout, interruptible); // Release the lock if 1 != self.lock_state.fetch_and(!1, Release) { ... } //{ ... }代表中间有很多代码被省略掉了. ret } #[inline] fn poll2(&self, events: &mut Events, mut timeout: Option<Duration>, interruptible: bool) -> io::Result<usize> { // Compute the timeout value passed to the system selector. If the // readiness queue has pending nodes, we still want to poll the system // selector for new events, but we don't want to block the thread to // wait for new events. if timeout == Some(Duration::from_millis(0)) { // If blocking is not requested, then there is no need to prepare // the queue for sleep // // The sleep_marker should be removed by readiness_queue.poll(). } else if self.readiness_queue.prepare_for_sleep() { // The readiness queue is empty. The call to `prepare_for_sleep` // inserts `sleep_marker` into the queue. This signals to any // threads setting readiness that the `Poll::poll` is going to // sleep, so the awakener should be used. } else { // The readiness queue is not empty, so do not block the thread. timeout = Some(Duration::from_millis(0)); } //poll系统就绪队列 loop { let now = Instant::now(); // First get selector events let res = self.selector.select(&mut events.inner, AWAKEN, timeout); //Linux下调用epoll_wait(),就绪事件放入events中 match res { Ok(true) => { // Some awakeners require reading from a FD. self.readiness_queue.inner.awakener.cleanup(); break; } Ok(false) => break, Err(ref e) if e.kind() == io::ErrorKind::Interrupted && !interruptible => { // Interrupted by a signal; update timeout if necessary and retry if let Some(to) = timeout { let elapsed = now.elapsed(); if elapsed >= to { break; } else { timeout = Some(to - elapsed); } } } Err(e) => return Err(e), } } // Poll custom event queue self.readiness_queue.poll(&mut events.inner); //Poll用户就绪队列 // Return number of polled events Ok(events.inner.len()) } /// Register an `Evented` handle with the `Poll` instance. pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> where E: Evented { validate_args(token)?; // Register interests for this socket handle.register(self, token, interest, opts)?; Ok(()) } /// Re-register an `Evented` handle with the `Poll` instance. pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> where E: Evented { validate_args(token)?; // Register interests for this socket handle.reregister(self, token, interest, opts)?; Ok(()) } /// Deregister an `Evented` handle with the `Poll` instance. pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()> where E: Evented { // Deregister interests for this socket handle.deregister(self)?; Ok(()) } } 复制代码
【3】Selector代码分析
下面这段代码出自 mio/src/sys/unix/epoll.rs 是对底层Linux系统epoll的封装抽象,可以看到 Selector::new()
内部实际上调用了 epoll_create()
, Selector::select()
内部实际上调用了 epoll_wait()
, register()
、 reregister()
、 deregister()
实内部实际上调用了 epoll_ctl()
。如果你非常熟悉 epoll
,就会感觉下面的代码很熟悉,详细代码如下:
pub struct Selector { id: usize, epfd: RawFd, } impl Selector { pub fn new() -> io::Result<Selector> { let epfd = unsafe { // Emulate `epoll_create` by using `epoll_create1` if it's available // and otherwise falling back to `epoll_create` followed by a call to // set the CLOEXEC flag. dlsym!(fn epoll_create1(c_int) -> c_int); match epoll_create1.get() { Some(epoll_create1_fn) => { cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))? } None => { let fd = cvt(libc::epoll_create(1024))?; drop(set_cloexec(fd)); fd } } }; // offset by 1 to avoid choosing 0 as the id of a selector let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1; Ok(Selector { id: id, epfd: epfd, }) } pub fn id(&self) -> usize { self.id } /// Wait for events from the OS pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> { let timeout_ms = timeout .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32) .unwrap_or(-1); // Wait for epoll events for at most timeout_ms milliseconds evts.clear(); unsafe { let cnt = cvt(libc::epoll_wait(self.epfd, evts.events.as_mut_ptr(), evts.events.capacity() as i32, timeout_ms))?; let cnt = cnt as usize; evts.events.set_len(cnt); for i in 0..cnt { if evts.events[i].u64 as usize == awakener.into() { evts.events.remove(i); return Ok(true); } } } Ok(false) } /// Register event interests for the given IO handle with the OS pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> { let mut info = libc::epoll_event { events: ioevent_to_epoll(interests, opts), u64: usize::from(token) as u64 }; unsafe { cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?; Ok(()) } } /// Register event interests for the given IO handle with the OS pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> { let mut info = libc::epoll_event { events: ioevent_to_epoll(interests, opts), u64: usize::from(token) as u64 }; unsafe { cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?; Ok(()) } } /// Deregister event interests for the given IO handle with the OS pub fn deregister(&self, fd: RawFd) -> io::Result<()> { // The &info argument should be ignored by the system, // but linux < 2.6.9 required it to be not null. // For compatibility, we provide a dummy EpollEvent. let mut info = libc::epoll_event { events: 0, u64: 0, }; unsafe { cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?; Ok(()) } } } 复制代码
【4】Notify channel代码分析
这个涉及的代码比较多,比较杂,也较为难以理解。
// `ReadinessQueue` is backed by a MPSC queue that supports reuse of linked // list nodes. This significantly reduces the number of required allocations. // Each `Registration` / `SetReadiness` pair allocates a single readiness node // that is used for the lifetime of the registration. // // The readiness node also includes a single atomic variable, `state` that // tracks most of the state associated with the registration. This includes the // current readiness, interest, poll options, and internal state. When the node // state is mutated, it is queued in the MPSC channel. A call to // `ReadinessQueue::poll` will dequeue and process nodes. The node state can // still be mutated while it is queued in the channel for processing. // Intermediate state values do not matter as long as the final state is // included in the call to `poll`. This is the eventually consistent nature of // the readiness queue. // // The readiness node is ref counted using the `ref_count` field. On creation, // the ref_count is initialized to 3: one `Registration` handle, one // `SetReadiness` handle, and one for the readiness queue. Since the readiness queue // doesn't *always* hold a handle to the node, we don't use the Arc type for // managing ref counts (this is to avoid constantly incrementing and // decrementing the ref count when pushing & popping from the queue). When the // `Registration` handle is dropped, the `dropped` flag is set on the node, then // the node is pushed into the registration queue. When Poll::poll pops the // node, it sees the drop flag is set, and decrements it's ref count. // // The MPSC queue is a modified version of the intrusive MPSC node based queue // described by 1024cores [1]. #[derive(Clone)] struct ReadinessQueue { inner: Arc<ReadinessQueueInner>, } struct ReadinessQueueInner { // Used to wake up `Poll` when readiness is set in another thread. awakener: sys::Awakener, // Head of the MPSC queue used to signal readiness to `Poll::poll`. head_readiness: AtomicPtr<ReadinessNode>, // Tail of the readiness queue. // // Only accessed by Poll::poll. Coordination will be handled by the poll fn tail_readiness: UnsafeCell<*mut ReadinessNode>, // Fake readiness node used to punctuate the end of the readiness queue. // Before attempting to read from the queue, this node is inserted in order // to partition the queue between nodes that are "owned" by the dequeue end // and nodes that will be pushed on by producers. end_marker: Box<ReadinessNode>, // Similar to `end_marker`, but this node signals to producers that `Poll` // has gone to sleep and must be woken up. sleep_marker: Box<ReadinessNode>, // Similar to `end_marker`, but the node signals that the queue is closed. // This happens when `ReadyQueue` is dropped and signals to producers that // the nodes should no longer be pushed into the queue. closed_marker: Box<ReadinessNode>, } 复制代码
/// Node shared by a `Registration` / `SetReadiness` pair as well as the node /// queued into the MPSC channel. struct ReadinessNode { // Node state, see struct docs for `ReadinessState` // // This variable is the primary point of coordination between all the // various threads concurrently accessing the node. state: AtomicState, // The registration token cannot fit into the `state` variable, so it is // broken out here. In order to atomically update both the state and token // we have to jump through a few hoops. // // First, `state` includes `token_read_pos` and `token_write_pos`. These can // either be 0, 1, or 2 which represent a token slot. `token_write_pos` is // the token slot that contains the most up to date registration token. // `token_read_pos` is the token slot that `poll` is currently reading from. // // When a call to `update` includes a different token than the one currently // associated with the registration (token_write_pos), first an unused token // slot is found. The unused slot is the one not represented by // `token_read_pos` OR `token_write_pos`. The new token is written to this // slot, then `state` is updated with the new `token_write_pos` value. This // requires that there is only a *single* concurrent call to `update`. // // When `poll` reads a node state, it checks that `token_read_pos` matches // `token_write_pos`. If they do not match, then it atomically updates // `state` such that `token_read_pos` is set to `token_write_pos`. It will // then read the token at the newly updated `token_read_pos`. token_0: UnsafeCell<Token>, token_1: UnsafeCell<Token>, token_2: UnsafeCell<Token>, // Used when the node is queued in the readiness linked list. Accessing // this field requires winning the "queue" lock next_readiness: AtomicPtr<ReadinessNode>, // Ensures that there is only one concurrent call to `update`. // // Each call to `update` will attempt to swap `update_lock` from `false` to // `true`. If the CAS succeeds, the thread has obtained the update lock. If // the CAS fails, then the `update` call returns immediately and the update // is discarded. update_lock: AtomicBool, // Pointer to Arc<ReadinessQueueInner> readiness_queue: AtomicPtr<()>, // Tracks the number of `ReadyRef` pointers ref_count: AtomicUsize, } 复制代码
/// Handle to a user space `Poll` registration. /// /// `Registration` allows implementing [`Evented`] for types that cannot work /// with the [system selector]. A `Registration` is always paired with a /// `SetReadiness`, which allows updating the registration's readiness state. /// When [`set_readiness`] is called and the `Registration` is associated with a /// [`Poll`] instance, a readiness event will be created and eventually returned /// by [`poll`]. pub struct Registration { inner: RegistrationInner, } 复制代码
/// Updates the readiness state of the associated `Registration`. #[derive(Clone)] pub struct SetReadiness { inner: RegistrationInner, } 复制代码
未完,待续......
参考文档: Intrusive MPSC node-based queue
【5】Timer定时器代码分析
pub struct Timer<T> { // Size of each tick in milliseconds tick_ms: u64, // Slab of timeout entries entries: Slab<Entry<T>>, // Timeout wheel. Each tick, the timer will look at the next slot for // timeouts that match the current tick. wheel: Vec<WheelEntry>, // Tick 0's time instant start: Instant, // The current tick tick: Tick, // The next entry to possibly timeout next: Token, // Masks the target tick to get the slot mask: u64, // Set on registration with Poll inner: LazyCell<Inner>, } 复制代码
未完,待续......
三、mio用法示例
下面的2个示例都很简单,其实直接看mio的 测试代码mio/test/ 就好了,不用看下面的2个示例。
1、代码示例1
直接使用 Poll
示例如下:
#[macro_use] extern crate log; extern crate simple_logger; extern crate mio; use mio::*; use mio::tcp::{TcpListener, TcpStream}; use std::io::{Read,Write}; fn main() { simple_logger::init().unwrap(); // Setup some tokens to allow us to identify which event is for which socket. const SERVER: Token = Token(0); const CLIENT: Token = Token(1); let addr = "127.0.0.1:12345".parse().unwrap(); // Setup the server socket let server = TcpListener::bind(&addr).unwrap(); // Create a poll instance let poll = Poll::new().unwrap(); // Start listening for incoming connections poll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap(); // Setup the client socket let sock = TcpStream::connect(&addr).unwrap(); // Register the socket poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()).unwrap(); // Create storage for events let mut events = Events::with_capacity(1024); loop { poll.poll(&mut events, None).unwrap(); for event in events.iter() { match event.token() { SERVER => { // Accept and drop the socket immediately, this will close // the socket and notify the client of the EOF. let (stream,addr) = server.accept().unwrap(); info!("Listener accept {:?}",addr); }, CLIENT => { // The server just shuts down the socket, let's just exit // from our event loop. info!("client response."); return; }, _ => unreachable!(), } } } } 复制代码
通过上面的代码示例1,我们可以看到其用法与 epoll
非常相似。
2、代码示例2
上面的代码编程时较为麻烦,下面使用事件循环 EventLoop
的方式,代码能看起来更清晰一些(相对的):
#[macro_use] extern crate log; extern crate simple_logger; extern crate mio; use mio::*; use mio::timer::{Timeout}; use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder}; use std::thread; use std::time::Duration; fn main() { simple_logger::init().unwrap(); let mut event_loop=EventLoop::new().unwrap(); let channel_sender=event_loop.channel(); thread::spawn(move ||{ channel_sender.send(IoMessage::Notify); thread::sleep_ms(5*1000); channel_sender.send(IoMessage::End); }); let timeout = event_loop.timeout(Token(123), Duration::from_millis(3000)).unwrap(); let mut handler=MioHandler::new(); let _ = event_loop.run(&mut handler).unwrap(); } pub enum IoMessage{ Notify, End, } pub struct MioHandler{ } impl MioHandler{ pub fn new()->Self{ MioHandler{} } } impl Handler for MioHandler { type Timeout = Token; type Message = IoMessage; /// Invoked when the socket represented by `token` is ready to be operated on. fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) { } /// Invoked when a message has been received via the event loop's channel. fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) { match msg { IoMessage::Notify=>info!("channel notify"), IoMessage::End=>{ info!("shutdown eventloop."); event_loop.shutdown(); } } } /// Invoked when a timeout has completed. fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) { match timeout{ Token(123)=>info!("time out."), Token(_)=>{}, } } /// Invoked when `EventLoop` has been interrupted by a signal interrupt. fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) { } /// Invoked at the end of an event loop tick. fn tick(&mut self, event_loop: &mut EventLoop<Self>) { } } 复制代码
这个示例说明了超时及channel,围绕 EventLoop
编程,其实与上一个例子没有什么不同,只是 EventLoop
对 Poll
做了封装。
参考文档:
【譯】Tokio 內部機制:從頭理解 Rust 非同步 I/O 框架
My Basic Understanding of mio and Asynchronous IO
mio-github关注微信公众号,定期推文!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数据资本时代
Viktor Mayer-Schnberger / 李晓霞、周涛 / 中信出版集团股份有限公司 / 2018-11-1 / CNY 58.00
【编辑推荐】 大数据除了能对我们的生活、工作、思维产生重大变革外,还能够做什么?畅销书《大数据时代》作者舍恩伯格在新书《数据资本时代》中,展示了大数据将如何从根本上改变经济——这并不是因为数据是一种新型石油,而是因为数据是一种新型润滑脂,它将给市场带来巨大能量,给公司带来巨大压力,使金融资本的作用大大削弱。赢家是市场,而并非资本。 这本书在当下国内出版,可以说恰逢其时。时下,中国经济正......一起来看看 《数据资本时代》 这本书的介绍吧!