springcloud(一)——spring-cloud-alibaba集成rocketmq

栏目: Java · 发布时间: 5年前

内容简介:在之前的工作中,微服务框架使用的是springcloud,消息中间件使用的rocketmq,这段时间看到阿里出了spring cloud alibaba集成了rocketmq,出于好奇,写了个demo第一步:下载:第二步:解压

在之前的工作中,微服务框架使用的是springcloud,消息中间件使用的rocketmq,这段时间看到阿里出了spring cloud alibaba集成了rocketmq,出于好奇,写了个demo

一些概念

  • 官方对 Spring Cloud Stream 的一段介绍:Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。基于 SpringBoot 创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
  • Binder :Components responsible to provide integration with the external messaging systems.【与外部消息中间件进行集成】
  • Binding:Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).【在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,开发者只需使用应用程序的 生产者或消费者生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。】
  • Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).【生产者和消费者用于与目标绑定器通信的规范数据结构。】

快速在本地启动rocketmq

第一步:下载: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip

第二步:解压

第三步:修改三个配置文件:runbroker.sh,runserver.sh,tools.sh,将其中JAVA_HOME改成自己电脑的环境配置,修改完如下

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=自己的地址
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

第四步:依次执行命令

./mqnamesrv
./mqbroker -n localhost:9876
./mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic

如果启动成功,没有报错,代表启动成功哈,下面就可以开发了

开发demo

第一步:导入相关的pom

<dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>0.2.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
 </dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 <!-- 为了Endpoint 信息查看 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>3.2.6</version>
</dependency>

第二步:建一个springboot项目,启动类如下:

@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

第三步:创建provider

@Service
public class RocketmqProducer {
    public void send(String message) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("test_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message msg = new Message("test-topic", "test-tag", message.getBytes());
        producer.send(msg);
    }
}

第四步:创建consumer

@Service
public class ReceiveService {

    /**
     * 默认是input,在Sink类中指定,如果想要多个input,需要写一个实现Sink的类
     * @param receiveMsg
     */
    @StreamListener("input")
    public void receiveInput1(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }

}

第五步:加入配置文件:

server.port=8087
spring.application.name=spring-cloud-alibaba-rocketmq-demo

# 配置rocketmq的nameserver地址
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
# 定义name为output的binding
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
#定义name为input的binding
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group

management.endpoint.health.show-details=always

第六步:写一个controller,启动项目,访问接口

@RestController
@RequestMapping(value = "/api/demo/test")
public class TestController {

    @Autowired
    RocketmqProducer rocketmqProducer;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public String send() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        rocketmqProducer.send("test rocketmq message");
        return "success";
    }
}

会看到控制台输出:input receive: test rocketmq message


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算群体智能基础

计算群体智能基础

恩格尔伯里特 / 谭营 / 2009-10 / 69.00元

《计算群体智能基础》全面系统地介绍了计算群体智能中的粒子群优化(PSO)和蚁群优化(ACO)的基本概念、基本模型、理论分析及其应用。在简要介绍基本优化理论和总结各类优化问题之后,重点介绍了社会网络结构如何在个体间交换信息以及个体聚集行为如何形成一个功能强大的有机体。在概述了进化计算后,重点论述了粒子群优化和蚁群优化的基本模型及其各种变体,给出了分析粒子群优化模型的一种通用方法,证明了基于蚂蚁行为实......一起来看看 《计算群体智能基础》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具