内容简介:基于JMS实现RPC功能
前言
JMS的点对点消息传送模式允许JMS客户端通过队列(queue)这个虚拟通道来同步和异步发送、接收消息;点对点消息传送模式既支持异步“即发即弃”消息传送方式,又支持同步请求/应答消息传送方式;点对点消息传送模式支持负载均衡,它允许多个接收者监听同一队列,并以此来分配负载;所以完全可以以JMS的点对点消息传送模式实现一套RPC框架。
准备
jdk:jdk1.7.0_80
jms:apache-activemq-5.10.0
serialize:protostuff
整体结构
整个实现包括:rpc-jms-common,rpc-jms-client,rpc-jms-server;
以及测试模块:rpc-jms-test-api,rpc-jms-test-client,rpc-jms-test-server
整体如下图所示:
rpc-jms-common:公共模块
rpc-jms-client:给客户端使用的支持库
rpc-jms-server:给服务端使用的支持库
客户端实现
客户端其实核心思想都是动态代理,生成的代理类将接口名称,版本信息,方法名,参数类型,参数值等信息封装成RpcRequest,然后使用protostuff对其进行序列化操作,将序列化后的二进制数据放入BytesMessage中,发送给消息队列;然后等待消息的返回,从BytesMessage中获取二进制数据,使用protostuff进行反序列化操作;最后将结果或者异常展示给用户,具体代码如下:
public class RpcClient { private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class); private QueueConnection qConnection; private QueueSession qSession; private Queue requestQ; private Queue responseQ; public RpcClient(String rpcFactory, String rpcRequest, String rpcResponse) { try { Context ctx = new InitialContext(); QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup(rpcFactory); qConnection = factory.createQueueConnection(); qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); requestQ = (Queue) ctx.lookup(rpcRequest); responseQ = (Queue) ctx.lookup(rpcResponse); qConnection.start(); } catch (Exception e) { LOGGER.error("init rpcproxy error", e); } } public <T> T create(final Class<?> interfaceClass) { return create(interfaceClass, ""); } @SuppressWarnings("unchecked") public <T> T create(final Class<?> interfaceClass, final String serviceVersion) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(method.getDeclaringClass().getName()); request.setServiceVersion(serviceVersion); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); BytesMessage requestMessage = qSession.createBytesMessage(); requestMessage.writeBytes(SerializationUtil.serialize(request)); requestMessage.setJMSReplyTo(responseQ); QueueSender qsender = qSession.createSender(requestQ); qsender.send(requestMessage); String filter = "JMSCorrelationID = '" + requestMessage.getJMSMessageID() + "'"; QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter); BytesMessage responseMessage = (BytesMessage) qReceiver.receive(30000); byte messByte[] = new byte[(int) responseMessage.getBodyLength()]; responseMessage.readBytes(messByte); RpcResponse rpcResponse = SerializationUtil.deserialize(messByte, RpcResponse.class); if (rpcResponse.hasException()) { throw rpcResponse.getException(); } else { return rpcResponse.getResult(); } } }); } }
RpcClient被创建的时候就和jms建立了连接,相关jms的配置信息在测试部分讲解。
其中封装请求类RpcRequest代码如下:
public class RpcRequest { private String requestId; private String interfaceName; private String serviceVersion; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; //省略get和set方法 }
封装响应类RpcResponse代码如下:
public class RpcResponse{ private String requestId; private Exception exception; private Object result; //省略get和set方法 }
服务端实现
服务器端首先加载所有需要对外提供服务的类对象;然后监听消息队列,从BytesMessage中获取二进制数据,通过protostuff反序列化为RpcRequest对象;最后通过反射的方式调用服务类,获取的结果封装成RpcResponse,然后序列化二进制写入BytesMessage中,发送给客户端,具体代码如下:
public class RpcServer implements ApplicationContextAware, InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class); private QueueConnection qConnection; private QueueSession qSession; private Queue requestQ; private String rpcFactory; private String rpcRequest; /** * 存放 服务名 与 服务对象 之间的映射关系 */ private Map<String, Object> serviceMap = new HashMap<String, Object>(); public RpcServer(String rpcFactory, String rpcRequest) { this.rpcFactory = rpcFactory; this.rpcRequest = rpcRequest; } @Override public void afterPropertiesSet() throws Exception { try { Context ctx = new InitialContext(); QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup(rpcFactory); qConnection = factory.createQueueConnection(); qSession = qConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); requestQ = (Queue) ctx.lookup(rpcRequest); qConnection.start(); QueueReceiver receiver = qSession.createReceiver(requestQ); receiver.setMessageListener(new RpcMessageListener(qSession, serviceMap)); LOGGER.info("ready receiver message"); } catch (Exception e) { if (qConnection != null) { try { qConnection.close(); } catch (JMSException e1) { } } LOGGER.error("server start error", e); } } @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); if (serviceBeanMap != null && serviceBeanMap.size() > 0) { for (Object serviceBean : serviceBeanMap.values()) { RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class); String serviceName = rpcService.value().getName(); String serviceVersion = rpcService.version(); if (serviceVersion != null && !serviceVersion.equals("")) { serviceName += "-" + serviceVersion; } serviceMap.put(serviceName, serviceBean); } } } }
消息监听器类RpcMessageListener:
public class RpcMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(RpcMessageListener.class); private QueueSession qSession; /** * 存放 服务名 与 服务对象 之间的映射关系 */ private Map<String, Object> serviceMap = new HashMap<String, Object>(); public RpcMessageListener(QueueSession qSession, Map<String, Object> serviceMap) { this.qSession = qSession; this.serviceMap = serviceMap; } @Override public void onMessage(Message message) { try { LOGGER.info("receiver message : " + message.getJMSMessageID()); RpcResponse response = new RpcResponse(); BytesMessage responeByte = qSession.createBytesMessage(); responeByte.setJMSCorrelationID(message.getJMSMessageID()); QueueSender sender = qSession.createSender((Queue) message.getJMSReplyTo()); try { BytesMessage byteMessage = (BytesMessage) message; byte messByte[] = new byte[(int) byteMessage.getBodyLength()]; byteMessage.readBytes(messByte); RpcRequest rpcRequest = SerializationUtil.deserialize(messByte, RpcRequest.class); response.setRequestId(rpcRequest.getRequestId()); String serviceName = rpcRequest.getInterfaceName(); String serviceVersion = rpcRequest.getServiceVersion(); if (serviceVersion != null && !serviceVersion.equals("")) { serviceName += "-" + serviceVersion; } Object serviceBean = serviceMap.get(serviceName); if (serviceBean == null) { throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName)); } Class<?> serviceClass = serviceBean.getClass(); String methodName = rpcRequest.getMethodName(); Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); Object[] parameters = rpcRequest.getParameters(); Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); Object result = method.invoke(serviceBean, parameters); response.setResult(result); } catch (Exception e) { response.setException(e); LOGGER.error("onMessage error", e); } responeByte.writeBytes(SerializationUtil.serialize(response)); sender.send(responeByte); } catch (Exception e) { LOGGER.error("send message error", e); } } }
测试
1.rpc-jms-test-api:接口模块,被rpc-jms-test-client和rpc-jms-test-server共同使用
IHelloService类:
public interface IHelloService { String hello(String name); }
2.rpc-jms-test-server:服务器端测试模块,依赖rpc-jms-server
jms相关信息配置在jndi.properties中,如下所示:
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager connectionFactoryNames=RPCFactory queue.RPCRequest=jms.RPCRequest queue.RPCResponse=jms.RPCResponse
服务器端主要依赖的spring-server.xml配置文件,主要用于实例化RpcServer
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="zh.rpc.jms.test.server.impl" /> <context:property-placeholder location="classpath:jms.properties" /> <bean id="rpcServer" class="zh.rpc.jms.server.RpcServer"> <constructor-arg name="rpcFactory" value="${rpc.rpc_factory}" /> <constructor-arg name="rpcRequest" value="${rpc.rpc_request}" /> </bean> </beans>
IHelloService的具体实现类:
@RpcService(IHelloService.class) public class HelloServiceImpl implements IHelloService { @Override public String hello(String name) { return "REQ+" + name; } }
3.rpc-jms-test-client:客户端测试模块,依赖rpc-jms-client
客户端同样需要连接消息队列,所以也提供了配置文件jndi.properties;
客户端主要依赖的spring-client.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="classpath:jms.properties" /> <bean id="rpcProxy" class="zh.rpc.jms.client.RpcClient"> <constructor-arg name="rpcFactory" value="${rpc.rpc_factory}" /> <constructor-arg name="rpcRequest" value="${rpc.rpc_request}" /> <constructor-arg name="rpcResponse" value="${rpc.rpc_response}" /> </bean> </beans>
客户端测试类:
public class ClientTest { private static ApplicationContext context; public static void main(String[] args) throws Exception { context = new ClassPathXmlApplicationContext("spring-client.xml"); RpcClient rpcProxy = context.getBean(RpcClient.class); IHelloService helloService = rpcProxy.create(IHelloService.class); String result = helloService.hello("World"); System.out.println(result); System.exit(0); } }
4.测试
首先启动事先准备的activemq,运行bin\win64\activemq.bat即可;
然后启动服务器ServerTest,ready receiver message
最后运行ClientTest,发送消息验证结果,结果如下:
REQ+World
以上只列出了部分较为重要的代码,更多详细的可以参考: http://code.taobao.org/svn/rpc-jms/trunk/rpc-jms
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
疯狂Java讲义
李刚 / 电子工业出版社 / 2012-1-1 / 109.00元
《疯狂Java讲义(附光盘第2版)》是《疯狂Java讲义》的第2版,第2版保持了第1版系统、全面、讲解浅显、细致的特性,全面介绍了新增的Java 7的新特性。 《疯狂Java讲义(附光盘第2版)》深入介绍了Java编程的相关方面,全书内容覆盖了Java的基本语法结构、Java的面向对象特征、Java集合框架体系、Java泛型、异常处理、Java GUI编程、JDBC数据库编程、Java注释、......一起来看看 《疯狂Java讲义》 这本书的介绍吧!