使用Apache Ignite的Service grid作为微服务开发框架, 通常是如下定义和实现Service的:
public interface MyService {
public String sayHello(String to);
}
本文将实现如下样式的Service,使其异步化:
public interface MyServiceAsync {
public CompletionStage<String> sayHello(String to);
}
当前ignite对上边这样的异步的service方法并没有remote支持。当调用端与服务部署再同一节点时,ignite会发起一个本地方法调用,这样是没有问题的,但是当服务部署端与调用端在不同节点时,ignite通过发起一个distributed task,将调用通过消息方式发布到服务部署节点,由于服务实现是异步的,通常来说,会返回一个未完成状态的CompletionStage,后续当真正complete的时候,调用端的CompletionStage并不会被notify,即调用端永远无法得到真正的调用结果。
为了能够支持CompletionStage的远程状态专递,我们需要对ignite进行如下改动:
...
// line 192
if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) {
CompletableFuture<Object> cs = new CompletableFuture<>();
//call async and notify completion stage
ctx.closure().callAsyncNoFailover(
GridClosureCallMode.BROADCAST,
new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args),
Collections.singleton(node),
false,
waitTimeout,
true).listen(f -> {
if(f.error() != null) {
cs.completeExceptionally(f.error());
}else if(f.isCancelled()) {
cs.cancel(false);
}
if(f.isDone()) {
try {
Object result = f.get();
if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) {
cs.completeExceptionally((IgniteException)result);
}else {
cs.complete(f.get());
}
} catch (IgniteCheckedException e) {
cs.completeExceptionally(e);
}
}
});
return cs;
}
...
这段代码做了如下的事情:检测当服务方法返回值是一个CompletionStage的时候,则创建一个CompletableFuture作为代理对象返回给调用端。随后监听服务的远程调用的结果,并且用这个结果来更新这个CompletableFuture。到这里,调用端的service proxy的改造就完成了。接下来,我们还需要改造服务节点这一端:
finally {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}
}
finally {
if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
final boolean sendResult = sndRes;
final IgniteException igException = ex;
@SuppressWarnings("unchecked")
CompletionStage<Object> cs = (CompletionStage<Object>)res;
cs.exceptionally(t->{
return new IgniteException(t);
}).thenAccept(r->{
if (!HOLD.get()) {
IgniteException e = igException;
finishJob(r, e, sendResult);
} else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
});
} else {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}
}
这里做的事情是:当在服务部署节点上拿到执行结果的时候,如果发现服务返回结果是一个CompletionStage,那么处理这个CompletionStage的exceptionally和thenAccept, 把结果发送给remote的调用端。
就这样,通过简单的改装,我们使ignite有了处理异步服务方法调用的能力。下边我们实现一个服务来看看改装结果:
import java.util.concurrent.CompletionStage;
import org.apache.ignite.services.Service;
public interface MyService extends Service {
public CompletionStage<String> sayHelloAsync(String to);
public String sayHelloSync(String to);
}
import java.util.concurrent.CompletionStage;
public class MyServiceImpl implements MyService {
private ScheduledExecutorService es;
@Override public void init(ServiceContext ctx) throws Exception {
es = Executors.newSingleThreadScheduledExecutor();
}
@Override public CompletionStage<String> sayHelloAsync(String to){
CompletableFuture<String> ret = new CompletableFuture<>();
//return "async hello $to" after 3 secs
es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS);
return ret;
}
@Override public String sayHelloSync(String to){
return "sync hello " + to;
}
...
}
然后将服务部署在Service grid中:
...
ServiceConfiguration sConf = new ServiceConfiguration();
sConf.setName("myservice.version.1");
sConf.setService(new MyServiceImpl());
sConf.setMaxPerNodeCount(2);
sConf.setTotalCount(4);
ignite.services().deploy(sConf);
...
然后启动一个客户端节点进行服务调用:
MyService service = ignite.services().serviceProxy("myservice.version.1", MyService.class, false);
//test async service
service.sayHelloAsync("nathan").thenAccept(r->{
System.out.println(r);
});
//test sync service
System.out.println(service.sayHelloSync("nathan"));
...
输出结果:
sync hello nathan
async hello nathan
可以看到先输出了sync的结果,大约3秒后输出async的结果。
Apache Ignite 改装(一) -- 服务异步化支持
原文地址:http://blog.51cto.com/12274728/2097094