内容简介:每日一博 | 使用 t-io 实现简单的 rpc 调用
1.先从最基础的来 编写接口及实现类
public interface IUserService { public String getList(); public String getList(Integer id,String name); }
import cn.ensoft.service.IUserService; public class UserServiceImpl implements IUserService{ @Override public String getList() { return "{'id':'abc','name':'hello'}"; } @Override public String getList(Integer id, String name) { return "{'id':'"+id+"','name':'"+name+"'}"; } }
2.t-io自定义公共类
2.1业务消息包
import java.io.UnsupportedEncodingException; import org.tio.core.intf.Packet; public class MsgPacket extends Packet{ public static final String CHARSET = "GB18030"; private byte[] body; public MsgPacket() { } public MsgPacket(String msg) { try { this.body = msg.getBytes(MsgPacket.CHARSET); } catch (UnsupportedEncodingException e) { } } /** * @return the body */ public byte[] getBody(){ return body; } /** * @param body the body to set */ public void setBody(byte[] body){ this.body = body; } }
2.2编码&解码
import java.nio.ByteBuffer; import org.tio.core.ChannelContext; import org.tio.core.GroupContext; import org.tio.core.exception.AioDecodeException; import org.tio.core.intf.AioHandler; /** * 服务器端和客户端的编码解码算法是一样的,所以抽象一个公共的父类出来 * @author tanyaowu */ public abstract class MsgHandler implements AioHandler<Object, MsgPacket, Object> { /** * 编码:把业务消息包编码为可以发送的ByteBuffer * 总的消息结构:消息头 + 消息体 * 消息头结构: 4个字节,存储消息体的长度 * 消息体结构: 对象的json串的byte[] */ @Override public ByteBuffer encode(MsgPacket packet, GroupContext<Object, MsgPacket, Object> groupContext, ChannelContext<Object, MsgPacket, Object> channelContext){ byte[] body = packet.getBody(); int bodyLen = 0; if (body != null){ bodyLen = body.length; } //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度 int allLen = bodyLen; //创建一个新的bytebuffer ByteBuffer buffer = ByteBuffer.allocate(allLen); //设置字节序 buffer.order(groupContext.getByteOrder()); //写入消息体 if (body != null){ buffer.put(body); } return buffer; } /** * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包 */ @Override public MsgPacket decode(ByteBuffer buffer, ChannelContext<Object, MsgPacket, Object> channelContext) throws AioDecodeException{ int readableLength = buffer.limit() - buffer.position();//真实数据长度 if (readableLength == 0){ return null; } byte[] dst = new byte[readableLength]; MsgPacket imPacket = new MsgPacket(); buffer.get(dst, 0, readableLength); imPacket.setBody(dst); return imPacket; } }
2.3服务端与客户端约定端口、IP及 客户端 连接超时时间
public class MsgConst { public static final int PORT = 6789; public static final String IP = "127.0.0.1"; public static final int TIME_OUT = 10000; }
2.4内容 编码&解码 =>hex
import java.io.ByteArrayOutputStream; import java.io.UnsupportedEncodingException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; /** * 进制之间的转换 * @author jwzhangjie * */ public class HexadecimalConver { private static String hexString="0123456789ABCDEF;{}\"'"; /** * 将字符串编码成16进制数字,适用于所有字符(包括中文) * @throws UnsupportedEncodingException */ public static String encode(String str) throws UnsupportedEncodingException{ //根据默认编码获取字节数组 byte[] bytes= str.getBytes(MsgPacket.CHARSET); StringBuilder sb = new StringBuilder(bytes.length*2); //将字节数组中每个字节拆解成2位16进制整数 for(int i=0;i<bytes.length;i++){ sb.append(hexString.charAt((bytes[i]&0xf0)>>4)); sb.append(hexString.charAt((bytes[i]&0x0f)>>0)); } return sb.toString(); } /** * 将16进制数字解码成字符串,适用于所有字符(包括中文) * @throws UnsupportedEncodingException */ public static String decode(String bytes) throws UnsupportedEncodingException{ ByteArrayOutputStream baos=new ByteArrayOutputStream(bytes.length()/2); //将每2位16进制整数组装成一个字节 for(int i=0;i<bytes.length();i+=2) baos.write((hexString.indexOf(bytes.charAt(i))<<4 |hexString.indexOf(bytes.charAt(i+1)))); return new String(baos.toByteArray(), MsgPacket.CHARSET); } public static String genWriteJson(String inf,String method,Class<?>[] argsType,Object[] args){ JSONObject jsonObject = new JSONObject(); jsonObject.put("inf", inf); jsonObject.put("method", method); jsonObject.put("argsType", argsType); jsonObject.put("args", args); return JSON.toJSONString(jsonObject); } public static String genWriteCode(String json){ try { return HexadecimalConver.encode(json); } catch (UnsupportedEncodingException e) { return null; } } }
3.编写tio-server用于客户端远程调用
3.1 服务端入口
import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.tio.server.AioServer; import org.tio.server.ServerGroupContext; import org.tio.server.intf.ServerAioHandler; import org.tio.server.intf.ServerAioListener; public class MsgServerStarter{ public static Map<String,Class<?>> regist; //handler, 包括编码、解码、消息处理 public static ServerAioHandler<Object, MsgPacket, Object> aioHandler = new MsgServerAioHandler(); //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口 public static ServerAioListener<Object, MsgPacket, Object> aioListener = null; //一组连接共用的上下文对象 public static ServerGroupContext<Object, MsgPacket, Object> serverGroupContext = new ServerGroupContext<>(aioHandler,aioListener); //aioServer对象 public static AioServer<Object, MsgPacket, Object> aioServer = new AioServer<>(serverGroupContext); //有时候需要绑定ip,不需要则null public static String serverIp = MsgConst.IP; //监听的端口 public static int serverPort = MsgConst.PORT; /** * 启动程序入口 * @throws IOException * @throws IllegalAccessException * @throws InstantiationException */ public static void main(String[] args) throws IOException { regist = new HashMap<String,Class<?>>(); regist.put(IUserService.class.getName(),UserServiceImpl.class); aioServer.start(serverIp, serverPort); } }
3.2消息处理器
import java.io.UnsupportedEncodingException; import org.nutz.log.Log; import org.nutz.log.Logs; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.server.intf.ServerAioHandler; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; public class MsgServerAioHandler extends MsgHandler implements ServerAioHandler<Object, MsgPacket, Object> { private static final Log log = Logs.get(); /** * 处理消息 * @throws UnsupportedEncodingException */ @Override public Object handler(MsgPacket packet, ChannelContext<Object, MsgPacket, Object> channelContext) throws UnsupportedEncodingException { MsgPacket resppacket = new MsgPacket(); byte[] body = packet.getBody(); if (body != null) { String encryCode = new String(body, MsgPacket.CHARSET); log.debugf("服务端收到消息:%s", encryCode); try { String requestJson = HexadecimalConver.decode(encryCode); log.debugf("解析数据:%s", requestJson); JSONObject requestObj = JSON.parseObject(requestJson); String inf = requestObj.getString("inf"); String method = requestObj.getString("method"); JSONArray argsTypeB = requestObj.getJSONArray("argsType"); Class<?>[] argsType = new Class[argsTypeB.size()]; for (int i = 0; i < argsTypeB.size(); i++) { String classzz = argsTypeB.getString(i); argsType[i] = Class.forName(classzz); } JSONArray argsB = requestObj.getJSONArray("args"); Object[] args = new Object[argsB.size()]; for (int i = 0; i < argsB.size(); i++) { Object atp = argsB.get(i); args[i] = atp; } Class<?> bindClazz = MsgServerStarter.regist.get(inf); Object bindObj = bindClazz.newInstance(); String responseMsg = (String)bindClazz.getMethod(method,argsType).invoke(bindObj,args); String responseCode = HexadecimalConver.genWriteCode(responseMsg); resppacket.setBody(responseCode.getBytes(MsgPacket.CHARSET)); } catch (Exception e) { log.error("该消息无法解析",e); log.errorf("msg", e); resppacket.setBody(FAIL.getBytes(MsgPacket.CHARSET)); } Aio.send(channelContext, resppacket); } return null; } }
4.编写tio-client客户端
4.1客户端入口
import java.util.HashMap; import java.util.Map; import org.tio.client.AioClient; import org.tio.client.ClientChannelContext; import org.tio.client.ClientGroupContext; import org.tio.client.ReconnConf; import org.tio.client.intf.ClientAioHandler; import org.tio.client.intf.ClientAioListener; import org.tio.core.Node; public class MsgClientStarter { public static Map<String, String> result; //服务器节点 public static Node serverNode = new Node(MsgConst.IP,MsgConst.PORT); //handler, 包括编码、解码、消息处理 public static ClientAioHandler<Object, MsgPacket, Object> aioClientHandler = new MsgClientAioHandler(); //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口 public static ClientAioListener<Object, MsgPacket, Object> aioListener = null; //new MsgClientAioListener(); //断链后自动连接的,不想自动连接请设为null private static ReconnConf<Object, MsgPacket, Object> reconnConf = null; //private static ReconnConf<Object, MsgPacket, Object> reconnConf = new ReconnConf<Object, MsgPacket, Object>(5000L); //一组连接共用的上下文对象 public static ClientGroupContext<Object, MsgPacket, Object> clientGroupContext = new ClientGroupContext<>(aioClientHandler, aioListener, reconnConf); public static AioClient<Object, MsgPacket, Object> aioClient = null; public static ClientChannelContext<Object, MsgPacket, Object> clientChannelContext = null; /** * 启动程序入口 * @throws Exception */ public static void instance() throws Exception { if(aioClient==null){ aioClient = new AioClient<Object, MsgPacket, Object>(clientGroupContext); clientChannelContext = aioClient.connect(serverNode); clientChannelContext.setUserid(java.util.UUID.randomUUID().toString().toUpperCase()); result = new HashMap<String,String>(); } } }
4.2消息处理器
import org.tio.client.intf.ClientAioHandler; import org.tio.core.ChannelContext; import cn.ensoft.tio.MsgHandler; import cn.ensoft.tio.MsgPacket; import cn.ensoft.util.HexadecimalConver; public class MsgClientAioHandler extends MsgHandler implements ClientAioHandler<Object, MsgPacket, Object> { /** * 处理消息 */ @Override public Object handler(MsgPacket packet, ChannelContext<Object, MsgPacket, Object> channelContext) throws Exception { byte[] body = packet.getBody(); if (body != null) { String str = new String(body, MsgPacket.CHARSET); System.out.println("CLIENT=>>收到消息:" + str); System.out.println("CLIENT=>>收到消息:" + HexadecimalConver.decode(str)); MsgClientStarter.result.put(channelContext.getUserid(), HexadecimalConver.decode(str)); } return null; } @Override public MsgPacket heartbeatPacket() { return null; } }
5.代理
import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import org.nutz.log.Log; import org.nutz.log.Logs; import org.tio.core.Aio; import cn.ensoft.client.MsgClientStarter; import cn.ensoft.tio.MsgPacket; import cn.ensoft.tio.MsgConst; public class ClientProxy<T> implements InvocationHandler{ private static final Log log = Logs.get(); private Object targetClazz; public ClientProxy(final Class<T> clazz) throws InstantiationException, IllegalAccessException { this.targetClazz = clazz.newInstance(); } public Object bind() { return Proxy.newProxyInstance(targetClazz.getClass().getClassLoader(),targetClazz.getClass().getInterfaces(), this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MsgClientStarter.instance(); String impl = targetClazz.getClass().getCanonicalName(); String inf = targetClazz.getClass().getInterfaces()[0].getName(); String methodName = method.getName(); Class<?>[] argsType = method.getParameterTypes(); log.debugf("参数-IMPL=%s", impl); log.debugf("参数-INF=%s", inf); log.debugf("参数-method=%s", methodName); log.debugf("参数-argsType=%s",JSON.toJSONString(argsType)); log.debugf("参数-args=%s", JSON.toJSONString(args)); String requestJson = HexadecimalConver.genWriteJson(inf, methodName,argsType, args); String requestCode = HexadecimalConver.genWriteCode(requestJson); log.debugf("requestJson=%s", requestJson); log.debugf("requestCode=%s", requestCode); log.debugf("requestCodeD=%s", HexadecimalConver.decode(requestCode)); MsgPacket packet = new MsgPacket(); packet.setBody(requestCode.getBytes(MsgPacket.CHARSET)); Aio.send(MsgClientStarter.clientChannelContext, packet); long stimes = System.currentTimeMillis(); String result = null; while(true){ result = MsgClientStarter.result.get(MsgClientStarter.clientChannelContext.getUserid()); if(null != result){ break; }else{ if((System.currentTimeMillis()-stimes)>=MsgConst.TIME_OUT){ break; } } } Aio.remove(MsgClientStarter.clientChannelContext, "sucess"); return result; } }
6.测试
public class Test { public static void main(String[] args) { try { IUserService userService = (IUserService) new ClientProxy<UserServiceImpl>(UserServiceImpl.class).bind(); String msg = userService.getList(888,"hello-rpc-"+java.util.UUID.randomUUID()); System.out.println("============"+msg); System.exit(0); } catch (Exception e) { e.printStackTrace(); } } }
先启动服务端
10:28:26,196 WARN AioServer:90 - t-io server started, listen on 127.0.0.1:6789
然后运行Test
10:34:18,356 INFO AioClient:338 - [1]: curr:0, closed:0, received:(0p)(0b), handled:0, sent:(0p)(0b) 10:34:18,356 INFO ConnectionCompletionHandler:98 - connected to 127.0.0.1:6789 10:34:18,356 DEBUG ClientProxy:39 - 参数-IMPL=cn.ensoft.service.impl.UserServiceImpl 10:34:18,356 DEBUG ClientProxy:40 - 参数-INF=cn.ensoft.service.IUserService 10:34:18,356 DEBUG ClientProxy:41 - 参数-method=getList 10:34:18,387 DEBUG ClientProxy:42 - 参数-argsType=["java.lang.Integer","java.lang.String"] 10:34:18,387 DEBUG ClientProxy:43 - 参数-args=[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"] 10:34:18,402 DEBUG ClientProxy:46 - requestJson={"inf":"cn.ensoft.service.IUserService","args":[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"],"method":"getList","argsType":["java.lang.Integer","java.lang.String"]} 10:34:18,402 DEBUG ClientProxy:47 - requestCode=7B22696E66223A22636E2E656E736F66742E736572766963652E495573657253657276696365222C2261726773223A5B3838382C2268656C6C6F2D7270632D65376639393930312D336639632D343163632D386239342D353963663664306433316536225D2C226D6574686F64223A226765744C697374222C226172677354797065223A5B226A6176612E6C616E672E496E7465676572222C226A6176612E6C616E672E537472696E67225D7D 10:34:18,402 DEBUG ClientProxy:48 - requestCodeD={"inf":"cn.ensoft.service.IUserService","args":[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"],"method":"getList","argsType":["java.lang.Integer","java.lang.String"]} CLIENT=>>收到消息:7B276964273A27383838272C276E616D65273A2768656C6C6F2D7270632D65376639393930312D336639632D343163632D386239342D353963663664306433316536277D CLIENT=>>收到消息:{'id':'888','name':'hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6'} 10:34:18,408 INFO DecodeRunnable:151 - 0.0.0.0:57205 收到消息 10:34:18,408 DEBUG DecodeRunnable:168 - 0.0.0.0:57205,组包后,数据刚好用完 ============{'id':'888','name':'hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6'} 10:34:18,408 INFO CloseRunnable:120 - 准备关闭连接:0.0.0.0:57205, isNeedRemove:true, sucess 10:34:18,408 INFO Aio:78 - 0.0.0.0:57205 正在等待被关闭
写得不好,请赐教,勿喷 谢谢!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- golang实现RPC调用
- 调用腾讯优图开放平台进行人脸识别:Java调用API实现
- 实现react组件的递归调用
- Ajax客户端异步调用服务端的实现方法(js调用cs文件)
- 徒手撸框架--实现 RPC 远程调用
- angular 用Observable实现异步调用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
PHP和MySQL Web开发
Luke Welling、Laura Thomson / 武欣、邵煜 / 机械工业出版社 / 2005-12 / 78.00元
本书将PHP开发与MySQL应用相结合,分别对PHP和MySQL做了深入浅出的分析,不仅介绍PHP和MySQL的一般概念,而且对PHP和MySQL的Web应用做了较全面的阐述,并包括几个经典且实用的例子。一起来看看 《PHP和MySQL Web开发》 这本书的介绍吧!