内容简介:Java中线程模型大致可以分为:单线程模型中,server端使用一个线程来处理所有的请求,所有的请求必须串行化处理,效率低下。 多线程模型中,server端会为每个请求分配一个线程去处理请求,相对单线程模型而言多线程模型效率更高,但是多线程模型的缺点也很明显:server端为每个请求都开辟一个线程来处理请求,如果请求数量很大,则会造成大量线程被创建,造成内存溢出。Java中线程是比较昂贵的对象。线程的数量不应该无限量的增大,当线程数超过一定数目后,增加线程不仅不能提高效率,反而会降低效率。 借助"对象复用"
Java中线程模型大致可以分为:
- 单线程模型
- 多线程模型
- 线程池模型(executor)
- Reactor线程模型
单线程模型中,server端使用一个线程来处理所有的请求,所有的请求必须串行化处理,效率低下。 多线程模型中,server端会为每个请求分配一个线程去处理请求,相对单线程模型而言多线程模型效率更高,但是多线程模型的缺点也很明显:server端为每个请求都开辟一个线程来处理请求,如果请求数量很大,则会造成大量线程被创建,造成内存溢出。Java中线程是比较昂贵的对象。线程的数量不应该无限量的增大,当线程数超过一定数目后,增加线程不仅不能提高效率,反而会降低效率。 借助"对象复用"的思想,线程池应运而生,线程池中一般有一定数目的线程,当请求数目超过线程数之后需要排队等待。这样就避免了线程会不停地增长。这里不打算对 Java 的线程池做过多介绍,有兴趣的可以去看我之前的文章:java线程池。
Reactor是一种处理模式。Reactor模式是处理并发I/O比较常见的一种模式,用于同步I/O,中心思想是将所有要处理的IO事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。
Reactor也是一种实现机制。Reactor利用事件驱动机制实现,和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。用“好莱坞原则”来形容Reactor再合适不过了:不要打电话给我们,我们会打电话通知你。
为什么需要Reactor模型
Reactor模型实质上是对I/O多路复用的一层包装,理论上来说I/O多路复用的效率已经够高了,为什么还需要Reactor模型呢?答案是I/O多路复用虽然性能已经够高了,但是编码复杂,在工程效率上还是太低。因此出现了Reactor模型。
一个个网络请求可能涉及到多个I/O请求,相比传统的单线程完整处理请求生命期的方法,I/O复用在人的大脑思维中并不自然,因为,程序员编程中,处理请求A的时候,假定A请求必须经过多个I/O操作A1-An(两次IO间可能间隔很长时间),每经过一次I/O操作,再调用I/O复用时,I/O复用的调用返回里,非常可能不再有A,而是返回了请求B。即请求A会经常被请求B打断,处理请求B时,又被C打断。这种思维下,编程容易出错。
Reactor模型
一般来说处理一个网络请求需要经过以下五个步骤:
- 读取请求数据(read request)
- 解码数据(decode request)
- 计算,生成响应(compute)
- 编码响应(encode response)
- 发送响应数据(send response) 如下图所示:
不难看出,上面的模型中读取请求数据和发送响应和业务处理逻辑耦合在一起,都是用一个handler线程处理,当发生读写事件时线程将被阻塞,无法处理其他的事情。既然不能耦合在一起,那自然解决方案是将读写操作和其他三个步骤进行分离:有专门的线程或者线程池负责连接请求,然后使用多路复用器来监测Socket上的读写事件,其他的处理委派给业务线程池进行处理。读写操作不阻塞主线程。
Reactor模型有三种线程模型:
- 单线程模型
- 多线程模型(单Reactor)
- 多线程模型(多Reactor)
单线程Reactor模型
单线程模型中Reactor既负责Accept新的连接请求,又负责分派请求到具体的handler中进行处理,一般不使用这种模型,因为单线程效率比较低下。
下面是基于Java NIO单线程Reactor模型的实现:
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { // Reactor设置 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 监听Socket连接事件 sk.attach(new Acceptor()); } public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); } catch (IOException ex) { /* ... */ } } } } final class Handler implements Runnable { static final int READING = 0, SENDING = 1; final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(1024); ByteBuffer output = ByteBuffer.allocate(1024); int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ } public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } } 复制代码
多线程模型(单Reactor)
该模型在事件处理器(Handler)链部分采用了多线程(线程池),也是后端程序常用的模型。模型图如下:
其实现大致代码如下: ``` class Handler implements Runnable { // 使用线程池 static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3; // ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } } ```
多线程模型(多Reactor)
比起多线程单Rector模型,它是将Reactor分成两部分,mainReactor负责监听并Accept新连接,然后将建立的socket通过多路复用器(Acceptor)分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据;业务处理功能,其交给worker线程池完成。通常,subReactor个数上可与CPU个数等同。其模型图如下:
Netty线程模型
Netty的线程模型类似于Reactor模型。Netty中ServerBootstrap用于创建服务端,下图是它的结构:
ServerBootstrap继承自AbstractBootstrap,AbstractBootstrap中的group属性就是Netty中的Acceptor,用于接受请求。而ServerBootstrap中的childGroup对应于Reactor模型中的worker线程池。 请求过来后Netty从group线程池中选出一个线程来建立连接,连接建立后委派给childGroup中的worker线程处理。 服务端线程模型工作原理如下图:
下面是一个完整的Netty服务端的例子:
public class TimeServer { public void bind(int port) { // Netty的多Reactor线程模型,bossGroup是Acceptor线程池,用于接受连接。workGroup是Worker线程池,处理业务。 // bossGroup是Acceptor线程池 EventLoopGroup bossGroup = new NioEventLoopGroup(); // workGroup是Worker线程池 EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } } } public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // msg转Buf ByteBuf buf = (ByteBuf) msg; // 创建缓冲中字节数的字节数组 byte[] req = new byte[buf.readableBytes()]; // 写入数组 buf.readBytes(req); String body = new String(req, "UTF-8"); String currenTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date( System.currentTimeMillis()).toString() : "BAD ORDER"; // 将要返回的信息写入Buffer ByteBuf resp = Unpooled.copiedBuffer(currenTime.getBytes()); // buffer写入通道 ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // write读入缓冲数组后通过invoke flush写入通道 ctx.flush(); } } 复制代码
参考资料
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python算法教程
[挪威] Magnus Lie Hetland 赫特兰 / 凌杰、陆禹淳、顾俊 / 人民邮电出版社 / 2016-1-1 / 69.00元
本书用Python语言来讲解算法的分析和设计。本书主要关注经典的算法,但同时会为读者理解基本算法问题和解决问题打下很好的基础。全书共11章。分别介绍了树、图、计数问题、归纳递归、遍历、分解合并、贪心算法、复杂依赖、Dijkstra算法、匹配切割问题以及困难问题及其稀释等内容。本书在每一章结束的时候均有练习题和参考资料,这为读者的自我检查以及进一步学习提供了较多的便利。在全书的最后,给出了练习题的提......一起来看看 《Python算法教程》 这本书的介绍吧!