说到java,就不得不说起多线程的使用,在 java8 的时代,可以使用多线程的方式已经非常多了,这里主要记录一下 ListenableFurure 的使用。
ListenableFurure 是guava库中的一员,主要是扩展原生的Future没有回调函数的诟病,通过ListenableFurure ,可以轻松的为异步方法添加回调、统一处理、链式处理等。
Listenablefuture vs Completablefuture
说道Future,就要提一下Completablefuture,这是java8中提供的一种新类型,他用与Listenablefuture 类似的方式扩展了基础的Future,那么他们有什么异同呢?
首先,和原始的Future比起来,这两者都具有一个优势。即允许给异步一个操作注册一个回调方法,这个方法将会在异步方法完成之后执行。
如果用原始的Future:
1
2
3
|
ExecutorService executor = ...;
Future f = executor.submit(...);
f.get();
|
f.get()
会阻塞住线程,直到异步方法结束。
如果使用Listenablefuture,就可以注册一个回调方法:
1
2
3
4
5
6
7
8
9
10
11
12
|
ListenableFuture listenable = service.submit(...);
Futures.addCallback(listenable, new FutureCallback<Object>() {
@Override
public void onSuccess(Object o) {
//handle on success
}
@Override
public void onFailure(Throwable throwable) {
//handle on failure
}
})
|
如果使用Completablefuture,也可以注册一个回调方法供异步方法完成之后调用。
1
2
3
4
5
6
7
8
|
CompletableFuture completableFuture = new CompletableFuture();
completableFuture.whenComplete(new BiConsumer() {
@Override
public void accept(Object o, Object o2) {
//handle complete
}
}); // complete the task
completableFuture.complete(new Object())
|
不过当然他们也有一些区别,这些区别我们可以放到以后再研究,本篇还是记录一下ListenableFuture的使用。
ListenableFuture的使用
线程池
ListenableFuture 使用的线程池为ListeningExecutorService,可以通过MoreExecutors工具类方便的封装一个原生线程池得到:
1
|
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
|
这个线程池也是对ExecutorService的一种实现,可以当做只是对原生线程池的一种封装。
ListenableFuture
假设有一个耗时的方法,所以把他写成一个异步的方法,他的返回值是 string,可是我们希望使用它的封装类 Document,那么,我们可以这样实现
1
2
3
4
5
6
7
8
|
final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, new Function<String, Document>() {
@Override
public Document apply(String contents) {
return parse(contents);
}
});
|
或者也可以将转换方法分离出来,增加可读性:
1
2
3
4
5
6
7
8
9
10
|
final Function<String, Document> parseFun = new Function<String, Document>() {
@Override
public Document apply(String contents) {
return parse(contents);
}
};
final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
|
Futures.transform 方法就相当于注册了一个回调方法,当异步方法执行结束之后,就会调用它。当我们希望继续转换的时候,可以继续使用transform 方法:
1
2
3
4
5
6
7
8
9
10
11
|
final Function<Document, Double> relevanceFun = new Function<Document, Double>() {
@Nullable
@Override
public Double apply(@Nullable Document input) {
return calculateRelevance(input);
}
};
final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceFun);
|
看起来是不是还不错,我们从获取到一个 String 首先转换成Document,然后计算relevance的值,这一切都是异步的。如果使用Future,就需要不断的get和submit,ListenableFuture让这个过程方便了很多。
如果我们有很多Future,也可以轻松的获取到他们执行都结束以后的Future:
1
2
|
final List<ListenableFuture<Double>> relevanceFutures = //...;
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);
|
在最后,可以增加一个callback方法进行处理:
1
2
3
4
5
6
7
8
9
10
11
12
|
Futures.addCallback(maxRelevanceFuture, new FutureCallback<List<Double>>() {
@Override
public void onSuccess(List<Double> result) {
Double result = Collections.max(relevanceList);
log.debug("Result: {}", result);
}
@Override
public void onFailure(Throwable t) {
log.error("Error :-(", t);
}
});
|
这样就获取到了最终的结果,是不是看起来还不错呢。
当使用异步方法处理rest请求的时候,一般需要配合DeferredResult,以便于等待结果处理好之后再返回,防止提前返回结果。