标签:
Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。
Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。
通常,一个典型的同步远程调用应该是这样的:
3, 客户端收到结果,然后当前线程继续往后执行
需要注意的是,这里的callback对象是每次调用产生一个新的,不能共享,否则会有问题;另外ID必需至少保证在一个Socket连接里面是唯一的。
另外:
服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理
同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调
com.taobao.remoting.impl.DefaultClient.java //同步调用远程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; }
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋锁 while (!isDone) { // 是否有结果了 wait(); //没结果是释放锁,让当前线程处于等待状态 } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //将远程调用结果设置到callback中来 setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了 notifyAll(); // 唤醒处于等待的线程 } }
CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { final ConnectionResponse resp; final ResponseCallback callback; final Thread createThread; CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { resp = _resp; callback = _cb; createThread = Thread.currentThread(); } public void run() { // 预防这种情况:业务提供的Executor,让调用者线程来执行任务 if (createThread == Thread.currentThread() && callback.getExecutor() != DIYExecutor.getInstance()) { StringBuilder sb = new StringBuilder(); sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); sb.append("Can not callback task on the network io thhread."); LOGGER.warn(sb.toString()); return; } if (TRConstants.RESULT_SUCCESS == resp.getResult()) { callback.handleResponse(resp.getAppResponse()); //设置调用结果 } else { callback.onRemotingException(resp.getResult(), resp .getErrorMsg()); //处理调用异常 } } }
标签:
原文地址:http://www.cnblogs.com/dengzy/p/5677571.html