hive2solr问题小结

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

  搞了一段时间,hive2solr的job终于可以稳定的跑了,实现使用hive向solr插数据,主要是实现RecordWriter接口,重写write方法和close方法。下面对遇到的问题一一列出:

1.数据覆盖问题,使用原子更新
参考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重复构建solrserver和solrtable对象问题,使用static在初始化的时候构建,后面直接调用
构建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
         public  static  Map<Integer,SolrServer> solrServers =  new  HashMap<Integer,SolrServer>();
         public  static  Map<Integer,SolrTable> solrTables =  new  HashMap<Integer,SolrTable>();
         public  static  String[] iparray;
         public  static  String ipstring;
         public  static  String collec;
         static  {
                LOG .warn( "in SolrServerCustom start initialize ip maps"  );
                ipstring =  "xxxx,xxxxxx" ;
                collec =  "userinfo"  ;
                LOG .warn( "in SolrServerCustom  ipstring and collec: "  + ipstring +  ","  + collec );
                iparray = ipstring .split( ","  );
               Arrays. sort( iparray);
                for  ( int  i= 0 ;i< iparray. length;i++){
                      String urlx =  "http://"  +iparray [i]+ "/solr/"  + collec;
                       solrServers.put(i,  new  HttpSolrServer(urlx));
                       solrTables.put(i,  new  SolrTable(String.valueOf(i)));
               }
                LOG .warn( "in SolrServerCustom end initialize ip maps,maps size "  + solrServers .size());
                LOG .warn( "in SolrServerCustom end initialize ip mapsx,mapsx size "  + solrTables .size()); 
        }

引用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  public  void  write(Writable w)  throws  IOException {
           MapWritable map = (MapWritable) w;
           SolrInputDocument doc =  new  SolrInputDocument();
           String key;
           String value;
           String newkey;
           int  idx;
           for  ( final  Map.Entry<Writable, Writable> entry : map.entrySet()) {
                key = entry.getKey().toString();
                newkey =  this .tableName +  "."  + entry.getKey().toString();
                value = entry.getValue().toString();
                if (key.equals( "id" )){
                     idx = SolrUtil.getIntServer(value,SolrServerCustom.solrServers);  //引用静态属性SolrServerCustom.solrServers
                     table = SolrServerCustom.solrTables.get(idx);  //引用静态属性SolrServerCustom.solrTables
                     table.setNumInputBufferRows( this .numInputBufferRows);
                }
                if (key.equals( "id" )){
                     doc.addField( "id" ,Integer.valueOf(value));
                } else {
                     if  (value.equals( "(null)" )){
                          value =  "" ;
                     }
                     setOper =  new  LinkedHashMap<String,Object>();
                     setOper.put( "set" ,value);
                     if (!doc.keySet().contains(newkey)){
                          doc.addField(newkey, setOper);
                     }    
                }
           }
           table.save(doc);
      }

3.代码存在内存泄露问题
1)对象的声明,放在循环外,并调整outbuffer的大小
现象:yarn map/reduce  java heap满导致job hang

错误日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2015-01-26 14:01:10,000 FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.OutOfMemoryError: GC overhead limit exceeded
         at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45)
         at java.lang.StringBuilder.<init>(StringBuilder.java:68)
         at com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71)
         at org.apache.hadoop.hive.ql. exec .FileSinkOperator.processOp(FileSinkOperator.java:621)
         at org.apache.hadoop.hive.ql. exec .Operator.forward(Operator.java:793)
         at org.apache.hadoop.hive.ql. exec .SelectOperator.processOp(SelectOperator.java:87)
         at org.apache.hadoop.hive.ql. exec .Operator.forward(Operator.java:793)
         at org.apache.hadoop.hive.ql. exec .TableScanOperator.processOp(TableScanOperator.java:92)
         at org.apache.hadoop.hive.ql. exec .Operator.forward(Operator.java:793)
         at org.apache.hadoop.hive.ql. exec .MapOperator.process(MapOperator.java:540)
         at org.apache.hadoop.hive.ql. exec .mr.ExecMapper.map(ExecMapper.java:177)
         at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
         at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428)
         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:396)
         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
               at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155)

2)try...catch....finally的使用(在finally中 clear buffer)
一开始没有增加finally,导致在异常发生时buffer会大于设置,最终导致job内存用满,hang住。

4.异常的处理
要求一个solrserver出错,或者solr暂时不响应时程序不能退出,默认情况下异常向上抛出,最终导致job失败
比如:

1
2
3
4
5
6
7
Caused by: org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException: Expected content  type  application /octet-stream  but got text /html . <html>
< head ><title>504 Gateway Time-out< /title >< /head >
<body bgcolor= "white" >
<center><h1>504 Gateway Time-out< /h1 >< /center >
<hr><center>nginx /1 .6.2< /center >
< /body >
< /html >

 防止异常的抛出会造成runtime error导致job失败,catch异常后不做处理

1
2
3
4
5
6
7
8
9
10
11
      public void flush(){
           try {
                if  (!outputBuffer.isEmpty()) {
                     server.add(outputBuffer);
                }
           } catch(Exception e){
                LOG.warn( "solrtable add error,Exception log is "  + e);
           }finally{
                outputBuffer. clear ();  // 在finally中清除buffer,否则会导致buffer在异常抛出时一直递增导致jvm oom的问题
           }
      }

5.commit问题,调用close方法时,只有最后一个solrtable会close,开始时使用每插入一行就commit的方式,但是这种性能很差(大约50%的降低),后来在solrserver端控制commit
solrconfig.xml:

1
2
3
4
5
6
7
8
9
10
      <autoCommit>
        <!--<maxTime>${solr.autoCommit.maxTime:15000}< /maxTime >-->
          <maxDocs>15000< /maxDocs // 当内存索引数量达到指定值的时候,将内存的索引DUMP到硬盘中,并通知searcher类加载新的索引
         <maxTime>1000< /maxTime // 每隔指定的时间段,自动的COMMIT内存中的索引数据,并通知Searcher类加载新的索引,以最先达到条件执行为准
        <openSearcher> true < /openSearcher >   // 设置为 false 时,虽然commit会导致index的变更flush到磁盘上,但是客户端不会看到更新
      < /autoCommit >
    
      <autoSoftCommit>
        <maxTime>${solr.autoSoftCommit.maxTime:10000}< /maxTime >
      < /autoSoftCommit >

这里autoCommit是指hard commit,如果不使用autoCommit也可以在add document时带上commitWithin的参数autoSoftCommit和autoCommit类似,但是它是一个solf类型的commit,可以确保数据可见但是没有把数据flush到磁盘,机器crash会导致数据丢失。
save也导致性能损耗,save会消耗6ms左右的时间,需要放到一个list中进行save操作(batch操作)


6.outbuffer的问题
初始的代码,因为对用solrtable来说只有一个入口(solrcloud时也一样),这样solrtable只有一个实例,这里用到了静态变量,每个solrtable不能按自己的buffer进行操作
改成非静态变量,并且使用静态代码块初始化table和server,放到一个hashmap中,用的时候去取,保证只有几个的实例。否则如果在使用时进行实例化,每次的对象都不同,导致buffer一直为1。

7.close的问题
如果设置了buffer,可能会导致不能flush

1
2
3
4
5
6
public  void  save(SolrInputDocument doc) {
      outputBuffer.add(doc);  //使用save放到buffer list中
      if  (outputBuffer.size() >= numOutputBufferRows) {  //只有list的大小>=设置的buffer大小时才会触发flush的操作
          flush();
      }
}

而flush中会调用server.add(outputBuffer)操作。filesink关闭时调用SolrWriter.close
调用SolrTable的commit(commit中调用flush和server.commit),发现只有最后一个table实例会调用commit.
解决方法,在SolrWriter.close中循环调用SolrTable.commit方法:

1
2
3
4
5
6
7
8
9
10
public  void  close( boolean  abort)  throws  IOException {
      if  (!abort) {
          Map<Integer,SolrTable> maps =  new  SolrServerCustom().solrTable;
          for (Map.Entry<Integer, SolrTable> entry:maps.entrySet()){
              entry.getValue().commit();
          }
      else  {
          table.rollback();
      }
}

8.锁的问题,从nginx端看到大量的302 ,solr日志看到有锁的问题,调整参数,在solr启动时释放锁
solr端日志:

1
userinfo: org.apache.solr.common.SolrException:org.apache.solr.common.SolrException: Index locked  for  write  for  core userinfo

解决:solrconfig.xml中设置

1
<unlockOnStartup> true < /unlockOnStartup >

原因:

org.apache.solr.core.SolrCore初始化时使用IndexWriter.isLocked(dir)判断是否加锁,如果已经加了锁,则分为两种情况,一种是在solrconfig.xml中配置了unlockOnStartup,会尝试unlock,如果没有配置unlockStartup,则会抛出Index locked for write for core异常

根据堆栈可以看对应代码:
org.apache.solr.core.SolrCore 构造函数中会调用initIndex方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
   void  initIndex( boolean  reload)  throws  IOException {
       String indexDir = getNewIndexDir();
       boolean  indexExists = getDirectoryFactory().exists(indexDir);
       boolean  firstTime;
       synchronized  (SolrCore. class ) {
         firstTime = dirs.add(getDirectoryFactory().normalize(indexDir));
       }
       boolean  removeLocks = solrConfig.unlockOnStartup;  // unlockOnStartup = getBool(indexConfigPrefix+"/unlockOnStartup", false); 默认为false
       initIndexReaderFactory();
       if  (indexExists && firstTime && !reload) {
        
         Directory dir = directoryFactory.get(indexDir, DirContext.DEFAULT,
             getSolrConfig().indexConfig.lockType);
         try  {
           if  (IndexWriter.isLocked(dir)) {
             if  (removeLocks) {
               log.warn(
                   logid
                       "WARNING: Solr index directory '{}' is locked.  Unlocking..." ,
                   indexDir);
               IndexWriter.unlock(dir);  //解锁
             else  {
               log.error(logid
                   "Solr index directory '{}' is locked.  Throwing exception" ,
                   indexDir);
               throw  new  LockObtainFailedException(
                   "Index locked for write for core "  + name);
             }
            
           }
         finally  {
           directoryFactory.release(dir);
         }
       }
       // Create the index if it doesn't exist.
       if (!indexExists) {
         log.warn(logid+ "Solr index directory '"  new  File(indexDir) +  "' doesn't exist."
                 " Creating new index..." );
         SolrIndexWriter writer = SolrIndexWriter.create( "SolrCore.initIndex" , indexDir, getDirectoryFactory(),  true ,
                                                         getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
         writer.close();
       }
   }

9.tomcat的配置导致的问题,每台机器两个solr实例,其中一个一直不能启动(在实例化core时会尝试获取锁,这里获取锁失败,可以手动删除write.lock)
最终发现是两个tomcat写到了一个solr目录里面

错误日志:

1
2
3
4
5
6
7
8
9
Caused by: org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: NativeFSLock@ /apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write .lock
      at org.apache.lucene.store.Lock.obtain(Lock.java:89)
      at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710)
      at org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77)
      at org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64)
      at org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267)
      at org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110)
      at org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513)
      ... 12  more

10.部分job运行缓慢,其中一个job运行了11个小时。。
原因:
数据写入时发生在mapoperator或者reduceoperator中,多少个map或者reduce就是多少个并发线程写入。job只有一个reduce,导致写入缓慢,调整reduce的数量到100(set mapreduce.job.reduces=100)后,性能大幅度提升,3kw数据导入时间由40916s下降到993s。



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1612601,如需转载请自行联系原作者

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
4天前
|
搜索推荐 编译器 Linux
一个可用于企业开发及通用跨平台的Makefile文件
一款适用于企业级开发的通用跨平台Makefile,支持C/C++混合编译、多目标输出(可执行文件、静态/动态库)、Release/Debug版本管理。配置简洁,仅需修改带`MF_CONFIGURE_`前缀的变量,支持脚本化配置与子Makefile管理,具备完善日志、错误提示和跨平台兼容性,附详细文档与示例,便于学习与集成。
296 116
|
19天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
7天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
461 44
Meta SAM3开源:让图像分割,听懂你的话
|
13天前
|
安全 Java Android开发
深度解析 Android 崩溃捕获原理及从崩溃到归因的闭环实践
崩溃堆栈全是 a.b.c?Native 错误查不到行号?本文详解 Android 崩溃采集全链路原理,教你如何把“天书”变“说明书”。RUM SDK 已支持一键接入。
685 222
|
1天前
|
Windows
dll错误修复 ,可指定下载dll,regsvr32等
dll错误修复 ,可指定下载dll,regsvr32等
134 95
|
11天前
|
人工智能 移动开发 自然语言处理
2025最新HTML静态网页制作工具推荐:10款免费在线生成器小白也能5分钟上手
晓猛团队精选2025年10款真正免费、无需编程的在线HTML建站工具,涵盖AI生成、拖拽编辑、设计稿转代码等多种类型,均支持浏览器直接使用、快速出图与文件导出,特别适合零基础用户快速搭建个人网站、落地页或企业官网。
1679 158
|
存储 人工智能 监控
从代码生成到自主决策:打造一个Coding驱动的“自我编程”Agent
本文介绍了一种基于LLM的“自我编程”Agent系统,通过代码驱动实现复杂逻辑。该Agent以Python为执行引擎,结合Py4j实现Java与Python交互,支持多工具调用、记忆分层与上下文工程,具备感知、认知、表达、自我评估等能力模块,目标是打造可进化的“1.5线”智能助手。
931 61