Hystrix使用

Hystrix的个人理解

Spring cloud 提供的Hystrix为服务的延时超时, 网络延迟导致的服务的请求提供了一个容错的解决方案, 在集群里面服务相互调用的时候,
不会因为一个节点的调用失败而影响到其他服务的业务调用, hystrix 的熔断机制为解决降级,服务熔断,线程和信号隔离,请求缓存,请求合并
以及服务监控提供强大的功能。

hystrix 熔断的使用

pom配置
<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
    </dependency>
</dependencies>
yml配置如下:
server:
  port: 8763 #修改启动服务在不同的端口  8762,8763 两个端口
spring:
  application:
    name: eurka-client
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
management:
  endpoints:
    web:
      exposure:
        include: '*'
      cors:
        allowed-origins: '*'
        allowed-methods: '*'

java代码如下:


//----------------------服务提供的接口


package com.example.demo;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@RestController
@EnableHystrix
@EnableHystrixDashboard
@EnableCircuitBreaker
public class EurekaClientApplication {

    /**
     * 访问地址 http://localhost:8762/actuator/hystrix.stream
     *
     * @param args
     */

    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class, args);
    }


    @Value("${server.port}")
    String port;

    @RequestMapping("/hi")
    @HystrixCommand(fallbackMethod = "hiError")
    public String home(@RequestParam(value = "name", defaultValue = "ninuxGithub") String name) {
        try {
            
            if (port==8763){  //8763 端口的服务睡眠  8762 的补睡眠
                
                TimeUnit.SECONDS.sleep(3);
                //加入3秒的睡眠, hystrix 因为调用客户端的api超时而产生短路---
                //从而调用hiError 方法
            }
            
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hi " + name +  " i am from port: " + port;
    }

    public String hiError(String name) {
        return "hi," + name + ",sorry,error!";
    }
    

}


ribbon服务端java代码参考:https://ninuxgithub.github.io/2018/12/scpost4/
这个博客配置的是权重的负载均衡的策略
当发现一个请求的消耗的时间长的时候会调整权重  



所以在8762  端口的权重会加重  8763因为短路器的调用  发现服务调用延时而权重降低
请求到8763端口的服务会由于超时而导致服务不可以走  处理熔断的方法  (可以理解为一种补偿的方法)

hystrix开启异步调用

client服务端
@RequestMapping("/getMessage")
public Message asyncRequest() {
    return new Message(Integer.valueOf(port), "这是一个异步调用的方式,来自 " + port);
}
    



//ribbon 服务端
 //异步方法的调用
 
 
    @HystrixCommand(fallbackMethod = "getMessageError")
    public Future<Message> getMessage() {
        return new AsyncResult<Message>() {
            @Override
            public Message invoke() {
                System.out.println(10/0);;
                return restTemplate.getForObject("http://EURKA-CLIENT/getMessage", Message.class);
            }
        };
    }

    //异常的处理
    public Message getMessageError(Throwable throwable) {
        return new Message(1, "default message"+ throwable.getMessage());
    }




hystrix 请求合并

请求地址:http://localhost:8764/getMessageById?msgId=2  修改msgId , 3s以内发起的请求将合并起来作为整体请求

@RestController
public class HelloControler {

    @Autowired
    HelloService helloService;

    @RequestMapping(value = "/getMessageById")
    public Message getMessageById(@RequestParam(value = "msgId") Integer msgId) throws ExecutionException, InterruptedException {
        Future<Message> message = helloService.findMessage(msgId);
        Message msg = message.get();
        return msg;
    }

}



@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;



    /**
     * 请求合并----------------------
     * 参考的博客
     * //https://blog.csdn.net/zlt995768025/article/details/81663500  demo
     * //https://blog.csdn.net/amosryan/article/details/54019479    RestTemplate.getForObject将PO中List的泛型变成LinkedHashMap问题的解决
     *  ResponseEntity<List<Message>> entity = restTemplate.getForEntity("http://EURKA-CLIENT/getMessages?messageID={1}", List<Message>.class, id);
     */
    @HystrixCollapser(batchMethod = "findMessages",
            collapserProperties = {
                    @HystrixProperty(name = "timerDelayInMilliseconds", value = "2000"), //延迟请求的毫秒
                    @HystrixProperty(name = "maxRequestsInBatch",value = "200")//最大合并请求的个数
            },
            scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL )
    public Future<Message> findMessage(Integer messageId) {
        System.out.println("执行hystrix 请求合并");
        throw new RuntimeException("执行异常");
    }

    @HystrixCommand(fallbackMethod = "findMessageFail")
    public List<Message> findMessages(List<Integer> messageIds) {
        Set<Integer> disId = new HashSet<>(messageIds);
        System.out.println("批量请求" + disId);
        String url = "http://EURKA-CLIENT/getMessages?messageID=" + StringUtils.join(messageIds, ",");
        ResponseEntity<List<Message>> exchange = restTemplate.exchange(url, HttpMethod.GET, null, new ParameterizedTypeReference<List<Message>>() {});
        List<Message> body = exchange.getBody();

        //请求的个数和responseList的个数一定要一致 , 否则会报异常
        Map<Integer, Message> id2beanMap = body.stream().collect(Collectors.toMap(Message::getId, msg -> msg));
        List<Message> responseList = new ArrayList<>();
        for(Integer id : messageIds){
            responseList.add(id2beanMap.get(id));
        }
        return responseList;
    }

    public List<Message> findMessageFail(List<Integer> messageIds) {
        List<Message> list = new ArrayList<>();
        list.add(new Message(-1,"----请求失败---"));
        return list;
    }
}





//EURKA-CLIENT  的服务如下
public class EurekaClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class, args);
    }


    static List<Message> list = new ArrayList<>();

    static {
        list.add(new Message(1, "系统初始化"));
        list.add(new Message(2, "初始化BeanFactory"));
        list.add(new Message(3, "创建表映射关系"));
        list.add(new Message(4, "加载缓存数据到内存"));
        list.add(new Message(5, "系统退出销毁创建的资源"));
    }

    @RequestMapping("/getMessages")
    public List<Message> getMessages(@RequestParam("messageID") String messageIDs) {
        List<Integer> messageIdList = new ArrayList<>();
        Arrays.stream(messageIDs.split(",")).forEach(r -> {
            Integer id = Integer.valueOf(r);
            if(id<=5 && id >=0){
                messageIdList.add(id);
            }
        });
        return list.stream().filter(r -> messageIdList.contains(r.getId())).collect(Collectors.toList());
    }

}