Hystrix的个人理解
Spring cloud 提供的Hystrix为服务的延时超时, 网络延迟导致的服务的请求提供了一个容错的解决方案, 在集群里面服务相互调用的时候,
不会因为一个节点的调用失败而影响到其他服务的业务调用, hystrix 的熔断机制为解决降级,服务熔断,线程和信号隔离,请求缓存,请求合并
以及服务监控提供强大的功能。
hystrix 熔断的使用
pom配置
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>
yml配置如下:
server:
port: 8763 #修改启动服务在不同的端口 8762,8763 两个端口
spring:
application:
name: eurka-client
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
management:
endpoints:
web:
exposure:
include: '*'
cors:
allowed-origins: '*'
allowed-methods: '*'
java代码如下:
//----------------------服务提供的接口
package com.example.demo;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@RestController
@EnableHystrix
@EnableHystrixDashboard
@EnableCircuitBreaker
public class EurekaClientApplication {
/**
* 访问地址 http://localhost:8762/actuator/hystrix.stream
*
* @param args
*/
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
@Value("${server.port}")
String port;
@RequestMapping("/hi")
@HystrixCommand(fallbackMethod = "hiError")
public String home(@RequestParam(value = "name", defaultValue = "ninuxGithub") String name) {
try {
if (port==8763){ //8763 端口的服务睡眠 8762 的补睡眠
TimeUnit.SECONDS.sleep(3);
//加入3秒的睡眠, hystrix 因为调用客户端的api超时而产生短路---
//从而调用hiError 方法
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hi " + name + " i am from port: " + port;
}
public String hiError(String name) {
return "hi," + name + ",sorry,error!";
}
}
ribbon服务端java代码参考:https://ninuxgithub.github.io/2018/12/scpost4/
这个博客配置的是权重的负载均衡的策略
当发现一个请求的消耗的时间长的时候会调整权重
所以在8762 端口的权重会加重 8763因为短路器的调用 发现服务调用延时而权重降低
请求到8763端口的服务会由于超时而导致服务不可以走 处理熔断的方法 (可以理解为一种补偿的方法)
hystrix开启异步调用
client服务端
@RequestMapping("/getMessage")
public Message asyncRequest() {
return new Message(Integer.valueOf(port), "这是一个异步调用的方式,来自 " + port);
}
//ribbon 服务端
//异步方法的调用
@HystrixCommand(fallbackMethod = "getMessageError")
public Future<Message> getMessage() {
return new AsyncResult<Message>() {
@Override
public Message invoke() {
System.out.println(10/0);;
return restTemplate.getForObject("http://EURKA-CLIENT/getMessage", Message.class);
}
};
}
//异常的处理
public Message getMessageError(Throwable throwable) {
return new Message(1, "default message"+ throwable.getMessage());
}
hystrix 请求合并
请求地址:http://localhost:8764/getMessageById?msgId=2 修改msgId , 3s以内发起的请求将合并起来作为整体请求
@RestController
public class HelloControler {
@Autowired
HelloService helloService;
@RequestMapping(value = "/getMessageById")
public Message getMessageById(@RequestParam(value = "msgId") Integer msgId) throws ExecutionException, InterruptedException {
Future<Message> message = helloService.findMessage(msgId);
Message msg = message.get();
return msg;
}
}
@Service
public class HelloService {
@Autowired
RestTemplate restTemplate;
/**
* 请求合并----------------------
* 参考的博客
* //https://blog.csdn.net/zlt995768025/article/details/81663500 demo
* //https://blog.csdn.net/amosryan/article/details/54019479 RestTemplate.getForObject将PO中List的泛型变成LinkedHashMap问题的解决
* ResponseEntity<List<Message>> entity = restTemplate.getForEntity("http://EURKA-CLIENT/getMessages?messageID={1}", List<Message>.class, id);
*/
@HystrixCollapser(batchMethod = "findMessages",
collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "2000"), //延迟请求的毫秒
@HystrixProperty(name = "maxRequestsInBatch",value = "200")//最大合并请求的个数
},
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL )
public Future<Message> findMessage(Integer messageId) {
System.out.println("执行hystrix 请求合并");
throw new RuntimeException("执行异常");
}
@HystrixCommand(fallbackMethod = "findMessageFail")
public List<Message> findMessages(List<Integer> messageIds) {
Set<Integer> disId = new HashSet<>(messageIds);
System.out.println("批量请求" + disId);
String url = "http://EURKA-CLIENT/getMessages?messageID=" + StringUtils.join(messageIds, ",");
ResponseEntity<List<Message>> exchange = restTemplate.exchange(url, HttpMethod.GET, null, new ParameterizedTypeReference<List<Message>>() {});
List<Message> body = exchange.getBody();
//请求的个数和responseList的个数一定要一致 , 否则会报异常
Map<Integer, Message> id2beanMap = body.stream().collect(Collectors.toMap(Message::getId, msg -> msg));
List<Message> responseList = new ArrayList<>();
for(Integer id : messageIds){
responseList.add(id2beanMap.get(id));
}
return responseList;
}
public List<Message> findMessageFail(List<Integer> messageIds) {
List<Message> list = new ArrayList<>();
list.add(new Message(-1,"----请求失败---"));
return list;
}
}
//EURKA-CLIENT 的服务如下
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
static List<Message> list = new ArrayList<>();
static {
list.add(new Message(1, "系统初始化"));
list.add(new Message(2, "初始化BeanFactory"));
list.add(new Message(3, "创建表映射关系"));
list.add(new Message(4, "加载缓存数据到内存"));
list.add(new Message(5, "系统退出销毁创建的资源"));
}
@RequestMapping("/getMessages")
public List<Message> getMessages(@RequestParam("messageID") String messageIDs) {
List<Integer> messageIdList = new ArrayList<>();
Arrays.stream(messageIDs.split(",")).forEach(r -> {
Integer id = Integer.valueOf(r);
if(id<=5 && id >=0){
messageIdList.add(id);
}
});
return list.stream().filter(r -> messageIdList.contains(r.getId())).collect(Collectors.toList());
}
}