本来想用ORACLE的外部表导入XML文件的数据,但是太麻烦,而且XMLTYPE对于大数据
量似乎性能很差。GOOLE上找到一个上有一个分段XML入库的列子
( http://forums.oracle.com/forums/thread.jspa?threadID=461009&tstart=0),
即把XML文件,分解成N个DOM树写入数据库,作都思路很好,(就是不太爱写注释)。
我在他的基础上写了一个解析SAX文件,把数据入库(不是DOM树)的方法,500M的数据在我的PC上10多分钟。
整个思路是这样:一个线程负责解析XML文件,当然采用SAX方式,DOM方式会让内存吃不消。
另外一个线程池负责向数据库写入。解析线程把解析出来的数据暂存在workQueue里,写入线程不断读取,
并写入数据库。当workQueue队列里的数据超过一定阀值,则解析线程等待,这是因为如果都解析出来,封装成
对象也很大,可能导致内存不足。基本上类似于生产者与消费者模式。
刚写完,测试了几例,还算稳定,欢迎大家补充完善。
OK,先贴代码。
量似乎性能很差。GOOLE上找到一个上有一个分段XML入库的列子
( http://forums.oracle.com/forums/thread.jspa?threadID=461009&tstart=0),
即把XML文件,分解成N个DOM树写入数据库,作都思路很好,(就是不太爱写注释)。
我在他的基础上写了一个解析SAX文件,把数据入库(不是DOM树)的方法,500M的数据在我的PC上10多分钟。
整个思路是这样:一个线程负责解析XML文件,当然采用SAX方式,DOM方式会让内存吃不消。
另外一个线程池负责向数据库写入。解析线程把解析出来的数据暂存在workQueue里,写入线程不断读取,
并写入数据库。当workQueue队列里的数据超过一定阀值,则解析线程等待,这是因为如果都解析出来,封装成
对象也很大,可能导致内存不足。基本上类似于生产者与消费者模式。
刚写完,测试了几例,还算稳定,欢迎大家补充完善。
OK,先贴代码。
SourceProcessor.java:
/*XML文件解析类 我用的是ORACLE的解析包xmlparserv2.jar
* SAX在解析的过程中,在一个元素结尾与另一个元素开始处,解析器会把他当成一个文本结点。
* characters方法会多出很多空格,最后用一个笨的方法解决了他,还请批评指正。
*/
public class SourceProcessor extends Thread implements ContentHandler {
private List<List<String>> cacheList = new ArrayList<List<String>>();
private SaxProcessor saxProcessor;
private String targetFilename = null;
private boolean recordStart = false;
private boolean useable = false;
private List<String> curDatas = new ArrayList<String>();
private StringBuffer curData = new StringBuffer();
public boolean isRecordStart() {
return recordStart;
}
public void setRecordStart( boolean recordStart) {
this.recordStart = recordStart;
}
public SourceProcessor(String threadName) {
super(threadName);
}
public String getTargetFilename() {
return targetFilename;
}
public void setTargetFilename(String targetFilename) {
this.targetFilename = targetFilename;
}
public void characters( char[] ch, int start, int length)
throws SAXException {
if ( this.useable == true) {
curData.append( new String(ch, start, length));
}
}
public void endDocument() throws SAXException {
if (cacheList.size() > 0) {
this.saxProcessor.addToQueue(cacheList);
}
this.saxProcessor.setParsingComplete();
System.out.println( "over");
}
public void endElement(String uri, String localName, String name)
throws SAXException {
/* 一条记录完成 */
if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) {
cacheList.add(curDatas);
this.curDatas = new ArrayList<String>();
if (cacheList.size() >= 100) {
this.saxProcessor.addToQueue(cacheList);
cacheList = new ArrayList<List<String>>();
}
} else if ( this.recordStart == true) {
curDatas.add(curData.toString().trim());
curData = new StringBuffer();
this.useable = false;
}
}
public void endPrefixMapping(String prefix) throws SAXException {
}
public void ignorableWhitespace( char[] ch, int start, int length)
throws SAXException {
}
public void processingInstruction(String target, String data)
throws SAXException {
}
public void setDocumentLocator(Locator locator) {
}
public void skippedEntity(String name) throws SAXException {
// TODO Auto-generated method stub
}
public void startDocument() throws SAXException {
}
public void startElement(String uri, String localName, String name,
Attributes atts) throws SAXException {
if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) {
this.recordStart = true;
} else if (recordStart == true) {
/* 可以收集 */
this.useable = true;
}
}
public void startPrefixMapping(String prefix, String uri)
throws SAXException {
}
public SaxProcessor getSaxProcessor() {
return saxProcessor;
}
public void setSaxProcessor(SaxProcessor saxProcessor) {
this.saxProcessor = saxProcessor;
}
public void run() {
try {
SAXParser parser = new SAXParser();
parser.setAttribute(SAXParser.STANDALONE, Boolean.valueOf( true));
parser.setValidationMode(SAXParser.NONVALIDATING);
parser.setContentHandler( this);
this.saxProcessor.setParserActive();
parser.parse( new FileInputStream( this.targetFilename));
} catch (ProcessingCompleteException pce) {
pce.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean isUseable() {
return useable;
}
public void setUseable( boolean useable) {
this.useable = useable;
}
}
* SAX在解析的过程中,在一个元素结尾与另一个元素开始处,解析器会把他当成一个文本结点。
* characters方法会多出很多空格,最后用一个笨的方法解决了他,还请批评指正。
*/
public class SourceProcessor extends Thread implements ContentHandler {
private List<List<String>> cacheList = new ArrayList<List<String>>();
private SaxProcessor saxProcessor;
private String targetFilename = null;
private boolean recordStart = false;
private boolean useable = false;
private List<String> curDatas = new ArrayList<String>();
private StringBuffer curData = new StringBuffer();
public boolean isRecordStart() {
return recordStart;
}
public void setRecordStart( boolean recordStart) {
this.recordStart = recordStart;
}
public SourceProcessor(String threadName) {
super(threadName);
}
public String getTargetFilename() {
return targetFilename;
}
public void setTargetFilename(String targetFilename) {
this.targetFilename = targetFilename;
}
public void characters( char[] ch, int start, int length)
throws SAXException {
if ( this.useable == true) {
curData.append( new String(ch, start, length));
}
}
public void endDocument() throws SAXException {
if (cacheList.size() > 0) {
this.saxProcessor.addToQueue(cacheList);
}
this.saxProcessor.setParsingComplete();
System.out.println( "over");
}
public void endElement(String uri, String localName, String name)
throws SAXException {
/* 一条记录完成 */
if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) {
cacheList.add(curDatas);
this.curDatas = new ArrayList<String>();
if (cacheList.size() >= 100) {
this.saxProcessor.addToQueue(cacheList);
cacheList = new ArrayList<List<String>>();
}
} else if ( this.recordStart == true) {
curDatas.add(curData.toString().trim());
curData = new StringBuffer();
this.useable = false;
}
}
public void endPrefixMapping(String prefix) throws SAXException {
}
public void ignorableWhitespace( char[] ch, int start, int length)
throws SAXException {
}
public void processingInstruction(String target, String data)
throws SAXException {
}
public void setDocumentLocator(Locator locator) {
}
public void skippedEntity(String name) throws SAXException {
// TODO Auto-generated method stub
}
public void startDocument() throws SAXException {
}
public void startElement(String uri, String localName, String name,
Attributes atts) throws SAXException {
if (localName.equals(this.saxProcessor.getSetting("Element", "Record"))) {
this.recordStart = true;
} else if (recordStart == true) {
/* 可以收集 */
this.useable = true;
}
}
public void startPrefixMapping(String prefix, String uri)
throws SAXException {
}
public SaxProcessor getSaxProcessor() {
return saxProcessor;
}
public void setSaxProcessor(SaxProcessor saxProcessor) {
this.saxProcessor = saxProcessor;
}
public void run() {
try {
SAXParser parser = new SAXParser();
parser.setAttribute(SAXParser.STANDALONE, Boolean.valueOf( true));
parser.setValidationMode(SAXParser.NONVALIDATING);
parser.setContentHandler( this);
this.saxProcessor.setParserActive();
parser.parse( new FileInputStream( this.targetFilename));
} catch (ProcessingCompleteException pce) {
pce.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean isUseable() {
return useable;
}
public void setUseable( boolean useable) {
this.useable = useable;
}
}
/*数据库写入线程类,记录数达到commitCharge条则提交,最后提交剩余记录*/
public class DatabaseWriter extends Thread {
private Connection connection;
private SaxProcessor processor;
private String threadName;
private int commitCharge;
private int recordCount = 0;
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public SaxProcessor getProcessor() {
return processor;
}
public void setProcessor(SaxProcessor processor) {
this.processor = processor;
}
DatabaseWriter(SaxProcessor processor, String threadName,
Connection connection) {
this.connection = connection;
this.processor = processor;
this.threadName = threadName;
}
public void setParameters( int commitCharge) {
this.commitCharge = commitCharge;
}
public void run() {
PreparedStatement stat = null;
try {
connection.setAutoCommit( false);
stat = connection
.prepareStatement( "insert /*+ append*/ into testMT nologging values(?,?,?,?)");
} catch (SQLException e1) {
e1.printStackTrace();
}
while (! this.processor.processingComplete()) {
List datas = this.processor.getNextData( this.threadName);
System.out.println( this.threadName + " run!!");
if (datas != null && stat != null) {
for ( int i = 0; i < datas.size(); i++) {
try {
List record = (List) datas.get(i);
stat.setString(1, (String) record.get(0));
stat.setString(2, (String) record.get(1));
stat.setString(3, (String) record.get(2));
stat.setString(4, (String) record.get(3));
stat.execute();
this.recordCount++;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
if (recordCount >= commitCharge) {
try {
connection.commit();
public class DatabaseWriter extends Thread {
private Connection connection;
private SaxProcessor processor;
private String threadName;
private int commitCharge;
private int recordCount = 0;
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public SaxProcessor getProcessor() {
return processor;
}
public void setProcessor(SaxProcessor processor) {
this.processor = processor;
}
DatabaseWriter(SaxProcessor processor, String threadName,
Connection connection) {
this.connection = connection;
this.processor = processor;
this.threadName = threadName;
}
public void setParameters( int commitCharge) {
this.commitCharge = commitCharge;
}
public void run() {
PreparedStatement stat = null;
try {
connection.setAutoCommit( false);
stat = connection
.prepareStatement( "insert /*+ append*/ into testMT nologging values(?,?,?,?)");
} catch (SQLException e1) {
e1.printStackTrace();
}
while (! this.processor.processingComplete()) {
List datas = this.processor.getNextData( this.threadName);
System.out.println( this.threadName + " run!!");
if (datas != null && stat != null) {
for ( int i = 0; i < datas.size(); i++) {
try {
List record = (List) datas.get(i);
stat.setString(1, (String) record.get(0));
stat.setString(2, (String) record.get(1));
stat.setString(3, (String) record.get(2));
stat.setString(4, (String) record.get(3));
stat.execute();
this.recordCount++;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
if (recordCount >= commitCharge) {
try {
connection.commit();
this.recordCount = 0;
} catch (SQLException e) {
e.printStackTrace();
}
}
}
try {
if (!connection.isClosed()) {
connection.commit();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
try {
if (!connection.isClosed()) {
connection.commit();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
SaxProcessor.java
/*
* 解析控制程序
*/
public class SaxProcessor extends ConnectionProvider {
/*工作队列,暂存解析的数据*/
private Vector workQueue = new Vector();
/*线程池,源作者用Hashtable实现的线程池,我觉得JDK1.5已经有现成的了,何必再造轮子呢*/
BlockingDeque<Runnable> queue = new LinkedBlockingDeque<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1,
TimeUnit.MINUTES, queue);
private int threadCount;
Thread saxReader;
private boolean parserActive = false;
private void setWriterCount( int count) {
this.threadCount = count;
}
private int getWriterCount() {
return this.threadCount;
}
protected synchronized void setParserActive() {
this.parserActive = true;
}
protected synchronized void setParsingComplete() {
this.parserActive = false;
notifyAll();
}
public synchronized boolean parsingComplete() {
return ! this.parserActive;
}
public synchronized boolean processingComplete() {
boolean result = (parsingComplete()) && ( this.workQueue.size() == 0);
return result;
}
private boolean listQueueFull() {
return ( this.workQueue.size() >= (2 * getWriterCount()));
}
/* 向工作队列添加一个任务,并通知所有受阻线程 */
protected synchronized void addToQueue(List data) throws SAXException {
if (listQueueFull()) {
try {
wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
this.workQueue.addElement(data);
notifyAll();
}
public synchronized List getNextData(String thread) {
List data = null;
while (!parsingComplete() && ( this.workQueue.size() == 0)) {
try {
wait();
} catch (InterruptedException ioe) {
}
}
if ( this.workQueue.size() > 0) {
data = (List) this.workQueue.remove(0);
notifyAll();
}
return data;
}
public SaxProcessor() throws SQLException, IOException, SAXException {
super();
}
public void doSomething(String[] args) {
try {
setWriterCount(Integer.parseInt(getSetting( "ThreadCount", "4")));
this.saxReader = createSourceProcessor();
this.setParserActive();
this.saxReader.start();
createDatabaseWriters();
waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
this.setParsingComplete();
}
}
private synchronized void waitForCompletion() {
while (!parsingComplete()) {
try {
wait();
} catch (InterruptedException ioe) {
}
}
this.executor.shutdown();
}
private void createDatabaseWriters() throws SQLException {
DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
df.applyPattern( "000000");
int commitCharge = Integer.parseInt(getSetting( "CommitCharge", "50"));
for ( int i = 0; i < getWriterCount(); i++) {
System.out.println(getWriterCount());
String threadName = "Writer_" + df.format(i + 1);
Connection conn = getNewConnection();
conn.setAutoCommit( false);
DatabaseWriter writer = new DatabaseWriter( this, threadName, conn);
writer.setParameters(commitCharge);
this.executor.execute(writer);
}
}
private Thread createSourceProcessor() throws SQLException {
String threadName = "SaxReader";
SourceProcessor saxReader = new SourceProcessor(threadName);
saxReader.setSaxProcessor( this);
saxReader.setTargetFilename(getSetting( "SourceXML", "DIR"));
return saxReader;
}
protected synchronized void printXML(Document xml, PrintWriter pw)
throws IOException {
((XMLDocument) xml).print(pw);
}
/*主函数*/
public static void main(String[] args) {
try {
SaxProcessor app = new SaxProcessor();
app.initializeConnection();
app.doSomething(args);
} catch (Exception e) {
e.printStackTrace();
}
}
public Vector getWorkQueue() {
return workQueue;
}
public void setWorkQueue(Vector workQueue) {
this.workQueue = workQueue;
}
}
* 解析控制程序
*/
public class SaxProcessor extends ConnectionProvider {
/*工作队列,暂存解析的数据*/
private Vector workQueue = new Vector();
/*线程池,源作者用Hashtable实现的线程池,我觉得JDK1.5已经有现成的了,何必再造轮子呢*/
BlockingDeque<Runnable> queue = new LinkedBlockingDeque<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1,
TimeUnit.MINUTES, queue);
private int threadCount;
Thread saxReader;
private boolean parserActive = false;
private void setWriterCount( int count) {
this.threadCount = count;
}
private int getWriterCount() {
return this.threadCount;
}
protected synchronized void setParserActive() {
this.parserActive = true;
}
protected synchronized void setParsingComplete() {
this.parserActive = false;
notifyAll();
}
public synchronized boolean parsingComplete() {
return ! this.parserActive;
}
public synchronized boolean processingComplete() {
boolean result = (parsingComplete()) && ( this.workQueue.size() == 0);
return result;
}
private boolean listQueueFull() {
return ( this.workQueue.size() >= (2 * getWriterCount()));
}
/* 向工作队列添加一个任务,并通知所有受阻线程 */
protected synchronized void addToQueue(List data) throws SAXException {
if (listQueueFull()) {
try {
wait();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
this.workQueue.addElement(data);
notifyAll();
}
public synchronized List getNextData(String thread) {
List data = null;
while (!parsingComplete() && ( this.workQueue.size() == 0)) {
try {
wait();
} catch (InterruptedException ioe) {
}
}
if ( this.workQueue.size() > 0) {
data = (List) this.workQueue.remove(0);
notifyAll();
}
return data;
}
public SaxProcessor() throws SQLException, IOException, SAXException {
super();
}
public void doSomething(String[] args) {
try {
setWriterCount(Integer.parseInt(getSetting( "ThreadCount", "4")));
this.saxReader = createSourceProcessor();
this.setParserActive();
this.saxReader.start();
createDatabaseWriters();
waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
this.setParsingComplete();
}
}
private synchronized void waitForCompletion() {
while (!parsingComplete()) {
try {
wait();
} catch (InterruptedException ioe) {
}
}
this.executor.shutdown();
}
private void createDatabaseWriters() throws SQLException {
DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
df.applyPattern( "000000");
int commitCharge = Integer.parseInt(getSetting( "CommitCharge", "50"));
for ( int i = 0; i < getWriterCount(); i++) {
System.out.println(getWriterCount());
String threadName = "Writer_" + df.format(i + 1);
Connection conn = getNewConnection();
conn.setAutoCommit( false);
DatabaseWriter writer = new DatabaseWriter( this, threadName, conn);
writer.setParameters(commitCharge);
this.executor.execute(writer);
}
}
private Thread createSourceProcessor() throws SQLException {
String threadName = "SaxReader";
SourceProcessor saxReader = new SourceProcessor(threadName);
saxReader.setSaxProcessor( this);
saxReader.setTargetFilename(getSetting( "SourceXML", "DIR"));
return saxReader;
}
protected synchronized void printXML(Document xml, PrintWriter pw)
throws IOException {
((XMLDocument) xml).print(pw);
}
/*主函数*/
public static void main(String[] args) {
try {
SaxProcessor app = new SaxProcessor();
app.initializeConnection();
app.doSomething(args);
} catch (Exception e) {
e.printStackTrace();
}
}
public Vector getWorkQueue() {
return workQueue;
}
public void setWorkQueue(Vector workQueue) {
this.workQueue = workQueue;
}
}
/*配置文件解析类,很简单,不说了*/
public class ConnectionProvider extends Object {
public static final boolean DEBUG = true;
protected OracleConnection connection;
protected XMLDocument connectionDefinition;
public static final String CONNECTION = "Connection";
public static final String DRIVER = "Driver";
public static final String HOSTNAME = "Hostname";
public static final String PORT = "Port";
public static final String SID = "SID";
public static final String SERVICENAME = "ServiceName";
public static final String SERVERMODE = "Server";
public static final String SCHEMA = "Schema";
public static final String PASSWORD = "Password";
public static final String POOL = "Pool";
public static final String THIN_DRIVER = "thin";
// public static final String OCI_DRIVER = "oci8";
public static final String DEFAULT_CONNECTION_DEFINITION = "c:\\temp\\connection.xml";
public static final String DEFAULT_DRIVER = THIN_DRIVER;
public static final String DEFAULT_HOSTNAME = "localhost";
public static final String DEFAULT_PORT = "1521";
public static final String DEFAULT_SERVERMODE = "DEDICATED";
public static final String TARGET_DIRECTORY = "targetDirectory";
protected PrintStream log;
public ConnectionProvider() {
}
public void initializeConnection() throws SAXException, IOException,
SQLException {
this.initializeConnection(System.out);
}
public void initializeConnection(PrintStream log) throws SAXException,
IOException, SQLException {
DriverManager.registerDriver( new oracle.jdbc.driver.OracleDriver());
this.log = log;
loadConnectionSettings();
this.connection = openConnection();
}
public ConnectionProvider getConnectionProvider() {
return this;
}
public void initalizeConnection(String connectionLocation, PrintStream log)
throws SAXException, IOException, SQLException {
DriverManager.registerDriver( new oracle.jdbc.driver.OracleDriver());
this.log = log;
loadConnectionSettings(connectionLocation);
this.connection = openConnection();
}
public void setLogger(PrintStream log) {
this.log = log;
}
private void setConnectionSettings(XMLDocument doc) {
this.connectionDefinition = doc;
}
private void dumpConnectionSettings() throws IOException {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
this.connectionDefinition.print(pw);
pw.close();
sw.close();
}
public OracleConnection getConnection() throws SQLException {
return this.connection;
}
public void closeConnection(Connection conn) throws Exception {
if (isPooled()) {
conn.close();
}
}
public Connection getConnection(String schema, String passwd)
throws Exception {
if (isPooled()) {
return (OracleOCIConnection) this.getConnection(schema, passwd);
} else {
return this.connection;
}
}
public String getSetting(String nodeName) {
return getSetting(nodeName, null);
}
public String getSetting(String nodeName, String defaultValue) {
XMLElement root = (XMLElement) this.connectionDefinition
.getDocumentElement();
NodeList children = root.getChildrenByTagName(nodeName);
if (children.getLength() != 0) {
Element element = (Element) children.item(0);
Text text = (Text) element.getFirstChild();
if (text != null) {
return text.getData();
}
}
return defaultValue;
}
protected String getDriver() {
return getSetting(DRIVER, DEFAULT_DRIVER);
}
protected String getHostname() {
return getSetting(HOSTNAME, DEFAULT_HOSTNAME);
}
protected String getPort() {
return getSetting(PORT, DEFAULT_PORT);
}
protected String getServerMode() {
return getSetting(SERVERMODE, DEFAULT_SERVERMODE);
}
protected String getServiceName() {
return getSetting(SERVICENAME);
}
protected String getSID() {
return getSetting(SID);
}
protected boolean isPooled() {
String usePool = getSetting(POOL, Boolean.FALSE.toString());
return !usePool.equalsIgnoreCase(Boolean.FALSE.toString());
}
protected String getSchema() {
return getSetting(SCHEMA);
}
protected String getPassword() {
return getSetting(PASSWORD);
}
public void loadConnectionSettings() throws IOException, SAXException {
String filename = System.getProperty(
"com.oracle.st.xmldb.pm.ConnectionParameters",
this.DEFAULT_CONNECTION_DEFINITION);
loadConnectionSettings(filename);
}
public void loadConnectionSettings(String filename) throws IOException,
SAXException {
if (DEBUG) {
System.out
.println( "Using connection Parameters from : " + filename);
}
Reader reader = new FileReader( new File(filename));
DOMParser parser = new DOMParser();
parser.parse(reader);
XMLDocument doc = parser.getDocument();
setConnectionSettings(doc);
if (DEBUG) {
dumpConnectionSettings();
}
}
protected String getDatabaseURL() {
if (getDriver() != null) {
if (getDriver().equalsIgnoreCase(THIN_DRIVER)) {
return "jdbc:oracle:thin:@" + getHostname() + ":" + getPort()
+ ":" + getSID();
} else {
return "jdbc:oracle:oci8:@(description=(address=(host="
+ getHostname() + ")(protocol=tcp)(port=" + getPort()
+ "))(connect_data=(service_name=" + getServiceName()
+ ")(server=" + getServerMode() + ")))";
}
} else {
return null;
}
}
private OracleConnection openConnection() throws SQLException {
String user = getSchema();
String password = getPassword();
String connectionString = user + "/" + password + "@"
+ getDatabaseURL();
OracleConnection conn = null;
if (DEBUG) {
this.log
.println( "ConnectionProvider.establishConnection(): Connecting as "
+ connectionString);
}
try {
conn = (OracleConnection) DriverManager.getConnection(
getDatabaseURL(), user, password);
if (DEBUG) {
this.log
.println( "ConnectionProvider.establishConnection(): Database Connection Established");
}
} catch (SQLException sqle) {
int err = sqle.getErrorCode();
this.log
.println( "ConnectionProvider.establishConnection(): Failed to connect using "
+ connectionString);
sqle.printStackTrace( this.log);
throw sqle;
}
return conn;
}
public OracleConnection getNewConnection() throws SQLException {
return openConnection();
}
public XMLDocument getConnectionSettings() {
return this.connectionDefinition;
}
}
public class ConnectionProvider extends Object {
public static final boolean DEBUG = true;
protected OracleConnection connection;
protected XMLDocument connectionDefinition;
public static final String CONNECTION = "Connection";
public static final String DRIVER = "Driver";
public static final String HOSTNAME = "Hostname";
public static final String PORT = "Port";
public static final String SID = "SID";
public static final String SERVICENAME = "ServiceName";
public static final String SERVERMODE = "Server";
public static final String SCHEMA = "Schema";
public static final String PASSWORD = "Password";
public static final String POOL = "Pool";
public static final String THIN_DRIVER = "thin";
// public static final String OCI_DRIVER = "oci8";
public static final String DEFAULT_CONNECTION_DEFINITION = "c:\\temp\\connection.xml";
public static final String DEFAULT_DRIVER = THIN_DRIVER;
public static final String DEFAULT_HOSTNAME = "localhost";
public static final String DEFAULT_PORT = "1521";
public static final String DEFAULT_SERVERMODE = "DEDICATED";
public static final String TARGET_DIRECTORY = "targetDirectory";
protected PrintStream log;
public ConnectionProvider() {
}
public void initializeConnection() throws SAXException, IOException,
SQLException {
this.initializeConnection(System.out);
}
public void initializeConnection(PrintStream log) throws SAXException,
IOException, SQLException {
DriverManager.registerDriver( new oracle.jdbc.driver.OracleDriver());
this.log = log;
loadConnectionSettings();
this.connection = openConnection();
}
public ConnectionProvider getConnectionProvider() {
return this;
}
public void initalizeConnection(String connectionLocation, PrintStream log)
throws SAXException, IOException, SQLException {
DriverManager.registerDriver( new oracle.jdbc.driver.OracleDriver());
this.log = log;
loadConnectionSettings(connectionLocation);
this.connection = openConnection();
}
public void setLogger(PrintStream log) {
this.log = log;
}
private void setConnectionSettings(XMLDocument doc) {
this.connectionDefinition = doc;
}
private void dumpConnectionSettings() throws IOException {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
this.connectionDefinition.print(pw);
pw.close();
sw.close();
}
public OracleConnection getConnection() throws SQLException {
return this.connection;
}
public void closeConnection(Connection conn) throws Exception {
if (isPooled()) {
conn.close();
}
}
public Connection getConnection(String schema, String passwd)
throws Exception {
if (isPooled()) {
return (OracleOCIConnection) this.getConnection(schema, passwd);
} else {
return this.connection;
}
}
public String getSetting(String nodeName) {
return getSetting(nodeName, null);
}
public String getSetting(String nodeName, String defaultValue) {
XMLElement root = (XMLElement) this.connectionDefinition
.getDocumentElement();
NodeList children = root.getChildrenByTagName(nodeName);
if (children.getLength() != 0) {
Element element = (Element) children.item(0);
Text text = (Text) element.getFirstChild();
if (text != null) {
return text.getData();
}
}
return defaultValue;
}
protected String getDriver() {
return getSetting(DRIVER, DEFAULT_DRIVER);
}
protected String getHostname() {
return getSetting(HOSTNAME, DEFAULT_HOSTNAME);
}
protected String getPort() {
return getSetting(PORT, DEFAULT_PORT);
}
protected String getServerMode() {
return getSetting(SERVERMODE, DEFAULT_SERVERMODE);
}
protected String getServiceName() {
return getSetting(SERVICENAME);
}
protected String getSID() {
return getSetting(SID);
}
protected boolean isPooled() {
String usePool = getSetting(POOL, Boolean.FALSE.toString());
return !usePool.equalsIgnoreCase(Boolean.FALSE.toString());
}
protected String getSchema() {
return getSetting(SCHEMA);
}
protected String getPassword() {
return getSetting(PASSWORD);
}
public void loadConnectionSettings() throws IOException, SAXException {
String filename = System.getProperty(
"com.oracle.st.xmldb.pm.ConnectionParameters",
this.DEFAULT_CONNECTION_DEFINITION);
loadConnectionSettings(filename);
}
public void loadConnectionSettings(String filename) throws IOException,
SAXException {
if (DEBUG) {
System.out
.println( "Using connection Parameters from : " + filename);
}
Reader reader = new FileReader( new File(filename));
DOMParser parser = new DOMParser();
parser.parse(reader);
XMLDocument doc = parser.getDocument();
setConnectionSettings(doc);
if (DEBUG) {
dumpConnectionSettings();
}
}
protected String getDatabaseURL() {
if (getDriver() != null) {
if (getDriver().equalsIgnoreCase(THIN_DRIVER)) {
return "jdbc:oracle:thin:@" + getHostname() + ":" + getPort()
+ ":" + getSID();
} else {
return "jdbc:oracle:oci8:@(description=(address=(host="
+ getHostname() + ")(protocol=tcp)(port=" + getPort()
+ "))(connect_data=(service_name=" + getServiceName()
+ ")(server=" + getServerMode() + ")))";
}
} else {
return null;
}
}
private OracleConnection openConnection() throws SQLException {
String user = getSchema();
String password = getPassword();
String connectionString = user + "/" + password + "@"
+ getDatabaseURL();
OracleConnection conn = null;
if (DEBUG) {
this.log
.println( "ConnectionProvider.establishConnection(): Connecting as "
+ connectionString);
}
try {
conn = (OracleConnection) DriverManager.getConnection(
getDatabaseURL(), user, password);
if (DEBUG) {
this.log
.println( "ConnectionProvider.establishConnection(): Database Connection Established");
}
} catch (SQLException sqle) {
int err = sqle.getErrorCode();
this.log
.println( "ConnectionProvider.establishConnection(): Failed to connect using "
+ connectionString);
sqle.printStackTrace( this.log);
throw sqle;
}
return conn;
}
public OracleConnection getNewConnection() throws SQLException {
return openConnection();
}
public XMLDocument getConnectionSettings() {
return this.connectionDefinition;
}
}
ProcessingCompleteException.java
public
class ProcessingCompleteException
extends SAXException {
public ProcessingCompleteException() {
super( "Processing Complete");
}
}
public ProcessingCompleteException() {
super( "Processing Complete");
}
}
配置文件例子(connection.xml):
<?
xml
version
="1.0"
encoding
="UTF-8"
?>
< Connection >
< Driver >Thin </ Driver >
< Hostname >10.1.199.250 </ Hostname >
< Port >1521 </ Port >
< ServiceName >orcl.xp.mark.drake.oracle.com </ ServiceName >
< SID >idm </ SID >
< ServerMode >DEDICATED </ ServerMode >
< Schema >dcaudit </ Schema >
< Password >dcaudit </ Password >
< SourceXML >C:\temp\AuditDemo2.xml </ SourceXML >
< Element >Record </ Element >
< Table >RDF_DOCUMENT_TABLE </ Table >
< ErrorTable >RDF_ERROR_TABLE </ ErrorTable >
< schemaInstancePrefix >xsi </ schemaInstancePrefix >
< schemaLocation />
< noNamespaceSchemaLocation />
< CommitCharge >10 </ CommitCharge >
< ThreadCount >2 </ ThreadCount >
</ Connection >
< Connection >
< Driver >Thin </ Driver >
< Hostname >10.1.199.250 </ Hostname >
< Port >1521 </ Port >
< ServiceName >orcl.xp.mark.drake.oracle.com </ ServiceName >
< SID >idm </ SID >
< ServerMode >DEDICATED </ ServerMode >
< Schema >dcaudit </ Schema >
< Password >dcaudit </ Password >
< SourceXML >C:\temp\AuditDemo2.xml </ SourceXML >
< Element >Record </ Element >
< Table >RDF_DOCUMENT_TABLE </ Table >
< ErrorTable >RDF_ERROR_TABLE </ ErrorTable >
< schemaInstancePrefix >xsi </ schemaInstancePrefix >
< schemaLocation />
< noNamespaceSchemaLocation />
< CommitCharge >10 </ CommitCharge >
< ThreadCount >2 </ ThreadCount >
</ Connection >
下班了,不多说了,88 。
本文转自 anranran 51CTO博客,原文链接:http://blog.51cto.com/guojuanjun/322318