前一篇文章说了一些基本的注册:http://blog.csdn.net/xieyuooo/article/details/8502585
,本文注重讲究一些核心类的一些方法,后面有时间再写一个jdbc级别错误的问题,注意事项:
本文介绍Connection的一些创建,篇幅所限,不能一一将所有代码贴出,可以跟着这种思路去阅读更为细节的源码为好;
上一篇文章,说到了Driver注册的过程,我们接着Connection的创建,这个要深入到各个Driver,我们以Oracle的Driver为核心来说明,进入驱动类:
oracle.jdbc.driver.OracleDriver
看下connect方法(关于url解析过程在上一篇文章中已经说明,这里主要看,调用了一个getConnection方法来获取connection,并设置了相关的参数):
public Connection connect(String s, Properties properties) throws SQLException { if(s.regionMatches(0, "jdbc:default:connection", 0, 23)) { String s1 = "jdbc:oracle:kprb"; int j = s.length(); if(j > 23) s = s1.concat(s.substring(23, s.length())); else s = s1.concat(":"); s1 = null; } int i = oracleAcceptsURL(s); if(i == 1) return null; if(i == 2) { DBError.throwSqlException(67); return null; } Hashtable hashtable = parseUrl(s); if(hashtable == null) return null; String s2 = properties.getProperty("user"); String s3 = properties.getProperty("password"); String s4 = properties.getProperty("database"); if(s4 == null) s4 = properties.getProperty("server"); if(s2 == null) s2 = (String)hashtable.get("user"); s2 = parseLoginOption(s2, properties); if(s3 == null) s3 = (String)hashtable.get("password"); if(s4 == null) s4 = (String)hashtable.get("database"); String s5 = (String)hashtable.get("protocol"); properties.put("protocol", s5); if(s5 == null) { DBError.throwSqlException(40, "Protocol is not specified in URL"); return null; } String s6 = properties.getProperty("dll"); if(s6 == null) properties.put("dll", "ocijdbc9"); String s7 = properties.getProperty("prefetch"); if(s7 == null) s7 = properties.getProperty("rowPrefetch"); if(s7 == null) s7 = properties.getProperty("defaultRowPrefetch"); if(s7 != null && Integer.parseInt(s7) <= 0) s7 = null; String s8 = properties.getProperty("batch"); if(s8 == null) s8 = properties.getProperty("executeBatch"); if(s8 == null) s8 = properties.getProperty("defaultExecuteBatch"); if(s8 != null && Integer.parseInt(s8) <= 0) s8 = null; String s9 = properties.getProperty("remarks"); if(s9 == null) s9 = properties.getProperty("remarksReporting"); String s10 = properties.getProperty("synonyms"); if(s10 == null) s10 = properties.getProperty("includeSynonyms"); String s11 = properties.getProperty("restrictGetTables"); String s12 = properties.getProperty("fixedString"); String s13 = properties.getProperty("dataSizeUnits"); String s14 = properties.getProperty("AccumulateBatchResult"); if(s14 == null) s14 = "true"; Enumeration enumeration; for(enumeration = DriverManager.getDrivers(); enumeration.hasMoreElements();) { Driver driver = (Driver)enumeration.nextElement(); if(driver instanceof OracleDriver) break; } while(enumeration.hasMoreElements()) { Driver driver1 = (Driver)enumeration.nextElement(); if(driver1 instanceof OracleDriver) DriverManager.deregisterDriver(driver1); } /** * s5 为协议如thin * s 整个jdbc url串 * s2 为user用户名 * s3 为密码 * s4 为数据库描述信息 * properties 为其他的参数说明 */ Connection connection = getConnectionInstance(s5, s, s2, s3, s4, properties); if(s7 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDefaultRowPrefetch(Integer.parseInt(s7)); if(s8 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDefaultExecuteBatch(Integer.parseInt(s8)); if(s9 != null) ((oracle.jdbc.driver.OracleConnection)connection).setRemarksReporting(s9.equalsIgnoreCase("true")); if(s10 != null) ((oracle.jdbc.driver.OracleConnection)connection).setIncludeSynonyms(s10.equalsIgnoreCase("true")); if(s11 != null) ((oracle.jdbc.driver.OracleConnection)connection).setRestrictGetTables(s11.equalsIgnoreCase("true")); if(s12 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDefaultFixedString(s12.equalsIgnoreCase("true")); if(s13 != null) ((oracle.jdbc.driver.OracleConnection)connection).setDataSizeUnits(s13); ((oracle.jdbc.driver.OracleConnection)connection).setAccumulateBatchResult(s14.equalsIgnoreCase("true")); hashtable = null; return connection; }
进入方法:
参数列表,请参看上一个方法,这里就标示出s代表的是协议,我们通常就是thin
private Connection getConnectionInstance(String s, String s1, String s2, String s3, String s4, Properties properties)
throws SQLException
{
Object obj = null;
if(s.compareTo("ultra") == 0)
{
try
{
Class aclass[] = null;
Object aobj[] = new Object[6];
aobj[0] = s;
aobj[1] = s1;
aobj[2] = s2;
aobj[3] = s3;
aobj[4] = s4;
aobj[5] = properties;
Class class1 = Class.forName("oracle.jdbc.ultra.client.Driver");
Method amethod[] = class1.getMethods();
for(int i = 0; i < amethod.length; i++)
{
if(!amethod[i].getName().equals("getConnection"))
continue;
aclass = amethod[i].getParameterTypes();
break;
}
Method method = class1.getMethod("getConnection", aclass);
obj = (Connection)method.invoke(class1.newInstance(), aobj);
}
catch(Exception exception)
{
exception.printStackTrace();
DBError.throwSqlException(1);
}
} else {
String s5 = null;
if(s.equals("thin") && System.getProperty("oracle.jserver.version") != null)
s5 = "thin-server";
else
if((s.equals("oci8") || s.equals("oci")) && System.getProperty("oracle.jserver.version") != null)
s5 = "oci-server";
else
s5 = s;
String s6 = (String)m_driverAccess.get(s5);
if(s6 == null)
DBError.throwSqlException(67, "Invalid protocol " + s);
DBAccess dbaccess = null;
try
{
dbaccess = (DBAccess)Class.forName(s6).newInstance();
}
catch(Exception _ex)
{
return null;
}
if(properties.getProperty("is_connection_pooling") == "true")
{
properties.put("database", s4 != null ? ((Object) (s4)) : "");
obj = new OracleOCIConnection(dbaccess, s1, s2, s3, s4, properties);
} else
{
obj = new oracle.jdbc.driver.OracleConnection(dbaccess, s1, s2, s3, s4, properties);
}
}
return ((Connection) (obj));
}
如果通常是thin的情况下,代码片段,可以看到s5就是"thin",此时m_driverAccess.get(s5)后,得到s6后,通过Class.forName(s6).newInstance()得到dbAccess的实例,这个dbAccess是非常重要的,虽然它还不是我们想要找的OracleConnection,但是可以看到下面去new OracleConnection的时候,是带上这个实例的,m_driverAccess是什么呢?
private static Properties m_driverAccess; static { m_driverAccess = new Properties(); m_driverAccess.put("thin-server", "oracle.jdbc.thinserver.ServerTTC7Protocol"); m_driverAccess.put("oci-server", "oracle.jdbc.ociserver.ServerOCIDBAccess"); m_driverAccess.put("thin", "oracle.jdbc.ttc7.TTC7Protocol"); m_driverAccess.put("oci8", "oracle.jdbc.oci8.OCIDBAccess"); m_driverAccess.put("oci", "oracle.jdbc.oci8.OCIDBAccess"); m_driverAccess.put("kprb", "oracle.jdbc.kprb.KprbDBAccess");
在上面的代码片段中可以看到他是一个Properties,也就是一个Map,可以看出,这里是要找到真正的协议处理类,thin的模式下,我们需要处理协议,有专门的类来处理对应的协议,这里就是要实例化对应的类;
最后通过new oracle.jdbc.driver.OracleConnection就获取到了相关的Connection对象了
也许你和我一样,想看看OracleConnection到底是什么,此时应该和数据库端发起了通信请求,是的,我们继续看看里头是啥,记住我们现在已经看到的是OracleConnection、TTC7Protocol、thin、以及连接串的信息,不然看到里面是晕的;
下面的代码我一般只贴出一些片段,因为方法区太长:
首先来看看被调用的构造方法:
public OracleConnection(DBAccess dbaccess, String s, String s1, String s2, String s3, Properties properties) throws SQLException { //.....各种参数赋值,这里省掉了 if(properties != null) { s4 = (String)properties.get("protocol"); String s6 = properties.getProperty("processEscapes"); if(s6 != null && s6.equalsIgnoreCase("false")) m_process_escapes = false; connectionProperties = (Properties)properties.clone(); connectionProperties.remove("password");//将password在链接参数中去掉,安全措施 } initialize(s, s1, s4, dbaccess, null, null, null, s3); logicalHandle = false; try { needLine(); conversion = db_access.logon(s1, s2, s3, properties);//用户名、密码、database描述、扩展参数 m_warning = DBError.addSqlWarning(m_warning, db_access.getWarnings()); if(properties == null || properties.getProperty("connection_pool") != "connection_pool") { default_row_prefetch = db_access.getDefaultPrefetch(); if(properties != null) { String s5 = properties.getProperty("autoCommit"); if(s5 != null && s5.equalsIgnoreCase("false")) flag = false; } setAutoCommit(flag); db_access.initNls(this); } } catch(IOException ioexception) { DBError.throwSqlException(ioexception); } catch(SQLException sqlexception) { try { db_access.logoff(); } catch(IOException _ex) { } catch(SQLException _ex) { } throw sqlexception; } m_txn_mode = 0; }
在看核心方法之前,我们先看下initialize方法里面做的事情:
private void initialize(String s, String s1, String s2, DBAccess dbaccess, Hashtable hashtable, Map map1, Map map2, String s3) throws SQLException { initClientDataSupport(); statementCache = null; m_stmtClearMetaData = false; database = s3; url = s; if(s1 != null) user = s1.toUpperCase(); else user = s1; db_access = dbaccess; protocol = s2; physicalStatus = true; default_row_prefetch = DEFAULT_ROW_PREFETCH; default_batch = 1; statement_table = new Hashtable(10); if(hashtable != null) descriptorCache = hashtable; else descriptorCache = new Hashtable(10); map = map1; if(map2 != null) m_javaObjectMap = map2; else m_javaObjectMap = new Hashtable(10); closed = false; trans_level = 2; XA_wants_error = false; UsingXA = false; fdo = null; big_endian = null; m_occ = null; m_privData = null; m_clientIdSet = false; m_clientId = null; }
这里有个很重要的参数设置是:default_row_prefetch的设置,也就是我们要说的每次从数据库端读取数据的行数,默认值为一个DEFAULT_ROW_PREFETCH,这个值为一个全局常量:
static int DEFAULT_ROW_PREFETCH = 10;所以oracle默认就是每次从服务器端获取10行数据出来,cache在应用端;
解析来我们要看logon方法了,里面会比较复杂或者说有点乱,可以喝口水,再看;
开始我们知道dbAccess的实体类是:TTC7Protocol了,所以logon方法自然就是在这个类或这各类的父类里面;看看源码是:
/** * s 为用户名 * s1为密码 * s2为数据库描述信息 * */ public synchronized DBConversion logon(String s, String s1, String s2, Properties properties) throws SQLException, IOException { try { if(state > 0) DBError.check_error(428); if(s == null || s1 == null) DBError.check_error(433); if(s.length() == 0 || s1.length() == 0) DBError.check_error(443); if(s2 == null) s2 = "localhost:1521:orcl"; //如果你没有设置连接串,Oracle会自己默认一个,就是一个本地叫orcl的sid,也许oracle认为这个是demo吧 connect(s2, properties);//这个是核心链接类 all7 = new Oall7(MEngine); commoncall = new Ocommoncall(MEngine); opencall = new Oopen(MEngine); close = new Oclose(MEngine); TTCTypeRep _tmp = MEngine.types; describe = (Odscrarr)MEngine.types.newTTIFunObject((byte)1, MEngine); bfileMsg = new v8TTIBfile(MEngine); blobMsg = new v8TTIBlob(MEngine);//建立BLOB通信对象 clobMsg = new v8TTIClob(MEngine);//建立CLOB通信对象 TTCTypeRep _tmp1 = MEngine.types; dty = (TTIdty)MEngine.types.newTTCMsgObject((byte)2, MEngine); dty.marshal();// dty.receive(); //....这里省掉很多代码,是链接创建后续的一些处理,可以继续向下看 //也有挺多东西,但是第一遍不要因为这些代码卡着看整体流程 return MEngine.conv; } catch(SQLException sqlexception) { try { net.disconnect(); } catch(Exception exception) { } state = 0; throw sqlexception; } }
我们进入这个类核心的connect方法:
/** * * @param s 数据库地址描述信息 * @param properties * @throws IOException * @throws SQLException */ private void connect(String s, Properties properties) throws IOException, SQLException { if(s == null || properties == null) DBError.check_error(433); net = new NSProtocol(); try { net.connect(s, properties); } catch(NetException netexception) { throw new IOException(netexception.getMessage()); } MEngine = new MAREngine(net); pro = new v8TTIpro(MEngine);//发送一个字节1过去 pro.marshal();//发送字节,获取版本号和字符集 pro.receive();//开启接受 short word0 = pro.getOracleVersion();//获取oracle的版本号码 short word1 = pro.getCharacterSet();//获取oracle字符集 short word2 = TTCConversion.findAccessCharSet(word1, word0); TTCConversion ttcconversion = new TTCConversion(word1, word2, word0, pro.getncharCHARSET()); MEngine.types.setServerConversion(word2 != word1); MEngine.types.setVersion(word0); if(DBConversion.isCharSetMultibyte(word2)) { if(DBConversion.isCharSetMultibyte(pro.getCharacterSet())) MEngine.types.setFlags((byte)1); else MEngine.types.setFlags((byte)2); } else { MEngine.types.setFlags(pro.getFlags()); } MEngine.conv = ttcconversion; }
这里又创建一个NSProtocol类,然后由他的connect方法来创建链接,是有点晕哈,主要是oracle认为不同的协议,有些东西是公用的,所以将这些部分有一个类来处理,当然会设置一些冗余参数而已,也会导致前面判定过的地方再次判定:
/** * s为数据库地址描述信息例如:10.233.133.11:1521:orcl */ public void connect(String s, Properties properties) throws IOException, NetException { if(sAtts.connected) throw new NetException(201); if(s == null) throw new NetException(208); addrRes = new AddrResolution(s, properties);//地址描述信息配置 if(addrRes.connection_revised) {//一般我们不用TNS,在thin模式下 s = addrRes.getTNSAddress(); properties = addrRes.getUp(); } if(addrRes.jndi)//一般用的不是JNDI sAtts.profile = new ClientProfile(properties, addrRes.getJndi()); else sAtts.profile = new ClientProfile(properties);//常规一般走这里,设置一些client属性,大多数我们都是默认 establishConnection(s); Object obj4 = null;
还有调用,,很烦人,不过还是再继续向下看:establishConnection吧,哎,要看就要看到底:
private SessionAtts establishConnection(String s) throws NetException, IOException { sAtts.cOption = addrRes.resolveAndExecute(s);//执行后就能到一个inputStream和outputStream了 sAtts.ntInputStream = sAtts.cOption.nt.getInputStream(); sAtts.ntOutputStream = sAtts.cOption.nt.getOutputStream(); sAtts.setTDU(sAtts.cOption.tdu); sAtts.setSDU(sAtts.cOption.sdu); sAtts.nsOutputStream = new NetOutputStream(sAtts, 255);//255字节大小的package buffer sAtts.nsInputStream = new NetInputStream(sAtts); return sAtts; }
可以看到,到这里我们可以拿到和数据库之间交互的输入流和输出流了;最关键的就是 resolveAndExecute这个方法了
public ConnOption resolveAndExecute(String s) throws NetException, IOException { cs = new ConnStrategy(); if(s.indexOf("//") != -1) resolveUrl(s); else if(s.indexOf(':') != -1 && s.indexOf(')') == -1) resolveSimple(s);//注意这里进去,默认简单的计算,此时判定,有冒号、但是没有括号,其他的方法,是解析不同种类的DB描述符,所以,JDBC的描述符并不是只有一种写法,而是很多 else if(newSyntax) resolveAddrTree(s); else resolveAddr(s); if(!cs.optAvailable()) return cs.execute();//第一次取到时候需要调用这个方法 else return cs.getOption();//后面就直接回去 }
我们看看: resolveSimple这个方法的实现吧(主要是看他的URL怎么解析的):
private void resolveSimple(String s) throws NetException { ConnOption connoption = new ConnOption(); int i = 0; int j = 0; int k = 0; if((i = s.indexOf(':')) == -1 || (j = s.indexOf(':', i + 1)) == -1) throw new NetException(115); if((k = s.indexOf(':', j + 1)) != -1) throw new NetException(115); try { connoption.host = s.substring(0, i); connoption.port = Integer.parseInt(s.substring(i + 1, j)); connoption.addr = "(ADDRESS=(PROTOCOL=tcp)(HOST=" + connoption.host + ")(PORT=" + connoption.port + "))"; connoption.sid = s.substring(j + 1, s.length()); String s1 = "(DESCRIPTION=(CONNECT_DATA=(SID=" + connoption.sid + ")(CID=(PROGRAM=)(HOST=__jdbc__)(USER=)))" + "(ADDRESS=" + "(PROTOCOL=tcp)(HOST=" + connoption.host + ")(PORT=" + connoption.port + ")))"; connoption.protocol = "TCP"; connoption.conn_data = new StringBuffer(s1); cs.addOption(connoption); } catch(NumberFormatException _ex) { throw new NetException(116); } }
可以看到,最创建链接前将会将协议解析为TNS的连接串模式,也就是说,你自己也可以讲这个连接串写到JDBC URL的后面;其次协议也被解析成真正的TCP协议,而不是thin什么的,因为这个时候就涉及到交互了;
好,协议解析好了,还得回到上一个方法中,得到了ConnOption类型的对象后(我们目前只知道这个类型里面存放着一些物理协议的属性描述,还没见到真正的链接),回到上一个方法中就看到:
if(!cs.optAvailable()) return cs.execute(); else return cs.getOption();
如果还没有生效,此时要调用执行命令,cs是什么就是开始包装ConnOption的另一个类,或者说,它里面可以包含多个ConnOption;看看他的execute方法是什么:
public ConnOption execute() throws NetException { for(int i = 0; i <= cOpts.size() - 1;) try { copt = (ConnOption)cOpts.elementAt(i); copt.connect(); optFound = true; return copt; } catch(IOException _ex) { i++; } throw new NetException(20); }
这里就调用了开始的记录下链接串信息的,一个connect方法,是不是很绕,不过的确很复杂,设计自然层次很多,继续向下看吧:
public void connect() throws IOException { nt = getNT(); nt.connect(); } private NTAdapter getNT() throws NetException { try { nt = new TcpNTAdapter(addr); } catch(NLException _ex) { throw new NetException(501); } return nt; }
这里做了两个操作,一个是getNT(),一个调用这个返回值的.connect方法,nt是什么呢,看到下面的getNT方法就是一个处理TcpAdapter的类,前面的所谓协议解析,只是将某种协议,解析为对应的TCP转换,真正处理在这里:
public class TcpNTAdapter implements NTAdapter { //构造方法这里就开始解析协议了,以及相关的参数信息,这里可能只关心,协议、HOST、PORT这几个信息 //注意,这里的JDBC URL串是被改完后的,也就是类似TNS中的describe连接串 public TcpNTAdapter(String s) throws NLException { NVNavigator nvnavigator = new NVNavigator(); NVPair nvpair = (new NVFactory()).createNVPair(s); if(nvpair == null) throw new NLException((short)100); NVPair nvpair1 = nvnavigator.findNVPair(nvpair, "PROTOCOL"); NVPair nvpair2 = nvnavigator.findNVPair(nvpair, "HOST"); NVPair nvpair3 = nvnavigator.findNVPair(nvpair, "PORT"); if(nvpair1 == null || nvpair2 == null || nvpair3 == null) throw new NLException((short)100); prot = nvpair1.getAtom(); host = nvpair2.getAtom(); port = Integer.parseInt(nvpair3.getAtom()); if(!prot.equals("TCP") && !prot.equals("tcp")) throw new NLException((short)100); else return; } public void connect() throws IOException { //看到这里的socket是不是很喜悦,因为这个是最基本的socket调用,所以通信 socket = new Socket(host, port); }
关于交互过程,等到下一篇文章我们说prepareStatement相关再说,因为每一样信息的提取都会比较麻烦:
接下来说下MySQL的jdbc Driver获取Connection过程,mysql其实也是类似,因为有oracle为前提,所以我们就简单说下mysql就好:
mysql的Driver为:com.mysql.jdbc.Driver 其父类为:com.mysql.jdbc.NonRegisteringDriver,同上,首先来看他的connect方法:
public java.sql.Connection connect(String url, Properties info) throws SQLException { if (url != null) { if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) { return connectLoadBalanced(url, info); } else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) { return connectReplicationConnection(url, info); } } Properties props = null; if ((props = parseURL(url, info)) == null) { return null; } try { Connection newConn = new com.mysql.jdbc.Connection(host(props), port(props), props, database(props), url); return newConn; } catch (SQLException sqlEx) { // Don't wrap SQLExceptions, throw // them un-changed. throw sqlEx; } catch (Exception ex) { throw SQLError.createSQLException(Messages .getString("NonRegisteringDriver.17") //$NON-NLS-1$ + ex.toString() + Messages.getString("NonRegisteringDriver.18"), //$NON-NLS-1$ SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } }
关于解析URL部分,已经在上一篇文章中,说明,可以看到这里我们主要关注的是:加粗部分的代码:newcom.mysql.jdbc.Connection这个部分,进去看看:
Connection(String hostToConnectTo, int portToConnectTo, Properties info, String databaseToConnectTo, String url) throws SQLException { this.charsetToNumBytesMap = new HashMap(); this.connectionCreationTimeMillis = System.currentTimeMillis(); this.pointOfOrigin = new Throwable(); // Stash away for later, used to clone this connection for Statement.cancel // and Statement.setQueryTimeout(). // this.origHostToConnectTo = hostToConnectTo; this.origPortToConnectTo = portToConnectTo; this.origDatabaseToConnectTo = databaseToConnectTo; try { Blob.class.getMethod("truncate", new Class[] {Long.TYPE}); this.isRunningOnJDK13 = false; } catch (NoSuchMethodException nsme) { this.isRunningOnJDK13 = true; } this.sessionCalendar = new GregorianCalendar(); this.utcCalendar = new GregorianCalendar(); this.utcCalendar.setTimeZone(TimeZone.getTimeZone("GMT")); // // Normally, this code would be in initializeDriverProperties, // but we need to do this as early as possible, so we can start // logging to the 'correct' place as early as possible...this.log // points to 'NullLogger' for every connection at startup to avoid // NPEs and the overhead of checking for NULL at every logging call. // // We will reset this to the configured logger during properties // initialization. // this.log = LogFactory.getLogger(getLogger(), LOGGER_INSTANCE_NAME); // We store this per-connection, due to static synchronization // issues in Java's built-in TimeZone class... this.defaultTimeZone = Util.getDefaultTimeZone(); if ("GMT".equalsIgnoreCase(this.defaultTimeZone.getID())) { this.isClientTzUTC = true; } else { this.isClientTzUTC = false; } this.openStatements = new HashMap(); this.serverVariables = new HashMap(); this.hostList = new ArrayList(); //设置主机 if (hostToConnectTo == null) {//默认主机 this.host = "localhost"; this.hostList.add(this.host); } else if (hostToConnectTo.indexOf(",") != -1) {//多个主机 // multiple hosts separated by commas (failover) StringTokenizer hostTokenizer = new StringTokenizer( hostToConnectTo, ",", false); while (hostTokenizer.hasMoreTokens()) { this.hostList.add(hostTokenizer.nextToken().trim()); } } else {//一个主机,我们通常认为就一个主机 this.host = hostToConnectTo; this.hostList.add(this.host); } this.hostListSize = this.hostList.size(); this.port = portToConnectTo; if (databaseToConnectTo == null) { databaseToConnectTo = ""; } this.database = databaseToConnectTo; this.myURL = url; this.user = info.getProperty(NonRegisteringDriver.USER_PROPERTY_KEY); this.password = info .getProperty(NonRegisteringDriver.PASSWORD_PROPERTY_KEY); if ((this.user == null) || this.user.equals("")) { this.user = ""; } if (this.password == null) { this.password = ""; } this.props = info; initializeDriverProperties(info); try { createNewIO(false); this.dbmd = new DatabaseMetaData(this, this.database); } catch (SQLException ex) { cleanup(ex); // don't clobber SQL exceptions throw ex; } catch (Exception ex) { cleanup(ex); StringBuffer mesg = new StringBuffer(); if (getParanoid()) { mesg.append("Cannot connect to MySQL server on "); mesg.append(this.host); mesg.append(":"); mesg.append(this.port); mesg.append(".\n\n"); mesg.append("Make sure that there is a MySQL server "); mesg.append("running on the machine/port you are trying "); mesg .append("to connect to and that the machine this software is " + "running on "); mesg.append("is able to connect to this host/port " + "(i.e. not firewalled). "); mesg .append("Also make sure that the server has not been started " + "with the --skip-networking "); mesg.append("flag.\n\n"); } else { mesg.append("Unable to connect to database."); } mesg.append("Underlying exception: \n\n"); mesg.append(ex.getClass().getName()); if (!getParanoid()) { mesg.append(Util.stackTraceToString(ex)); } throw SQLError.createSQLException(mesg.toString(), SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE); } }
可以看到,我们最终要的是createNewIO方法来与数据库通信,其余的都是辅助建立通信的,这个方法很长,注意了,要看加粗部分的代码:
protected com.mysql.jdbc.MysqlIO createNewIO(boolean isForReconnect) throws SQLException { MysqlIO newIo = null; Properties mergedProps = new Properties(); mergedProps = exposeAsProperties(this.props); long queriesIssuedFailedOverCopy = this.queriesIssuedFailedOver; this.queriesIssuedFailedOver = 0; try { if (!getHighAvailability() && !this.failedOver) {//如果不是高可用,且不是failover(这里指通过连接池自己做,这样会有多个host) boolean connectionGood = false; Exception connectionNotEstablishedBecause = null; int hostIndex = 0; // // TODO: Eventually, when there's enough metadata // on the server to support it, we should come up // with a smarter way to pick what server to connect // to...perhaps even making it 'pluggable' // if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList); } for (; hostIndex < this.hostListSize; hostIndex++) { if (hostIndex == 0) { this.hasTriedMasterFlag = true; } try { String newHostPortPair = (String) this.hostList .get(hostIndex); int newPort = 3306; String[] hostPortPair = NonRegisteringDriver .parseHostPortPair(newHostPortPair); String newHost = hostPortPair[NonRegisteringDriver.HOST_NAME_INDEX]; if (newHost == null || newHost.trim().length() == 0) { newHost = "localhost"; } if (hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) { try { newPort = Integer .parseInt(hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( "Illegal connection port value '" + hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] + "'", SQLError.SQL_STATE_INVALID_CONNECTION_ATTRIBUTE); } } this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout());//获取IO链接 this.io.doHandshake(this.user, this.password, this.database);//登陆 this.connectionId = this.io.getThreadId();//mysql端的线程ID this.isClosed = false; // save state from old connection boolean oldAutoCommit = getAutoCommit(); int oldIsolationLevel = this.isolationLevel; boolean oldReadOnly = isReadOnly(); String oldCatalog = getCatalog(); // Server properties might be different // from previous connection, so initialize // again... initializePropsFromServer(); if (isForReconnect) { // Restore state from old connection setAutoCommit(oldAutoCommit); if (this.hasIsolationLevels) { setTransactionIsolation(oldIsolationLevel); } setCatalog(oldCatalog); } if (hostIndex != 0) { setFailedOverState(); queriesIssuedFailedOverCopy = 0; } else { this.failedOver = false; queriesIssuedFailedOverCopy = 0; if (this.hostListSize > 1) { setReadOnlyInternal(false); } else { setReadOnlyInternal(oldReadOnly); } } connectionGood = true; break; // low-level connection succeeded } catch (Exception EEE) { if (this.io != null) { this.io.forceClose(); } connectionNotEstablishedBecause = EEE; connectionGood = false; if (EEE instanceof SQLException) { SQLException sqlEx = (SQLException)EEE; String sqlState = sqlEx.getSQLState(); // If this isn't a communications failure, it will probably never succeed, so // give up right here and now .... if ((sqlState == null) || !sqlState .equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) { throw sqlEx; } } // Check next host, it might be up... if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList) - 1 /* incremented by for loop next time around */; } else if ((this.hostListSize - 1) == hostIndex) { throw new CommunicationsException(this, (this.io != null) ? this.io .getLastPacketSentTimeMs() : 0, EEE); } } } if (!connectionGood) { // We've really failed! throw SQLError.createSQLException( "Could not create connection to database server due to underlying exception: '" + connectionNotEstablishedBecause + "'." + (getParanoid() ? "" : Util .stackTraceToString(connectionNotEstablishedBecause)), SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } } else { double timeout = getInitialTimeout();//启动延迟,主要是为了保证通信 boolean connectionGood = false; Exception connectionException = null; int hostIndex = 0; if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList); } for (; (hostIndex < this.hostListSize) && !connectionGood; hostIndex++) { if (hostIndex == 0) { this.hasTriedMasterFlag = true; } if (this.preferSlaveDuringFailover && hostIndex == 0) { hostIndex++; } for (int attemptCount = 0; (attemptCount < getMaxReconnects()) && !connectionGood; attemptCount++) { try { if (this.io != null) { this.io.forceClose(); } String newHostPortPair = (String) this.hostList .get(hostIndex); int newPort = 3306; String[] hostPortPair = NonRegisteringDriver .parseHostPortPair(newHostPortPair); String newHost = hostPortPair[NonRegisteringDriver.HOST_NAME_INDEX]; if (newHost == null || newHost.trim().length() == 0) { newHost = "localhost"; } if (hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] != null) { try { newPort = Integer .parseInt(hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX]); } catch (NumberFormatException nfe) { throw SQLError.createSQLException( "Illegal connection port value '" + hostPortPair[NonRegisteringDriver.PORT_NUMBER_INDEX] + "'", SQLError.SQL_STATE_INVALID_CONNECTION_ATTRIBUTE); } } this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout()); this.io.doHandshake(this.user, this.password, this.database); pingInternal(false); this.connectionId = this.io.getThreadId(); this.isClosed = false; // save state from old connection boolean oldAutoCommit = getAutoCommit(); int oldIsolationLevel = this.isolationLevel; boolean oldReadOnly = isReadOnly(); String oldCatalog = getCatalog(); // Server properties might be different // from previous connection, so initialize // again... initializePropsFromServer(); if (isForReconnect) {//重新链接,设置老的connection参数 // Restore state from old connection setAutoCommit(oldAutoCommit); if (this.hasIsolationLevels) { setTransactionIsolation(oldIsolationLevel); } setCatalog(oldCatalog); } connectionGood = true; if (hostIndex != 0) { setFailedOverState(); queriesIssuedFailedOverCopy = 0; } else { this.failedOver = false; queriesIssuedFailedOverCopy = 0; if (this.hostListSize > 1) { setReadOnlyInternal(false); } else { setReadOnlyInternal(oldReadOnly); } } break; } catch (Exception EEE) { connectionException = EEE; connectionGood = false; // Check next host, it might be up... if (getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), this.hostList) - 1 /* incremented by for loop next time around */; } } if (connectionGood) { break; } if (attemptCount > 0) { try { Thread.sleep((long) timeout * 1000); } catch (InterruptedException IE) { ; } } } // end attempts for a single host } // end iterator for list of hosts if (!connectionGood) { // We've really failed! throw SQLError.createSQLException( "Server connection failure during transaction. Due to underlying exception: '" + connectionException + "'." + (getParanoid() ? "" : Util .stackTraceToString(connectionException)) + "\nAttempted reconnect " + getMaxReconnects() + " times. Giving up.", SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } } if (getParanoid() && !getHighAvailability() && (this.hostListSize <= 1)) { this.password = null; this.user = null; } if (isForReconnect) {//是否为重新链接,如果是,将拷贝原有的statements到这个链接上 // // Retrieve any 'lost' prepared statements if re-connecting // Iterator statementIter = this.openStatements.values() .iterator(); // // We build a list of these outside the map of open statements, // because // in the process of re-preparing, we might end up having to // close // a prepared statement, thus removing it from the map, and // generating // a ConcurrentModificationException // Stack serverPreparedStatements = null; while (statementIter.hasNext()) { Object statementObj = statementIter.next(); if (statementObj instanceof ServerPreparedStatement) { if (serverPreparedStatements == null) { serverPreparedStatements = new Stack(); } serverPreparedStatements.add(statementObj); } } if (serverPreparedStatements != null) { while (!serverPreparedStatements.isEmpty()) { ((ServerPreparedStatement) serverPreparedStatements .pop()).rePrepare(); } } } return newIo; } finally { this.queriesIssuedFailedOver = queriesIssuedFailedOverCopy; } }
对于我们来讲,其中最重要的,也是最想看到的两条代码就是:
this.io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout()); this.io.doHandshake(this.user, this.password, this.database);
那么首先来看下第一条:new MysqlIO,这个类是:com.mysql.jdbc,MySqlIO,对应的构造方法如下:
public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, com.mysql.jdbc.Connection conn, int socketTimeout) throws IOException, SQLException { this.connection = conn; if (this.connection.getEnablePacketDebug()) { this.packetDebugRingBuffer = new LinkedList(); } this.logSlowQueries = this.connection.getLogSlowQueries(); this.reusablePacket = new Buffer(INITIAL_PACKET_SIZE); this.sendPacket = new Buffer(INITIAL_PACKET_SIZE); this.port = port; this.host = host; this.socketFactoryClassName = socketFactoryClassName; this.socketFactory = createSocketFactory(); this.mysqlConnection = this.socketFactory.connect(this.host, this.port, props); if (socketTimeout != 0) { try {//设置socket超时 this.mysqlConnection.setSoTimeout(socketTimeout); } catch (Exception ex) { /* Ignore if the platform does not support it */ ; } } this.mysqlConnection = this.socketFactory.beforeHandshake(); if (this.connection.getUseReadAheadInput()) { this.mysqlInput = new ReadAheadInputStream(this.mysqlConnection .getInputStream(), 16384, this.connection .getTraceProtocol(), this.connection.getLog()); } else if (this.connection.useUnbufferedInput()) { this.mysqlInput = this.mysqlConnection.getInputStream(); } else { this.mysqlInput = new BufferedInputStream(this.mysqlConnection .getInputStream(), 16384); } this.mysqlOutput = new BufferedOutputStream(this.mysqlConnection .getOutputStream(), 16384); this.isInteractiveClient = this.connection.getInteractiveClient(); this.profileSql = this.connection.getProfileSql(); this.sessionCalendar = Calendar.getInstance(); this.autoGenerateTestcaseScript = this.connection .getAutoGenerateTestcaseScript(); this.needToGrabQueryFromPacket = (this.profileSql || this.logSlowQueries || this.autoGenerateTestcaseScript); if (this.connection.getUseNanosForElapsedTime() && Util.nanoTimeAvailable()) { this.useNanosForElapsedTime = true; this.queryTimingUnits = Messages.getString("Nanoseconds"); } else { this.queryTimingUnits = Messages.getString("Milliseconds"); } if (this.connection.getLogSlowQueries()) { calculateSlowQueryThreshold(); } }
上面标示出来的socketFactory就是用来创建socket的,创建出来的mysqlConnection就是Socket类型,是不是又很熟悉了:
protected Socket mysqlConnection = null; private SocketFactory socketFactory = null;
而SocketFactory是一个接口,实例化是通过createSocketFactory()方法创建的,limit里面有个socketFactoryClassName,也就是要创建的实例的类名,可以再上面的代码中看到这个className是在com.mysql.jdbc.Connection类里面从newMySqlIO时候传入的,MySqlIO中getSocketFactoryClassName方法来获取类名的,可以看看这个类中的对应方法,发现在com.mysql.jdbc.Connection的父类:com.mysql.jdbc.ConnectionProperties中如下定义:
public String getSocketFactoryClassName() { return this.socketFactoryClassName.getValueAsString(); }
发现是一个属性:
private StringConnectionProperty socketFactoryClassName = new StringConnectionProperty( "socketFactory", StandardSocketFactory.class.getName(), "The name of the class that the driver should use for creating socket connections to the server. This class must implement the interface 'com.mysql.jdbc.SocketFactory' and have public no-args constructor.", "3.0.3", CONNECTION_AND_AUTH_CATEGORY, 4);
继续跟踪你可以发现,这个函数通过getValueAsString可以得到的是StandardSocketFactory.class.getName()这个返回值,所以是通过类:com.mysql.jdbc.StandardSocketFactory来实现的,
找到他的connect方法:
public Socket connect(String hostname, int portNumber, Properties props) throws SocketException, IOException { if (props != null) { this.host = hostname; this.port = portNumber; Method connectWithTimeoutMethod = null; Method socketBindMethod = null; Class socketAddressClass = null; String localSocketHostname = props .getProperty("localSocketAddress"); String connectTimeoutStr = props.getProperty("connectTimeout");//超时设置 int connectTimeout = 0; boolean wantsTimeout = (connectTimeoutStr != null && connectTimeoutStr.length() > 0 && !connectTimeoutStr .equals("0")); boolean wantsLocalBind = (localSocketHostname != null && localSocketHostname .length() > 0); boolean needsConfigurationBeforeConnect = socketNeedsConfigurationBeforeConnect(props); if (wantsTimeout || wantsLocalBind || needsConfigurationBeforeConnect) { if (connectTimeoutStr != null) { try { connectTimeout = Integer.parseInt(connectTimeoutStr); } catch (NumberFormatException nfe) { throw new SocketException("Illegal value '" + connectTimeoutStr + "' for connectTimeout"); } } try { // Have to do this with reflection, otherwise older JVMs // croak socketAddressClass = Class .forName("java.net.SocketAddress"); connectWithTimeoutMethod = Socket.class.getMethod( "connect", new Class[] { socketAddressClass, Integer.TYPE }); socketBindMethod = Socket.class.getMethod("bind", new Class[] { socketAddressClass }); } catch (NoClassDefFoundError noClassDefFound) { // ignore, we give a better error below if needed } catch (NoSuchMethodException noSuchMethodEx) { // ignore, we give a better error below if needed } catch (Throwable catchAll) { // ignore, we give a better error below if needed } if (wantsLocalBind && socketBindMethod == null) { throw new SocketException( "Can't specify \"localSocketAddress\" on JVMs older than 1.4"); } if (wantsTimeout && connectWithTimeoutMethod == null) { throw new SocketException( "Can't specify \"connectTimeout\" on JVMs older than 1.4"); } } if (this.host != null) { if (!(wantsLocalBind || wantsTimeout || needsConfigurationBeforeConnect)) { InetAddress[] possibleAddresses = InetAddress .getAllByName(this.host); Throwable caughtWhileConnecting = null; // Need to loop through all possible addresses, in case // someone has IPV6 configured (SuSE, for example...) for (int i = 0; i < possibleAddresses.length; i++) { try { this.rawSocket = new Socket(possibleAddresses[i], port); configureSocket(this.rawSocket, props); break; } catch (Exception ex) { caughtWhileConnecting = ex; } } if (rawSocket == null) { unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } } else { // must explicitly state this due to classloader issues // when running on older JVMs :( try { InetAddress[] possibleAddresses = InetAddress .getAllByName(this.host); Throwable caughtWhileConnecting = null; Object localSockAddr = null; Class inetSocketAddressClass = null; Constructor addrConstructor = null; try { inetSocketAddressClass = Class .forName("java.net.InetSocketAddress"); addrConstructor = inetSocketAddressClass .getConstructor(new Class[] { InetAddress.class, Integer.TYPE }); if (wantsLocalBind) { localSockAddr = addrConstructor .newInstance(new Object[] { InetAddress .getByName(localSocketHostname), new Integer(0 /* * use ephemeral * port */) }); } } catch (Throwable ex) { unwrapExceptionToProperClassAndThrowIt(ex); } // Need to loop through all possible addresses, in case // someone has IPV6 configured (SuSE, for example...) for (int i = 0; i < possibleAddresses.length; i++) { try { this.rawSocket = new Socket();//创建链接 configureSocket(this.rawSocket, props);//做一些扩展配置 Object sockAddr = addrConstructor .newInstance(new Object[] { possibleAddresses[i], new Integer(port) }); // bind to the local port, null is 'ok', it // means // use the ephemeral port socketBindMethod.invoke(rawSocket, new Object[] { localSockAddr }); connectWithTimeoutMethod.invoke(rawSocket, new Object[] { sockAddr, new Integer(connectTimeout) }); break; } catch (Exception ex) { this.rawSocket = null; caughtWhileConnecting = ex; } } if (this.rawSocket == null) { unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } } catch (Throwable t) { unwrapExceptionToProperClassAndThrowIt(t); } } return this.rawSocket; } } throw new SocketException("Unable to create socket"); }
这里Socket就创建了,socket里面进一步的connectionTimeout以及configureSocket里面设置了tcpNoDelay、keepAlive、sendBufferSize、ReceiveBufferSize等信息;
最后再来看下在Conection类中方法createNewIO:当中获取到MySqlIO后,要进行用户名密码校验了:
this.io.doHandshake(this.user, this.password , this.database);
void doHandshake(String user, String password, String database) throws SQLException { // Read the first packet this.checkPacketSequence = false; this.readPacketSequence = 0; Buffer buf = readPacket(); // Get the protocol version this.protocolVersion = buf.readByte(); if (this.protocolVersion == -1) {//版本检测如果为-1 try { this.mysqlConnection.close(); } catch (Exception e) { ; // ignore } int errno = 2000; errno = buf.readInt(); String serverErrorMessage = buf.readString(); StringBuffer errorBuf = new StringBuffer(Messages.getString( "MysqlIO.10")); //$NON-NLS-1$ errorBuf.append(serverErrorMessage); errorBuf.append("\""); //$NON-NLS-1$ String xOpen = SQLError.mysqlToSqlState(errno, this.connection.getUseSqlStateCodes()); throw SQLError.createSQLException(SQLError.get(xOpen) + ", " //$NON-NLS-1$ +errorBuf.toString(), xOpen, errno); } this.serverVersion = buf.readString(); // Parse the server version into major/minor/subminor int point = this.serverVersion.indexOf("."); //$NON-NLS-1$ if (point != -1) { try { int n = Integer.parseInt(this.serverVersion.substring(0, point)); this.serverMajorVersion = n; } catch (NumberFormatException NFE1) { ; } String remaining = this.serverVersion.substring(point + 1, this.serverVersion.length()); point = remaining.indexOf("."); //$NON-NLS-1$ if (point != -1) { try { int n = Integer.parseInt(remaining.substring(0, point)); this.serverMinorVersion = n; } catch (NumberFormatException nfe) { ; } remaining = remaining.substring(point + 1, remaining.length()); int pos = 0; while (pos < remaining.length()) { if ((remaining.charAt(pos) < '0') || (remaining.charAt(pos) > '9')) { break; } pos++; } try { int n = Integer.parseInt(remaining.substring(0, pos)); this.serverSubMinorVersion = n; } catch (NumberFormatException nfe) { ; } } } if (versionMeetsMinimum(4, 0, 8)) { this.maxThreeBytes = (256 * 256 * 256) - 1; this.useNewLargePackets = true; } else { this.maxThreeBytes = 255 * 255 * 255; this.useNewLargePackets = false; } this.colDecimalNeedsBump = versionMeetsMinimum(3, 23, 0); this.colDecimalNeedsBump = !versionMeetsMinimum(3, 23, 15); // guess? Not noted in changelog this.useNewUpdateCounts = versionMeetsMinimum(3, 22, 5); threadId = buf.readLong(); //线程ID this.seed = buf.readString(); this.serverCapabilities = 0; if (buf.getPosition() < buf.getBufLength()) { this.serverCapabilities = buf.readInt(); } if (versionMeetsMinimum(4, 1, 1)) { int position = buf.getPosition(); /* New protocol with 16 bytes to describe server characteristics */ this.serverCharsetIndex = buf.readByte() & 0xff; this.serverStatus = buf.readInt(); buf.setPosition(position + 16); String seedPart2 = buf.readString(); StringBuffer newSeed = new StringBuffer(20); newSeed.append(this.seed); newSeed.append(seedPart2); this.seed = newSeed.toString(); } if (((this.serverCapabilities & CLIENT_COMPRESS) != 0) && this.connection.getUseCompression()) { this.clientParam |= CLIENT_COMPRESS; } this.useConnectWithDb = (database != null) && (database.length() > 0) && !this.connection.getCreateDatabaseIfNotExist(); if (this.useConnectWithDb) { this.clientParam |= CLIENT_CONNECT_WITH_DB; } if (((this.serverCapabilities & CLIENT_SSL) == 0) && this.connection.getUseSSL()) { if (this.connection.getRequireSSL()) { this.connection.close(); forceClose(); throw SQLError.createSQLException(Messages.getString("MysqlIO.15"), //$NON-NLS-1$ SQLError.SQL_STATE_UNABLE_TO_CONNECT_TO_DATASOURCE); } this.connection.setUseSSL(false);//不采用SSL } if ((this.serverCapabilities & CLIENT_LONG_FLAG) != 0) { // We understand other column flags, as well this.clientParam |= CLIENT_LONG_FLAG; this.hasLongColumnInfo = true; } // return FOUND rows this.clientParam |= CLIENT_FOUND_ROWS; if (this.connection.getAllowLoadLocalInfile()) { this.clientParam |= CLIENT_LOCAL_FILES; } if (this.isInteractiveClient) { this.clientParam |= CLIENT_INTERACTIVE; } // Authenticate if (this.protocolVersion > 9) { this.clientParam |= CLIENT_LONG_PASSWORD; // for long passwords } else { this.clientParam &= ~CLIENT_LONG_PASSWORD; } // // 4.1 has some differences in the protocol // if (versionMeetsMinimum(4, 1, 0)) { if (versionMeetsMinimum(4, 1, 1)) { this.clientParam |= CLIENT_PROTOCOL_41; this.has41NewNewProt = true; // Need this to get server status values this.clientParam |= CLIENT_TRANSACTIONS; // We always allow multiple result sets this.clientParam |= CLIENT_MULTI_RESULTS; // We allow the user to configure whether // or not they want to support multiple queries // (by default, this is disabled). if (this.connection.getAllowMultiQueries()) { this.clientParam |= CLIENT_MULTI_QUERIES; } } else { this.clientParam |= CLIENT_RESERVED; this.has41NewNewProt = false; } this.use41Extensions = true; } int passwordLength = 16; int userLength = (user != null) ? user.length() : 0; int databaseLength = (database != null) ? database.length() : 0; int packLength = ((userLength + passwordLength + databaseLength) * 2) + 7 + HEADER_LENGTH + AUTH_411_OVERHEAD; Buffer packet = null; if (!this.connection.getUseSSL()) { if ((this.serverCapabilities & CLIENT_SECURE_CONNECTION) != 0) { this.clientParam |= CLIENT_SECURE_CONNECTION; //没有使用SSL if (versionMeetsMinimum(4, 1, 1)) { secureAuth411(null, packLength, user, password, database, true); } else { secureAuth(null, packLength, user, password, database, true); } } else { // Passwords can be 16 chars long 这相当于是一个buffer packet = new Buffer(packLength); if ((this.clientParam & CLIENT_RESERVED) != 0) { if (versionMeetsMinimum(4, 1, 1)) { packet.writeLong(this.clientParam);//发送一个0过去,代表要发起一个请求 packet.writeLong(this.maxThreeBytes);//最大字节数 // charset, JDBC will connect as 'latin1', // and use 'SET NAMES' to change to the desired // charset after the connection is established. packet.writeByte((byte) 8); // Set of bytes reserved for future use. packet.writeBytesNoNull(new byte[23]); } else { packet.writeLong(this.clientParam); packet.writeLong(this.maxThreeBytes); } } else { packet.writeInt((int) this.clientParam); packet.writeLongInt(this.maxThreeBytes); } // User/Password data packet.writeString(user, "Cp1252", this.connection); //写入密码 if (this.protocolVersion > 9) { packet.writeString(Util.newCrypt(password, this.seed), "Cp1252", this.connection); } else { packet.writeString(Util.oldCrypt(password, this.seed), "Cp1252", this.connection); } //写入数据库信息 if (this.useConnectWithDb) { packet.writeString(database, "Cp1252", this.connection); } //将packet里面的信息,写入socket发送出去 send(packet, packet.getPosition()); } } else { negotiateSSLConnection(user, password, database, packLength); } // Check for errors, not for 4.1.1 or newer, // as the new auth protocol doesn't work that way // (see secureAuth411() for more details...) if (!versionMeetsMinimum(4, 1, 1)) { checkErrorPacket(); } // // Can't enable compression until after handshake // if (((this.serverCapabilities & CLIENT_COMPRESS) != 0) && this.connection.getUseCompression()) { // The following matches with ZLIB's // compress() this.deflater = new Deflater(); this.useCompression = true; this.mysqlInput = new CompressedInputStream(this.connection, this.mysqlInput); } if (!this.useConnectWithDb) { changeDatabaseTo(database); } }
最后看到的send方法,就不说代码了,你跟踪进去看看,就知道,他是使用了开始创建好的MySqlIO中的
protected BufferedOutputStream mysqlOutput = null;
这个属性,将数据out.write出去,然后做了一次flush,然后里面通过checkErrorPacket方法来读取MySQL返回的数据,如果返回的第一个字符是0xff,则认为是错误的信息,此时判定错误的内容。
最后我们说下,MySql的另一个Driver,是: com.mysql.jdbc.ReplicationDriver,用于集群下,用的时候没在Connection上, setReadOnlytrue|false就进行主备份切换了,他创建的Connection是: com.mysql.jdbc.ReplicationConnection,我们简单看一些代码:
public class ReplicationConnection implements java.sql.Connection, PingTarget { private Connection currentConnection;//当前链接 private Connection masterConnection;//主库链接 private Connection slavesConnection;//备库连接
从这就可以看出,他是封装了两个connection,来回切换,currentConnection为当前使用的那个connection,再随便抽调一些方法出来看看:
public synchronized void close() throws SQLException { this.masterConnection.close(); this.slavesConnection.close(); } public synchronized void commit() throws SQLException { this.currentConnection.commit(); } public Statement createStatement() throws SQLException { Statement stmt = this.currentConnection.createStatement(); ((com.mysql.jdbc.Statement) stmt).setPingTarget(this); return stmt; } public synchronized boolean isReadOnly() throws SQLException { return this.currentConnection == this.slavesConnection; } public synchronized void setReadOnly(boolean readOnly) throws SQLException { if (readOnly) { if (currentConnection != slavesConnection) { switchToSlavesConnection(); } } else { if (currentConnection != masterConnection) { switchToMasterConnection(); } } } public synchronized void doPing() throws SQLException { if (this.masterConnection != null) { this.masterConnection.ping();//发送一个14命令号过去,类似于ping命令 } if (this.slavesConnection != null) { this.slavesConnection.ping(); } }
或许你看到这个会有点头晕,不过没事,多看几次就明白了,我这里出来也是个大概,理清楚思路,在处理问题的时候,可以快速定位到源码,然后找到基本依据。