搞了一段时间,hive2solr的job终于可以稳定的跑了,实现使用hive向solr插数据,主要是实现RecordWriter接口,重写write方法和close方法。下面对遇到的问题一一列出:
1.数据覆盖问题,使用原子更新
参考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重复构建solrserver和solrtable对象问题,使用static在初始化的时候构建,后面直接调用
构建:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public
static
Map<Integer,SolrServer> solrServers =
new
HashMap<Integer,SolrServer>();
public
static
Map<Integer,SolrTable> solrTables =
new
HashMap<Integer,SolrTable>();
public
static
String[] iparray;
public
static
String ipstring;
public
static
String collec;
static
{
LOG .warn(
"in SolrServerCustom start initialize ip maps"
);
ipstring =
"xxxx,xxxxxx"
;
collec =
"userinfo"
;
LOG .warn(
"in SolrServerCustom ipstring and collec: "
+ ipstring +
","
+ collec );
iparray = ipstring .split(
","
);
Arrays. sort( iparray);
for
(
int
i=
0
;i< iparray. length;i++){
String urlx =
"http://"
+iparray [i]+
"/solr/"
+ collec;
solrServers.put(i,
new
HttpSolrServer(urlx));
solrTables.put(i,
new
SolrTable(String.valueOf(i)));
}
LOG .warn(
"in SolrServerCustom end initialize ip maps,maps size "
+ solrServers .size());
LOG .warn(
"in SolrServerCustom end initialize ip mapsx,mapsx size "
+ solrTables .size());
}
|
引用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public
void
write(Writable w)
throws
IOException {
MapWritable map = (MapWritable) w;
SolrInputDocument doc =
new
SolrInputDocument();
String key;
String value;
String newkey;
int
idx;
for
(
final
Map.Entry<Writable, Writable> entry : map.entrySet()) {
key = entry.getKey().toString();
newkey =
this
.tableName +
"."
+ entry.getKey().toString();
value = entry.getValue().toString();
if
(key.equals(
"id"
)){
idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers);
//引用静态属性SolrServerCustom.solrServers
table = SolrServerCustom.solrTables.get(idx);
//引用静态属性SolrServerCustom.solrTables
table.setNumInputBufferRows(
this
.numInputBufferRows);
}
if
(key.equals(
"id"
)){
doc.addField(
"id"
,Integer.valueOf(value));
}
else
{
if
(value.equals(
"(null)"
)){
value =
""
;
}
setOper =
new
LinkedHashMap<String,Object>();
setOper.put(
"set"
,value);
if
(!doc.keySet().contains(newkey)){
doc.addField(newkey, setOper);
}
}
}
table.save(doc);
}
|
3.代码存在内存泄露问题
1)对象的声明,放在循环外,并调整outbuffer的大小
现象:yarn map/reduce java heap满导致job hang
错误日志:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)
at java.lang.StringBuilder.<init>(StringBuilder.java:68)
at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71)
at org.apache.hadoop.hive.ql.
exec
.FileSinkOperator.processOp(FileSinkOperator.java:621)
at org.apache.hadoop.hive.ql.
exec
.Operator.forward(Operator.java:793)
at org.apache.hadoop.hive.ql.
exec
.SelectOperator.processOp(SelectOperator.java:87)
at org.apache.hadoop.hive.ql.
exec
.Operator.forward(Operator.java:793)
at org.apache.hadoop.hive.ql.
exec
.TableScanOperator.processOp(TableScanOperator.java:92)
at org.apache.hadoop.hive.ql.
exec
.Operator.forward(Operator.java:793)
at org.apache.hadoop.hive.ql.
exec
.MapOperator.process(MapOperator.java:540)
at org.apache.hadoop.hive.ql.
exec
.mr.ExecMapper.map(ExecMapper.java:177)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)
|
2)try...catch....finally的使用(在finally中 clear buffer)
一开始没有增加finally,导致在异常发生时buffer会大于设置,最终导致job内存用满,hang住。
4.异常的处理
要求一个solrserver出错,或者solr暂时不响应时程序不能退出,默认情况下异常向上抛出,最终导致job失败
比如:
1
2
3
4
5
6
7
|
Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content
type
application
/octet-stream
but got text
/html
. <html>
<
head
><title>504 Gateway Time-out<
/title
><
/head
>
<body bgcolor=
"white"
>
<center><h1>504 Gateway Time-out<
/h1
><
/center
>
<hr><center>nginx
/1
.6.2<
/center
>
<
/body
>
<
/html
>
|
防止异常的抛出会造成runtime error导致job失败,catch异常后不做处理
1
2
3
4
5
6
7
8
9
10
11
|
public void flush(){
try {
if
(!outputBuffer.isEmpty()) {
server.add(outputBuffer);
}
} catch(Exception e){
LOG.warn(
"solrtable add error,Exception log is "
+ e);
}finally{
outputBuffer.
clear
();
//
在finally中清除buffer,否则会导致buffer在异常抛出时一直递增导致jvm oom的问题
}
}
|
5.commit问题,调用close方法时,只有最后一个solrtable会close,开始时使用每插入一行就commit的方式,但是这种性能很差(大约50%的降低),后来在solrserver端控制commit
solrconfig.xml:
1
2
3
4
5
6
7
8
9
10
|
<autoCommit>
<!--<maxTime>${solr.autoCommit.maxTime:15000}<
/maxTime
>-->
<maxDocs>15000<
/maxDocs
>
//
当内存索引数量达到指定值的时候,将内存的索引DUMP到硬盘中,并通知searcher类加载新的索引
<maxTime>1000<
/maxTime
>
//
每隔指定的时间段,自动的COMMIT内存中的索引数据,并通知Searcher类加载新的索引,以最先达到条件执行为准
<openSearcher>
true
<
/openSearcher
>
//
设置为
false
时,虽然commit会导致index的变更flush到磁盘上,但是客户端不会看到更新
<
/autoCommit
>
<autoSoftCommit>
<maxTime>${solr.autoSoftCommit.maxTime:10000}<
/maxTime
>
<
/autoSoftCommit
>
|
这里autoCommit是指hard commit,如果不使用autoCommit也可以在add document时带上commitWithin的参数autoSoftCommit和autoCommit类似,但是它是一个solf类型的commit,可以确保数据可见但是没有把数据flush到磁盘,机器crash会导致数据丢失。
save也导致性能损耗,save会消耗6ms左右的时间,需要放到一个list中进行save操作(batch操作)
6.outbuffer的问题
初始的代码,因为对用solrtable来说只有一个入口(solrcloud时也一样),这样solrtable只有一个实例,这里用到了静态变量,每个solrtable不能按自己的buffer进行操作
改成非静态变量,并且使用静态代码块初始化table和server,放到一个hashmap中,用的时候去取,保证只有几个的实例。否则如果在使用时进行实例化,每次的对象都不同,导致buffer一直为1。
7.close的问题
如果设置了buffer,可能会导致不能flush
1
2
3
4
5
6
|
public
void
save(SolrInputDocument doc) {
outputBuffer.add(doc);
//使用save放到buffer list中
if
(outputBuffer.size() >= numOutputBufferRows) {
//只有list的大小>=设置的buffer大小时才会触发flush的操作
flush();
}
}
|
而flush中会调用server.add(outputBuffer)操作。filesink关闭时调用SolrWriter.close
调用SolrTable的commit(commit中调用flush和server.commit),发现只有最后一个table实例会调用commit.
解决方法,在SolrWriter.close中循环调用SolrTable.commit方法:
1
2
3
4
5
6
7
8
9
10
|
public
void
close(
boolean
abort)
throws
IOException {
if
(!abort) {
Map<Integer,SolrTable> maps =
new
SolrServerCustom().solrTable;
for
(Map.Entry<Integer, SolrTable> entry:maps.entrySet()){
entry.getValue().commit();
}
}
else
{
table.rollback();
}
}
|
8.锁的问题,从nginx端看到大量的302 ,solr日志看到有锁的问题,调整参数,在solr启动时释放锁
solr端日志:
1
|
userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked
for
write
for
core userinfo
|
解决:solrconfig.xml中设置
1
|
<unlockOnStartup>
true
<
/unlockOnStartup
>
|
原因:
org.apache.solr.core.SolrCore初始化时使用IndexWriter.isLocked(dir)判断是否加锁,如果已经加了锁,则分为两种情况,一种是在solrconfig.xml中配置了unlockOnStartup,会尝试unlock,如果没有配置unlockStartup,则会抛出Index locked for write for core异常
根据堆栈可以看对应代码:
org.apache.solr.core.SolrCore 构造函数中会调用initIndex方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
void
initIndex(
boolean
reload)
throws
IOException {
String indexDir = getNewIndexDir();
boolean
indexExists = getDirectoryFactory().exists(indexDir);
boolean
firstTime;
synchronized
(SolrCore.
class
) {
firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));
}
boolean
removeLocks = solrConfig.unlockOnStartup;
// unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 默认为false
initIndexReaderFactory();
if
(indexExists && firstTime && !reload) {
Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,
getSolrConfig().indexConfig.lockType);
try
{
if
(IndexWriter.isLocked(dir)) {
if
(removeLocks) {
log.warn(
logid
+
"WARNING: Solr index directory '{}' is locked. Unlocking..."
,
indexDir);
IndexWriter.unlock(dir);
//解锁
}
else
{
log.error(logid
+
"Solr index directory '{}' is locked. Throwing exception"
,
indexDir);
throw
new
LockObtainFailedException(
"Index locked for write for core "
+ name);
}
}
}
finally
{
directoryFactory.release(dir);
}
}
// Create the index if it doesn't exist.
if
(!indexExists) {
log.warn(logid+
"Solr index directory '"
+
new
File(indexDir) +
"' doesn't exist."
+
" Creating new index..."
);
SolrIndexWriter writer = SolrIndexWriter.create(
"SolrCore.initIndex"
, indexDir, getDirectoryFactory(),
true
,
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
writer.close();
}
}
|
9.tomcat的配置导致的问题,每台机器两个solr实例,其中一个一直不能启动(在实例化core时会尝试获取锁,这里获取锁失败,可以手动删除write.lock)
最终发现是两个tomcat写到了一个solr目录里面
错误日志:
1
2
3
4
5
6
7
8
9
|
Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@
/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write
.lock
at org.apache.lucene.store.Lock.obtain(Lock.java:89)
at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710)
at org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77)
at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64)
at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267)
at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110)
at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513)
... 12
more
|
10.部分job运行缓慢,其中一个job运行了11个小时。。
原因:
数据写入时发生在mapoperator或者reduceoperator中,多少个map或者reduce就是多少个并发线程写入。job只有一个reduce,导致写入缓慢,调整reduce的数量到100(set mapreduce.job.reduces=100)后,性能大幅度提升,3kw数据导入时间由40916s下降到993s。
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1612601,如需转载请自行联系原作者