序
本文主要研究一下spring cloud netflix的HystrixCommands。
maven
org.springframework.cloud spring-cloud-starter-netflix-hystrix 2.0.0.RELEASE
这个组件对hystrix进行了封装了,2.0.0.RELEASE全面支持了Reactor的Reactive Streams。
spring-cloud-starter-netflix-hystrix/pom.xml
spring-cloud-starter-netflix-hystrix-2.0.0.RELEASE.jar!/META-INF/maven/org.springframework.cloud/spring-cloud-starter-netflix-hystrix/pom.xml
4.0.0 org.springframework.cloud spring-cloud-starter-netflix 2.0.0.RELEASE spring-cloud-starter-netflix-hystrix Spring Cloud Starter Netflix Hystrix Spring Cloud Starter Netflix Hystrix https://projects.spring.io/spring-cloud Pivotal Software, Inc. https://www.spring.io org.springframework.cloud spring-cloud-starter org.springframework.cloud spring-cloud-netflix-core org.springframework.cloud spring-cloud-netflix-ribbon org.springframework.cloud spring-cloud-starter-netflix-archaius com.netflix.hystrix hystrix-core com.netflix.hystrix hystrix-serialization com.netflix.hystrix hystrix-metrics-event-stream com.netflix.hystrix hystrix-javanica io.reactivex rxjava-reactive-streams
这里要讲的HystrixCommands在spring-cloud-netflix-core这个组件里
HystrixCommands
spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/cloud/netflix/hystrix/HystrixCommands.java
/** * Utility class to wrap a {@see Publisher} in a {@see HystrixObservableCommand}. Good for * use in a Spring WebFlux application. Allows more flexibility than the @HystrixCommand * annotation. * @author Spencer Gibb */public class HystrixCommands { public staticPublisherBuilder from(Publisher publisher) { return new PublisherBuilder<>(publisher); } public static class PublisherBuilder { private final Publisher publisher; private String commandName; private String groupName; private Publisher fallback; private Setter setter; private HystrixCommandProperties.Setter commandProperties; private boolean eager = false; private Function , Observable > toObservable; public PublisherBuilder(Publisher publisher) { this.publisher = publisher; } public PublisherBuilder commandName(String commandName) { this.commandName = commandName; return this; } public PublisherBuilder groupName(String groupName) { this.groupName = groupName; return this; } public PublisherBuilder fallback(Publisher fallback) { this.fallback = fallback; return this; } public PublisherBuilder setter(Setter setter) { this.setter = setter; return this; } public PublisherBuilder commandProperties( HystrixCommandProperties.Setter commandProperties) { this.commandProperties = commandProperties; return this; } public PublisherBuilder commandProperties( Function commandProperties) { if (commandProperties == null) { throw new IllegalArgumentException( "commandProperties must not both be null"); } return this.commandProperties( commandProperties.apply(HystrixCommandProperties.Setter())); } public PublisherBuilder eager() { this.eager = true; return this; } public PublisherBuilder toObservable(Function , Observable > toObservable) { this.toObservable = toObservable; return this; } public Publisher build() { if (!StringUtils.hasText(commandName) && setter == null) { throw new IllegalStateException("commandName and setter can not both be empty"); } Setter setterToUse = getSetter(); PublisherHystrixCommand command = new PublisherHystrixCommand<>(setterToUse, this.publisher, this.fallback); Observable observable = getObservableFunction().apply(command); return RxReactiveStreams.toPublisher(observable); } public Function , Observable > getObservableFunction() { Function , Observable > observableFunc; if (this.toObservable != null) { observableFunc = this.toObservable; } else if (this.eager) { observableFunc = cmd -> cmd.observe(); } else { // apply a default onBackpressureBuffer if not eager observableFunc = cmd -> cmd.toObservable().onBackpressureBuffer(); } return observableFunc; } public Setter getSetter() { Setter setterToUse; if (this.setter != null) { setterToUse = this.setter; } else { String groupNameToUse; if (StringUtils.hasText(this.groupName)) { groupNameToUse = this.groupName; } else { groupNameToUse = commandName + "group"; } HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupNameToUse); HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(this.commandName); HystrixCommandProperties.Setter commandProperties = this.commandProperties != null ? this.commandProperties : HystrixCommandProperties.Setter(); setterToUse = Setter.withGroupKey(groupKey).andCommandKey(commandKey) .andCommandPropertiesDefaults(commandProperties); } return setterToUse; } public Flux toFlux() { return Flux.from(build()); } public Mono toMono() { return Mono.from(build()); } } private static class PublisherHystrixCommand extends HystrixObservableCommand { private Publisher publisher; private Publisher fallback; protected PublisherHystrixCommand(Setter setter, Publisher publisher, Publisher fallback) { super(setter); this.publisher = publisher; this.fallback = fallback; } @Override protected Observable construct() { return RxReactiveStreams.toObservable(publisher); } @Override protected Observable resumeWithFallback() { if (this.fallback != null) { return RxReactiveStreams.toObservable(this.fallback); } return super.resumeWithFallback(); } }}
从类注释可以看到这个类就是为了方便webflux应用使用hystrix而设计的。
实例
@Test public void testHystrixFallback() throws InterruptedException { MonodelayMono = Mono.just("hello") .delayElement(Duration.ofMillis(500)); Mono result = HystrixCommands.from(delayMono) .commandName("demoCmd") .groupName("demoGroup") .eager() .commandProperties(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) .withExecutionTimeoutInMilliseconds(1000) ) .fallback(Mono.just("from fallback")) .toMono(); System.out.println(result.block()); }
- HystrixCommands.from方法可以对Publisher进行hystrix包装
- commandName用于指定hystrix的command名称
- groupName用于指定hystrix的group名称
- eager是默认方式,表示使用的是observe()方法,相当于hot Observable,只能消费从订阅时刻之后的数据,lazy使用的是toObservable()方法,相当于cold Observable,可以消费订阅之前的数据。
- commandProperties用于指定command的属性,比如executionIsolationStrategy、executionTimeoutInMilliseconds
- fallback用于指定fallback的操作
另外配置文件也可以指定默认的参数,比如
hystrix: command: default: execution: isolation: thread: timeoutInMilliseconds : 6000 circuitBreaker: sleepWindowInMilliseconds: 10000 metrics: rollingStats: timeInMilliseconds : 18000
小结
HystrixCommands就是spring cloud对netflix hystrix的包装,以方便webflux里头使用hystrix,就省得再去使用AOP技术了。