处理搜狗实验室元数据.xml----->txt
package cn.yusys.hotnews.datasource;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
/**
* 解析搜狗实验室新闻xml文件为txt文件---->项目数据源
* Created on 2018年11月12日
*/
public class MyXMLReader2JDOM {
public static void main(String[] args) {
// 获取xml文件读取流
SAXReader reader = new SAXReader();
// 设置字符集编码方式
reader.setEncoding("utf-8");
Document document;
Element rootElement;
List<Element> docList;
Iterator<Element> iterator;
// 用于存放节点数据以便后面的写入之news.log
ArrayList<News> list = new ArrayList<News>();
// 开始进行读取
try {
document = reader.read(new File("D:\\Downloads\\大数据数据源\\news_tensite_xml.smarty.dat"));
// 得到根节点元素 <docs>...</docs>
rootElement = document.getRootElement();
//<doc>...<doc>
docList = rootElement.elements("doc");
/*
* 得到xml具体配置文件信息
*/
iterator = docList.iterator();
for (Element e : docList) {
News news = new News();
/**
* 遍历子节点将具体新闻信息写入txt文件
*/
if (e.element("url") != null && !" ".equals(e.element("url"))) {
news.setUrl(e.element("url").getStringValue().trim());
}
if (e.element("docno") != null && !" ".equals(e.element("docno"))) {
news.setDocno(e.element("docno").getStringValue().trim());
}
if (e.element("contenttitle") != null && !" ".equals(e.element("contenttitle"))) {
news.setContenttitle(e.element("contenttitle").getStringValue().trim());
}
if (e.element("content") != null && !" ".equals(e.element("content"))) {
news.setContent(e.element("content").getStringValue().trim());
}
list.add(news);
}
/**
* 进行写入txt文件
*/
writwToFile(list);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 写入txt文件(后期当源数据文件过大时进行分片处理)
* @throws IOException
*/
public static void writwToFile(List<News> list) throws IOException {
File file = new File("D:\\Downloads\\大数据数据源\\news2.log");
BufferedWriter bw = new BufferedWriter(new FileWriter(file));
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
} else {
for (News news : list) {
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
bw.write("datetime"+"="+dateStr+"|");
bw.write("url"+"="+news.getUrl()+"|");
bw.write("docno"+"="+news.getDocno()+"|");
bw.write("contenttitle"+"="+news.getContenttitle()+"|");
bw.write("content"+"="+news.getContent());
bw.write("\n");
bw.flush();
}
}
}
}
----------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------
package cn.yusys.hotnews.datasource;
/**
*xml解析时新闻实体类
*/
public class News implements Serializable{
// 实现序列化接口以便多台机器同时解析
public News () {
}
public News(String url, String docno, String contenttitle, String content) {
super();
this.url = url;
this.docno = docno;
this.contenttitle = contenttitle;
this.content = content;
}
String url;
String docno;
String contenttitle;
String content;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getDocno() {
return docno;
}
public void setDocno(String docno) {
this.docno = docno;
}
public String getContenttitle() {
return contenttitle;
}
public void setContenttitle(String contenttitle) {
this.contenttitle = contenttitle;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
-----------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------
运行在Liunx上模拟日志产生并通过flume采集
package cn.yusys.hotnews.datasource;
/**
* 模拟日志服务器产生日(从news.log/news1.log中随机切换文件读取数据然后写入日志文件-----》然后使用进行flume采集)
* @date 2018年11月12日
*/
public class ReadWebLog {
public static String readFileName;
public static String writeFileName;
public static void main (String[] args) {
readFileName = args[0];
writeFileName = args[1];
readFile(readFileName);
}
/**
* 从new.log/news1.log中随机读取日志信息
*/
public static void readFile(String fileName){
try {
FileInputStream fs = new FileInputStream(fileName);
// 转换流
InputStreamReader isr = new InputStreamReader(fs,"utf-8");
BufferedReader br = new BufferedReader(isr);
int count = 0;
while (br.readLine() != null){
String line = br.readLine();
count ++;
// 自定义读取间隔毫秒
Thread.sleep(1000);
System.out.println("row:" + count + ">>>>>>>>" + line);
/**
* 写入到指定文件中(与flume配置文件对应)
*/
writeFile(writeFileName,line);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 文件内容的写入
*/
public static void writeFile (String fileName,String line) {
try {
FileOutputStream fs = new FileOutputStream(fileName, true);
OutputStreamWriter osw = new OutputStreamWriter(fs);
BufferedWriter bw = new BufferedWriter(osw);
// 执行文件内容的写入
bw.write(line);
bw.write("\n");
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}