当前客户端调用一个异步dubbo服务,客户端拿到一个Future对象,通过Future.get()获取服务调用结果。在服务端未返回前,客户端调用Future.get()的线程处于wait状态,实际上还是同步的。
期望提供另一种异步dubbo服务调用方式,客户端直接传入一个callback接口实例,当服务端处理完毕后,直接将服务结果作为参数调用callback。
当前客户端调用一个异步dubbo服务,客户端拿到一个Future对象,通过Future.get()获取服务调用结果。在服务端未返回前,客户端调用Future.get()的线程处于wait状态,实际上还是同步的。
期望提供另一种异步dubbo服务调用方式,客户端直接传入一个callback接口实例,当服务端处理完毕后,直接将服务结果作为参数调用callback。
很好的提议,这个特性已经在我们的功能规划中了,甚至也包括provider端的异步执行。可以将这个特性提前安排在近两个版本来做
@chickenlj 可以考虑支持 Reative
自己实现一个Filter就可以了
先自定义一个callback
public interface DubboCallback<T> {
void callback(T retValue, Throwable exception, Object[] args);
}
然后实现一个Filter
@Activate(group = Constants.CONSUMER, order = -10001)
public class DubboCallbackFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Object[] args = invocation.getArguments();
DubboCallback<Object> callback = null;
if (args != null) {
for (Object arg : args) {
if (arg instanceof DubboCallback) {
callback = (DubboCallback) arg;
break;
}
}
}
callback = callback != null ? callback : (DubboCallback) RpcContext.getContext().get("callback");
try {
Result ret = invoker.invoke(invocation);
boolean isOneWay = RpcUtils.isOneway(invoker.getUrl(), invocation);
if (isOneWay) {
return ret;
}
boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
Object retValue = null;
if (isAsync) {
Future<Object> f = RpcContext.getContext().getFuture();
retValue = f == null ? null : f.get();
}
if (callback != null) {
callback.callback(retValue, ret.getException(), args);
}
return ret;
} catch (InterruptedException | ExecutionException e) {
throw new RpcException(e);
}
}
}
配置一下
使用方式
RpcContext.getContext().set("callback", new DubboCallback<Token>() {
@Override
public void callback(Token retValue, Throwable exception, Object[] args) {
logger.info("retValue:{} userId:{}", JSON.toJSONString(retValue), collectReq.getUserId());
}
});
creditService.token(collectReq);
future 提供addListener方法, 注册callback, 参考netty future的实现
@blackshadowwalker if (isAsync) { Future
@chickenlj 我发现dubbo在异步调用的时候,使用future.get()的方式,会立刻进行一次判断是否isDone,虽然这个属于极端情况,但是个人不太建议这么干。 原因一为了防止并发需要考虑加锁。 原因二万一触发执行会占用执行线程触发业务逻辑,在很多netty+dubbo的场景下,执行线程很可能是netty的IO线程,导致有限的IO线程资源耗尽。 我基于2.5.3版本修改了dubbo的异步工作模式,在context中新增了传入callback方法并在调用方使用netty的nio直接异步执行的方式。 public class RpcContext { ...... // 新增回调传入 private ResponseCallback callback; // 新增getset ...... }
// 这里注释掉了对外暴露的future引用,个人认为没有必要,仅为个人意见【吐舌】
public class DubboInvoker
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(null);
// 此处将future向外传递,但是我认为其实没有必要,所有操作都通过异步方式调用回调处理了,暴露future反而会引起不必要的误操作
// ResponseFuture future = currentClient.request(inv, timeout) ;
// RpcContext.getContext().setFuture(new FutureAdapter final class HeaderExchangeChannel implements ExchangeChannel { } public class DefaultFuture implements ResponseFuture {
// future中setCallback()方法的lock可以被去除,原因是异步状态没必要占用执行线程。
// invokeCallback(ResponseCallback c)仅在doReceived(Response res)中执行,保留锁避免timeout和receive逻辑中重复执行即可。
// 当然这仅仅是代码优化~
public void setCallback(ResponseCallback callback) {
this.callback = callback;
}
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (! isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
} @kirayunfei 1. 应该不需要加锁,判断超时的逻辑是单独一个线程在执行检查
2.整个业务执行的逻辑应该不是在netty的io线程内,而是后面的业务的客户端业务线程,io线程只负责底层的数据收发,连接管理 @vihunk @kirayunfei 不会出现你说的极端情况执行两次doRecevied方法。
对外都是调用下面的rececved方法,此方法会从全局mapping中移除之前的任务,只会有一个线程移除成功,不过肯定会出现(不可避免)刚认为超时而服务器有响应。
另外:加锁的目的是为了实现线程通知机制,因为有线程在等待服务器的结果(一般是业务处理线程池的线程),如果如你所说在netty的io线程发起rpc,那这种问题本身是用框架不熟悉机制产生的后果,一般情况下也没人这么用。
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at " it's already been supported on 2.7.0......
public ResponseFuture request(Object request, int timeout) throws RemotingException {
......
// 此处将callback传入future等待回调
ResponseCallback callback = null;
if (request instanceof Invocation) {
callback = ((Invocation)request).getCallback();
}
DefaultFuture future = new DefaultFuture(channel, req, timeout, (+)callback);
......
}
@kirayunfei 1. 应该不需要加锁,判断超时的逻辑是单独一个线程在执行检查 2.整个业务执行的逻辑应该不是在netty的io线程内,而是后面的业务的客户端业务线程,io线程只负责底层的数据收发,连接管理
@vihunk
@kirayunfei 不会出现你说的极端情况执行两次doRecevied方法。 对外都是调用下面的rececved方法,此方法会从全局mapping中移除之前的任务,只会有一个线程移除成功,不过肯定会出现(不可避免)刚认为超时而服务器有响应。 另外:加锁的目的是为了实现线程通知机制,因为有线程在等待服务器的结果(一般是业务处理线程池的线程),如果如你所说在netty的io线程发起rpc,那这种问题本身是用框架不熟悉机制产生的后果,一般情况下也没人这么用。 public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at "
it's already been supported on 2.7.0