程序员社区

Spring Cloud Stream 简单使用

Hello,欢迎来到程序员社区。 今天聊一聊 Spring Cloud Stream 简单使用,希望对大家有所帮助。

Java面试手册PDF下载:http://117.78.51.75/219-2

文章目录

    • Spring Cloud Stream 简单使用
      • 开启绑定功能
      • 绑定消息通道
      • 注入绑定接口
      • 注入消息通道
      • 消息生产与消费
        • Spring Integration原生支持
      • 消息反馈
    • 参考

Spring Cloud Stream 简单使用

开启绑定功能

在Spring Cloud Stream中,我们需要通过@EnableBinding注解来为应用启动消息驱动的功能,该注解我们在快速入门中已经有了基本的介绍,下面来详细看看它的定义:

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@Import({BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class})
@EnableIntegration
public @interface EnableBinding {
    Class?>[] value() default {};
}

从该注解的定义中我们可以看到,它自身包含了@configuration注解,所以用它注解的类也会成为Spring的基本配置类。另外该注解还通过@Import加载了Spring Cloud Stream运行需要的几个基础配置类。

@EnableBinding 注解只有一个唯一的属性:value。由于该注解@ImportBindingBeansRegistrar实现,所以在加载了基础配置内容之后,它会回调来读取value中的类,以创建消息通道的绑定。另外,由于value是一个Class类型的数组,所以我们可以通过value属性一次性指定多个关于消息通道的配置。

绑定消息通道

在Spring Cloud Steam中,我们可以在接口中通过@Input@Output注解来定义消息通道,而用于定义绑定消息通道的接口则可以被@EnableBinding 注解的value参数来指定,从而在应用启动的时候实现对定义消息通道的绑定。

Sink接口是Spring cloud Steam 提供的一个默认实现,除此之外还有SourceProcessor,可从它们的源码中学习它们的定义方式:

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

public interface Processor extends Source, Sink {
}

从上面的源码中,我们可以看到,Sink和Source中分别通过@Input@Output注解定义了输入通道和输出通道,而Processor通过继承Source和sink的方式同时定义了一个输入通道和一个输出通道。

另外,@Input@Output注解都还有一个value属性,该属性可以用来设置消息通道的名称,这里Sink和Source中指定的消息通道名称分别为input和output。如果我们直接使用这两个注解而没有指定具体的value值,将默认使用方法名作为消息通道的名称。

最后,需要注意一点,当我们定义输出通道的时候,需要返回Messagechannel接口对象,该接口定义了向消息通道发送消息的方法;而定义输入通道时,需要返回SubscribableChannel接口对象,该接口继承自MessageChannel接口,它定义了维护消息通道订阅者的方法。

注入绑定接口

在完成了消息通道绑定的定义之后,Spring Cloud Stream会为其创建具体的实例,而开发者只需要通过注入的方式来获取这些实例并直接使用即可,下面可以通过注入的方式实现一个消息生成者,向input消息通道发送数据。

  • 创建一个将Input消息通道作为输出通道的接口,具体如下:
package com.example.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.MessageChannel;

public interface SinkSender {

    @Output(Sink.INPUT)
    MessageChannel output();
}

  • @EnableBinding注解中增加对SinkSender接口的指定,使Spring Cloud Stream能创建出对应的实例。
package com.example.stream;

import org.slf4j.LoJava面试手册gger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding({Sink.class,SinkSender.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info("Received: {}", payload);
    }
}

  • 创建一个单元测试类,通过@Autowired注解注入SinkSender的实例,并在测试用例中调用它的发送消息方法。
package com.example.stream;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootTest
class SinkSenderTest {

    @Autowired
    private SinkSender sinkSender;


    @Test
    public void contextLoads(){
        sinkSender.output().send(MessageBuilder.withPayload("From SinkSender").build());
    }
}
  • 运行该单元测试用例,如果可以在控制台中找到如下输出内容,表明我们的试验已经成功了,消息被正确地发送到了input通道中,并被相对应的消息消费编程电子书汇总者输出。
2020-07-28 09:37:32.007  INFO 17224 --- [           main] com.example.stream.SinkReceiver          : Received: From SinkSender

源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo01/

注入消息通道

由于Spring Cloud Stream会根据绑定接口中的@Input@Output注解来创建消息通道实例,所以我们也可以通过直接注入的方式来使用消息通道对象。比如,我们可以通过下面的示例,注入上面例子中SinkSender接口中定义的名为input的消息输入通道。

package com.example.stream;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootTest
class SinkSenderTest {

    @Autowired
    private MessageChannel input;


    @Test
    public void contextLoads(){
        input.send(MessageBuilder.withPayload("From SinkSender").build());
    }
}

源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo2/

上面定义的内容,完成了与之前通过注入绑定接口sinksender方式实现的测试用例相同的操作。因为在通过注入绑定接口实现时, sinkSender.output()方法实际获得的就是Sinksender接口中定义的Messagechannel实例,只是在这里我们直接通过注入的方式来实现了而已。这种用法虽然很直接,但是也容易犯错,很多时候我们在一个微服务应用中可能会创建多个不同名的Messagechannel实例,这样通过@Autowired注入时,要注意参数命名需要与通道同名才能被正确注入,或者也可以使用@Qualifier注解来特别指定具体实例的名称,该名称需要与定义Messagechannel@Output中的value参数一致,这样才能被正确注入。比如下面的例子,在一个接口中定义了两个输出通道,分别命名为Output-1Output-2,当要使用Output-1的时候,可以通过
@Qualifier("Output-1")来指定这个具体的实例来注入使用。


import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    String OUTPUT_1 = "Output-1";
    String OUTPUT_2 = "Output-2";

    @Output(MySource.OUTPUT_1)
    MessageChannel output1();


    @Output(MySource.OUTPUT_2)
    MessageChannel output2();
}

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootTest
class StreamAppTest {

    @Qualifier("Output-1")
    @Autowired
    private MessageChannel output1;

    @Qualifier("Output-2")
    @Autowired
    private MessageChannel output2;


    @Test
    public void contextLoads(){
        output1.send(MessageBuilder.withPayload("From Output-1").build());
        output2.send(MessageBuilder.withPayload("From Output-2").build());
    }
}

源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo3/

消息生产与消费

由于Spring Cloud Stream是基于Spring Integration构建起来的,所以在使用Spring Cloud Stream构建消息驱动服务的时候,完全可以使用Spring Integration的原生注解来实现各种业务需求。同时,为了简化面向消息的编程模型,Spring Cloud Stream还提供了@StreamListener注解对输入通道的处理做了进一步优化。下面我们分别从这两方面来学习一下对消息的处理。

Spring Integration原生支持

通过之前的内容,我们已经能够通过注入绑定接口和消息通道的方式实现向名为input的消息通道发送信息。接下来,我们通过Spring Integration 原生的@ServiceActivator@InboundChannelAdapter注解来尝试实现相同的功能

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding({Sink.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info("Received: {}", payload);
    }
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

import java.util.Date;

@EnableBinding(value = {SinkSender.SinkOutPut.class})
public class SinkSender {

    private static final Logger LOGGER = LoggerFactory.getLogger(SinkSender.class);

    @Bean
    @InboundChannelAdapter(value = SinkOutPut.OUTPUT, poller = @Poller(fixedDelay = "2000"))
    public MessageSourceDate> timerMessageSource() {
        return () -> new GenericMessage>(new Date());
    }

    public interface SinkOutPut {

        String OUTPUT = Sink.INPUT;

        @Output(SinkOutPut.OUTPUT)
        MessageChannel output();
    }
}

  • SinkReceiver类属于消息消费者实现,与之前实现的类似,只是做了一些修改:
    使用原生的@ServiceActivator 注解替换了@StreamListener,实现对Sink.INPUT通道的监听处理,而该通道绑定了名为input的主题。
  • SinkSender类属于消息生产者实现,它在内部定义了SinkOutPut接口来将输出通道绑定到名为input的主题中。由于SinkSenderSinkReceiver共用一个主题,所以它们构成了一组生产者与消费者。另外,在SinkSender中还创建了用于生产消息的timerMessageSource方法,该方法会将当前时间作为消息返回。而 @InboundChannelAdapter注解定义了该方法是对SinkOutPut.OUTPUT通道的输出绑定,同时使用poller参数将该方法设置为轮询执行,这里我们定义为2000毫秒,所以它会以2秒的频率向SinkOutPut.OUTPUT通道输出当前时间。

Spring Cloud Stream 简单使用插图

源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo4/

另外,还可以通过@Transformer注解对指定通道的消息进行转换。

    @Transformer(inputChannel = MySink.INPUT_1, outputChannel = MySource.OUTPUT_2)
    public Object transformer(Date message) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);
    }

Spring Cloud Stream 简单使用插图1

源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo5/

注意:如果使用@StreamListener注释从同一channel中消费时,将使用pub-sub模型。带有注释的每种方法均@StreamListener接收其自己的消息副本,并且每种方法都有其自己的消费组。但是,但是如果使用Spring Integration的注解,比如@Transformer@ServiceActivator,这些属于消费竞争模型。没有为每个订阅创建单独的消费者组。

演示代码如下:

  • 使用@StreamListener
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding({Sink.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive1(Object payload) {
        LOGGER.info("Received1: {}", payload);
    }

    @StreamListener(Sink.INPUT)
    public void receive2(Object payload) {
        LJava面试手册OGGER.info("Received2: {}", payload);
    }
}

Spring Cloud Stream 简单使用插图2

  • 使用@ServiceActivator
import org.slf4j.Logg编程电子书汇总er;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;

@EnableBinding({Sink.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);


    @ServiceActivator(inputChannel = Sink.INPUT)
    public void receive1(Object payload) {
        LOGGER.info("Received1: {}", payload);
    }

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void receive2(Object payload) {
        LOGGER.info("Received2: {}", payload);
    }
}

Spring Cloud Stream 简单使用插图3

源代码参考: https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo6/

消息反馈

很多时候在处理完输入消息之后,需要反馈一个消息给对方,这时候可以通@SendTo注解来指定返回内容的输出通道。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;

import java.text.SimpleDateFormat;
import java.util.Date;

@EnableBinding({MySink.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);


    @StreamListener(MySink.INPUT_1)
    @SendTo(MySource.OUTPUT_2)
    public Object transformer(Date message) {
        LOGGER.info("Received from input1: {}", message);
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);
    }

    @StreamListener(MySink.INPUT_2)
    public void receive2(Object payload) {
        LOGGE编程电子书汇总R.info("Received from input2: {}", payload);
    }
}

Spring Cloud Stream 简单使用插图4

源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo8/

参考

Spring Cloud微服务实战.pdf

Spring Cloud Stream知识点盘点

https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#spring-cloud-stream-reference

时间不一定能证明很多东西,但是一定能看透很多东西。坚信自己的选择,不动摇,使劲跑,明天会更好。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Spring Cloud Stream 简单使用

一个分享Java & Python知识的社区