TIP:相应的代码在step-8文件夹中(https://github.com/vert-x3/vertx-guide-for-java-devs)
到目前为止,我们已经探讨了Vert.x的多个领域,使用基于APIs的回调方法,这种编程模型在多钟编程语言中可以良好实现。然后,它会变得有的冗长乏味,尤其是处理多个时间源或者多个数据流。
这就是RxJava闪亮的地方,Vert.x可以与之无缝连接。
使用RxJava APIs
除了回调的API,Vert.x模块还提供了一个"Rxified"API,需要我们在maven的pom.xml中加入依赖:
然后我们把Verticles修改一下,需要将原来类中继承的io.vertx.core.AbstractVerticle修改成io.vertx.rxjava.core.AbstractVerticle,这有什么不同呢?前面的类继承了后者并且加入io.vertx.rxjava.core.Vertx属性。
io.vertx.rxjava.core.Vertx定义了额外的rxSomething(…)方法,等同于callback-based的存在。
让我们看看MainVerticle,来找一找在实践中更好的实现:
rxDeploy方法没有把Handler<AsyncResult<String>>作为一个final的参数,并放回一个Single<String>
以及,这个操作并不是在方法被调用执行,在你订阅这个Single执行,当这个操作完成,它返回部署的 id 或者异常抛出的信息。
依次部署verticles
重构MainVerticle,我们需要确定部署操作触发有序:
1.flatMap中执行dbVerticleDeployment方法的结果,HttpServerVerticle部署的任务日常调度。
2.当被订阅的时候执行,成功或者失败,MainVerticle的future会是完成或者失败。
Rxifying" HttpServerVerticle
如果你按序看这个guide,编辑这个代码,那么你的HttpServerVerticle使用的还是基于callback-based的API。在使用RxJava API执行异步的操作之前,你需要先重构HttpServerVerticle
Import RxJava versions of Vert.x classes
1.我们的backupHandler()方法依旧使用HttpResponse类,因此这是必须导入的。RxJava版本中提供的HttpResponse在某些情况下可以被替换,在step-8文件夹中的没有导入这个类,因为通过lambda表达式推到的原因。
委托"Rxified" vertx实例
当你有一个io.vertx.rxjava.core.Vertx的时候可以调用一个io.vertx.core.Vertx实例,需要调整Verticle’s srart()创建WikiDatabaseService实例的方法:
同时执行授权查询
在前面的例子中,我们看到使用RxJava operators 和 Rxified Vert.x API 来依次执行异步操作。但是有时候这是不需要的,或者你只是想让他们同时运行出于性能的原因。
对于这样的情景,HttpServerVerticle中的JWT 贴可能生成程序是个好的例子。为了创建一个token,我们需要权限查询完成,但是查询是独立的:
1.创建了三个Single 对象,代表了不同的权限查询
2.当三个操作完成,zip操作带着results回调。
数据库连接
从连接池中获取一个数据库连接,需要做的事情就是JDBCClient调用rxGetConnection
Single<SQLConnection> connection = dbClient.rxGetConnection();
这个方法返回Single<Connection>,这可以使你容易的sql查询
Single<ResultSet> resultSet = dbClient.rxQueryWithParams( sqlQueries.get(SqlQuery.GET_PAGE_BY_ID), new JsonArray().add(id));
当SQLConnection没有用的时候,我们怎么释放这个连接呢?一个简单方便的方式是调用关闭close方法当
Single<SQLConnection>没有被订阅:
1.当连接被请求的时候,我们把它放入Single对象中
2.当没有被订阅的时候,Single调用close方法
现在当我们需要的额时候就可以使用getConnection来执行sql查询
为callbacks和RxJava之间搭桥
这个时候,你可能混淆了RxJava 代码和callback-based API,例如service proxy 接口定义为callbacks,但是实现使用了Vert.x Rxified API。
在这个例子中,io.vertx.rx.java.RxHelper可以适配Handler<AsyncResult<T>>到RxJava Subscriber<T>:
1.fetchAllPagesData是一个一个异步的service proxy操作,在Handler<AsyncResult<List<JsonObject>>>调用中定义
2.toSubscriber适配resultHandler到Subscriber<List<JsonObject>>
数据流
RxJava不仅仅在组合不同的时间源做的很好,对于数据流也是非常好的,不像Vert.x或者JDK中的fututre,Observable可以发布事件流,而不是单个,它配备了一套广泛的数据操作,我们可以使用其中的把一部分来重构我们的fetchAllPages database verticle 方法:
1.利用flatMapObservable我们可以从Single<Result>发起中创建Observable
2.from方法将数据库results迭代入Observable
3.因为我们只需要网页名称,可以映射的每个JSONObject到第一列
4.客户端期望数据是按字母顺序排列
5.event bus service由一个JsonArray组成,collect用来JsonArray::new创建一个新的,然后添加JsonArray::add
原文链接:http://vertx.io/docs/guide-for-java-devs/
我的微信公众号: