内容简介:在之前的工作中,微服务框架使用的是springcloud,消息中间件使用的rocketmq,这段时间看到阿里出了spring cloud alibaba集成了rocketmq,出于好奇,写了个demo第一步:下载:第二步:解压
在之前的工作中,微服务框架使用的是springcloud,消息中间件使用的rocketmq,这段时间看到阿里出了spring cloud alibaba集成了rocketmq,出于好奇,写了个demo
一些概念
- 官方对 Spring Cloud Stream 的一段介绍:Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。基于 SpringBoot 创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
- Binder :Components responsible to provide integration with the external messaging systems.【与外部消息中间件进行集成】
- Binding:Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).【在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,开发者只需使用应用程序的 生产者或消费者生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。】
- Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).【生产者和消费者用于与目标绑定器通信的规范数据结构。】
快速在本地启动rocketmq
第一步:下载: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
第二步:解压
第三步:修改三个配置文件:runbroker.sh,runserver.sh,tools.sh,将其中JAVA_HOME改成自己电脑的环境配置,修改完如下
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=自己的地址 #[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
第四步:依次执行命令
./mqnamesrv ./mqbroker -n localhost:9876 ./mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
如果启动成功,没有报错,代表启动成功哈,下面就可以开发了
开发demo
第一步:导入相关的pom
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>0.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 为了Endpoint 信息查看 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> <version>3.2.6</version> </dependency>
第二步:建一个springboot项目,启动类如下:
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
第三步:创建provider
@Service public class RocketmqProducer { public void send(String message) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("test_producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("test-topic", "test-tag", message.getBytes()); producer.send(msg); } }
第四步:创建consumer
@Service public class ReceiveService { /** * 默认是input,在Sink类中指定,如果想要多个input,需要写一个实现Sink的类 * @param receiveMsg */ @StreamListener("input") public void receiveInput1(String receiveMsg) { System.out.println("input receive: " + receiveMsg); } }
第五步:加入配置文件:
server.port=8087 spring.application.name=spring-cloud-alibaba-rocketmq-demo # 配置rocketmq的nameserver地址 spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 # 定义name为output的binding spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=application/json #定义name为input的binding spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=test-group management.endpoint.health.show-details=always
第六步:写一个controller,启动项目,访问接口
@RestController @RequestMapping(value = "/api/demo/test") public class TestController { @Autowired RocketmqProducer rocketmqProducer; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { rocketmqProducer.send("test rocketmq message"); return "success"; } }
会看到控制台输出:input receive: test rocketmq message
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 持续集成:数据库集成及快速构建
- ShareSDK集成及集成后遇到的一些问题【原创】
- 持续集成与持续部署宝典Part 3:创建集成环境
- 持续集成与持续部署宝典Part 2:创建持续集成流水线
- 禅道 12.3.stable 版本发布,全面集成八种单元测试框架,打通持续集成闭环
- 持续集成将死
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Probability and Computing
Michael Mitzenmacher、Eli Upfal / Cambridge University Press / 2005-01-31 / USD 66.00
Assuming only an elementary background in discrete mathematics, this textbook is an excellent introduction to the probabilistic techniques and paradigms used in the development of probabilistic algori......一起来看看 《Probability and Computing》 这本书的介绍吧!