Spring Cloud Stream集成RibbitMQ

消息发送端

Spring Cloud为我们在分布式环境下提供了向消息队列推送和拉去消息的方法

消息发送方添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

application.properties

eureka.client.service-url.defaultZone=https://localhost:8080/eureka
spring.application.name=stream-provider
server.port=8081

spring.rabbitmq.host=192.168.159.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

创建一个接口,用来向ribbitmq发消息的

public interface SendService {
    @Output("myInput")
    SubscribableChannel sendMsg();
}

@Output时消息发送方在接口上添加的注解,消息消费方用@Input

myInput时消息通道,消费方要配置成@Input("myInput")才能接收到这个通道的消息

创建controller,请求这个接口就向ribbitmq发一次消息

@RestController
public class SendMsgController {
    @Autowired
    private SendService sendService;

    @RequestMapping("/sendMsg")
    public String sendMsg() {
        Message<byte[]> msg = MessageBuilder.withPayload("Hello RibbitMQ".getBytes()).build();
        sendService.sendMsg().send(msg);
        return "Send Message To RibbitMQ Success";
    }
}

在启动类上添加 @EnableBinding(SendService.class) 注解,指定 SendService 接口

消息消费端

也需要添加 spring-cloud-starter-stream-rabbit 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

application.properties

eureka.client.service-url.defaultZone=https://localhost:8080/eureka
spring.application.name=stream-consumer
server.port=8082

spring.rabbitmq.host=192.168.159.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

在消费者端也需要声明接口,和消息发送端不同的是,这里是 @Input,同样指定 myInput 通道

public interface ReceiveService {
    @Input("myInput")
    SubscribableChannel receiveMsg();
}

再创建个组件,用来监听消息的

@Component
public class ReceiveMsg {
    /**
     * 接收router key为myInput的ribbitmq消息
     */
    @StreamListener("myInput")
    public void receive(byte[] msg) {
        System.out.println(new String(msg));
    }
}

和消息发送端一样,在启动类上添加 @EnableBinding(ReceiveService.class)

测试

访问消息发送端的 /sendMsg 接口,消息消费端控制台就会打印 Hello RibbitMQ


源码:https://codox.coding.net/public/springcloud-learning/springcloud-learning/git/files

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×