public void execute(RouteResultset rrs, int type) { // clear prev execute resources clearHandlesResources(); if (LOGGER.isDebugEnabled()) { StringBuilder s = new StringBuilder(); LOGGER.debug(s.append(source).append(rrs).toString() + " rrs "); } // 检查路由结果是否为空 RouteResultsetNode[] nodes = rrs.getNodes(); if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) { source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR, "No dataNode found ,please check tables defined in schema:" + source.getSchema()); return; } if (nodes.length == 1) { singleNodeHandler = new SingleNodeHandler(rrs, this); try { singleNodeHandler.execute(); } catch (Exception e) { LOGGER.warn(new StringBuilder().append(source).append(rrs), e); source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); } } else { boolean autocommit = source.isAutocommit(); SystemConfig sysConfig = MycatServer.getInstance().getConfig() .getSystem(); int mutiNodeLimitType = sysConfig.getMutiNodeLimitType(); multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this); try { multiNodeHandler.execute(); } catch (Exception e) { LOGGER.warn(new StringBuilder().append(source).append(rrs), e); source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); } } }
public void execute() throws Exception { startTime=System.currentTimeMillis(); ServerConnection sc = session.getSource(); this.isRunning = true; this.packetId = 0; final BackendConnection conn = session.getTarget(node); if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection MycatConfig conf = MycatServer.getInstance().getConfig(); PhysicalDBNode dn = conf.getDataNodes().get(node.getName()); dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this, node); } }
如果session已经有该datanode关联的后端连接(session.tryExistsCon(conn, node)返回true),则调用_execute()方法下发SQL指令;反之,则调用dn.getConnection()方法从连接池中获取一个可用连接或新建一个连接,并且由于第4个参数将this作为ResponseHandler对象传入,获取到连接后会在PhysicalDatasource.takeCon()中调用handler.connectionAcquired(conn)完成回调,即SingleNodeHandler.connectionAcquired():
public void connectionAcquired(final BackendConnection conn) { session.bindConnection(node, conn); _execute(conn); }
private void _execute(BackendConnection conn) { if (session.closed()) { endRunning(); session.clearResources(true); return; } conn.setResponseHandler(this); try { conn.execute(node, session.getSource(), session.getSource() .isAutocommit()); } catch (Exception e1) { executeException(conn, e1); return; } }
public void execute(RouteResultsetNode rrn, ServerConnection sc, boolean autocommit) throws UnsupportedEncodingException { if (!modifiedSQLExecuted && rrn.isModifySQL()) { modifiedSQLExecuted = true; } String xaTXID = sc.getSession2().getXaTXID(); synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(), autocommit); } private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn, int clientCharSetIndex, int clientTxIsoLation, boolean clientAutoCommit) { String xaCmd = null; boolean conAutoComit = this.autocommit; String conSchema = this.schema; // never executed modify sql,so auto commit boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB() || clientAutoCommit; if (expectAutocommit == false && xaTxID != null && xaStatus == 0) { clientTxIsoLation = Isolations.SERIALIZABLE; xaCmd = "XA START " + xaTxID + ‘;‘; } int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1; int charsetSyn = (this.charsetIndex == clientCharSetIndex) ? 0 : 1; int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1; int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1; int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn; if (synCount == 0) { // not need syn connection sendQueryCmd(rrn.getStatement()); return; } CommandPacket schemaCmd = null; StringBuilder sb = new StringBuilder(); if (schemaSyn == 1) { schemaCmd = getChangeSchemaCommand(conSchema); // getChangeSchemaCommand(sb, conSchema); } if (charsetSyn == 1) { getCharsetCommand(sb, clientCharSetIndex); } if (txIsoLationSyn == 1) { getTxIsolationCommand(sb, clientTxIsoLation); } if (autoCommitSyn == 1) { getAutocommitCommand(sb, expectAutocommit); } if (xaCmd != null) { sb.append(xaCmd); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("con need syn ,total syn cmd " + synCount + " commands " + sb.toString() + "schema change:" + (schemaCmd != null) + " con:" + this); } metaDataSyned = false; statusSync = new StatusSync(xaCmd != null, conSchema, clientCharSetIndex, clientTxIsoLation, expectAutocommit, synCount); // syn schema if (schemaCmd != null) { schemaCmd.write(this); } // and our query sql to multi command at last sb.append(rrn.getStatement()); // syn and execute others this.sendQueryCmd(sb.toString()); // waiting syn result... }
public void execute() throws Exception { final ReentrantLock lock = this.lock; lock.lock(); try { this.reset(rrs.getNodes().length); this.fieldsReturned = false; this.affectedRows = 0L; this.insertId = 0L; } finally { lock.unlock(); } MycatConfig conf = MycatServer.getInstance().getConfig(); startTime = System.currentTimeMillis(); for (final RouteResultsetNode node : rrs.getNodes()) { BackendConnection conn = session.getTarget(node); if (session.tryExistsCon(conn, node)) { _execute(conn, node); } else { // create new connection PhysicalDBNode dn = conf.getDataNodes().get(node.getName()); dn.getConnection(dn.getDatabase(), autocommit, node, this, node); } } }