目录
- “Hello World!”
- 同步执行
- 异步执行
- 响应执行
“Hello World!”
下面是一个通过实现接口 HystrixCommand
的一个Hello World 示例:
public class HystrixHelloWorld extends HystrixCommand<String> { private final String name; public HystrixHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { return "Hello " + name + "!"; } }
点击查看详细源码
HystrixObservableCommand
等价于 HystrixCommand
一个等效的Hello World解决方案,使用 HystrixObservableCommand
代替 HystrixCommand
,通过覆盖 construct
方法,如下所示:
public class HystrixObservableHelloWorld extends HystrixObservableCommand<String> { private final String name; public HystrixObservableHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable<String> construct() { return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
同步执行
通过 execute()
方法同步调用 HystrixCommand
的实现,示例如下:
String s = new HystrixHelloWorld("World").execute();
单元测试如下:
@Test public void testSynchronous() { //测试同步 assertEquals("Hello World!", new HystrixHelloWorld("World").execute()); assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").execute()); }
关于实现了HystrixObservableCommand
的方法示例如下:
关于HystrixObservableCommand
的实现没有简单的 execute
方法调用,如果清楚通过一个命令产生的 Observable
必定仅仅产生一个单一的值,则可以对 Observable
应用RXjava的操作 .toBlocking().toFuture().get()
模拟 execute
方法。
异步执行
我们可以通过使用 queue()
方法异步执行 HystrixCommand
,示例如下:
Future<String> fWorld = new HystrixHelloWorld("World").queue();
我们可以通过 Future获取到命令的结果集
String fw=fWorld.get();
通过单元测试模拟操作如下:
@Test public void testAsynchronous1() throws Exception { //测试异步 assertEquals("Hello World!", new HystrixHelloWorld("World").queue().get()); assertEquals("Hello Bob!", new HystrixHelloWorld("Bob").queue().get()); } @Test public void testAsynchronous2() throws Exception { //测试异步 Future<String> fWorld = new HystrixHelloWorld("World").queue(); Future<String> fBob = new HystrixHelloWorld("Bob").queue(); assertEquals("Hello World!", fWorld.get()); assertEquals("Hello Bob!", fBob.get()); }
下面的操作是等价的:
String s1 = new HystrixHelloWorld("World").execute(); String s2 = new HystrixHelloWorld("World").queue().get();
关于实现了HystrixObservableCommand
的方法示例如下:
HystrixObservableCommand
没有 queue
这种简单实现异步的方法 ,如果清楚通过一个命令产生的 Observable
必定仅仅产生一个单一的值,则可以对 Observable
应用RxJava操作 .toBlocking().toFuture()
模拟 queue
方法。
响应执行
可以通过一下任意方法监听 HystrixCommand
的结果:
observe()
— 执行这个命令会返回一个热 Observable立刻执行hystrix的命令 ,因为这个 Observable 通过 ReplaySubject
过滤,咱们不会有丢失订阅之前的任何东西的危险。 toObservable()
— 执行这个命令会返回一个“冷“ Observable,直到订阅 Observable 才会开始执行命令和发送结果 。
Observable<String> fWorld = new HystrixHelloWorld("World").observe();
执行完上面的代码,我们可以通过订阅 Observable 获取到它的值
fWorld.subscribe(new Action1<String>() { @Override public void call(String s) { // value emitted here } });
下面的单元测试示例如下:
@Test public void testObservable() throws Exception { Observable<String> fWorld = new HystrixHelloWorld("World").observe(); Observable<String> fBob = new HystrixHelloWorld("Bob").observe(); // blocking assertEquals("Hello World!", fWorld.toBlocking().single()); assertEquals("Hello Bob!", fBob.toBlocking().single()); fWorld.subscribe(new Observer<String>() { @Override public void onCompleted() { // 这里可以什么都不做 } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(String v) { System.out.println("onNext: " + v); } }); fBob.subscribe(new Action1<String>() { @Override public void call(String v) { System.out.println("onNext: " + v); } }); }
使用java8的 lambda 表达式,示例如下:
fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }) // - or while also including error handling fWorld.subscribe((v) -> { System.out.println("onNext: " + v); }, (exception) -> { exception.printStackTrace(); })
点击链接了解更多关于RXjava中 的 Observable