前言
本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:
pip install kafka-python
封装代码
kafka_helper.py
#!/usr/bin/env python # -*- coding: utf-8 -*- import json import traceback from kafka import KafkaConsumer, KafkaProducer, TopicPartition from typing import List class KProducer: def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"), value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None): try: self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, buffer_memory=33554432, batch_size=1048576, max_request_size=1048576, key_serializer=key_serializer, value_serializer=value_serializer, compression_type=compression_type # 压缩消息发送 gzip lz4 snappy zstd ) print("connect success, kafka producer info {0}".format(bootstrap_servers)) except Exception as e: raise Exception("connect kafka failed, {}.".format(e)) def sync_send(self, topic: str, data): """ 同步发送数据 :param data: 发送数据 :param topic: 主题 :return: partition, offset """ try: future = self.producer.send(topic, data) record_metadata = future.get(timeout=10) # 同步确认消费 partition = record_metadata.partition # 数据所在的分区 offset = record_metadata.offset # 数据所在分区的位置 print("save success, partition: {}, offset: {}".format(partition, offset)) return partition, offset except Exception as e: raise Exception("Kafka sync send failed, {}.".format(e)) def async_send(self, topic: str, data): """ 异步发送数据 :param data: 发送数据 :param topic: 主题 :return: None """ try: self.producer.send(topic, data) print("send data:{}".format(data)) except Exception as e: raise Exception("Kafka asyn send failed, {}.".format(e)) def async_callback(self, topic: str, data): """ 异步发送数据 + 发送状态处理 :param data:发送数据 :param topic: 主题 :return: None """ try: for item in data: self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error) self.producer.flush() # 批量提交 except Exception as e: raise Exception("Kafka asyn send fail, {}.".format(e)) @staticmethod def __send_success(): """异步发送成功回调函数""" print("save success") return @staticmethod def __send_error(): """异步发送错误回调函数""" print("save error") return def close(self): self.producer.close() class KConsumer: def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None, value_deserializer=None, auto_offset_reset="latest"): self.topic = topic try: self.consumer = KafkaConsumer( self.topic, bootstrap_servers=bootstrap_servers, group_id=group_id, enable_auto_commit=False, auto_commit_interval_ms=1000, session_timeout_ms=30000, max_poll_records=50, max_poll_interval_ms=30000, metadata_max_age_ms=3000, key_deserializer=key_deserializer, value_deserializer=value_deserializer, auto_offset_reset=auto_offset_reset ) self.consumer.subscribe(topics=[self.topic]) print("connect to kafka and subscribe topic success") except Exception as e: raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc())) def get_consumer(self): """ 返会可迭代consumer :return: consumer """ return self.consumer def set_topic(self, topic: str): """ 订阅主题 :param topic: 主题 :return: None """ self.topic = topic self.consumer.subscribe(topics=[self.topic]) def get_message_by_partition_offset(self, partition, offset): """ 通过partition、offset获取一个消息 :param partition: 分区 :param offset: 游标、下标、序号 :return: message,消息 """ self.consumer.unsubscribe() partition = TopicPartition(self.topic, partition) self.consumer.assign([partition]) self.consumer.seek(partition, offset=offset) for message in self.consumer: return message
测试代码
kafka_test.py
from kafka_helper import KProducer,KConsumer import json def sync_send_test(bootstrap_servers,topic,json_format=True): value = { "send_type": "sync_send", "name": "lady_killer", "age": 18 } if json_format: p = KProducer(bootstrap_servers=bootstrap_servers) p.sync_send(value,topic) else: p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None) v = bytes('{}'.format(json.dumps(value)), 'utf-8') p.sync_send(v,topic) p.close() def async_send_test(bootstrap_servers,topic,json_format=True): value = { "send_type": "async_send", "name":"lady_killer", "age":18 } if json_format: p = KProducer(bootstrap_servers=bootstrap_servers) p.asyn_send(value,topic) else: p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None) v = bytes('{}'.format(json.dumps(value)), 'utf-8') p.asyn_send(v,topic) p.close() def consumer_test(bootstrap_servers,topic): c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest") for data in c.get_consumer(): print(type(data.value),data.value) print(json.loads(data.value)) def get_one_msg(bootstrap_servers,topic,partition,offset): c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest") msg = c.get_message_by_partition_offset(partition,offset) print(msg) if __name__ == '__main__': bootstrap_servers = ["kafka:9092"] topic = "demodata" # 测试生产 sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic) async_send_test(bootstrap_servers=bootstrap_servers,topic=topic) sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False) async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False) # 测试消费 consumer_test(bootstrap_servers=bootstrap_servers,topic=topic) # get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)