内容简介:在上一篇的在学习Netty之前呢我们首先得了解IO和NIOIO编程模型简单来说就是上一篇我写的服务端与客户端的连接,客户端与服务端建立连接通信后,必须等待服务端返回信息才能进行下一步的动作,这期间线程一直是等待状态。IO模型在客户端较少的情况下是没问题的,但是一旦有大量客户端与服务端进行连接,那么就会出问题。我们简单的分析一下原因。
徒手撸一个简单的RPC框架(2)——项目改造
在上一篇的 徒手撸一个简单的RPC框架 中再最后的服务器和客户端连接的时候只是简单的写了Socket连接,觉得有些不妥。正好最近学习了Netty,在平时工作中没机会运用,于是自己就给自己出需求将之前的项目改造一下。
Netty是什么?
在学习Netty之前呢我们首先得了解IO和NIO
IO模型
IO编程模型简单来说就是上一篇我写的服务端与客户端的连接,客户端与服务端建立连接通信后,必须等待服务端返回信息才能进行下一步的动作,这期间线程一直是等待状态。IO模型在客户端较少的情况下是没问题的,但是一旦有大量客户端与服务端进行连接,那么就会出问题。我们简单的分析一下原因。
while
IO模型有这么多的问题,于是在JDK1.4中提出了NIO的概念,就是为了解决以上的问题
NIO模型
我们一一对应上面的问题来看NIO用什么技术来解决的
第一个问题
第一个问题是代码的问题,我们就不讨论了
第二个问题
线程有限的问题:NIO中提出了 Selector
概念,IO中是每个连接都会由一个线程阻塞来维护,NIO中用 Selector
来管理这些连接,如果有消息的传入或传出,那么就建立相应的线程了处理。这样服务器只需要阻塞一个 Selector
线程,就可以管理多个连接了。
具体的 Selector
文章可以看我之前的 NIO中选择器Selector ,里面有介绍详细的 Selector
用法。
这里举个例子应该就明白的,好比你去钓鱼,IO就是一人一个鱼竿,等着鱼上来,中间哪也不能去,而NIO就是一个人能守着好几个鱼竿。
这就是NIO模型解决操作系统中线程有限的问题。
第三个问题
CPU在线程之间频繁切换,由于NIO中只管理了一个 Selector
线程,那么这个问题也就相应的解决了
第四个问题
NIO中提出了 Channel
和 Buffer
的概念,就好比在向往的生活第一季中摘玉米中,是用竹筐一次一次背快呢?还是接一辆车子来回运送快?当然是车子来回运送快了,而这里的 Buffer
就好比车子。具体的 Channel
和 Buffer
的解释可以看我之前的文章 Java中IO和NIO 和 JAVA中NIO再深入 。
Netty
那么为什么就和Netty扯上关系了呢?其实我觉得NIO之于Netty的关系就好比Servlet之于Tomcat的关系,Netty只是对于NIO进行了进一步的封装,让使用者更加简便的编程。
改造
这次改造分为服务端和客户端的改造
服务端
接下来我们就利用Netty将我们的服务器端与客户端连接通信部分进行改造一下,首先我们先加上对于Netty的依赖
compile 'io.netty:netty-all:4.1.6.Final'
然后编写服务端的代码,服务端的代码非常简单
ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup boos = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap .group(boos, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //获得实现类处理过后的返回值 String invokeMethodMes = CommonDeal.getInvokeMethodMes(msg); ByteBuf encoded = ctx.alloc().buffer(4 * invokeMethodMes.length()); encoded.writeBytes(invokeMethodMes.getBytes()); ctx.writeAndFlush(encoded); } }); } }).bind(20006);
这和我们平常写的Socket连接有些区别,可以看到我们建了两个 NioEventLoopGroup
一个boss一个worker,为什么会有两个呢?
从这个图里面我们可以看到,boss是专门用来对外连接的,worker则是像NIO中 Selector
用来处理各种读写的请求。
客户端
其实难点就是在客户端,因为Netty是异步事件驱动的框架,什么是异步呢?
客户端与服务端的任何I/O操作都将立即返回,等待服务端处理完成以后会调用指定的回调函数进行处理。在这个过程中客户端一直没有阻塞。所以我们在客户端与服务端请求处理时,如果获得异步处理的结果呢?Netty提供有一种获取异步回调结果的,但是那是添加监听器。而我们的RPC调用在最后返回结果的时候必须得阻塞等待结果的返回,所以我们需要自己写一个简单的获取异步回调结果的程序。想法如下。
- 想要获得服务端返回的消息时,阻塞等待。
- Netty客户端读取到客户端消息时,唤醒等待的线程
那么我们就围绕这两步来进行编码。
客户端想要获取服务端消息时如何等待呢?这里我们就可以用wait()
public Response getMessage(){ synchronized (object){ while (!success){ try { object.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } }
那么读到消息以后如何唤醒呢?
public void setMessage(Response response){ synchronized (object){ this.response = response; this.success = true; object.notify(); } }
这样就解决了我们上面提出的两个问题了。接下来编写客户端的代码
private final Map<Long,MessageFuture> futureMap = new ConcurrentHashMap<>(); private CountDownLatch countDownLatch = new CountDownLatch(1); public void connect(String requestJson,Long threadId){ Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline(). addLast(new StringDecoder()). addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Gson gson = new Gson(); Response response = gson.fromJson(msg, Response.class); MessageFuture messageFuture = futureMap.get(threadId); messageFuture.setMessage(response); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { futureMap.put(threadId,new MessageFuture()); countDownLatch.countDown(); ByteBuf encoded = ctx.alloc().buffer(4 * requestJson.length()); encoded.writeBytes(requestJson.getBytes()); ctx.writeAndFlush(encoded); } }); } }).connect("127.0.0.1", 20006); } public Response getResponse(Long threadId){ MessageFuture messageFuture = null; try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } messageFuture = futureMap.get(threadId); return messageFuture.getMessage(); }
这里面我们用到了 CountDownLatch
类,即等待发送完消息以后通知我能获取数据了。这里面的代码和服务端的差不多,其中有区别的地方就是在发送数据的时候将线程ID和 MessageFuture
放入Map中,在得到服务端发送的数据时取出并放入得到的Response。
总结
到目前为止我们就完成了我们的项目改造,只是简单的应用了一下Netty的客户端和服务端的通信,因为在学习的过程中如果没有运用的话,那么感觉记忆没有那么牢靠,所以就有了此次的项目改造的计划。虽然完成了简单的通信,但是我知道还有些地方需要优化,例如用 synchronized
在以后学习了 AQS
以后希望也能够学以致用将这里给改一下。
完整的项目地址
徒手撸一个简单的RPC框架
徒手撸一个简单的IOC
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
疯狂Java讲义
李刚 / 电子工业出版社 / 2014-7-1 / 109.00元
《疯狂Java讲义(第3版)(含CD光盘1张)》是《疯狂Java讲义》的第3版,第3版保持了前两版系统、全面、讲解浅显、细致的特性,全面新增介绍了Java 8的新特性,《疯狂Java讲义(第3版)(含CD光盘1张)》大部分示例程序都采用Lambda表达式、流式API进行了改写,因此务必使用Java 8的JDK来编译、运行。 《疯狂Java讲义(第3版)(含CD光盘1张)》深入介绍了Java编......一起来看看 《疯狂Java讲义》 这本书的介绍吧!