张坤的个人博客

  • 首页
  • 分类
  • 标签
  • 日志

  • 搜索
Jenkins RabbitMQ Zookeeper IDEA Logstash Kibana ELK NIO Netty Spring Cloud Golang DataX Elasticsearch React Native Mysql H2 Socket Spring Boot Kafka Mybatis Sqlmap Vue Postgresql Docker Vert.x Flutter Flink Redis

Spring Cloud Stream集成RibbitMQ

发表于 2020-06-13 | 分类于 Spring Cloud | 0 | 阅读次数 31

消息发送端

Spring Cloud为我们在分布式环境下提供了向消息队列推送和拉去消息的方法

消息发送方添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

application.properties

eureka.client.service-url.defaultZone=https://localhost:8080/eureka
spring.application.name=stream-provider
server.port=8081

spring.rabbitmq.host=192.168.159.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

创建一个接口,用来向ribbitmq发消息的

public interface SendService {
    @Output("myInput")
    SubscribableChannel sendMsg();
}

@Output时消息发送方在接口上添加的注解,消息消费方用@Input

myInput时消息通道,消费方要配置成@Input("myInput")才能接收到这个通道的消息

创建controller,请求这个接口就向ribbitmq发一次消息

@RestController
public class SendMsgController {
    @Autowired
    private SendService sendService;

    @RequestMapping("/sendMsg")
    public String sendMsg() {
        Message<byte[]> msg = MessageBuilder.withPayload("Hello RibbitMQ".getBytes()).build();
        sendService.sendMsg().send(msg);
        return "Send Message To RibbitMQ Success";
    }
}

在启动类上添加 @EnableBinding(SendService.class) 注解,指定 SendService 接口

消息消费端

也需要添加 spring-cloud-starter-stream-rabbit 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

application.properties

eureka.client.service-url.defaultZone=https://localhost:8080/eureka
spring.application.name=stream-consumer
server.port=8082

spring.rabbitmq.host=192.168.159.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

在消费者端也需要声明接口,和消息发送端不同的是,这里是 @Input,同样指定 myInput 通道

public interface ReceiveService {
    @Input("myInput")
    SubscribableChannel receiveMsg();
}

再创建个组件,用来监听消息的

@Component
public class ReceiveMsg {
    /**
     * 接收router key为myInput的ribbitmq消息
     */
    @StreamListener("myInput")
    public void receive(byte[] msg) {
        System.out.println(new String(msg));
    }
}

和消息发送端一样,在启动类上添加 @EnableBinding(ReceiveService.class)

测试

访问消息发送端的 /sendMsg 接口,消息消费端控制台就会打印 Hello RibbitMQ


源码:https://codox.coding.net/public/springcloud-learning/springcloud-learning/git/files

# Jenkins # RabbitMQ # Zookeeper # IDEA # Logstash # Kibana # ELK # NIO # Netty # Spring Cloud # Golang # DataX # Elasticsearch # React Native # Mysql # H2 # Socket # Spring Boot # Kafka # Mybatis # Sqlmap # Vue # Postgresql # Docker # Vert.x # Flutter # Flink # Redis
Spring Cloud集成Zuul
Spring Cloud服务跟踪
  • 文章目录
  • 站点概览
会Coding的猴子

会Coding的猴子

57 日志
19 分类
28 标签
RSS
Github
Creative Commons
© 2021 会Coding的猴子
由 Halo 强力驱动
|
主题 - NexT.Gemini v5.1.4

湘ICP备18011740号