python-kafka客户端封装

简介: python-kafka客户端封装

前言

本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:

pip install kafka-python

封装代码

kafka_helper.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from typing import List
class KProducer:
    def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),
                 value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                buffer_memory=33554432,
                batch_size=1048576,
                max_request_size=1048576,
                key_serializer=key_serializer,
                value_serializer=value_serializer,
                compression_type=compression_type  # 压缩消息发送 gzip lz4 snappy zstd
            )
            print("connect success, kafka producer info {0}".format(bootstrap_servers))
        except Exception as e:
            raise Exception("connect kafka failed, {}.".format(e))
    def sync_send(self, topic: str, data):
        """
        同步发送数据
        :param data:  发送数据
        :param topic: 主题
        :return: partition, offset
        """
        try:
            future = self.producer.send(topic, data)
            record_metadata = future.get(timeout=10)  # 同步确认消费
            partition = record_metadata.partition  # 数据所在的分区
            offset = record_metadata.offset  # 数据所在分区的位置
            print("save success, partition: {}, offset: {}".format(partition, offset))
            return partition, offset
        except Exception as e:
            raise Exception("Kafka sync send failed, {}.".format(e))
    def async_send(self, topic: str, data):
        """
        异步发送数据
        :param data:  发送数据
        :param topic: 主题
        :return: None
        """
        try:
            self.producer.send(topic, data)
            print("send data:{}".format(data))
        except Exception as e:
            raise Exception("Kafka asyn send failed, {}.".format(e))
    def async_callback(self, topic: str, data):
        """
        异步发送数据 + 发送状态处理
        :param data:发送数据
        :param topic: 主题
        :return: None
        """
        try:
            for item in data:
                self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)
                self.producer.flush()  # 批量提交
        except Exception as e:
            raise Exception("Kafka asyn send fail, {}.".format(e))
    @staticmethod
    def __send_success():
        """异步发送成功回调函数"""
        print("save success")
        return
    @staticmethod
    def __send_error():
        """异步发送错误回调函数"""
        print("save error")
        return
    def close(self):
        self.producer.close()
class KConsumer:
    def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,
                 value_deserializer=None, auto_offset_reset="latest"):
        self.topic = topic
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=bootstrap_servers,
                group_id=group_id,
                enable_auto_commit=False,
                auto_commit_interval_ms=1000,
                session_timeout_ms=30000,
                max_poll_records=50,
                max_poll_interval_ms=30000,
                metadata_max_age_ms=3000,
                key_deserializer=key_deserializer,
                value_deserializer=value_deserializer,
                auto_offset_reset=auto_offset_reset
            )
            self.consumer.subscribe(topics=[self.topic])
            print("connect to kafka and subscribe topic success")
        except Exception as e:
            raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))
    def get_consumer(self):
        """
        返会可迭代consumer
        :return: consumer
        """
        return self.consumer
    def set_topic(self, topic: str):
        """
        订阅主题
        :param topic: 主题
        :return: None
        """
        self.topic = topic
        self.consumer.subscribe(topics=[self.topic])
    def get_message_by_partition_offset(self, partition, offset):
        """
        通过partition、offset获取一个消息
        :param partition: 分区
        :param offset: 游标、下标、序号
        :return: message,消息
        """
        self.consumer.unsubscribe()
        partition = TopicPartition(self.topic, partition)
        self.consumer.assign([partition])
        self.consumer.seek(partition, offset=offset)
        for message in self.consumer:
            return message

测试代码

kafka_test.py

from kafka_helper import KProducer,KConsumer
import json
def sync_send_test(bootstrap_servers,topic,json_format=True):
    value = {
        "send_type": "sync_send",
        "name": "lady_killer",
        "age": 18
    }
    if json_format:
        p = KProducer(bootstrap_servers=bootstrap_servers)
        p.sync_send(value,topic)
    else:
        p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
        v = bytes('{}'.format(json.dumps(value)), 'utf-8')
        p.sync_send(v,topic)
    p.close()
def async_send_test(bootstrap_servers,topic,json_format=True):
    value = {
        "send_type": "async_send",
        "name":"lady_killer",
        "age":18
    }
    if json_format:
        p = KProducer(bootstrap_servers=bootstrap_servers)
        p.asyn_send(value,topic)
    else:
        p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)
        v = bytes('{}'.format(json.dumps(value)), 'utf-8')
        p.asyn_send(v,topic)
    p.close()
def consumer_test(bootstrap_servers,topic):
    c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")
    for data in c.get_consumer():
        print(type(data.value),data.value)
        print(json.loads(data.value))
def get_one_msg(bootstrap_servers,topic,partition,offset):
    c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")
    msg = c.get_message_by_partition_offset(partition,offset)
    print(msg)
if __name__ == '__main__':
    bootstrap_servers = ["kafka:9092"]
    topic = "demodata"
    # 测试生产
    sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
    async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)
    sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
    async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)
    # 测试消费
    consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)
    # get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)

参考

Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

相关文章
|
2月前
|
Python
用python转移小文件到指定目录并压缩,脚本封装
这篇文章介绍了如何使用Python脚本将大量小文件转移到指定目录,并在达到大约250MB时进行压缩。
38 2
|
28天前
|
Python
Socket学习笔记(二):python通过socket实现客户端到服务器端的图片传输
使用Python的socket库实现客户端到服务器端的图片传输,包括客户端和服务器端的代码实现,以及传输结果的展示。
108 3
Socket学习笔记(二):python通过socket实现客户端到服务器端的图片传输
|
28天前
|
JSON 数据格式 Python
Socket学习笔记(一):python通过socket实现客户端到服务器端的文件传输
本文介绍了如何使用Python的socket模块实现客户端到服务器端的文件传输,包括客户端发送文件信息和内容,服务器端接收并保存文件的完整过程。
110 1
Socket学习笔记(一):python通过socket实现客户端到服务器端的文件传输
|
2月前
|
关系型数据库 MySQL Python
mysql之python客户端封装类
mysql之python客户端封装类
|
3月前
|
数据安全/隐私保护 开发者 Python
在 Python 中定义封装?
【8月更文挑战第29天】
31 9
|
2月前
|
Linux Python
linux 封装 python
linux 封装 python
|
2月前
|
Linux Python
Linux 下封装 Python
Linux 下封装 Python
|
3月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
73 4
|
3月前
|
网络协议 安全 Unix
6! 用Python脚本演示TCP 服务器与客户端通信过程!
6! 用Python脚本演示TCP 服务器与客户端通信过程!
|
3月前
|
传感器 数据采集 算法
python实现ModBusRTU客户端方式
python实现基于串口通信的ModBusRTU客户端是一件简单的事情,只要通过pymodbus模块就可以实现。