[apache/dubbo]支持callback方式的纯异步方式

2024-04-11 472 views
5

当前客户端调用一个异步dubbo服务,客户端拿到一个Future对象,通过Future.get()获取服务调用结果。在服务端未返回前,客户端调用Future.get()的线程处于wait状态,实际上还是同步的。

期望提供另一种异步dubbo服务调用方式,客户端直接传入一个callback接口实例,当服务端处理完毕后,直接将服务结果作为参数调用callback。

回答

7

很好的提议,这个特性已经在我们的功能规划中了,甚至也包括provider端的异步执行。可以将这个特性提前安排在近两个版本来做

7

@chickenlj 可以考虑支持 Reative

4

自己实现一个Filter就可以了

先自定义一个callback

public interface DubboCallback<T> {

    void callback(T retValue, Throwable exception, Object[] args);

}

然后实现一个Filter

@Activate(group = Constants.CONSUMER, order = -10001)
public class DubboCallbackFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Object[] args = invocation.getArguments();
        DubboCallback<Object> callback = null;
        if (args != null) {
            for (Object arg : args) {
                if (arg instanceof DubboCallback) {
                    callback = (DubboCallback) arg;
                    break;
                }
            }
        }
        callback = callback != null ? callback : (DubboCallback) RpcContext.getContext().get("callback");
        try {
            Result ret = invoker.invoke(invocation);
            boolean isOneWay = RpcUtils.isOneway(invoker.getUrl(), invocation);
            if (isOneWay) {
                return ret;
            }
            boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
            Object retValue = null;
            if (isAsync) {
                Future<Object> f = RpcContext.getContext().getFuture();
                retValue = f == null ? null : f.get();
            }
            if (callback != null) {
                callback.callback(retValue, ret.getException(), args);
            }
            return ret;
        } catch (InterruptedException | ExecutionException e) {
            throw new RpcException(e);
        }
    }
}

配置一下 image

使用方式

RpcContext.getContext().set("callback", new DubboCallback<Token>() {
            @Override
            public void callback(Token retValue, Throwable exception, Object[] args) {
                logger.info("retValue:{} userId:{}", JSON.toJSONString(retValue), collectReq.getUserId());
            }
        });
        creditService.token(collectReq);
3

future 提供addListener方法, 注册callback, 参考netty future的实现

2

@blackshadowwalker if (isAsync) { Future f = RpcContext.getContext().getFuture(); retValue = f == null ? null : f.get(); } 使用了future.get() 还是同步在等待,没有使用到异步 只有通过dubbo的future设置callback才是异步 比如: java.util.concurrent.Future future = RpcContext.getContext().getFuture(); FutureAdapter futureAdapter = (FutureAdapter) future; futureAdapter.getFuture().setCallback(new ResponseCallback() { @Override public void done(Object response) { try { Object result = ((Result) response).getValue(); ..... } catch (Exception e) { .... } } @Override public void caught(Throwable exception){ ..... } });

或者需要dubbo future 开放注册 callback

@chickenlj 我发现dubbo在异步调用的时候,使用future.get()的方式,会立刻进行一次判断是否isDone,虽然这个属于极端情况,但是个人不太建议这么干。 原因一为了防止并发需要考虑加锁。 原因二万一触发执行会占用执行线程触发业务逻辑,在很多netty+dubbo的场景下,执行线程很可能是netty的IO线程,导致有限的IO线程资源耗尽。 我基于2.5.3版本修改了dubbo的异步工作模式,在context中新增了传入callback方法并在调用方使用netty的nio直接异步执行的方式。 public class RpcContext { ...... // 新增回调传入 private ResponseCallback callback; // 新增getset ...... }

// 这里注释掉了对外暴露的future引用,个人认为没有必要,仅为个人意见【吐舌】 public class DubboInvoker extends AbstractInvoker { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(null);
            // 此处将future向外传递,但是我认为其实没有必要,所有操作都通过异步方式调用回调处理了,暴露future反而会引起不必要的误操作

// ResponseFuture future = currentClient.request(inv, timeout) ; // RpcContext.getContext().setFuture(new FutureAdapter(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }

final class HeaderExchangeChannel implements ExchangeChannel {

......

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    ......
    // 此处将callback传入future等待回调
    ResponseCallback callback = null;
    if (request instanceof Invocation) {
        callback = ((Invocation)request).getCallback();
    }
    DefaultFuture future = new DefaultFuture(channel, req, timeout, (+)callback);
    ......
}

}

public class DefaultFuture implements ResponseFuture { // future中setCallback()方法的lock可以被去除,原因是异步状态没必要占用执行线程。 // invokeCallback(ResponseCallback c)仅在doReceived(Response res)中执行,保留锁避免timeout和receive逻辑中重复执行即可。 // 当然这仅仅是代码优化~ public void setCallback(ResponseCallback callback) { this.callback = callback; } public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } }

@kirayunfei 1. 应该不需要加锁,判断超时的逻辑是单独一个线程在执行检查 2.整个业务执行的逻辑应该不是在netty的io线程内,而是后面的业务的客户端业务线程,io线程只负责底层的数据收发,连接管理

@vihunk

  1. 超时判断的线程是扫描future,如果触发了timeout,会执行doReceived()方法,如果极端情况下超时触发的同时,异步返回到达,doReceived()会执行两次,所以加了锁,个人理解,这个锁其实是去不掉的。 2.实际上setCallBack()方法体中存在一个if(isDone()) {invokeCallback()}逻辑,所以在极端情况下,存在挂上回调立刻执行的可能性。处理异步返回的线程池通常和我们发送请求的线程池并不是同一个,所以这个invoke会在发送请求的线程中执行,个人是希望统一由处理返回的线程池统一处理,这样逻辑代码和风格都是收束的,看起来风格统一些。IO线程池只是发送线程池的一个特例,很不幸的是我看到过许多使用netty的项目在pipeline的最后一环handler直接发起了dubbo调用,这种特例就出现了hhh。这只是个建议,理论上也是极端情况,我在dubbo停止维护期间,修改代码时无意间发现的,因此提了出来~另外同时,这个invoke也是有锁的,因为我觉得这段逻辑不需要。。所以这个锁我也干掉了,个人习惯~

@kirayunfei 不会出现你说的极端情况执行两次doRecevied方法。 对外都是调用下面的rececved方法,此方法会从全局mapping中移除之前的任务,只会有一个线程移除成功,不过肯定会出现(不可避免)刚认为超时而服务器有响应。 另外:加锁的目的是为了实现线程通知机制,因为有线程在等待服务器的结果(一般是业务处理线程池的线程),如果如你所说在netty的io线程发起rpc,那这种问题本身是用框架不熟悉机制产生的后果,一般情况下也没人这么用。 public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at "

  • (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
  • ", response " + response
  • (channel == null ? "" : ", channel: " + channel.getLocalAddress()
    • " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }

it's already been supported on 2.7.0

5

@chickenlj 我发现dubbo在异步调用的时候,使用future.get()的方式,会立刻进行一次判断是否isDone,虽然这个属于极端情况,但是个人不太建议这么干。 原因一为了防止并发需要考虑加锁。 原因二万一触发执行会占用执行线程触发业务逻辑,在很多netty+dubbo的场景下,执行线程很可能是netty的IO线程,导致有限的IO线程资源耗尽。 我基于2.5.3版本修改了dubbo的异步工作模式,在context中新增了传入callback方法并在调用方使用netty的nio直接异步执行的方式。 public class RpcContext { ...... // 新增回调传入 private ResponseCallback callback; // 新增getset ...... }

// 这里注释掉了对外暴露的future引用,个人认为没有必要,仅为个人意见【吐舌】 public class DubboInvoker extends AbstractInvoker { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(null);
            // 此处将future向外传递,但是我认为其实没有必要,所有操作都通过异步方式调用回调处理了,暴露future反而会引起不必要的误操作

// ResponseFuture future = currentClient.request(inv, timeout) ; // RpcContext.getContext().setFuture(new FutureAdapter(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } }

final class HeaderExchangeChannel implements ExchangeChannel {

......

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    ......
    // 此处将callback传入future等待回调
    ResponseCallback callback = null;
    if (request instanceof Invocation) {
        callback = ((Invocation)request).getCallback();
    }
    DefaultFuture future = new DefaultFuture(channel, req, timeout, (+)callback);
    ......
}

}

public class DefaultFuture implements ResponseFuture { // future中setCallback()方法的lock可以被去除,原因是异步状态没必要占用执行线程。 // invokeCallback(ResponseCallback c)仅在doReceived(Response res)中执行,保留锁避免timeout和receive逻辑中重复执行即可。 // 当然这仅仅是代码优化~ public void setCallback(ResponseCallback callback) { this.callback = callback; } public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); try { while (! isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } }

@kirayunfei 1. 应该不需要加锁,判断超时的逻辑是单独一个线程在执行检查 2.整个业务执行的逻辑应该不是在netty的io线程内,而是后面的业务的客户端业务线程,io线程只负责底层的数据收发,连接管理

@vihunk

  1. 超时判断的线程是扫描future,如果触发了timeout,会执行doReceived()方法,如果极端情况下超时触发的同时,异步返回到达,doReceived()会执行两次,所以加了锁,个人理解,这个锁其实是去不掉的。 2.实际上setCallBack()方法体中存在一个if(isDone()) {invokeCallback()}逻辑,所以在极端情况下,存在挂上回调立刻执行的可能性。处理异步返回的线程池通常和我们发送请求的线程池并不是同一个,所以这个invoke会在发送请求的线程中执行,个人是希望统一由处理返回的线程池统一处理,这样逻辑代码和风格都是收束的,看起来风格统一些。IO线程池只是发送线程池的一个特例,很不幸的是我看到过许多使用netty的项目在pipeline的最后一环handler直接发起了dubbo调用,这种特例就出现了hhh。这只是个建议,理论上也是极端情况,我在dubbo停止维护期间,修改代码时无意间发现的,因此提了出来~另外同时,这个invoke也是有锁的,因为我觉得这段逻辑不需要。。所以这个锁我也干掉了,个人习惯~

@kirayunfei 不会出现你说的极端情况执行两次doRecevied方法。 对外都是调用下面的rececved方法,此方法会从全局mapping中移除之前的任务,只会有一个线程移除成功,不过肯定会出现(不可避免)刚认为超时而服务器有响应。 另外:加锁的目的是为了实现线程通知机制,因为有线程在等待服务器的结果(一般是业务处理线程池的线程),如果如你所说在netty的io线程发起rpc,那这种问题本身是用框架不熟悉机制产生的后果,一般情况下也没人这么用。 public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at "

  • (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
  • ", response " + response
  • (channel == null ? "" : ", channel: " + channel.getLocalAddress()
    • " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }

it's already been supported on 2.7.0

0

@kirayunfei 1. 应该不需要加锁,判断超时的逻辑是单独一个线程在执行检查 2.整个业务执行的逻辑应该不是在netty的io线程内,而是后面的业务的客户端业务线程,io线程只负责底层的数据收发,连接管理

9

@vihunk

  1. 超时判断的线程是扫描future,如果触发了timeout,会执行doReceived()方法,如果极端情况下超时触发的同时,异步返回到达,doReceived()会执行两次,所以加了锁,个人理解,这个锁其实是去不掉的。 2.实际上setCallBack()方法体中存在一个if(isDone()) {invokeCallback()}逻辑,所以在极端情况下,存在挂上回调立刻执行的可能性。处理异步返回的线程池通常和我们发送请求的线程池并不是同一个,所以这个invoke会在发送请求的线程中执行,个人是希望统一由处理返回的线程池统一处理,这样逻辑代码和风格都是收束的,看起来风格统一些。IO线程池只是发送线程池的一个特例,很不幸的是我看到过许多使用netty的项目在pipeline的最后一环handler直接发起了dubbo调用,这种特例就出现了hhh。这只是个建议,理论上也是极端情况,我在dubbo停止维护期间,修改代码时无意间发现的,因此提了出来~另外同时,这个invoke也是有锁的,因为我觉得这段逻辑不需要。。所以这个锁我也干掉了,个人习惯~
8

@kirayunfei 不会出现你说的极端情况执行两次doRecevied方法。 对外都是调用下面的rececved方法,此方法会从全局mapping中移除之前的任务,只会有一个线程移除成功,不过肯定会出现(不可避免)刚认为超时而服务器有响应。 另外:加锁的目的是为了实现线程通知机制,因为有线程在等待服务器的结果(一般是业务处理线程池的线程),如果如你所说在netty的io线程发起rpc,那这种问题本身是用框架不熟悉机制产生的后果,一般情况下也没人这么用。 public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at "

  • (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
  • ", response " + response
  • (channel == null ? "" : ", channel: " + channel.getLocalAddress()
    • " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
2

it's already been supported on 2.7.0