1、kafka是什么

  • Apache Kafka是一个开源消息系统,由Scala写成

  • Kafka最初是由LinkedIn开发,并于2011年初开源

  • Kafka是一个分布式消息队列:生产者消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现

  • Kafka对消息保存时根据Topic进行分类,发送消息者称为producer,消息接受者称为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例称为broker

  • 无论是Kafka集群还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统的可用性


JMS:jms是Java提供的一套技术规范。

    可以用来异构系统集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活。


类JMS消息队列,结合JMS中的两种模式,可以有多个消费者主动拉取数据,在JMS中只有点对点模式才有消费者主动拉取数据。

kafka和JMS的区别.png


kafka是一个生产-消费模型。

    01.Producer:生产者

        只负责数据生产,生产者的代码可以集成到任务系统中。 数据的分发策略由producer决定,默认是defaultPartition  Utils.abs(key.hashCode) % numPartitions

    02.Broker

        当前服务器上的Kafka进程,俗称拉皮条。只管数据存储,不管是谁生产,不管是谁消费。在集群中每个broker都有一个唯一brokerid,不得重复。

    03.Topic:

        目标发送的目的地,这是一个逻辑上的概念,落到磁盘上是一个partition的目录。partition的目录中有多个segment组合(index,log)

        一个Topic对应多个partition[0,1,2,3],一个partition对应多个segment组合。一个segment有默认的大小是1G。

        每个partition可以设置多个副本(replication-factor 1),会从所有的副本中选取一个leader出来。所有读写操作都是通过leader来进行的

        特别强调,和mysql中主从有区别,mysql做主从是为了读写分离,在kafka中读写操作都是leader。

    04.ConsumerGroup:

        数据消费者组,ConsumerGroup可以有多个,每个ConsumerGroup消费的数据都是一样的。

        可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据,组员之间不能重复消费。

        (在下面代码配置文件中,可以设置groupID和读取的位置)

    05.zookeeper

        依赖集群保存meta信息(每次读取到哪的信息)。


    kafka集群.png

2、kafka生产数据时的分组策略

    默认是defaultPartition  Utils.abs(key.hashCode) % numPartitions

    上文中的key是producer在发送数据时传入的,produer.send(KeyedMessage(topic,myPartitionKey,messageContent))


3、kafka如何保证数据的完全生产

    ack机制:broker表示发来的数据已确认接收无误,表示数据已经保存到磁盘。

    0:不等待broker返回确认消息

    1:等待topic中某个partition leader保存成功的状态反馈

    -1:等待topic中某个partition 所有副本都保存成功的状态反馈

4、broker如何保存数据

    在理论环境下,broker按照顺序读写的机制,可以每秒保存600M的数据。主要通过pagecache机制,尽可能的利用当前物理机器上的空闲内存来做缓存。

    当前topic所属的broker,必定有一个该topic的partition,partition是一个磁盘目录。partition的目录中有多个segment组合(index,log)


5、partition如何分布在不同的broker上

    int i = 0

    list{kafka01,kafka02,kafka03}

    

    for(int i=0;i<5;i++){

        brIndex = i%broker;

        hostName = list.get(brIndex)

    }

6、consumerGroup的组员和partition之间如何做负载均衡

    最好是一一对应,一个partition对应一个consumer。

    如果consumer的数量过多,必然有空闲的consumer。

    

    算法:

        假如topic1,具有如下partitions: P0,P1,P2,P3

        加入group中,有如下consumer: C1,C2

        首先根据partition索引号对partitions排序: P0,P1,P2,P3

        根据consumer.id排序: C0,C1

        计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

        然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]


7、如何保证kafka消费者消费数据是全局有序的

    伪命题

    如果要全局有序的,必须保证生产有序,存储有序,消费有序。

    由于生产可以做集群,存储可以分片,消费可以设置为一个consumerGroup,要保证全局有序,就需要保证每个环节都有序。

    只有一个可能,就是一个生产者,一个partition,一个消费者。这种场景和大数据应用场景相悖。

8.kafka生产数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import  kafka.javaapi.producer.Producer;
import  kafka.producer.KeyedMessage;
import  kafka.producer.ProducerConfig;
 
import  java.util.Properties;
import  java.util.UUID;
 
/**
  * 这是一个简单的Kafka producer代码
  * 包含两个功能:
  * 1、数据发送
  * 2、数据按照自定义的partition策略进行发送
  * KafkaSpout的类
  */
public  class  KafkaProducerSimple {
     public  static  void  main(String[] args) {
 
         //1、指定当前kafka producer生产的数据的目的地
         //创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
         //bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
         String TOPIC =  "orderMq" ;
 
         //2、读取配置文件
         Properties props =  new  Properties();
 
         //key.serializer.class默认为serializer.class
         props.put( "serializer.class" "kafka.serializer.StringEncoder" );
 
         //kafka broker对应的主机,格式为host1:port1,host2:port2
         props.put( "metadata.broker.list" "kafka01:9092,kafka02:9092,kafka03:9092" );
         
//      request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
//      0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
//      1,意味着在leader replica已经接收到数据后,producer会得到一个ack。这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
//      -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
 
         props.put( "request.required.acks" "1" );
 
//      可选配置,如果不配置,则使用默认的partitioner partitioner.class
//      默认值:kafka.producer.DefaultPartitioner
//      用来把消息分到各个partition中,默认行为是对key进行hash。
         props.put( "partitioner.class" "cn.my.storm.kafka.MyLogPartitioner" );
//      props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
 
         //3、通过配置文件,创建生产者
         Producer<String, String> producer =  new  Producer<String, String>( new  ProducerConfig(props));
 
         //4、通过for循环生产数据
         for  ( int  messageNo =  1 ; messageNo <  100000 ; messageNo++) {
//            String messageStr = new String(messageNo + "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey," +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "用来配合自定义的MyLogPartitioner进行数据分发");
 
//            5、调用producer的send方法发送数据
//            注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
             producer.send( new  KeyedMessage<String, String>(TOPIC, messageNo +  "" "appid"  + UUID.randomUUID() +  "itcast" ));
         }
     }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import  kafka.producer.Partitioner;
import  kafka.utils.VerifiableProperties;
import  org.apache.log4j.Logger;
 
public  class  MyLogPartitioner  implements  Partitioner {
     private  static  Logger logger = Logger.getLogger(MyLogPartitioner. class );
 
     public  MyLogPartitioner(VerifiableProperties props) {
     }
 
     public  int  partition(Object obj,  int  numPartitions) {
         return  Integer.parseInt(obj.toString())%numPartitions;
//        return 1;
     }
 
}

9.kafka消费数据(低阶)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import  kafka.consumer.Consumer;
import  kafka.consumer.ConsumerConfig;
import  kafka.consumer.ConsumerIterator;
import  kafka.consumer.KafkaStream;
import  kafka.javaapi.consumer.ConsumerConnector;
import  kafka.message.MessageAndMetadata;
 
import  java.util.HashMap;
import  java.util.List;
import  java.util.Map;
import  java.util.Properties;
import  java.util.concurrent.ExecutorService;
import  java.util.concurrent.Executors;
 
public  class  KafkaConsumerSimple  implements  Runnable {
     public  String title;
     public  KafkaStream< byte [],  byte []> stream;
     public  KafkaConsumerSimple(String title, KafkaStream< byte [],  byte []> stream) {
         this .title = title;
         this .stream = stream;
     }
     @Override
     public  void  run() {
         System.out.println( "开始运行 "  + title);
         ConsumerIterator< byte [],  byte []> it = stream.iterator();
         /**
          * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
          * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
          * */
         while  (it.hasNext()) {
             MessageAndMetadata< byte [],  byte []> data = it.next();
             String topic = data.topic();
             int  partition = data.partition();
             long  offset = data.offset();
             String msg =  new  String(data.message());
             System.out.println(String.format(
                     "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]" ,
                     title, topic, partition, offset, msg));
         }
         System.out.println(String.format( "Consumer: [%s] exiting ..." , title));
     }
 
     public  static  void  main(String[] args)  throws  Exception{
         Properties props =  new  Properties();
         props.put( "group.id" "dashujujiagoushi" );
         props.put( "zookeeper.connect" "zk01:2181,zk02:2181,zk03:2181" );
         props.put( "auto.offset.reset" "largest" );
         props.put( "auto.commit.interval.ms" "1000" );
         props.put( "partition.assignment.strategy" "roundrobin" );
         ConsumerConfig config =  new  ConsumerConfig(props);
         String topic1 =  "orderMq" ;
         String topic2 =  "paymentMq" ;
         
         //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
         ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
         
         //定义一个map
         Map<String, Integer> topicCountMap =  new  HashMap<>();
         topicCountMap.put(topic1,  3 );
         
         //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
         Map<String, List<KafkaStream< byte [],  byte []>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
         
         //取出 `kafkaTest` 对应的 streams
         List<KafkaStream< byte [],  byte []>> streams = topicStreamsMap.get(topic1);
         
         //创建一个容量为4的线程池
         ExecutorService executor = Executors.newFixedThreadPool( 3 );
         //创建20个consumer threads
         for  ( int  i =  0 ; i < streams.size(); i++)
             executor.execute( new  KafkaConsumerSimple( "消费者"  + (i +  1 ), streams.get(i)));
     }
}


10.kafka和zookeeper使用JavaAPI能够拉取到数据(高阶消费)

properties配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
###zookeeper\u548ckafka\u914d\u7f6e\u5730\u5740
zk.connect=xxxxx
#zk.connect=xxxxx
###kafka\u6d88\u8d39\u7684group\u5fc5\u987b\u8c03\u6574\u4e3a\u72ec\u5360
adinfo.log.group.name=qinbin_ad_interfaceLog_20171218
###kafka\u7684topic.\u9700\u8981\u548cadstat\u6a21\u5757\u7684kafka topic\u4e00\u81f4
adinfo.log.topic.name=ad_interfaceLog
adinfo.log.queue.max= 10000
adinfo.log.list.size= 1
###\u4e2d\u95f4\u7ed3\u679c\u4fdd\u5b58\u65e5\u5fd7
adinfo.log.pathFile=E:/opt/realtime/avro/file/
 
  
###\u9ed8\u8ba4\u4e0d\u8981\u52a8
adinfo.statistics.time= 120000
adinfo.statistics.commitSize= 3000

kafka配置文件(注意groupID)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?xml version= "1.0"  encoding= "UTF-8" ?>
<beans xmlns= "http://www.springframework.org/schema/beans"
     xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"  xmlns: int = "http://www.springframework.org/schema/integration"
     xmlns: int -kafka= "http://www.springframework.org/schema/integration/kafka"
     xmlns:task= "http://www.springframework.org/schema/task"
     xsi:schemaLocation="http: //www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
         http: //www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
         http: //www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http: //www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
 
     < int :channel id= "inputFromAdinfo" >
         < int :queue/>
     </ int :channel>
     
     < int -kafka:inbound-channel-adapter
         id= "kafkaInboundChannelAdinfo"  kafka-consumer-context-ref= "consumerContextAdinfo"
         auto-startup= "true"  channel= "inputFromAdinfo"  >
         < int :poller fixed-delay= "10"  time-unit= "MILLISECONDS"   max-messages-per-poll= "5"  />
     </ int -kafka:inbound-channel-adapter>
 
     <bean id= "consumerPropertiesAdinfo"
         class = "org.springframework.beans.factory.config.PropertiesFactoryBean" >
         <property name= "properties" >
             <props>
                 <prop key= "auto.offset.reset" >smallest</prop>
                 <prop key= "socket.receive.buffer.bytes" > 314572 </prop> <!-- 5M -->
                 <prop key= "fetch.min.bytes" > 26214 </prop><!-- 256k -->
                 <prop key= "fetch.message.max.bytes" > 104857 </prop><!-- 3M -->
                 <prop key= "fetch.wait.max.ms" > 5000 </prop>
                 <prop key= "auto.commit.interval.ms" > 2000 </prop>
                 <prop key= "rebalance.backoff.ms" > 5000 </prop>
                 <prop key= "rebalance.max.retries" > 5 </prop>
             </props>
         </property>
     </bean>
 
     < int -kafka:consumer-context id= "consumerContextAdinfo"
         consumer-timeout= "4000"  zookeeper-connect= "zookeeperConnectAdinfo"  consumer-properties= "consumerPropertiesAdinfo" >
         < int -kafka:consumer-configurations>
         <!-- 需要注意如果两个线程同时互不相干去消费通一个topic,则需要注意group-id不能重复 -->
             < int -kafka:consumer-configuration group-id= "${adinfo.log.group.name}"  max-messages= "500" >
                 < int -kafka:topic id= "${adinfo.log.topic.name}"  streams= "1"  />
             </ int -kafka:consumer-configuration>
         </ int -kafka:consumer-configurations>
     </ int -kafka:consumer-context>
 
     < int -kafka:zookeeper-connect id= "zookeeperConnectAdinfo"
           zk-connect= "${zk.connect}"  zk-connection-timeout= "6000"
           zk-session-timeout= "6000"  zk-sync-time= "2000" />
</beans>

然后在spring配置文件中import kafka的配置文件


Java接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import  java.io.UnsupportedEncodingException;
import  java.util.Collection;
import  java.util.Iterator;
import  java.util.List;
import  java.util.Map;
import  java.util.Map.Entry;
import  java.util.Set;
import  java.util.concurrent.ConcurrentHashMap;
 
import  javax.annotation.Resource;
 
import  org.apache.avro.io.DatumReader;
import  org.apache.avro.specific.SpecificDatumReader;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
import  org.springframework.integration.channel.QueueChannel;
import  org.springframework.messaging.Message;
 
import  com.ElasticSearchServiceImpl;
import  com.IElasticSearchService;
import  com.AdInfoRealTimeThread;
import  com.ConfigUtil;
import  com.AdInfo;
 
public  class  AdInfoConsumer {
     // DatumReader<AdInfo> adInfoDatumReader = new
     // SpecificDatumReader<AdInfo>(AdInfoOld.getClassSchema(),AdInfo.getClassSchema());
     DatumReader<AdInfo> adInfoDatumReader =  new  SpecificDatumReader<AdInfo>(AdInfo. class );
     private  Logger logger = LoggerFactory.getLogger(AdInfoConsumer. class );
     @Resource (type = ElasticSearchServiceImpl. class )
     private  IElasticSearchService elasticSearchServiceImpl;
     @Resource (type = ConfigUtil. class )
     private  ConfigUtil configUtil;
 
     private  QueueChannel queueChannel;
     public  QueueChannel getQueueChannel() {
         return  queueChannel;
     }
     public  void  setQueueChannel(QueueChannel queueChannel) {
         this .queueChannel = queueChannel;
     }
 
     private  AdInfoRealTimeThread adInfoRealTimeThread;
     public  AdInfoRealTimeThread getAdInfoRealTimeThread() {
         return  adInfoRealTimeThread;
     }
     public  void  setAdInfoRealTimeThread(AdInfoRealTimeThread adInfoRealTimeThread) {
         this .adInfoRealTimeThread = adInfoRealTimeThread;
     }
 
     public  void  consumerLog()  throws  UnsupportedEncodingException {
 
         @SuppressWarnings ( "rawtypes" )
         Message msg;
         while  ((msg = queueChannel.receive()) !=  null ) {
             // msg = queueChannel.receive();
             try  {
                 Map<String, Object> map = (Map<String, Object>) msg.getPayload();
                 Set<Entry<String, Object>> set = map.entrySet();
                 for  (Map.Entry<String, Object> entry : set) {
                     String topic = entry.getKey();
                     ConcurrentHashMap<Integer, List< byte []>> messages = (ConcurrentHashMap<Integer, List< byte []>>) entry
                             .getValue();
                     Collection<List< byte []>> values = messages.values();
                     for  (Iterator<List< byte []>> iterator = values.iterator(); iterator.hasNext();) {
                         List< byte []> list = iterator.next();
                         for  ( byte [] object : list) {
                             String message =  new  String(object,  "UTF-8" );
                             StringBuilder megJson =  new  StringBuilder(message);
                             megJson.delete( 0 , megJson.indexOf( "=" ) +  1 );
                             // logger.info("json:"+megJson.toString());
                             // adinfoToSaveES.saveAdLogToEs(megJson.toString());
                             elasticSearchServiceImpl.executeSearch(configUtil.clusterName,megJson.toString());
                             //System.out.println(megJson.toString());
 
                         }
                     }
                 }
             catch  (Exception ex) {
                 logger.error( "===AdInfoConsumer consumer is exception" , ex);
             }
         }
 
         logger.error( "====AdInfoConsumer receive is interrupted====" );
     }
 
     /*
      * public void consumerLog() throws UnsupportedEncodingException {
     
      * @SuppressWarnings("rawtypes") Message msg; while ((msg =
      * queueChannel.receive()) != null) {
     
      * try {
     
      * Map<String, Object> map = (Map<String, Object>) msg.getPayload();
      * Set<Entry<String, Object>> set = map.entrySet(); for (Map.Entry<String,
      * Object> entry : set) { // String topic = entry.getKey();
      * ConcurrentHashMap<Integer, List<byte[]>> messages =
      * (ConcurrentHashMap<Integer, List<byte[]>>) entry .getValue();
      * Collection<List<byte[]>> values = messages.values(); for
      * (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
      * List<byte[]> list = iterator.next(); for (byte[] object : list) {
     
      * try { Decoder decoder = DecoderFactory.get().binaryDecoder(object, null);
     
      * AdInfo adInfo = adInfoDatumReader.read(null, decoder);
     
      * String json=adInfo.toString(); System.out.println("*************"+json);
      * //logger.info("json:"+json); //adInfoRealTimeThread.statistics(json);
     
      * } catch (Exception e) {
      * logger.error("===AdInfoConsumer consumer one is exception", e); }
     
     
      * } } } } catch (Exception ex) {
      * logger.error("===AdInfoConsumer consumer is exception", ex); } }
     
      * logger.error("====AdInfoConsumer receive is interrupted===="); }
      */
}