SSM(十六) 曲线救国-Kafka消费异常

简介: 最近线上遇到一个问题:在消费kafka消息的时候如果长时间(大概半天到一天的时间)队列里没有消息就可能再也消费不了。针对这个问题我们反复调试多次。线下模拟,调整代码,但貌似还是没有找到原因。但是只要重启消费进程就又可以继续消费。

解决方案


由于线上业务非常依赖kafka的消费,但一时半会也没有找到原因,所以最后只能想一个临时的替换方案:


基于重启就可以消费这个特点,我们在每次消费的时候都记下当前的时间点,当这个时间点在十分钟之内都没有更新我们就认为当前队列中没有消息了,就需要重启下消费进程。


既然是需要重启,由于目前还没有上分布式调度中心所以需要crontab来配合调度:每隔一分钟会调用一个shell脚本,该脚本会判断当前进程是否存在,如果存在则什么都不作,不存在则启动消费进程。


具体实现


消费程序:


/**
 * kafka消费
 *
 * @author crossoverJie
 * @date 2017年6月19日 下午3:15:16
 */
public class KafkaMsgConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMsgConsumer.class);
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAXIMUM_POOL_SIZE = 4;
    private static final int BLOCKING_QUEUE_CAPACITY = 4000;
    private static final String KAFKA_CONFIG = "kafkaConfig";
    private static final ExecutorService fixedThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(BLOCKING_QUEUE_CAPACITY));
    //最后更新时间
    private static AtomicLong LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());
    private static MsgIterator iter = null;
    private static String topic;//主题名称
    static {
        Properties properties = new Properties();
        String path = System.getProperty(KAFKA_CONFIG);
        checkArguments(!StringUtils.isBlank(path), "启动参数中没有配置kafka_easyframe_msg参数来指定kafka启动参数,请使用-DkafkaConfig=/path/fileName/easyframe-msg.properties");
        try {
            properties.load(new FileInputStream(new File(path)));
        } catch (IOException e) {
            LOGGER.error("IOException" ,e);
        }
        EasyMsgConfig.setProperties(properties);
    }
    private static void iteratorTopic() {
        if (iter == null) {
            iter = MsgUtil.consume(topic);
        }
        long i = 0L;
        while (iter.hasNext()) {
            i++;
            if (i % 10000 == 0) {
                LOGGER.info("consume i:" + i);
            }
            try {
                String message = iter.next();
                if (StringUtils.isEmpty(message)) {
                    continue;
                }
                LAST_MESSAGE_TIME = new AtomicLong(DateUtil.getLongTime());
                //处理消息
                LOGGER.debug("msg = " + JSON.toJSONString(message));
            } catch (Exception e) {
                LOGGER.error("KafkaMsgConsumer err:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    LOGGER.error("Thread InterruptedException", e1);
                }
                break;
            }
        }
    }
    public static void main(String[] args) {
        topic = System.getProperty("topic");
        checkArguments(!StringUtils.isBlank(topic), "system property topic or log_path is must!");
        while (true) {
            try {
                iteratorTopic();
            } catch (Exception e) {
                MsgUtil.shutdownConsummer();
                iter = null;
                LOGGER.error("KafkaMsgConsumer err:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    LOGGER.error("Thread InterruptedException", e1);
                }
            } finally {
                //此处关闭之后,由crontab每分钟检查一次,挂掉的话会重新拉起来
                if (DateUtil.getLongTime() - LAST_MESSAGE_TIME.get() > 10 * 60) { //10分钟
                    fixedThreadPool.shutdown();
                    LOGGER.info("线程池是否关闭:" + fixedThreadPool.isShutdown());
                    try {
                        //当前线程阻塞10ms后,去检测线程池是否终止,终止则返回true
                        while (!fixedThreadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                            LOGGER.info("检测线程池是否终止:" + fixedThreadPool.isTerminated());
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("等待线程池关闭错误", e);
                    }
                    LOGGER.info("线程池是否终止:" + fixedThreadPool.isTerminated());
                    LOGGER.info("in 10 min dont have data break");
                    break;
                }
            }
        }
        LOGGER.info("app shutdown");
        System.exit(0);
    }
}


在线代码


需要配合以下这个shell脚本运行:


#!/bin/sh
#crontab
# * * * * * sh /data/schedule/kafka/run-kafka-consumer.sh >>/data/schedule/kafka/run-sms-log.log
# 如果进程存在就不启动
a1=`ps -ef|grep 'KafkaMsgConsumer'|grep -v grep|wc -l`
if [ $a1 -gt 0  ];then
        echo "=======     `date +'%Y-%m-%d %H:%M:%S'` KafkaMsgConsumer  is EXIT...=======     "
        exit
fi
LANG="zh_CN.UTF-8"
nohup /opt/java/jdk1.7.0_80/bin/java -d64 -Djava.security.egd=file:/dev/./urandom
-Djava.ext.dirs=/opt/tomcat/webapps/ROOT/WEB-INF/lib
-Dtopic=TOPIC_A
-Dlogback.configurationFile=/data/schedule/kafka/logback.xml
-DkafkaConfig=/opt/tomcat/iopconf/easyframe-msg.properties
-classpath /opt/tomcat/webapps/ROOT/WEB-INF/classes com.crossoverJie.kafka.SMSMsgConsumer >> /data/schedule/kafka/smslog/kafka.log 2>&1 &
echo "`date +'%Y-%m-%d %H:%M:%S'`  KafkaMsgConsumer running...."


在线代码


再配合crontab的调度:


* * * * * sh /data/schedule/kafka/run-kafka-consumer.sh >>/data/schedule/kafka/run-sms-log.log


即可。


总结


虽说处理起来很简单,但依然是治标不治本,依赖的东西比较多(shell脚本,调度)。

所以也问问各位有没有什么思路:



生产配置:


  • 三台kafka、ZK组成的集群。


其中也有其他团队的消费程序在正常运行,应该和kafka的配置没有关系。


项目地址:github.com/crossoverJi…

个人博客:crossoverjie.top


相关文章
|
消息中间件 缓存 人工智能
Kafka生产者客户端几种异常Case详解
1生产者UserCallBack异常 异常日志 ERROR Error executing user-provided callback on message for topic-partition &#39;Topic1-0&#39; (org.apache.kafka.clients.producer.internals.ProducerBatch) 通常还会有具体的异常栈信息 异常源码 ProducerBatch#completeFutureAndFireCallbacks
Kafka生产者客户端几种异常Case详解
|
6月前
|
消息中间件 搜索推荐 关系型数据库
淘东电商项目(51) -全局异常日志采集(ELK+Kafka)
淘东电商项目(51) -全局异常日志采集(ELK+Kafka)
55 0
|
消息中间件 JSON NoSQL
记一次Flink 消费Kafka数据积压排查解决
记一次Flink 消费Kafka数据积压排查解决
记一次Flink 消费Kafka数据积压排查解决
|
消息中间件 存储 数据采集
Streaming 消费 kafka 数据的两种方式|学习笔记
快速学习 Streaming 消费 kafka 数据的两种方式
178 0
|
消息中间件 NoSQL Kafka
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
302 0
【Flink-FlinkUtils】高级自定义封装工具类实现消费kafka数据保存数据到Redis
|
消息中间件 弹性计算 Java
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
246 0
【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境
|
消息中间件 Java Kafka
Java实现Flink集成Kafka消费数据
Java实现Flink集成Kafka消费数据
371 0
|
消息中间件 Java Kafka
Java模拟读取本地数据到Flink集成的Kafka并消费数据
Java模拟读取本地数据到Flink集成的Kafka并消费数据
184 0
|
消息中间件 运维 网络协议
【kafka运维】Topic的生产和消费运维脚本
1.Topic的发送kafka-console-producer.sh 1.1 生产无key消息 ## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties 1.2 生产有key消息加上属性--property parse.key=true
|
消息中间件 Kafka
kafka查看组消费情况
kafka查看组消费情况
207 0

热门文章

最新文章