我的mqtt协议和emqttd开源项目个人理解(12) - Hook使用和连接Kafka发送消息,使用ekaf库

简介: 我的mqtt协议和emqttd开源项目个人理解(12) - Hook使用和连接Kafka发送消息,使用ekaf库

一、工作环境准备


erlang kafka客户端库使用的是https://github.com/helpshift/ekaf


emq使用的是v2.3.5版本,https://github.com/emqtt/emq-relx


kafka的运行环境准备,http://blog.csdn.net/libaineu2004/article/details/79202408


我们以插件的形式来实现,我的插件路径是/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps


copy一份emq_plugin_template,并更名为emq_plugin_kafka_ekaf,注意相关配置文件和源文件都要更名。


进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf


Makefile文件,新增红色文字


BUILD_DEPS = emqttd cuttlefish ekaf

dep_ekaf = git https://github.com/helpshift/ekaf.git master




二、工程文件修改


1、进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/Makefile 文件


Makefile 增加


DEPS += emq_plugin_kafka_ekaf


2、relx.config 中 release 段落添加这三行:


kafkamocker,

ekaf,

{emq_plugin_kafka_ekaf, load}


3、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/data/loaded_plugins设置自启动插件


emq_recon.

emq_modules.

emq_retainer.

emq_dashboard.

emq_plugin_kafka_ekaf.



三、源码实现


1、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_ekaf.config


[
  {emq_plugin_kafka_ekaf, [
    {kafka, [
      {bootstrap_broker, {"127.0.0.1", 9092} }, %%不能使用"localhost"
      {partition_strategy, strict_round_robin}
    ]}
  ]}
].


2、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/src/emq_plugin_kafka_ekaf.erl

%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emq_plugin_kafka_ekaf).
-include_lib("emqttd/include/emqttd.hrl").
-define(TEST_TOPIC,<<"emq_broker_message">>).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
    ekaf_init([Env]),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
    emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
    emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
    emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
    Json = mochijson2:encode([
        {type, <<"connected">>},
        {client_id, ClientId},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后
    ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
    ekaf:produce_async(?TEST_TOPIC, {<<"mykey_1">>, list_to_binary(Json)}),
    {ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
    Json = mochijson2:encode([
        {type, <<"disconnected">>},
        {client_id, ClientId},
        {reason, Reason},
        {cluster_node, node()},
        {ts, emqttd_time:now_ms()}
    ]),
    %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后
    ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
    ekaf:produce_async(?TEST_TOPIC, {<<"mykey_2">>, list_to_binary(Json)}),
    ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
    {ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
    io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
    {ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
    io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    {ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
    io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
    ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
    io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};
on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),
    Id = Message#mqtt_message.id,
    From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload,
    Qos = Message#mqtt_message.qos,
    Dup = Message#mqtt_message.dup,
    Retain = Message#mqtt_message.retain,
    Timestamp = Message#mqtt_message.timestamp,
    ClientId = c(From),
    Username = u(From),
    Json = mochijson2:encode([
            {type, <<"publish">>},
            {client_id, ClientId},
            {message, [
           {username, Username},
           {topic, Topic},
           {payload, Payload},
           {qos, i(Qos)},
           {dup, i(Dup)},
           {retain, i(Retain)}
          ]},
            {cluster_node, node()},
            {ts, emqttd_time:now_ms()}
           ]),
    %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后
    ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)),
    {ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
    {ok, Message}.
%% Called when the plugin application stop
unload() ->
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
    emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
    emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
    emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
    emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ===================================================================
%% ekaf_init https://github.com/helpshift/ekaf
%% ===================================================================
ekaf_init(_Env) ->
    %% Start
    application:load(ekaf),
    %% Get parameters
    {ok, Kafka} = application:get_env(?MODULE, kafka),
    BootstrapBroker = proplists:get_value(bootstrap_broker, Kafka),
    PartitionStrategy= proplists:get_value(partition_strategy, Kafka),
    %% Set partition strategy, like application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
    application:set_env(ekaf, ekaf_partition_strategy, PartitionStrategy),
    %% Set broker url and port, like application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
    application:set_env(ekaf, ekaf_bootstrap_broker, BootstrapBroker),
    %% Set topic
    application:set_env(ekaf, ekaf_bootstrap_topics, ?TEST_TOPIC),
    {ok, _} = application:ensure_all_started(kafkamocker),
    {ok, _} = application:ensure_all_started(gproc),
    {ok, _} = application:ensure_all_started(ekaf),
    io:format("Init ekaf with ~p~n", [BootstrapBroker]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.

完整的源码下载地址:https://download.csdn.net/download/libaineu2004/10281784




---


参考文章:http://www.cnblogs.com/wunaozai/p/8249657.html


https://github.com/msdevanms/emqttd_plugin_kafka_bridge


相关文章
|
16天前
|
物联网
一个绿色的免费MQTT调试工具,实测连接移动OneNET
一个绿色的免费MQTT调试工具,实测连接移动OneNET,主题发布 ,主题订阅。
108 16
|
1月前
|
消息中间件 运维 Java
招行面试:RocketMQ、Kafka、RabbitMQ,如何选型?
45岁资深架构师尼恩针对一线互联网企业面试题,特别是招商银行的高阶Java后端面试题,进行了系统化梳理。本文重点讲解如何根据应用场景选择合适的消息中间件(如RabbitMQ、RocketMQ和Kafka),并对比三者的性能、功能、可靠性和运维复杂度,帮助求职者在面试中充分展示技术实力,实现“offer直提”。此外,尼恩还提供了《尼恩Java面试宝典PDF》等资源,助力求职者提升架构、设计、开发水平,应对高并发、分布式系统的挑战。更多内容及技术圣经系列PDF,请关注【技术自由圈】获取。
|
1月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ、Apache Seata 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。在评审出的 10 个年度开源项目中,Apache RocketMQ、Apache Seata 成功入选。
|
3月前
|
存储 算法 安全
FreeMQTT:一款Python语言实现的开源MQTT Server
FreeMQTT 是一款用 Python 语言并基于 Tornado 开发的开源 MQTT 服务器,支持 MQTT3.1.1 和 MQTT5.0 协议,提供多租户安全隔离、高效 Topic 匹配算法及实时上下线通知等功能,适用于 IoT 场景。快速启动仅需克隆仓库、安装依赖并运行服务。
|
3月前
|
消息中间件 运维 Serverless
商业版vs开源版:一图看懂云消息队列 RocketMQ 版核心优势
自建开源 RocketMQ 集群,为保证业务稳定性,往往需要按照业务请求的峰值去配置集群资源。云消息队列 RocketMQ 版 Serverless 实例通过资源快速伸缩,实现资源使用量与实际业务负载贴近,并按实际使用量计费,有效降低企业的运维压力和使用成本。
230 15
|
3月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
3月前
|
消息中间件 弹性计算 运维
一图看懂云消息队列 RabbitMQ 版对比开源优势
一张图带您快速了解云消息队列 RabbitMQ 版对比开源版本的显著优势。
|
3月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
159 1
|
4月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
153 1
|
4月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
81 0