每日一博 | 使用 t-io 实现简单的 rpc 调用

栏目: 服务器 · 发布时间: 8年前

内容简介:每日一博 | 使用 t-io 实现简单的 rpc 调用

1.先从最基础的来 编写接口及实现类

public interface IUserService {
	public String getList();
	public String getList(Integer id,String name);
}
import cn.ensoft.service.IUserService;

public class UserServiceImpl implements IUserService{
	@Override
	public String getList() {
		return "{'id':'abc','name':'hello'}";
	}

	@Override
	public String getList(Integer id, String name) {
		return "{'id':'"+id+"','name':'"+name+"'}";
	}
}

2.t-io自定义公共类

2.1业务消息包

import java.io.UnsupportedEncodingException;
import org.tio.core.intf.Packet;

public class MsgPacket extends Packet{
	
    public static final String CHARSET = "GB18030";
    
    private byte[] body;
    
    public MsgPacket() {
	}
    public MsgPacket(String msg) {
    	try {
			this.body = msg.getBytes(MsgPacket.CHARSET);
		} catch (UnsupportedEncodingException e) {
		}
	}
    /**
     * @return the body
     */
    public byte[] getBody(){
        return body;
    }
    /**
     * @param body the body to set
     */
    public void setBody(byte[] body){
        this.body = body;
    }
}

2.2编码&解码

import java.nio.ByteBuffer;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.AioHandler;

/**
 * 服务器端和客户端的编码解码算法是一样的,所以抽象一个公共的父类出来
 * @author tanyaowu 
 */
public abstract class MsgHandler implements AioHandler<Object, MsgPacket, Object>
{
    /**
     * 编码:把业务消息包编码为可以发送的ByteBuffer
     * 总的消息结构:消息头 + 消息体
     * 消息头结构:    4个字节,存储消息体的长度
     * 消息体结构:   对象的json串的byte[]
     */
    @Override
    public ByteBuffer encode(MsgPacket packet, GroupContext<Object, MsgPacket, Object> groupContext, ChannelContext<Object, MsgPacket, Object> channelContext){
        byte[] body = packet.getBody();
        int bodyLen = 0;
        if (body != null){
            bodyLen = body.length;
        }
        //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度
        int allLen = bodyLen;
        //创建一个新的bytebuffer
        ByteBuffer buffer = ByteBuffer.allocate(allLen);
        //设置字节序
        buffer.order(groupContext.getByteOrder());
        //写入消息体
        if (body != null){
            buffer.put(body);
        }
        return buffer;
    }
 
    /**
     * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包
     */
    @Override
    public MsgPacket decode(ByteBuffer buffer, ChannelContext<Object, MsgPacket, Object> channelContext) throws AioDecodeException{
    	int readableLength = buffer.limit() - buffer.position();//真实数据长度
	    if (readableLength == 0){
	    	return null;
	    }
    	byte[] dst = new byte[readableLength];
    	MsgPacket imPacket = new MsgPacket();
    	buffer.get(dst, 0, readableLength);
	    imPacket.setBody(dst);
        return imPacket;
    }
}

2.3服务端与客户端约定端口、IP及 客户端 连接超时时间

public class MsgConst
{
	public static final int PORT = 6789;
	public static final String IP = "127.0.0.1";
	public static final int TIME_OUT = 10000;
}

2.4内容 编码&解码 =>hex

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

/** 
 * 进制之间的转换 
 * @author jwzhangjie 
 * 
 */
public class HexadecimalConver {

private static String hexString="0123456789ABCDEF;{}\"'";
 
 	/** 
 	 * 将字符串编码成16进制数字,适用于所有字符(包括中文) 
	 * @throws UnsupportedEncodingException 
	*/
	public static String encode(String str) throws UnsupportedEncodingException{
			//根据默认编码获取字节数组
			byte[] bytes= str.getBytes(MsgPacket.CHARSET);
			StringBuilder sb = new StringBuilder(bytes.length*2);
			//将字节数组中每个字节拆解成2位16进制整数
			for(int i=0;i<bytes.length;i++){
				sb.append(hexString.charAt((bytes[i]&0xf0)>>4));
				sb.append(hexString.charAt((bytes[i]&0x0f)>>0));
			}
			return sb.toString();
	}
	/** 
	 * 将16进制数字解码成字符串,适用于所有字符(包括中文) 
	 * @throws UnsupportedEncodingException 
	*/
	public static String decode(String bytes) throws UnsupportedEncodingException{
		 ByteArrayOutputStream baos=new ByteArrayOutputStream(bytes.length()/2);
		//将每2位16进制整数组装成一个字节
		for(int i=0;i<bytes.length();i+=2)
		baos.write((hexString.indexOf(bytes.charAt(i))<<4 |hexString.indexOf(bytes.charAt(i+1))));
		return new String(baos.toByteArray(), MsgPacket.CHARSET);
	}
	public static String genWriteJson(String inf,String method,Class<?>[] argsType,Object[] args){
		JSONObject jsonObject = new JSONObject();
		jsonObject.put("inf", inf);
		jsonObject.put("method", method);
		jsonObject.put("argsType", argsType);
		jsonObject.put("args", args);
		return JSON.toJSONString(jsonObject);
	}
	public static String genWriteCode(String json){
		try {
			return HexadecimalConver.encode(json);
		} catch (UnsupportedEncodingException e) {
			return null;
		}
	}
}

3.编写tio-server用于客户端远程调用

3.1 服务端入口

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.tio.server.AioServer;
import org.tio.server.ServerGroupContext;
import org.tio.server.intf.ServerAioHandler;
import org.tio.server.intf.ServerAioListener;

public class MsgServerStarter{
	
	public static Map<String,Class<?>> regist;
    //handler, 包括编码、解码、消息处理
    public static ServerAioHandler<Object, MsgPacket, Object> aioHandler = new MsgServerAioHandler();
     
    //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
    public static ServerAioListener<Object, MsgPacket, Object> aioListener = null;
     
    //一组连接共用的上下文对象
    public static ServerGroupContext<Object, MsgPacket, Object> serverGroupContext = new ServerGroupContext<>(aioHandler,aioListener);
     
    //aioServer对象
    public static AioServer<Object, MsgPacket, Object> aioServer = new AioServer<>(serverGroupContext);
     
    //有时候需要绑定ip,不需要则null
    public static String serverIp = MsgConst.IP;
    //监听的端口
    public static int serverPort = MsgConst.PORT;
    
   /**
     * 启动程序入口
     * @throws IOException 
     * @throws IllegalAccessException 
     * @throws InstantiationException 
     */
    public static void main(String[] args) throws IOException 
    {
    	regist = new HashMap<String,Class<?>>();
    	regist.put(IUserService.class.getName(),UserServiceImpl.class);
        aioServer.start(serverIp, serverPort);
    }
}

3.2消息处理器

import java.io.UnsupportedEncodingException;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.server.intf.ServerAioHandler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

public class MsgServerAioHandler extends MsgHandler implements ServerAioHandler<Object, MsgPacket, Object>
{
	private static final Log log = Logs.get();
	
    /** 
     * 处理消息
     * @throws UnsupportedEncodingException 
     */
    @Override
    public Object handler(MsgPacket packet, ChannelContext<Object, MsgPacket, Object> channelContext) throws UnsupportedEncodingException
    {
    	MsgPacket resppacket = new MsgPacket();
        byte[] body = packet.getBody();
        if (body != null)
        {
        	String encryCode = new String(body, MsgPacket.CHARSET);
        	log.debugf("服务端收到消息:%s", encryCode);
            try {
                String requestJson = HexadecimalConver.decode(encryCode);
                log.debugf("解析数据:%s", requestJson);
            	JSONObject requestObj =  JSON.parseObject(requestJson);
            	String inf = requestObj.getString("inf");
            	String method = requestObj.getString("method");
            	JSONArray argsTypeB = requestObj.getJSONArray("argsType");
            	Class<?>[] argsType = new Class[argsTypeB.size()];
            	for (int i = 0; i < argsTypeB.size(); i++) {
					String classzz = argsTypeB.getString(i);
					argsType[i]  = Class.forName(classzz);
				}
            	JSONArray argsB = requestObj.getJSONArray("args");
            	Object[] args = new Object[argsB.size()];
            	for (int i = 0; i < argsB.size(); i++) {
					Object atp = argsB.get(i);
					args[i] = atp;
				}
            	Class<?> bindClazz = MsgServerStarter.regist.get(inf);
        		Object bindObj = bindClazz.newInstance();
        		String responseMsg = (String)bindClazz.getMethod(method,argsType).invoke(bindObj,args);
        		String responseCode = HexadecimalConver.genWriteCode(responseMsg);
            	resppacket.setBody(responseCode.getBytes(MsgPacket.CHARSET));
			} catch (Exception e) {
	            log.error("该消息无法解析",e);
	            log.errorf("msg", e);
	            resppacket.setBody(FAIL.getBytes(MsgPacket.CHARSET));
			}
            Aio.send(channelContext, resppacket);
        }
        return null;
    }
}

4.编写tio-client客户端

4.1客户端入口

import java.util.HashMap;
import java.util.Map;
import org.tio.client.AioClient;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.ReconnConf;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Node;

public class MsgClientStarter
{
	public static Map<String, String> result;
    //服务器节点
    public static Node serverNode = new Node(MsgConst.IP,MsgConst.PORT);
  
    //handler, 包括编码、解码、消息处理
    public static ClientAioHandler<Object, MsgPacket, Object> aioClientHandler = new MsgClientAioHandler();
     
    //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口
    public static ClientAioListener<Object, MsgPacket, Object> aioListener = null;
    		//new MsgClientAioListener();
     
    //断链后自动连接的,不想自动连接请设为null
    private static ReconnConf<Object, MsgPacket, Object> reconnConf = null;
    //private static ReconnConf<Object, MsgPacket, Object> reconnConf = new ReconnConf<Object, MsgPacket, Object>(5000L);
 
    //一组连接共用的上下文对象
    public static ClientGroupContext<Object, MsgPacket, Object> clientGroupContext = new ClientGroupContext<>(aioClientHandler, aioListener, reconnConf);
 
    public static AioClient<Object, MsgPacket, Object> aioClient = null;
    public static ClientChannelContext<Object, MsgPacket, Object> clientChannelContext = null;
 
    /**
     * 启动程序入口
     * @throws Exception 
     */
    public static void instance() throws Exception
    {
    	if(aioClient==null){
    		 aioClient = new AioClient<Object, MsgPacket, Object>(clientGroupContext);
    	     clientChannelContext = aioClient.connect(serverNode);
    	     clientChannelContext.setUserid(java.util.UUID.randomUUID().toString().toUpperCase());
    	     result = new HashMap<String,String>();
    	}
    }
 
}

4.2消息处理器

import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import cn.ensoft.tio.MsgHandler;
import cn.ensoft.tio.MsgPacket;
import cn.ensoft.util.HexadecimalConver;

public class MsgClientAioHandler extends MsgHandler implements ClientAioHandler<Object, MsgPacket, Object>
{
    /** 
     * 处理消息
     */
    @Override
    public Object handler(MsgPacket packet, ChannelContext<Object, MsgPacket, Object> channelContext) throws Exception
    {
        byte[] body = packet.getBody();
        if (body != null)
        {
            String str = new String(body, MsgPacket.CHARSET);
            System.out.println("CLIENT=>>收到消息:" + str);
            System.out.println("CLIENT=>>收到消息:" + HexadecimalConver.decode(str));
            MsgClientStarter.result.put(channelContext.getUserid(), HexadecimalConver.decode(str));
        }
        return null;
    }
    @Override
    public MsgPacket heartbeatPacket()
    {
        return null;
    }
}

5.代理

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.tio.core.Aio;
import cn.ensoft.client.MsgClientStarter;
import cn.ensoft.tio.MsgPacket;
import cn.ensoft.tio.MsgConst;

public class ClientProxy<T> implements InvocationHandler{
	
	private static final Log log = Logs.get();

    
	private Object targetClazz;
    
	public  ClientProxy(final Class<T> clazz) throws InstantiationException, IllegalAccessException {
		this.targetClazz = clazz.newInstance();
	}

	public Object bind() {
        return Proxy.newProxyInstance(targetClazz.getClass().getClassLoader(),targetClazz.getClass().getInterfaces(), this);  
	}
	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		MsgClientStarter.instance();
		String impl = targetClazz.getClass().getCanonicalName();
		String inf = targetClazz.getClass().getInterfaces()[0].getName();
		String methodName = method.getName();
		Class<?>[] argsType = method.getParameterTypes();
		log.debugf("参数-IMPL=%s", impl);
		log.debugf("参数-INF=%s",  inf);
		log.debugf("参数-method=%s", methodName);
		log.debugf("参数-argsType=%s",JSON.toJSONString(argsType));
		log.debugf("参数-args=%s", JSON.toJSONString(args));
		String requestJson = HexadecimalConver.genWriteJson(inf, methodName,argsType, args);
		String requestCode = HexadecimalConver.genWriteCode(requestJson);
		log.debugf("requestJson=%s", requestJson);
		log.debugf("requestCode=%s", requestCode);
		log.debugf("requestCodeD=%s", HexadecimalConver.decode(requestCode));
		MsgPacket packet = new MsgPacket();
        packet.setBody(requestCode.getBytes(MsgPacket.CHARSET));
        Aio.send(MsgClientStarter.clientChannelContext, packet);
        long stimes = System.currentTimeMillis();
        String result = null;
        while(true){
        	result = MsgClientStarter.result.get(MsgClientStarter.clientChannelContext.getUserid());
        	if(null != result){
        		break;
        	}else{
        		if((System.currentTimeMillis()-stimes)>=MsgConst.TIME_OUT){
            		break;
            	}
            }
        }
        Aio.remove(MsgClientStarter.clientChannelContext, "sucess");
		return result;
	}
}

6.测试

public class Test {
	public static void main(String[] args)  {
		try {
			IUserService userService = (IUserService) new ClientProxy<UserServiceImpl>(UserServiceImpl.class).bind();
			String msg = userService.getList(888,"hello-rpc-"+java.util.UUID.randomUUID());
			System.out.println("============"+msg);
			System.exit(0);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

先启动服务端

10:28:26,196 WARN AioServer:90 - t-io server started, listen on 127.0.0.1:6789

然后运行Test

10:34:18,356 INFO AioClient:338 - [1]: curr:0, closed:0, received:(0p)(0b), handled:0, sent:(0p)(0b)
10:34:18,356 INFO ConnectionCompletionHandler:98 - connected to 127.0.0.1:6789
10:34:18,356 DEBUG ClientProxy:39 - 参数-IMPL=cn.ensoft.service.impl.UserServiceImpl
10:34:18,356 DEBUG ClientProxy:40 - 参数-INF=cn.ensoft.service.IUserService
10:34:18,356 DEBUG ClientProxy:41 - 参数-method=getList
10:34:18,387 DEBUG ClientProxy:42 - 参数-argsType=["java.lang.Integer","java.lang.String"]
10:34:18,387 DEBUG ClientProxy:43 - 参数-args=[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"]
10:34:18,402 DEBUG ClientProxy:46 - requestJson={"inf":"cn.ensoft.service.IUserService","args":[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"],"method":"getList","argsType":["java.lang.Integer","java.lang.String"]}
10:34:18,402 DEBUG ClientProxy:47 - requestCode=7B22696E66223A22636E2E656E736F66742E736572766963652E495573657253657276696365222C2261726773223A5B3838382C2268656C6C6F2D7270632D65376639393930312D336639632D343163632D386239342D353963663664306433316536225D2C226D6574686F64223A226765744C697374222C226172677354797065223A5B226A6176612E6C616E672E496E7465676572222C226A6176612E6C616E672E537472696E67225D7D
10:34:18,402 DEBUG ClientProxy:48 - requestCodeD={"inf":"cn.ensoft.service.IUserService","args":[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"],"method":"getList","argsType":["java.lang.Integer","java.lang.String"]}
CLIENT=>>收到消息:7B276964273A27383838272C276E616D65273A2768656C6C6F2D7270632D65376639393930312D336639632D343163632D386239342D353963663664306433316536277D
CLIENT=>>收到消息:{'id':'888','name':'hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6'}
10:34:18,408 INFO DecodeRunnable:151 - 0.0.0.0:57205 收到消息 
10:34:18,408 DEBUG DecodeRunnable:168 - 0.0.0.0:57205,组包后,数据刚好用完
============{'id':'888','name':'hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6'}
10:34:18,408 INFO CloseRunnable:120 - 准备关闭连接:0.0.0.0:57205, isNeedRemove:true, sucess
10:34:18,408 INFO Aio:78 - 0.0.0.0:57205 正在等待被关闭

写得不好,请赐教,勿喷 谢谢!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Elements of Programming

Elements of Programming

Alexander A. Stepanov、Paul McJones / Addison-Wesley Professional / 2009-6-19 / USD 39.99

Elements of Programming provides a different understanding of programming than is presented elsewhere. Its major premise is that practical programming, like other areas of science and engineering, mus......一起来看看 《Elements of Programming》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具