public Connection connect(String s, Properties properties) throws SQLException { if(s.regionMatches(0, "jdbc:default:connection", 0, 23)) { String s1 = "jdbc:oracle:kprb"; int j = s.length(); if(j > 23) s = s1.concat(s.substring(23, s.length())); else s = s1.concat(":"); s1 = null; } int i = oracleAcceptsURL(s); if(i == 1) return null; if(i == 2) { DBError.throwSqlException(67); return null; } Hashtable hashtable = parseUrl(s); if(hashtable == null) return null; String s2 = properties.getProperty("user"); String s3 = properties.getProperty("password"); String s4 = properties.getProperty("database"); if(s4 == null) s4 = properties.getProperty("server"); if(s2 == null) s2 = (String)hashtable.get("user"); s2 = parseLoginOption(s2, properties); if(s3 == null) s3 = (String)hashtable.get("password"); if(s4 == null) s4 = (String)hashtable.get("database"); String s5 = (String)hashtable.get("protocol"); properties.put("protocol", s5); if(s5 == null) { DBError.throwSqlException(40, "Protocol is not specified in URL"); return null; } String s6 = properties.getProperty("dll"); if(s6 == null) properties.put("dll", "ocijdbc9"); String s7 = properties.getProperty("prefetch"); if(s7 == null) s7 = properties.getProperty("rowPrefetch"); if(s7 == null) s7 = properties.getProperty("defaultRowPrefetch"); if(s7 != null && Integer.parseInt(s7) <= 0) s7 = null; String s8 = properties.getProperty("batch"); if(s8 == null) s8 = properties.getProperty("executeBatch"); if(s8 == null) s8 = properties.getProperty("defaultExecuteBatch"); if(s8 != null && Integer.parseInt(s8) <= 0) s8 = null; String s9 = properties.getProperty("remarks"); if(s9 == null) s9 = properties.getProperty("remarksReporting"); String s10 = properties.getProperty("synonyms"); if(s10 == null) s10 = properties.getProperty("includeSynonyms"); String s11 = properties.getProperty("restrictGetTables"); String s12 = properties.getProperty("fixedString"); String s13 = properties.getProperty("dataSizeUnits"); String s14 = properties.getProperty("AccumulateBatchResult"); if(s14 == null) s14 = "true"; Enumeration enumeration; for(enumeration = DriverManager.getDrivers(); enumeration.hasMoreElements();) { Driver driver = (Driver)enumeration.nextElement(); if(driver instanceof OracleDriver) break; } while(enumeration.hasMoreElements()) { Driver driver1 = (Driver)enumeration.nextElement(); if(driver1 instanceof OracleDriver) DriverManager.deregisterDriver(driver1); } /** * s5 为协议如thin * s 整个jdbc url串 * s2 为user用户名 * s3 为密码 * s4 为数据库描述信息 * properties 为其他的参数说明 */ Connection connection = getConnectionInstance(s5, s, s2, s3, s4, properties); if(s7 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDefaultRowPrefetch(Integer.parseInt(s7)); if(s8 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDefaultExecuteBatch(Integer.parseInt(s8)); if(s9 != null) ((oracle.jdbc.driver.OracleConnection)connection).setRemarksReporting(s9.equalsIgnoreCase("true")); if(s10 != null) ((oracle.jdbc.driver.OracleConnection)connection).setIncludeSynonyms(s10.equalsIgnoreCase("true")); if(s11 != null) ((oracle.jdbc.driver.OracleConnection)connection).setRestrictGetTables(s11.equalsIgnoreCase("true")); if(s12 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDefaultFixedString(s12.equalsIgnoreCase("true")); if(s13 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDataSizeUnits(s13); ((oracle.jdbc.driver.OracleConnection)connection).setAccumulateBatchResult(s14.equalsIgnoreCase("true")); hashtable = null; return connection; }
private Connection getConnectionInstance(String s, String s1, String s2, String s3, String s4, Properties properties)
throws SQLException
Object obj = null;
if(s.compareTo("ultra") == 0)
Class aclass[] = null;
Object aobj[] = new Object[6];
aobj[0] = s;
aobj[1] = s1;
aobj[2] = s2;
aobj[3] = s3;
aobj[4] = s4;
aobj[5] = properties;
Class class1 = Class.forName("oracle.jdbc.ultra.client.Driver");
Method amethod[] = class1.getMethods();
for(int i = 0; i < amethod.length; i++)
aclass = amethod[i].getParameterTypes();
Method method = class1.getMethod("getConnection", aclass);
obj = (Connection)method.invoke(class1.newInstance(), aobj);
catch(Exception exception)
} else {
String s5 = null;
if(s.equals("thin") && System.getProperty("oracle.jserver.version") != null)
s5 = "thin-server";
if((s.equals("oci8") || s.equals("oci")) && System.getProperty("oracle.jserver.version") != null)
s5 = "oci-server";
s5 = s;
String s6 = (String)m_driverAccess.get(s5);
if(s6 == null)
DBError.throwSqlException(67, "Invalid protocol " + s);
DBAccess dbaccess = null;
dbaccess = (DBAccess)Class.forName(s6).newInstance();
catch(Exception _ex)
return null;
if(properties.getProperty("is_connection_pooling") == "true")
properties.put("database", s4 != null ? ((Object) (s4)) : "");
obj = new OracleOCIConnection(dbaccess, s1, s2, s3, s4, properties);
} else
obj = new oracle.jdbc.driver.OracleConnection(dbaccess, s1, s2, s3, s4, properties);
return ((Connection) (obj));
如果通常是thin的情况下,代码片段,可以看到s5就是"thin",此时m_driverAccess.get(s5)后,得到s6后,通过Class.forName(s6).newInstance()得到dbAccess的实例,这个dbAccess是非常重要的,虽然它还不是我们想要找的OracleConnection,但是可以看到下面去new OracleConnection的时候,是带上这个实例的,m_driverAccess是什么呢?
private static Properties m_driverAccess; static { m_driverAccess = new Properties(); m_driverAccess.put("thin-server", "oracle.jdbc.thinserver.ServerTTC7Protocol"); m_driverAccess.put("oci-server", "oracle.jdbc.ociserver.ServerOCIDBAccess"); m_driverAccess.put("thin", "oracle.jdbc.ttc7.TTC7Protocol"); m_driverAccess.put("oci8", "oracle.jdbc.oci8.OCIDBAccess"); m_driverAccess.put("oci", "oracle.jdbc.oci8.OCIDBAccess"); m_driverAccess.put("kprb", "oracle.jdbc.kprb.KprbDBAccess");
最后通过new oracle.jdbc.driver.OracleConnection就获取到了相关的Connection对象了
public OracleConnection(DBAccess dbaccess, String s, String s1, String s2, String s3, Properties properties) throws SQLException { //.....各种参数赋值,这里省掉了 if(properties != null) { s4 = (String)properties.get("protocol"); String s6 = properties.getProperty("processEscapes"); if(s6 != null && s6.equalsIgnoreCase("false")) m_process_escapes = false; connectionProperties = (Properties)properties.clone(); connectionProperties.remove("password");//将password在链接参数中去掉,安全措施 } initialize(s, s1, s4, dbaccess, null, null, null, s3); logicalHandle = false; try { needLine(); conversion = db_access.logon(s1, s2, s3, properties);//用户名、密码、database描述、扩展参数 m_warning = DBError.addSqlWarning(m_warning, db_access.getWarnings()); if(properties == null || properties.getProperty("connection_pool") != "connection_pool") { default_row_prefetch = db_access.getDefaultPrefetch(); if(properties != null) { String s5 = properties.getProperty("autoCommit"); if(s5 != null && s5.equalsIgnoreCase("false")) flag = false; } setAutoCommit(flag); db_access.initNls(this); } } catch(IOException ioexception) { DBError.throwSqlException(ioexception); } catch(SQLException sqlexception) { try { db_access.logoff(); } catch(IOException _ex) { } catch(SQLException _ex) { } throw sqlexception; } m_txn_mode = 0; }
private void initialize(String s, String s1, String s2, DBAccess dbaccess, Hashtable hashtable, Map map1, Map map2, String s3) throws SQLException { initClientDataSupport(); statementCache = null; m_stmtClearMetaData = false; database = s3; url = s; if(s1 != null) user = s1.toUpperCase(); else user = s1; db_access = dbaccess; protocol = s2; physicalStatus = true; default_row_prefetch = DEFAULT_ROW_PREFETCH; default_batch = 1; statement_table = new Hashtable(10); if(hashtable != null) descriptorCache = hashtable; else descriptorCache = new Hashtable(10); map = map1; if(map2 != null) m_javaObjectMap = map2; else m_javaObjectMap = new Hashtable(10); closed = false; trans_level = 2; XA_wants_error = false; UsingXA = false; fdo = null; big_endian = null; m_occ = null; m_privData = null; m_clientIdSet = false; m_clientId = null; }
static int DEFAULT_ROW_PREFETCH = 10;所以oracle默认就是每次从服务器端获取10行数据出来,cache在应用端;
/** * s 为用户名 * s1为密码 * s2为数据库描述信息 * */ public synchronized DBConversion logon(String s, String s1, String s2, Properties properties) throws SQLException, IOException { try { if(state > 0) DBError.check_error(428); if(s == null || s1 == null) DBError.check_error(433); if(s.length() == 0 || s1.length() == 0) DBError.check_error(443); if(s2 == null) s2 = "localhost:1521:orcl"; //如果你没有设置连接串,Oracle会自己默认一个,就是一个本地叫orcl的sid,也许oracle认为这个是demo吧 connect(s2, properties);//这个是核心链接类 all7 = new Oall7(MEngine); commoncall = new Ocommoncall(MEngine); opencall = new Oopen(MEngine); close = new Oclose(MEngine); TTCTypeRep _tmp = MEngine.types; describe = (Odscrarr)MEngine.types.newTTIFunObject((byte)1, MEngine); bfileMsg = new v8TTIBfile(MEngine); blobMsg = new v8TTIBlob(MEngine);//建立BLOB通信对象 clobMsg = new v8TTIClob(MEngine);//建立CLOB通信对象 TTCTypeRep _tmp1 = MEngine.types; dty = (TTIdty)MEngine.types.newTTCMsgObject((byte)2, MEngine); dty.marshal();// dty.receive(); //....这里省掉很多代码,是链接创建后续的一些处理,可以继续向下看 //也有挺多东西,但是第一遍不要因为这些代码卡着看整体流程 return MEngine.conv; } catch(SQLException sqlexception) { try { net.disconnect(); } catch(Exception exception) { } state = 0; throw sqlexception; } }
/** * * @param s 数据库地址描述信息 * @param properties * @throws IOException * @throws SQLException */ private void connect(String s, Properties properties) throws IOException, SQLException { if(s == null || properties == null) DBError.check_error(433); net = new NSProtocol(); try { net.connect(s, properties); } catch(NetException netexception) { throw new IOException(netexception.getMessage()); } MEngine = new MAREngine(net); pro = new v8TTIpro(MEngine);//发送一个字节1过去 pro.marshal();//发送字节,获取版本号和字符集 pro.receive();//开启接受 short word0 = pro.getOracleVersion();//获取oracle的版本号码 short word1 = pro.getCharacterSet();//获取oracle字符集 short word2 = TTCConversion.findAccessCharSet(word1, word0); TTCConversion ttcconversion = new TTCConversion(word1, word2, word0, pro.getncharCHARSET()); MEngine.types.setServerConversion(word2 != word1); MEngine.types.setVersion(word0); if(DBConversion.isCharSetMultibyte(word2)) { if(DBConversion.isCharSetMultibyte(pro.getCharacterSet())) MEngine.types.setFlags((byte)1); else MEngine.types.setFlags((byte)2); } else { MEngine.types.setFlags(pro.getFlags()); } MEngine.conv = ttcconversion; }
/** * s为数据库地址描述信息例如: */ public void connect(String s, Properties properties) throws IOException, NetException { if(sAtts.connected) throw new NetException(201); if(s == null) throw new NetException(208); addrRes = new AddrResolution(s, properties);//地址描述信息配置 if(addrRes.connection_revised) {//一般我们不用TNS,在thin模式下 s = addrRes.getTNSAddress(); properties = addrRes.getUp(); } if(addrRes.jndi)//一般用的不是JNDI sAtts.profile = new ClientProfile(properties, addrRes.getJndi()); else sAtts.profile = new ClientProfile(properties);//常规一般走这里,设置一些client属性,大多数我们都是默认 establishConnection(s); Object obj4 = null;
private SessionAtts establishConnection(String s) throws NetException, IOException { sAtts.cOption = addrRes.resolveAndExecute(s);//执行后就能到一个inputStream和outputStream了 sAtts.ntInputStream = sAtts.cOption.nt.getInputStream(); sAtts.ntOutputStream = sAtts.cOption.nt.getOutputStream(); sAtts.setTDU(sAtts.cOption.tdu); sAtts.setSDU(sAtts.cOption.sdu); sAtts.nsOutputStream = new NetOutputStream(sAtts, 255);//255字节大小的package buffer sAtts.nsInputStream = new NetInputStream(sAtts); return sAtts; }
可以看到,到这里我们可以拿到和数据库之间交互的输入流和输出流了;最关键的就是 resolveAndExecute这个方法了
public ConnOption resolveAndExecute(String s) throws NetException, IOException { cs = new ConnStrategy(); if(s.indexOf("//") != -1) resolveUrl(s); else if(s.indexOf(':') != -1 && s.indexOf(')') == -1) resolveSimple(s);//注意这里进去,默认简单的计算,此时判定,有冒号、但是没有括号,其他的方法,是解析不同种类的DB描述符,所以,JDBC的描述符并不是只有一种写法,而是很多 else if(newSyntax) resolveAddrTree(s); else resolveAddr(s); if(!cs.optAvailable()) return cs.execute();//第一次取到时候需要调用这个方法 else return cs.getOption();//后面就直接回去 }
我们看看: resolveSimple这个方法的实现吧(主要是看他的URL怎么解析的):
private void resolveSimple(String s) throws NetException { ConnOption connoption = new ConnOption(); int i = 0; int j = 0; int k = 0; if((i = s.indexOf(':')) == -1 || (j = s.indexOf(':', i + 1)) == -1) throw new NetException(115); if((k = s.indexOf(':', j + 1)) != -1) throw new NetException(115); try { connoption.host = s.substring(0, i); connoption.port = Integer.parseInt(s.substring(i + 1, j)); connoption.addr = "(ADDRESS=(PROTOCOL=tcp)(HOST=" + connoption.host + ")(PORT=" + connoption.port + "))"; connoption.sid = s.substring(j + 1, s.length()); String s1 = "(DESCRIPTION=(CONNECT_DATA=(SID=" + connoption.sid + ")(CID=(PROGRAM=)(HOST=__jdbc__)(USER=)))" + "(ADDRESS=" + "(PROTOCOL=tcp)(HOST=" + connoption.host + ")(PORT=" + connoption.port + ")))"; connoption.protocol = "TCP"; connoption.conn_data = new StringBuffer(s1); cs.addOption(connoption); } catch(NumberFormatException _ex) { throw new NetException(116); } }
可以看到,最创建链接前将会将协议解析为TNS的连接串模式,也就是说,你自己也可以讲这个连接串写到JDBC URL的后面;其次协议也被解析成真正的TCP协议,而不是thin什么的,因为这个时候就涉及到交互了;
if(!cs.optAvailable()) return cs.execute(); else return cs.getOption();
public ConnOption execute() throws NetException { for(int i = 0; i <= cOpts.size() - 1;) try { copt = (ConnOption)cOpts.elementAt(i); copt.connect(); optFound = true; return copt; } catch(IOException _ex) { i++; } throw new NetException(20); }
public void connect() throws IOException { nt = getNT(); nt.connect(); } private NTAdapter getNT() throws NetException { try { nt = new TcpNTAdapter(addr); } catch(NLException _ex) { throw new NetException(501); } return nt; }
public class TcpNTAdapter implements NTAdapter { //构造方法这里就开始解析协议了,以及相关的参数信息,这里可能只关心,协议、HOST、PORT这几个信息 //注意,这里的JDBC URL串是被改完后的,也就是类似TNS中的describe连接串 public TcpNTAdapter(String s) throws NLException { NVNavigator nvnavigator = new NVNavigator(); NVPair nvpair = (new NVFactory()).createNVPair(s); if(nvpair == null) throw new NLException((short)100); NVPair nvpair1 = nvnavigator.findNVPair(nvpair, "PROTOCOL"); NVPair nvpair2 = nvnavigator.findNVPair(nvpair, "HOST"); NVPair nvpair3 = nvnavigator.findNVPair(nvpair, "PORT"); if(nvpair1 == null || nvpair2 == null || nvpair3 == null) throw new NLException((short)100); prot = nvpair1.getAtom(); host = nvpair2.getAtom(); port = Integer.parseInt(nvpair3.getAtom()); if(!prot.equals("TCP") && !prot.equals("tcp")) throw new NLException((short)100); else return; } public void connect() throws IOException { //看到这里的socket是不是很喜悦,因为这个是最基本的socket调用,所以通信 socket = new Socket(host, port); }
接下来说下MySQL的jdbc Driver获取Connection过程,mysql其实也是类似,因为有oracle为前提,所以我们就简单说下mysql就好:
mysql的Driver为:com.mysql.jdbc.Driver 其父类为:com.mysql.jdbc.NonRegisteringDriver,同上,首先来看他的connect方法:
public java.sql.Connection connect(String url, Properties info) throws SQLException { if (url != null) { if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) { return connectLoadBalanced(url, info); } else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) { return connectReplicationConnection(url, info); } } Properties props = null; if ((props = parseURL(url, info)) == null) { return null; } try { Connection newConn = new com.mysql.jdbc.Connection(host(props), port(props), props, database(props), url); return newConn; } catch (SQLException sqlEx) { // Don't wrap SQLExceptions, throw // them un-changed. throw sqlEx; } catch (Exception ex) { throw SQLError.createSQLException(Messages .getString("NonRegisteringDriver.17") //$NON-NLS-1$ + ex.toString() + Messages.getString("NonRegisteringDriver.18"), //$NON-NLS-1$ SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } }
Connection(String hostToConnectTo, int portToConnectTo, Properties info, String databaseToConnectTo, String url) throws SQLException { this.charsetToNumBytesMap = new HashMap(); this.connectionCreationTimeMillis = System.currentTimeMillis(); this.pointOfOrigin = new Throwable(); // Stash away for later, used to clone this connection for Statement.cancel // and Statement.setQueryTimeout(). // this.origHostToConnectTo = hostToConnectTo; this.origPortToConnectTo = portToConnectTo; this.origDatabaseToConnectTo = databaseToConnectTo; try { Blob.class.getMethod("truncate", new Class[] {Long.TYPE}); this.isRunningOnJDK13 = false; } catch (NoSuchMethodException nsme) { this.isRunningOnJDK13 = true; } this.sessionCalendar = new GregorianCalendar(); this.utcCalendar = new GregorianCalendar(); this.utcCalendar.setTimeZone(TimeZone.getTimeZone("GMT")); // // Normally, this code would be in initializeDriverProperties, // but we need to do this as early as possible, so we can start // logging to the 'correct' place as early as possible...this.log // points to 'NullLogger' for every connection at startup to avoid // NPEs and the overhead of checking for NULL at every logging call. // // We will reset this to the configured logger during properties // initialization. // this.log = LogFactory.getLogger(getLogger(), LOGGER_INSTANCE_NAME); // We store this per-connection, due to static synchronization // issues in Java's built-in TimeZone class... this.defaultTimeZone = Util.getDefaultTimeZone(); if ("GMT".equalsIgnoreCase(this.defaultTimeZone.getID())) { this.isClientTzUTC = true; } else { this.isClientTzUTC = false; } this.openStatements = new HashMap(); this.serverVariables = new HashMap(); this.hostList = new ArrayList(); //设置主机 if (hostToConnectTo == null) {//默认主机 this.host = "localhost"; this.hostList.add(this.host); } else if (hostToConnectTo.indexOf(",") != -1) {//多个主机 // multiple hosts separated by commas (failover) StringTokenizer hostTokenizer = new StringTokenizer( hostToConnectTo, ",", false); while (hostTokenizer.hasMoreTokens()) { this.hostList.add(hostTokenizer.nextToken().trim()); } } else {//一个主机,我们通常认为就一个主机 this.host = hostToConnectTo; this.hostList.add(this.host); } this.hostListSize = this.hostList.size(); this.port = portToConnectTo; if (databaseToConnectTo == null) { databaseToConnectTo = ""; } this.database = databaseToConnectTo; this.myURL = url; this.user = info.getProperty(NonRegisteringDriver.USER_PROPERTY_KEY); this.password = info .getProperty(NonRegisteringDriver.PASSWORD_PROPERTY_KEY); if ((this.user == null) || this.user.equals("")) { this.user = ""; } if (this.password == null) { this.password = ""; } this.props = info; initializeDriverProperties(info); try { createNewIO(false); this.dbmd = new DatabaseMetaData(this, this.database); } catch (SQLException ex) { cleanup(ex); // don't clobber SQL exceptions throw ex; } catch (Exception ex) { cleanup(ex); StringBuffer mesg = new StringBuffer(); if (getParanoid()) { mesg.append("Cannot connect to MySQL server on "); mesg.append(this.host); mesg.append(":"); mesg.append(this.port); mesg.append(".\n\n"); mesg.append("Make sure that there is a MySQL server "); mesg.append("running on the machine/port you are trying "); mesg .append("to connect to and that the machine this software is " + "running on "); mesg.append("is able to connect to this host/port " + "(i.e. not firewalled). "); mesg .append("Also make sure that the server has not been started " + "with the --skip-networking "); mesg.append("flag.\n\n"); } else { mesg.append("Unable to connect to database."); } mesg.append("Underlying exception: \n\n"); mesg.append(ex.getClass().getName()); if (!getParanoid()) { mesg.append(Util.stackTraceToString(ex)); } throw SQLError.createSQLException(mesg.toString(), SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE); } }
protected com.mysql.jdbc.MysqlIO createNewIO(boolean isForReconnect) throws SQLException { MysqlIO newIo = null; Properties mergedProps = new Properties(); mergedProps = exposeAsProperties(this.props); long queriesIssuedFailedOverCopy = this.queriesIssuedFailedOver; this.queriesIssuedFailedOver = 0; try { if (!getHighAvailability() && !this.failedOver) {//如果不是高可用,且不是failover(这里指通过连接池自己做,这样会有多个host) boolean connectionGood = false; Exception connectionNotEstablishedBecause = null; int hostIndex = 0; // // TODO: Eventually, when there's enough metadata // on the server to support it, we should come up // with a smarter way to pick what server to connect // to...perhaps even making it 'pluggable' // if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList); } for (; hostIndex < this.hostListSize; hostIndex++) { if (hostIndex == 0) { this.hasTriedMasterFlag = true; } try { String newHostPortPair = (String) this.hostList .get(hostIndex); int newPort = 3306; String[] hostPortPair = NonRegisteringDriver .parseHostPortPair(newHostPortPair); String newHost = hostPortPair[NonRegisteringDriver.HOST_NAME_INDEX]; if (newHost == null || newHost.trim().length() == 0) { newHost = "localhost"; } if (hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) { try { newPort = Integer .parseInt(hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( "Illegal connection port value '" + hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] + "'", SQLError.SQL_STATE_INVALID_CONNECTION_ATTRIBUTE); } } this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout());//获取IO链接 this.io.doHandshake(this.user, this.password, this.database);//登陆 this.connectionId = this.io.getThreadId();//mysql端的线程ID this.isClosed = false; // save state from old connection boolean oldAutoCommit = getAutoCommit(); int oldIsolationLevel = this.isolationLevel; boolean oldReadOnly = isReadOnly(); String oldCatalog = getCatalog(); // Server properties might be different // from previous connection, so initialize // again... initializePropsFromServer(); if (isForReconnect) { // Restore state from old connection setAutoCommit(oldAutoCommit); if (this.hasIsolationLevels) { setTransactionIsolation(oldIsolationLevel); } setCatalog(oldCatalog); } if (hostIndex != 0) { setFailedOverState(); queriesIssuedFailedOverCopy = 0; } else { this.failedOver = false; queriesIssuedFailedOverCopy = 0; if (this.hostListSize > 1) { setReadOnlyInternal(false); } else { setReadOnlyInternal(oldReadOnly); } } connectionGood = true; break; // low-level connection succeeded } catch (Exception EEE) { if (this.io != null) { this.io.forceClose(); } connectionNotEstablishedBecause = EEE; connectionGood = false; if (EEE instanceof SQLException) { SQLException sqlEx = (SQLException)EEE; String sqlState = sqlEx.getSQLState(); // If this isn't a communications failure, it will probably never succeed, so // give up right here and now .... if ((sqlState == null) || !sqlState .equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) { throw sqlEx; } } // Check next host, it might be up... if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList) - 1 /* incremented by for loop next time around */; } else if ((this.hostListSize - 1) == hostIndex) { throw new CommunicationsException(this, (this.io != null) ? this.io .getLastPacketSentTimeMs() : 0, EEE); } } } if (!connectionGood) { // We've really failed! throw SQLError.createSQLException( "Could not create connection to database server due to underlying exception: '" + connectionNotEstablishedBecause + "'." + (getParanoid() ? "" : Util .stackTraceToString(connectionNotEstablishedBecause)), SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } } else { double timeout = getInitialTimeout();//启动延迟,主要是为了保证通信 boolean connectionGood = false; Exception connectionException = null; int hostIndex = 0; if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList); } for (; (hostIndex < this.hostListSize) && !connectionGood; hostIndex++) { if (hostIndex == 0) { this.hasTriedMasterFlag = true; } if (this.preferSlaveDuringFailover && hostIndex == 0) { hostIndex++; } for (int attemptCount = 0; (attemptCount < getMaxReconnects()) && !connectionGood; attemptCount++) { try { if (this.io != null) { this.io.forceClose(); } String newHostPortPair = (String) this.hostList .get(hostIndex); int newPort = 3306; String[] hostPortPair = NonRegisteringDriver .parseHostPortPair(newHostPortPair); String newHost = hostPortPair[NonRegisteringDriver.HOST_NAME_INDEX]; if (newHost == null || newHost.trim().length() == 0) { newHost = "localhost"; } if (hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) { try { newPort = Integer .parseInt(hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( "Illegal connection port value '" + hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] + "'", SQLError.SQL_STATE_INVALID_CONNECTION_ATTRIBUTE); } } this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout()); this.io.doHandshake(this.user, this.password, this.database); pingInternal(false); this.connectionId = this.io.getThreadId(); this.isClosed = false; // save state from old connection boolean oldAutoCommit = getAutoCommit(); int oldIsolationLevel = this.isolationLevel; boolean oldReadOnly = isReadOnly(); String oldCatalog = getCatalog(); // Server properties might be different // from previous connection, so initialize // again... initializePropsFromServer(); if (isForReconnect) {//重新链接,设置老的connection参数 // Restore state from old connection setAutoCommit(oldAutoCommit); if (this.hasIsolationLevels) { setTransactionIsolation(oldIsolationLevel); } setCatalog(oldCatalog); } connectionGood = true; if (hostIndex != 0) { setFailedOverState(); queriesIssuedFailedOverCopy = 0; } else { this.failedOver = false; queriesIssuedFailedOverCopy = 0; if (this.hostListSize > 1) { setReadOnlyInternal(false); } else { setReadOnlyInternal(oldReadOnly); } } break; } catch (Exception EEE) { connectionException = EEE; connectionGood = false; // Check next host, it might be up... if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList) - 1 /* incremented by for loop next time around */; } } if (connectionGood) { break; } if (attemptCount > 0) { try { Thread.sleep((long) timeout * 1000); } catch (InterruptedException IE) { ; } } } // end attempts for a single host } // end iterator for list of hosts if (!connectionGood) { // We've really failed! throw SQLError.createSQLException( "Server connection failure during transaction. Due to underlying exception: '" + connectionException + "'." + (getParanoid() ? "" : Util .stackTraceToString(connectionException)) + "\nAttempted reconnect " + getMaxReconnects() + " times. Giving up.", SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } } if (getParanoid() && !getHighAvailability() && (this.hostListSize <= 1)) { this.password = null; this.user = null; } if (isForReconnect) {//是否为重新链接,如果是,将拷贝原有的statements到这个链接上 // // Retrieve any 'lost' prepared statements if re-connecting // Iterator statementIter = this.openStatements.values() .iterator(); // // We build a list of these outside the map of open statements, // because // in the process of re-preparing, we might end up having to // close // a prepared statement, thus removing it from the map, and // generating // a ConcurrentModificationException // Stack serverPreparedStatements = null; while (statementIter.hasNext()) { Object statementObj = statementIter.next(); if (statementObj instanceof ServerPreparedStatement) { if (serverPreparedStatements == null) { serverPreparedStatements = new Stack(); } serverPreparedStatements.add(statementObj); } } if (serverPreparedStatements != null) { while (!serverPreparedStatements.isEmpty()) { ((ServerPreparedStatement) serverPreparedStatements .pop()).rePrepare(); } } } return newIo; } finally { this.queriesIssuedFailedOver = queriesIssuedFailedOverCopy; } }
this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout()); this.io.doHandshake(this.user, this.password, this.database);
那么首先来看下第一条:new MysqlIO,这个类是:com.mysql.jdbc,MySqlIO,对应的构造方法如下:
public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, com.mysql.jdbc.Connection conn, int socketTimeout) throws IOException, SQLException { this.connection = conn; if (this.connection.getEnablePacketDebug()) { this.packetDebugRingBuffer = new LinkedList(); } this.logSlowQueries = this.connection.getLogSlowQueries(); this.reusablePacket = new Buffer(INITIAL_PACKET_SIZE); this.sendPacket = new Buffer(INITIAL_PACKET_SIZE); this.port = port; this.host = host; this.socketFactoryClassName = socketFactoryClassName; this.socketFactory = createSocketFactory(); this.mysqlConnection = this.socketFactory.connect(this.host, this.port, props); if (socketTimeout != 0) { try {//设置socket超时 this.mysqlConnection.setSoTimeout(socketTimeout); } catch (Exception ex) { /* Ignore if the platform does not support it */ ; } } this.mysqlConnection = this.socketFactory.beforeHandshake(); if (this.connection.getUseReadAheadInput()) { this.mysqlInput = new ReadAheadInputStream(this.mysqlConnection .getInputStream(), 16384, this.connection .getTraceProtocol(), this.connection.getLog()); } else if (this.connection.useUnbufferedInput()) { this.mysqlInput = this.mysqlConnection.getInputStream(); } else { this.mysqlInput = new BufferedInputStream(this.mysqlConnection .getInputStream(), 16384); } this.mysqlOutput = new BufferedOutputStream(this.mysqlConnection .getOutputStream(), 16384); this.isInteractiveClient = this.connection.getInteractiveClient(); this.profileSql = this.connection.getProfileSql(); this.sessionCalendar = Calendar.getInstance(); this.autoGenerateTestcaseScript = this.connection .getAutoGenerateTestcaseScript(); this.needToGrabQueryFromPacket = (this.profileSql || this.logSlowQueries || this.autoGenerateTestcaseScript); if (this.connection.getUseNanosForElapsedTime() && Util.nanoTimeAvailable()) { this.useNanosForElapsedTime = true; this.queryTimingUnits = Messages.getString("Nanoseconds"); } else { this.queryTimingUnits = Messages.getString("Milliseconds"); } if (this.connection.getLogSlowQueries()) { calculateSlowQueryThreshold(); } }
protected Socket mysqlConnection = null; private SocketFactory socketFactory = null;
public String getSocketFactoryClassName() { return this.socketFactoryClassName.getValueAsString(); }
private StringConnectionProperty socketFactoryClassName = new StringConnectionProperty( "socketFactory", StandardSocketFactory.class.getName(), "The name of the class that the driver should use for creating socket connections to the server. This class must implement the interface 'com.mysql.jdbc.SocketFactory' and have public no-args constructor.", "3.0.3", CONNECTION_AND_AUTH_CATEGORY, 4);
public Socket connect(String hostname, int portNumber, Properties props) throws SocketException, IOException { if (props != null) { this.host = hostname; this.port = portNumber; Method connectWithTimeoutMethod = null; Method socketBindMethod = null; Class socketAddressClass = null; String localSocketHostname = props .getProperty("localSocketAddress"); String connectTimeoutStr = props.getProperty("connectTimeout");//超时设置 int connectTimeout = 0; boolean wantsTimeout = (connectTimeoutStr != null && connectTimeoutStr.length() > 0 && !connectTimeoutStr .equals("0")); boolean wantsLocalBind = (localSocketHostname != null && localSocketHostname .length() > 0); boolean needsConfigurationBeforeConnect = socketNeedsConfigurationBeforeConnect(props); if (wantsTimeout || wantsLocalBind || needsConfigurationBeforeConnect) { if (connectTimeoutStr != null) { try { connectTimeout = Integer.parseInt(connectTimeoutStr); } catch (NumberFormatException nfe) { throw new SocketException("Illegal value '" + connectTimeoutStr + "' for connectTimeout"); } } try { // Have to do this with reflection, otherwise older JVMs // croak socketAddressClass = Class .forName("java.net.SocketAddress"); connectWithTimeoutMethod = Socket.class.getMethod( "connect", new Class[] { socketAddressClass, Integer.TYPE }); socketBindMethod = Socket.class.getMethod("bind", new Class[] { socketAddressClass }); } catch (NoClassDefFoundError noClassDefFound) { // ignore, we give a better error below if needed } catch (NoSuchMethodException noSuchMethodEx) { // ignore, we give a better error below if needed } catch (Throwable catchAll) { // ignore, we give a better error below if needed } if (wantsLocalBind && socketBindMethod == null) { throw new SocketException( "Can't specify \"localSocketAddress\" on JVMs older than 1.4"); } if (wantsTimeout && connectWithTimeoutMethod == null) { throw new SocketException( "Can't specify \"connectTimeout\" on JVMs older than 1.4"); } } if (this.host != null) { if (!(wantsLocalBind || wantsTimeout || needsConfigurationBeforeConnect)) { InetAddress[] possibleAddresses = InetAddress .getAllByName(this.host); Throwable caughtWhileConnecting = null; // Need to loop through all possible addresses, in case // someone has IPV6 configured (SuSE, for example...) for (int i = 0; i < possibleAddresses.length; i++) { try { this.rawSocket = new Socket(possibleAddresses[i], port); configureSocket(this.rawSocket, props); break; } catch (Exception ex) { caughtWhileConnecting = ex; } } if (rawSocket == null) { unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } } else { // must explicitly state this due to classloader issues // when running on older JVMs :( try { InetAddress[] possibleAddresses = InetAddress .getAllByName(this.host); Throwable caughtWhileConnecting = null; Object localSockAddr = null; Class inetSocketAddressClass = null; Constructor addrConstructor = null; try { inetSocketAddressClass = Class .forName("java.net.InetSocketAddress"); addrConstructor = inetSocketAddressClass .getConstructor(new Class[] { InetAddress.class, Integer.TYPE }); if (wantsLocalBind) { localSockAddr = addrConstructor .newInstance(new Object[] { InetAddress .getByName(localSocketHostname), new Integer(0 /* * use ephemeral * port */) }); } } catch (Throwable ex) { unwrapExceptionToProperClassAndThrowIt(ex); } // Need to loop through all possible addresses, in case // someone has IPV6 configured (SuSE, for example...) for (int i = 0; i < possibleAddresses.length; i++) { try { this.rawSocket = new Socket();//创建链接 configureSocket(this.rawSocket, props);//做一些扩展配置 Object sockAddr = addrConstructor .newInstance(new Object[] { possibleAddresses[i], new Integer(port) }); // bind to the local port, null is 'ok', it // means // use the ephemeral port socketBindMethod.invoke(rawSocket, new Object[] { localSockAddr }); connectWithTimeoutMethod.invoke(rawSocket, new Object[] { sockAddr, new Integer(connectTimeout) }); break; } catch (Exception ex) { this.rawSocket = null; caughtWhileConnecting = ex; } } if (this.rawSocket == null) { unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } } catch (Throwable t) { unwrapExceptionToProperClassAndThrowIt(t); } } return this.rawSocket; } } throw new SocketException("Unable to create socket"); }
this.io.doHandshake(this.user, this.password , this.database);
void doHandshake(String user, String password, String database) throws SQLException { // Read the first packet this.checkPacketSequence = false; this.readPacketSequence = 0; Buffer buf = readPacket(); // Get the protocol version this.protocolVersion = buf.readByte(); if (this.protocolVersion == -1) {//版本检测如果为-1 try { this.mysqlConnection.close(); } catch (Exception e) { ; // ignore } int errno = 2000; errno = buf.readInt(); String serverErrorMessage = buf.readString(); StringBuffer errorBuf = new StringBuffer(Messages.getString( "MysqlIO.10")); //$NON-NLS-1$ errorBuf.append(serverErrorMessage); errorBuf.append("\""); //$NON-NLS-1$ String xOpen = SQLError.mysqlToSqlState(errno, this.connection.getUseSqlStateCodes()); throw SQLError.createSQLException(SQLError.get(xOpen) + ", " //$NON-NLS-1$ +errorBuf.toString(), xOpen, errno); } this.serverVersion = buf.readString(); // Parse the server version into major/minor/subminor int point = this.serverVersion.indexOf("."); //$NON-NLS-1$ if (point != -1) { try { int n = Integer.parseInt(this.serverVersion.substring(0, point)); this.serverMajorVersion = n; } catch (NumberFormatException NFE1) { ; } String remaining = this.serverVersion.substring(point + 1, this.serverVersion.length()); point = remaining.indexOf("."); //$NON-NLS-1$ if (point != -1) { try { int n = Integer.parseInt(remaining.substring(0, point)); this.serverMinorVersion = n; } catch (NumberFormatException nfe) { ; } remaining = remaining.substring(point + 1, remaining.length()); int pos = 0; while (pos < remaining.length()) { if ((remaining.charAt(pos) < '0') || (remaining.charAt(pos) > '9')) { break; } pos++; } try { int n = Integer.parseInt(remaining.substring(0, pos)); this.serverSubMinorVersion = n; } catch (NumberFormatException nfe) { ; } } } if (versionMeetsMinimum(4, 0, 8)) { this.maxThreeBytes = (256 * 256 * 256) - 1; this.useNewLargePackets = true; } else { this.maxThreeBytes = 255 * 255 * 255; this.useNewLargePackets = false; } this.colDecimalNeedsBump = versionMeetsMinimum(3, 23, 0); this.colDecimalNeedsBump = !versionMeetsMinimum(3, 23, 15); // guess? Not noted in changelog this.useNewUpdateCounts = versionMeetsMinimum(3, 22, 5); threadId = buf.readLong(); //线程ID this.seed = buf.readString(); this.serverCapabilities = 0; if (buf.getPosition() < buf.getBufLength()) { this.serverCapabilities = buf.readInt(); } if (versionMeetsMinimum(4, 1, 1)) { int position = buf.getPosition(); /* New protocol with 16 bytes to describe server characteristics */ this.serverCharsetIndex = buf.readByte() & 0xff; this.serverStatus = buf.readInt(); buf.setPosition(position + 16); String seedPart2 = buf.readString(); StringBuffer newSeed = new StringBuffer(20); newSeed.append(this.seed); newSeed.append(seedPart2); this.seed = newSeed.toString(); } if (((this.serverCapabilities & CLIENT_COMPRESS) != 0) && this.connection.getUseCompression()) { this.clientParam |= CLIENT_COMPRESS; } this.useConnectWithDb = (database != null) && (database.length() > 0) && !this.connection.getCreateDatabaseIfNotExist(); if (this.useConnectWithDb) { this.clientParam |= CLIENT_CONNECT_WITH_DB; } if (((this.serverCapabilities & CLIENT_SSL) == 0) && this.connection.getUseSSL()) { if (this.connection.getRequireSSL()) { this.connection.close(); forceClose(); throw SQLError.createSQLException(Messages.getString("MysqlIO.15"), //$NON-NLS-1$ SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } this.connection.setUseSSL(false);//不采用SSL } if ((this.serverCapabilities & CLIENT_LONG_FLAG) != 0) { // We understand other column flags, as well this.clientParam |= CLIENT_LONG_FLAG; this.hasLongColumnInfo = true; } // return FOUND rows this.clientParam |= CLIENT_FOUND_ROWS; if (this.connection.getAllowLoadLocalInfile()) { this.clientParam |= CLIENT_LOCAL_FILES; } if (this.isInteractiveClient) { this.clientParam |= CLIENT_INTERACTIVE; } // Authenticate if (this.protocolVersion > 9) { this.clientParam |= CLIENT_LONG_PASSWORD; // for long passwords } else { this.clientParam &= ~CLIENT_LONG_PASSWORD; } // // 4.1 has some differences in the protocol // if (versionMeetsMinimum(4, 1, 0)) { if (versionMeetsMinimum(4, 1, 1)) { this.clientParam |= CLIENT_PROTOCOL_41; this.has41NewNewProt = true; // Need this to get server status values this.clientParam |= CLIENT_TRANSACTIONS; // We always allow multiple result sets this.clientParam |= CLIENT_MULTI_RESULTS; // We allow the user to configure whether // or not they want to support multiple queries // (by default, this is disabled). if (this.connection.getAllowMultiQueries()) { this.clientParam |= CLIENT_MULTI_QUERIES; } } else { this.clientParam |= CLIENT_RESERVED; this.has41NewNewProt = false; } this.use41Extensions = true; } int passwordLength = 16; int userLength = (user != null) ? user.length() : 0; int databaseLength = (database != null) ? database.length() : 0; int packLength = ((userLength + passwordLength + databaseLength) * 2) + 7 + HEADER_LENGTH + AUTH_411_OVERHEAD; Buffer packet = null; if (!this.connection.getUseSSL()) { if ((this.serverCapabilities & CLIENT_SECURE_CONNECTION) != 0) { this.clientParam |= CLIENT_SECURE_CONNECTION; //没有使用SSL if (versionMeetsMinimum(4, 1, 1)) { secureAuth411(null, packLength, user, password, database, true); } else { secureAuth(null, packLength, user, password, database, true); } } else { // Passwords can be 16 chars long 这相当于是一个buffer packet = new Buffer(packLength); if ((this.clientParam & CLIENT_RESERVED) != 0) { if (versionMeetsMinimum(4, 1, 1)) { packet.writeLong(this.clientParam);//发送一个0过去,代表要发起一个请求 packet.writeLong(this.maxThreeBytes);//最大字节数 // charset, JDBC will connect as 'latin1', // and use 'SET NAMES' to change to the desired // charset after the connection is established. packet.writeByte((byte) 8); // Set of bytes reserved for future use. packet.writeBytesNoNull(new byte[23]); } else { packet.writeLong(this.clientParam); packet.writeLong(this.maxThreeBytes); } } else { packet.writeInt((int) this.clientParam); packet.writeLongInt(this.maxThreeBytes); } // User/Password data packet.writeString(user, "Cp1252", this.connection); //写入密码 if (this.protocolVersion > 9) { packet.writeString(Util.newCrypt(password, this.seed), "Cp1252", this.connection); } else { packet.writeString(Util.oldCrypt(password, this.seed), "Cp1252", this.connection); } //写入数据库信息 if (this.useConnectWithDb) { packet.writeString(database, "Cp1252", this.connection); } //将packet里面的信息,写入socket发送出去 send(packet, packet.getPosition()); } } else { negotiateSSLConnection(user, password, database, packLength); } // Check for errors, not for 4.1.1 or newer, // as the new auth protocol doesn't work that way // (see secureAuth411() for more details...) if (!versionMeetsMinimum(4, 1, 1)) { checkErrorPacket(); } // // Can't enable compression until after handshake // if (((this.serverCapabilities & CLIENT_COMPRESS) != 0) && this.connection.getUseCompression()) { // The following matches with ZLIB's // compress() this.deflater = new Deflater(); this.useCompression = true; this.mysqlInput = new CompressedInputStream(this.connection, this.mysqlInput); } if (!this.useConnectWithDb) { changeDatabaseTo(database); } }
protected BufferedOutputStream mysqlOutput = null;
最后我们说下,MySql的另一个Driver,是: com.mysql.jdbc.ReplicationDriver,用于集群下,用的时候没在Connection上, setReadOnlytrue|false就进行主备份切换了,他创建的Connection是: com.mysql.jdbc.ReplicationConnection,我们简单看一些代码:
public class ReplicationConnection implements java.sql.Connection, PingTarget { private Connection currentConnection;//当前链接 private Connection masterConnection;//主库链接 private Connection slavesConnection;//备库连接
public synchronized void close() throws SQLException { this.masterConnection.close(); this.slavesConnection.close(); } public synchronized void commit() throws SQLException { this.currentConnection.commit(); } public Statement createStatement() throws SQLException { Statement stmt = this.currentConnection.createStatement(); ((com.mysql.jdbc.Statement) stmt).setPingTarget(this); return stmt; } public synchronized boolean isReadOnly() throws SQLException { return this.currentConnection == this.slavesConnection; } public synchronized void setReadOnly(boolean readOnly) throws SQLException { if (readOnly) { if (currentConnection != slavesConnection) { switchToSlavesConnection(); } } else { if (currentConnection != masterConnection) { switchToMasterConnection(); } } } public synchronized void doPing() throws SQLException { if (this.masterConnection != null) { this.masterConnection.ping();//发送一个14命令号过去,类似于ping命令 } if (this.slavesConnection != null) { this.slavesConnection.ping(); } }