我的mqtt协议和emqttd开源项目个人理解(13) - Hook使用和连接Kafka发送消息,使用brod库

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 我的mqtt协议和emqttd开源项目个人理解(13) - Hook使用和连接Kafka发送消息,使用brod库

一、工作环境准备


erlang kafka客户端库使用的是brod,https://github.com/klarna/brod


emq使用的是v2.3.5版本,https://github.com/emqtt/emq-relx


kafka的运行环境准备,http://blog.csdn.net/libaineu2004/article/details/79202408


我们以插件的形式来实现,我的插件路径是/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps


copy一份emq_plugin_template,并更名为emq_plugin_kafka_brod,注意相关配置文件和源文件都要更名。


进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_brod


Makefile文件,新增红色文字


BUILD_DEPS = emqttd cuttlefish brod

dep_brod = git https://github.com/klarna/brod.git 3.4.0




二、工程文件修改


1、进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/Makefile 文件


Makefile 增加


DEPS += emq_plugin_kafka_brod


2、relx.config 中 release 段落添加这两行:


brod,

{emq_plugin_kafka_brod, load}



3、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/data/loaded_plugins设置自启动插件


emq_recon.

emq_modules.

emq_retainer.

emq_dashboard.

emq_plugin_kafka_brod.



4、编译过程会报错:


DEP    brod

make[2]: Entering directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod'

../../erlang.mk:1260: warning: overriding recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/docopt'

../../erlang.mk:1235: warning: ignoring old recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/docopt'

../../erlang.mk:1260: warning: overriding recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/jsone'

../../erlang.mk:1235: warning: ignoring old recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/jsone'

DEP    supervisor3

Error: Unknown or invalid dependency: supervisor3.

make[2]: *** [/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/supervisor3] Error 78

make[2]: Leaving directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod'

make[1]: *** [deps] Error 2

make[1]: Leaving directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_brod'

make: *** [deps] Error 2

解决方法如下:


/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod


Makefile文件,第12和13行

dep_supervisor3_commit = 1.1.5

dep_kafka_protocol_commit = 1.1.2

修改为:

dep_supervisor3 = git https://github.com/klarna/supervisor3.git 1.1.5


dep_kafka_protocol = git https://github.com/klarna/kafka_protocol.git 1.1.2




三、源码实现(完整的源码下载地址:https://download.csdn.net/download/libaineu2004/10284403


1、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_brod.config


[
  {emq_plugin_kafka_brod, [
    {kafka, [
      { bootstrap_broker, [{"127.0.0.1", 9092}] }, %%能使用"localhost"
      { query_api_versions, false },
      { reconnect_cool_down_seconds, 10}
    ]}
  ]}
].

kafka broker集群时,建议不使用localhost和127.0.0.1。而是使用真实ip。可以多个ip,例如:


[{"172.16.6.170", 9092},{"172.16.6.170", 9093},{"172.16.6.170", 9094}]




2、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/src/emq_plugin_kafka_brod.erl

%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emq_plugin_kafka_brod).
-include_lib("emqttd/include/emqttd.hrl").
-include_lib("brod/include/brod_int.hrl").
-define(TEST_TOPIC, <<"emqtest">>).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
    brod_init([Env]),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    Json = mochijson2:encode([
        {type, <<"connected">>},
        {client_id, ClientId},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        io:format("brod_produce_reply:ok ~n"),
        ok
    after 5000 ->
        io:format("brod_produce_reply:exit ~n"),
        erlang:exit(timeout)
        %%ct:fail({?MODULE, ?LINE, timeout})
    end,
    {ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    Json = mochijson2:encode([
        {type, <<"disconnected">>},
        {client_id, ClientId},
        {reason, Reason},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        ok
    after 5000 ->
        ct:fail({?MODULE, ?LINE, timeout})
    end,
    ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};
on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    Id = Message#mqtt_message.id,
    From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    Timestamp = Message#mqtt_message.timestamp,
    ClientId = c(From),
    Username = u(From),
    Json = mochijson2:encode([
            {type, <<"publish">>},
            {client_id, ClientId},
            {message, [
           {username, Username},
           {topic, Topic},
           {payload, Payload},
           {qos, i(Qos)},
           {dup, i(Dup)},
           {retain, i(Retain)}
          ]},
            {cluster_node, node()},
            {ts, emqttd_time:now_ms()}
           ]),
    %%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
    {ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
    receive
        #brod_produce_reply{ call_ref = CallRef
                           , result   = brod_produce_req_acked
                       } ->
        ok
    after 5000 ->
        ct:fail({?MODULE, ?LINE, timeout})
    end,
    {ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
%% Called when the plugin application stop
unload() ->
    %%application:stop(brod),
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ===================================================================
%% brod_init https://github.com/klarna/brod
%% ===================================================================
brod_init(_Env) ->
    {ok, _} = application:ensure_all_started(brod),
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
    %%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
    %%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
    %%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
    ClientConfig = [],%% socket error recovery
    Topic = ?TEST_TOPIC,
    Partition = 0,
    ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
    ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
    %%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
    %%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
    io:format("Init ekaf with ~p~n", [KafkaBootstrapEndpoints]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.

请注意:


(1)在运行brod:start_producer语句之前,务必保证Topic在kafka已经建立完毕,否则运行该语句会抛出异常。


Topic = ?TEST_TOPIC,


ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),


(2)关于brod:start_client,也可以这样使用clientid:


-define(CLIENT_ID, ?MODULE).

ClientId       = ?CLIENT_ID,


%%单机版BootstrapHosts = [{"localhost", 9092}],


BootstrapHosts = [{"172.16.6.170", 9092},{"172.16.6.170", 9093},{"172.16.6.170", 9094}],%%集群版


ClientConfig   = [],

ok = brod:start_client(BootstrapHosts, ClientId, ClientConfig),



3、kafka主题创建


注意,


(1)kafka broker和zookeeper使用集群时,必须手动先创建topic,指定zookeeper节点列表;单机也应该要创建。否则客户端会报错。


(2)主题名不建议使用"."和"_"字符。


(3)创建主题


./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --partitions 3 --topic emqtest


(4)消费者


./bin/kafka-console-consumer.sh --zookeeper 172.16.6.170:2181 --topic emqtest --from-beginning




4、上面举例的都是分区写成固定了,Partition = 0,那么客户端如何自定义分区?由于erlang brod库没有实现自动分区,所以需要我们手动计算hash值。


-define(NUM_PARTITIONS, 3).

-define(EMPTY(S), (S == <<"">> orelse S == undefined)).

getPartition(ClientId) when ?EMPTY(ClientId) ->

   crypto:rand_uniform(0, ?NUM_PARTITIONS);

getPartition(ClientId) ->

   <<NodeD01, NodeD02, NodeD03, NodeD04, NodeD05,

     NodeD06, NodeD07, NodeD08, NodeD09, NodeD10,

     NodeD11, NodeD12, NodeD13, NodeD14, NodeD15,

     NodeD16>> = Val = crypto:hash(md5, ClientId),%%md5的值是16byte

   io:format("Value is ~w~n", [Val]),

   NodeD16 rem ?NUM_PARTITIONS.%%取余数

具体实现举例:

ClientId = <<"123456">>,

Partition = getPartition(ClientId),

brod:produce(?KAFKA_CLIENT1, ?EMQ_TOPIC_ONLINE, Partition, ClientId, list_to_binary(Json)).

我这个方案(取clientId的md5值,然后求余)是自定义的简易版,不是原生的java hashcode源码的实现方式。原生的要查看java.lang.String的hashcode函数源码。




附上kafka的官方分区实现方式:


消息-分区的分配

默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,如下图所示:

def partition(key: Any, numPartitions: Int): Int = {

   Utils.abs(key.hashCode) % numPartitions

}

 这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?


if(key == null) {  // 如果没有指定key
        val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有没有缓存的现成的分区Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 如果有的话直接使用这个分区Id就好了
          case None => // 如果没有的话,
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出所有可用分区的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 从中随机挑一个
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
            partitionId
        }
      }

 可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)



完整的源码下载地址:https://download.csdn.net/download/libaineu2004/10284403


相关文章
|
3月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
40 0
|
3月前
|
物联网 C# 智能硬件
智能家居新篇章:WPF与物联网的智慧碰撞——通过MQTT协议连接与控制智能设备,打造现代科技生活的完美体验
【8月更文挑战第31天】物联网(IoT)技术的发展使智能家居设备成为现代家庭的一部分。通过物联网,家用电器和传感器可以互联互通,实现远程控制和状态监测等功能。本文将探讨如何在Windows Presentation Foundation(WPF)应用中集成物联网技术,通过具体示例代码展示其实现过程。文章首先介绍了MQTT协议及其在智能家居中的应用,并详细描述了使用Wi-Fi连接方式的原因。随后,通过安装Paho MQTT客户端库并创建MQTT客户端实例,演示了如何编写一个简单的WPF应用程序来控制智能灯泡。
116 0
|
3月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
227 0
RocketMQ—一次连接namesvr失败的案例分析
|
3月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
|
4月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ使用问题之一直连接master失败,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
4月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 RocketMQ
消息队列 MQ使用问题之如何使用SockJS和Stomp与RabbitMQ建立连接
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 监控 Java
查询Kafka生产者是否连接到Kafka服务
查询Kafka生产者是否连接到Kafka服务
228 2