SpringCloud Stream 使用

介绍stream

spring cloud stream 是一个用微服务应用构建消息驱动动力的框架。
整合了spring boot + spring integration 来连接消息代理中间件以实现消息事件驱动。
核心的概念:  发布-订阅  , 消费组, 分区三个核心的概念。

具体使用介绍

引入pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>


    <artifactId>spring-cloud-stream-simple</artifactId>
    <groupId>com.ninuGithub</groupId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-cloud-stream-simple</name>
    <description>Demo project for Spring Boot</description>


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

核心的配置yml:
server:
  port: 8190

#eureka:
#  client:
#    serviceUrl:
#      defaultZone: http://localhost:8761/eureka/

spring:
  application:
      name: stream-simple
  cloud:
    stream:
      bindings:
        
        #input 发送topic
        input:
          group: inputGroup
          destination: queue.messages
          binder: defaultRabbit
          content-type: application/json
          consumer:
            max-attempts: 1 # 自动重试
        output:
          destination: queue.messages
          binder: defaultRabbit
          content-type: application/json
        
        #sink 发送的topic
        sinkInput:
          group: sinkGroup
          destination: sink.messages
          binder: defaultRabbit
          content-type: application/json
          consumer:
            max-attempts: 1
        sinkOutput:
          destination: sink.messages
          binder: defaultRabbit
          content-type: application/json
      rabbit:
        bindings:
          input:
            consumer:
              auto-bind-dlq: true
          sinkInput:
            consumer:
              requeue-rejected: true #开启重入队列 ⑤
              #auto-bind-dlq: true  #放入死信队列  ④
      
      #定义一个binder 使用rabbitmq 作为中间件
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                username: admin
                password: admin
                virtual-host: /
                host: 10.1.51.96
                port: 5672

#端点开启
management:
  endpoints:
    web:
      exposure:
        include: "*"

核心的代码如下:
1. 自己消费自己生产的消息
       定义sinkInput, sinkOutput  对应的是同一个队列 :  sink.messages  定义一个group : sinkGroup(处理降级消费)  
       采用默认的binder 我们自己定义的defaultRabbit 作为中间件

2. 自动重试

3. 消费失败自定义错误处理 参考③
 
4. 消费失败进入死信队列 , 然后对死信队列的消息进行特定的消费  参考④
    死信队列的描述请参考:https://www.jianshu.com/p/9211d1136341

5.重入队列: 异常发生是没有消费的消息重新入队列, 如果没有消费一直报异常,只有等到改消息被消费  参考⑤
@SpringBootApplication
public class StreamSimpleApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamSimpleApplication.class, args);
    }
}

@RestController
@EnableBinding(value = {SinkSender.class})
public class TestController {


    @Autowired
    private SinkSender sinkSender;

    @RequestMapping(value = "/sendMessage")
    public String sendMessage(@RequestParam String message) {
        sinkSender.output().send(MessageBuilder.withPayload(new Message(message, new Date())).build());
        return "ok";
    }
}

@Component
public interface Sink {

    String INPUT = "sinkInput";

    @Input(Sink.INPUT)
    SubscribableChannel input();
}

@Component
@EnableBinding(Sink.class)
public class SinkReceiver {


    Map<String,Integer> diffMap = new HashMap<>();

    @StreamListener(Sink.INPUT)
    public void receive(Message message) {
        System.out.println("SinkReciver: " + message);

        String key = message.getMsg();
        Integer c = diffMap.get(key);
        if(c==null){
            c = 0;
        }else{
            c +=1;
        }
        //http://localhost:8190/sendMessage?message=java
        if(message.getMsg().equals("java")){
            c ++;
            diffMap.put(key, c);
            if(c <5){
                throw new RuntimeException("11111111111111消息消费失败!");
            }
        }
        diffMap.put(key, c);
    }

    /**
    *       ③   
    *       
     * 降级消费失败
     *
     * @param message
     */
    /*@ServiceActivator(inputChannel = "sink.messages.sinkGroup.errors")
    public void error(Message message) {
        System.out.println("11111111111111111sink 降级消费 异常处理");
    }
*/
}

@Component
public interface SinkSender {

    String OUTPUT = "sinkOutput";

    @Output(SinkSender.OUTPUT)
    MessageChannel output();
}

public class Message implements Serializable {

    private String msg;

    private Date time;

    public Message() {
    }

    public Message(String msg, Date time) {
        this.msg = msg;
        this.time = time;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Message{" +
                "msg='" + msg + '\'' +
                ", time=" + time +
                '}';
    }
}

reference

https://blog.csdn.net/u014677702/article/details/85886522
http://blog.didispace.com/spring-cloud-starter-finchley-7-1/
https://blog.csdn.net/mameng1988/article/details/83900293
https://www.jianshu.com/p/b2c3ae8a9ca7
https://blog.csdn.net/peterwanghao/article/details/80304382