标签:syn nal 关闭 ceo lob sim await strategy throw
sofa客户端访问服务器分为两步,第一步是初始化工作,第二步是建立连接。典型的代码是
// 1. create a rpc client
RpcClient client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.startup();
// 4. invoke
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
// 5. close
client.shutdown();
RpcClient.startup完成了初始化工作,包括连接的管理(ConnectionManager)、监控(DefaultConnectionMonitor)和重连(ReconnectManager)。
ConnectionManager负责对连接进行管理。每一个url对应一个poolkey,每一个poolkey创建一个ConnectionPool,
一个ConnectionPool维护多个连接。如上图所示,当需要一个连接时,ConnectionManager从ConnectionPool中按照连接选择策略ConnectionSelectStrategy获取一个连接。
初始化连接管理器
com.alipay.remoting.rpc.RpcClient#startup
ConnectionSelectStrategy connectionSelectStrategy = option(BoltGenericOption.CONNECTION_SELECT_STRATEGY);
if (connectionSelectStrategy == null) {
connectionSelectStrategy = new RandomSelectStrategy(switches());
}
this.connectionManager = new DefaultClientConnectionManager(connectionSelectStrategy,
new RpcConnectionFactory(userProcessors, this), connectionEventHandler,
connectionEventListener, switches());
this.connectionManager.setAddressParser(this.addressParser);
this.connectionManager.startup();
连接选择策略 ConnectionSelectStrategy
在获取Connection的时候,ConnectPool会调用ConnectionSelectStrategy来选择某一个Connection。
com.alipay.remoting.ConnectionPool#get
public Connection get() {
//更新访问时间
markAccess();
if (null != connections) {
List<Connection> snapshot = new ArrayList<Connection>(connections);
if (snapshot.size() > 0) {
//按选择策略选择Connection
return strategy.select(snapshot);
} else {
return null;
}
} else {
return null;
}
}
以随机选择策略RandomSelectStrategy为例,使用一个随机变量随机从传入的连接列表中获取一个正常的连接
com.alipay.remoting.RandomSelectStrategy#select
public Connection select(List<Connection> connections) {
//从connections列表中随机获取一个正常的连接
Connection result = randomGet(connections);
return result;
}
com.alipay.remoting.RandomSelectStrategy#randomGet
private Connection randomGet(List<Connection> connections) {
if (null == connections || connections.isEmpty()) {
return null;
}
int size = connections.size();
int tries = 0;
Connection result = null;
while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) {
result = connections.get(this.random.nextInt(size));
}
if (result != null && !result.isFine()) {
result = null;
}
return result;
}
地址解析器 RpcAddressParser
RpcAddressParser将一个url字符串解析为一个Url对象,例如
连接工厂类 RpcConnectionFactory
用来创建ConnectionPool
事件处理器 ConnectionEventHandler
实现了ChannelHandler,会被注册到netty的pipeline里面。当产生连接打开或连接关闭事件时,转发给ConnectionEventListener处理
事件监听器 ConnectionEventListener
配合ConnectionEventHandler使用,实现事件产生时具体的业务处理
启动连接管理器
com.alipay.remoting.DefaultClientConnectionManager#startup
@Override
public void startup() throws LifeCycleException {
//更新状态为启动
super.startup();
this.connectionEventHandler.setConnectionManager(this);
this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
this.connectionFactory.init(connectionEventHandler);
}
startup方法依次执行
更新状态为启动
初始化事件处理器
初始化连接工厂类
connectionFactory.init方法完成对客户端netty的配置
com.alipay.remoting.connection.AbstractConnectionFactory#init
public void init(final ConnectionEventHandler connectionEventHandler) {
bootstrap = new Bootstrap();
bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSocketChannelClass())
.option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
.option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
.option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
// init netty write buffer water mark
initWriteBufferWaterMark();
// init byte buf allocator
if (ConfigManager.netty_buffer_pooled()) {
this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
} else {
this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", codec.newDecoder());
pipeline.addLast("encoder", codec.newEncoder());
boolean idleSwitch = ConfigManager.tcp_idle_switch();
if (idleSwitch) {
pipeline.addLast("idleStateHandler",
new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0,
TimeUnit.MILLISECONDS));
pipeline.addLast("heartbeatHandler", heartbeatHandler);
}
pipeline.addLast("connectionEventHandler", connectionEventHandler);
pipeline.addLast("handler", handler);
}
});
}
可以看到,客户端netty的pipeline与服务器的pipeline是几乎一样的。细微的差别是,客户端需要发送心跳,服务器端需要检测心跳。
//client
pipeline.addLast("heartbeatHandler", heartbeatHandler);
//server
pipeline.addLast("serverIdleHandler", serverIdleHandler);
if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
if (monitorStrategy == null) {
connectionMonitor = new DefaultConnectionMonitor(new ScheduledDisconnectStrategy(),
this.connectionManager);
} else {
connectionMonitor = new DefaultConnectionMonitor(monitorStrategy,
this.connectionManager);
}
connectionMonitor.startup();
logger.warn("Switch on connection monitor");
}
DefaultConnectionMonitor对连接进行监控,其在startup方法中启动一个 ScheduledThreadPoolExecutor ,调用ConnectionMonitorStrategy.monitor方法对连接池进行处理。
@Override
public void startup() throws LifeCycleException {
super.startup();
/* initial delay to execute schedule task, unit: ms */
long initialDelay = ConfigManager.conn_monitor_initial_delay();
/* period of schedule task, unit: ms*/
long period = ConfigManager.conn_monitor_period();
this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(
"ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy());
this.executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager
.getConnPools();
strategy.monitor(connPools);
} catch (Exception e) {
logger.warn("MonitorTask error", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
}
例如ScheduledDisconnectStrategy监控连接池中连接的数量,当数量超过配置指定的长度时,随机关闭连接池中的一个连接。
public void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools) {
try {
if (connPools == null || connPools.size() == 0) {
return;
}
for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : connPools
.entrySet()) {
String poolKey = entry.getKey();
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
List<Connection> serviceOnConnections = new ArrayList<Connection>();
List<Connection> serviceOffConnections = new ArrayList<Connection>();
for (Connection connection : pool.getAll()) {
if (isConnectionOn(connection)) {
serviceOnConnections.add(connection);
} else {
serviceOffConnections.add(connection);
}
}
if (serviceOnConnections.size() > connectionThreshold) {
Connection freshSelectConnect = serviceOnConnections.get(random
.nextInt(serviceOnConnections.size()));
freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
Configs.CONN_SERVICE_STATUS_OFF);
serviceOffConnections.add(freshSelectConnect);
} else {
if (logger.isInfoEnabled()) {
logger.info("serviceOnConnections({}) size[{}], CONNECTION_THRESHOLD[{}].",
poolKey, serviceOnConnections.size(), connectionThreshold);
}
}
for (Connection offConn : serviceOffConnections) {
if (offConn.isInvokeFutureMapFinish()) {
if (offConn.isFine()) {
offConn.close();
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Address={} won't close at this schedule turn",
RemotingUtil.parseRemoteAddress(offConn.getChannel()));
}
}
}
}
} catch (Exception e) {
logger.error("ScheduledDisconnectStrategy monitor error", e);
}
}
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
reconnectManager = new ReconnectManager(connectionManager);
reconnectManager.startup();
connectionEventHandler.setReconnector(reconnectManager);
logger.warn("Switch on reconnect manager");
}
与监控不同,重连在startup方法中启动了一个Thread,该线程作为一个消费者,处理添加的ReconnectTask
com.alipay.remoting.ReconnectManager#startup
@Override
public void startup() throws LifeCycleException {
super.startup();
this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.healConnectionThreads.start();
}
com.alipay.remoting.ReconnectManager.HealConnectionRunner#run
@Override
public void run() {
while (isStarted()) {
long start = -1;
ReconnectTask task = null;
try {
if (this.lastConnectTime < HEAL_CONNECTION_INTERVAL) {
Thread.sleep(HEAL_CONNECTION_INTERVAL);
}
try {
task = ReconnectManager.this.tasks.take();
} catch (InterruptedException e) {
// ignore
}
if (task == null) {
continue;
}
start = System.currentTimeMillis();
if (!canceled.contains(task.url)) {
task.run();
} else {
logger.warn("Invalid reconnect request task {}, cancel list size {}",
task.url, canceled.size());
}
this.lastConnectTime = System.currentTimeMillis() - start;
} catch (Exception e) {
if (start != -1) {
this.lastConnectTime = System.currentTimeMillis() - start;
}
if (task != null) {
logger.warn("reconnect target: {} failed.", task.url, e);
tasks.add(task);
}
}
}
}
ReconnectTask的创建
创建由两种情况,一是连接异常断开后,二是重连失败后
1. 连接异常断开的情况
com.alipay.remoting.ConnectionEventHandler#channelInactive
// add reconnect task
if (this.globalSwitch != null
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.reconnect(conn.getUrl());
}
}
ReconnectTask的取消
当客户端强制关闭连接的时候会取消重连,并设置ReconnectManager#canceled变量
com.alipay.remoting.rpc.RpcClient#closeConnection
@Override
public void closeConnection(Url url) {
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
reconnectManager.disableReconnect(url);
}
this.connectionManager.remove(url.getUniqueKey());
}
com.alipay.remoting.ReconnectManager#disableReconnect
@Override
public void disableReconnect(Url url) {
canceled.add(url);
}
回到客户端启动的代码
RequestBody req = new RequestBody(2, "hello world sync");
try {
String res = (String) client.invokeSync(addr, req, 3000);
System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
String errMsg = "RemotingException caught in oneway!";
logger.error(errMsg, e);
Assert.fail(errMsg);
} catch (InterruptedException e) {
logger.error("interrupted!");
}
客户端调用invokeSync方法,发起对服务器的访问
com.alipay.remoting.rpc.RpcClient#invokeSync
@Override
public Object invokeSync(final String address, final Object request,
final InvokeContext invokeContext, final int timeoutMillis) throws RemotingException,InterruptedException {
return this.rpcRemoting.invokeSync(address, request, invokeContext, timeoutMillis);
}
com.alipay.remoting.rpc.RpcRemoting#invokeSync
public Object invokeSync(final String addr, final Object request,
final InvokeContext invokeContext, final int timeoutMillis)
throws RemotingException,InterruptedException {
//调用addressParser解析地址为Url对象
Url url = this.addressParser.parse(addr);
return this.invokeSync(url, request, invokeContext, timeoutMillis);
}
com.alipay.remoting.rpc.RpcClientRemoting#invokeSync
@Override
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException,InterruptedException {
//1.创建连接对象
final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
//2.检查连接对象
this.connectionManager.check(conn);
//3.触发调用
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
创建连接对象
com.alipay.remoting.rpc.RpcClientRemoting#getConnectionAndInitInvokeContext
protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext)
throws RemotingException,
InterruptedException {
long start = System.currentTimeMillis();
Connection conn;
try {
//调用ConnectManager创建Connection对象
conn = this.connectionManager.getAndCreateIfAbsent(url);
} finally {
if (null != invokeContext) {
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME,
(System.currentTimeMillis() - start));
}
}
return conn;
}
com.alipay.remoting.DefaultConnectionManager#getAndCreateIfAbsent
@Override
public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
// 1. 创建连接池,后面可以看到poolkey就是Url的uniqueKey
ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),
new ConnectionPoolCall(url));
if (null != pool) {
//2. 获取连接
return pool.get();
} else {
logger.error("[NOTIFYME] bug detected! pool here must not be null!");
return null;
}
}
获取连接可以参考连接选择策略
创建连接池
com.alipay.remoting.DefaultConnectionManager#getConnectionPoolAndCreateIfAbsent
private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey,
Callable<ConnectionPool> callable)throws RemotingException,InterruptedException {
RunStateRecordedFutureTask<ConnectionPool> initialTask;
ConnectionPool pool = null;
int retry = Constants.DEFAULT_RETRY_TIMES;
int timesOfResultNull = 0;
int timesOfInterrupt = 0;
for (int i = 0; (i < retry) && (pool == null); ++i) {
initialTask = this.connTasks.get(poolKey);
if (null == initialTask) {
RunStateRecordedFutureTask<ConnectionPool> newTask = new RunStateRecordedFutureTask<ConnectionPool>(
callable);
initialTask = this.connTasks.putIfAbsent(poolKey, newTask);
if (null == initialTask) {
initialTask = newTask;
//运行task,实际调用的是ConnectionPoolCall.call()
initialTask.run();
}
}
try {
pool = initialTask.get();
...
} catch (ExecutionException e) {
...
}
}
return pool;
}
com.alipay.remoting.DefaultConnectionManager.ConnectionPoolCall#call
@Override
public ConnectionPool call() throws Exception {
final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
if (whetherInitConnection) {
try {
//初始化连接池
doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
} catch (Exception e) {
pool.removeAllAndTryClose();
throw e;
}
}
return pool;
}
doCreate方法创建连接池中连接对象,可以同步和异步创建部分连接对象
private void doCreate(final Url url, final ConnectionPool pool, final String taskName,final int syncCreateNumWhenNotWarmup) throws RemotingException {
final int actualNum = pool.size();
final int expectNum = url.getConnNum();
if (actualNum >= expectNum) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("actual num {}, expect num {}, task name {}", actualNum, expectNum,
taskName);
}
if (url.isConnWarmup()) {
for (int i = actualNum; i < expectNum; ++i) {
Connection connection = create(url);
pool.add(connection);
}
} else {
if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) {
throw new IllegalArgumentException(
"sync create number when not warmup should be [0," + url.getConnNum() + "]");
}
// 同步创建对象
if (syncCreateNumWhenNotWarmup > 0) {
for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
Connection connection = create(url);
pool.add(connection);
}
if (syncCreateNumWhenNotWarmup >= url.getConnNum()) {
return;
}
}
pool.markAsyncCreationStart();// mark the start of async
try {
//异步创建连接对象
this.asyncCreateConnectionExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = pool.size(); i < url.getConnNum(); ++i) {
Connection conn = null;
try {
conn = create(url);
} catch (RemotingException e) {
}
pool.add(conn);
}
} finally {
pool.markAsyncCreationDone();// mark the end of async
}
}
});
} catch (RejectedExecutionException e) {
pool.markAsyncCreationDone();// mark the end of async when reject
throw e;
}
} // end of NOT warm up
}
创建连接对象
通过连接工厂类创建连接对象
com.alipay.remoting.DefaultConnectionManager#create
public Connection create(Url url) throws RemotingException {
Connection conn;
try {
conn = this.connectionFactory.createConnection(url);
} catch (Exception e) {
throw new RemotingException("Create connection failed. The address is "
+ url.getOriginUrl(), e);
}
return conn;
}
com.alipay.remoting.connection.AbstractConnectionFactory#createConnection
@Override
public Connection createConnection(Url url) throws Exception {
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
url.getVersion(), url);
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
return conn;
}
com.alipay.remoting.connection.AbstractConnectionFactory#doCreateConnection
protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout)
throws Exception {
// prevent unreasonable value, at least 1000
connectTimeout = Math.max(connectTimeout, 1000);
String address = targetIP + ":" + targetPort;
if (logger.isDebugEnabled()) {
logger.debug("connectTimeout of address [{}] is [{}].", address, connectTimeout);
}
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
future.awaitUninterruptibly();
...
return future.channel();
}
检查连接对象
com.alipay.remoting.DefaultConnectionManager#check
public void check(Connection connection) throws RemotingException {
if (connection == null) {
throw new RemotingException("Connection is null when do check!");
}
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
this.remove(connection);
throw new RemotingException("Check connection failed for address: "
+ connection.getUrl());
}
if (!connection.getChannel().isWritable()) {
// No remove. Most of the time it is unwritable temporarily.
throw new RemotingException("Check connection failed for address: "
+ connection.getUrl() + ", maybe write overflow!");
}
}
触发调用
public Object invokeSync(final Connection conn, final Object request,
final InvokeContext invokeContext, final int timeoutMillis)
throws RemotingException,InterruptedException {
RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext,
timeoutMillis);
preProcessInvokeContext(invokeContext, requestCommand, conn);
ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,
timeoutMillis);
responseCommand.setInvokeContext(invokeContext);
Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
RemotingUtil.parseRemoteAddress(conn.getChannel()));
return responseObject;
}
发送请求是将requestCommand写入到通道里
com.alipay.remoting.BaseRemoting#invokeSync
protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request,final int timeoutMillis) throws RemotingException, InterruptedException {
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
conn.addInvokeFuture(future);
final int requestId = request.getId();
try {
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
...
}
});
} catch (Exception e) {
...
}
RemotingCommand response = future.waitResponse(timeoutMillis);
if (response == null) {
conn.removeInvokeFuture(requestId);
response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
logger.warn("Wait response, request id={} timeout!", requestId);
}
return response;
}
标签:syn nal 关闭 ceo lob sim await strategy throw
原文地址:https://www.cnblogs.com/huiyao/p/12404208.html