一、工作环境准备
erlang kafka客户端库使用的是https://github.com/helpshift/ekaf
emq使用的是v2.3.5版本,https://github.com/emqtt/emq-relx
kafka的运行环境准备,http://blog.csdn.net/libaineu2004/article/details/79202408
我们以插件的形式来实现,我的插件路径是/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps
copy一份emq_plugin_template,并更名为emq_plugin_kafka_ekaf,注意相关配置文件和源文件都要更名。
进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf
Makefile文件,新增红色文字
BUILD_DEPS = emqttd cuttlefish ekaf
dep_ekaf = git https://github.com/helpshift/ekaf.git master
二、工程文件修改
1、进入目录/home/firecat/Prj/emq2.0/emq-relx-2.3.5/Makefile 文件
Makefile 增加
DEPS += emq_plugin_kafka_ekaf
2、relx.config 中 release 段落添加这三行:
kafkamocker,
ekaf,
{emq_plugin_kafka_ekaf, load}
3、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/data/loaded_plugins设置自启动插件
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka_ekaf.
三、源码实现
1、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_ekaf.config
[ {emq_plugin_kafka_ekaf, [ {kafka, [ {bootstrap_broker, {"127.0.0.1", 9092} }, %%不能使用"localhost" {partition_strategy, strict_round_robin} ]} ]} ].
2、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/src/emq_plugin_kafka_ekaf.erl
%%-------------------------------------------------------------------- %% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emq_plugin_kafka_ekaf). -include_lib("emqttd/include/emqttd.hrl"). -define(TEST_TOPIC,<<"emq_broker_message">>). -export([load/1, unload/0]). %% Hooks functions -export([on_client_connected/3, on_client_disconnected/3]). -export([on_client_subscribe/4, on_client_unsubscribe/4]). -export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]). -export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]). %% Called when the plugin application start load(Env) -> ekaf_init([Env]), emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]), emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]), emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]), emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]), emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]), emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]), emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]), emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]), emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]), emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]). on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) -> io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]), Json = mochijson2:encode([ {type, <<"connected">>}, {client_id, ClientId}, {cluster_node, node()}, {ts, emqttd_time:now_ms()} ]), %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后 ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)), ekaf:produce_async(?TEST_TOPIC, {<<"mykey_1">>, list_to_binary(Json)}), {ok, Client}. on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) -> io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]), Json = mochijson2:encode([ {type, <<"disconnected">>}, {client_id, ClientId}, {reason, Reason}, {cluster_node, node()}, {ts, emqttd_time:now_ms()} ]), %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后 ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)), ekaf:produce_async(?TEST_TOPIC, {<<"mykey_2">>, list_to_binary(Json)}), ok. on_client_subscribe(ClientId, Username, TopicTable, _Env) -> io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]), {ok, TopicTable}. on_client_unsubscribe(ClientId, Username, TopicTable, _Env) -> io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]), {ok, TopicTable}. on_session_created(ClientId, Username, _Env) -> io:format("session(~s/~s) created.", [ClientId, Username]). on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) -> io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]), {ok, {Topic, Opts}}. on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) -> io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]), ok. on_session_terminated(ClientId, Username, Reason, _Env) -> io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]). %% transform message and return on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) -> {ok, Message}; on_message_publish(Message, _Env) -> io:format("publish ~s~n", [emqttd_message:format(Message)]), Id = Message#mqtt_message.id, From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的 Topic = Message#mqtt_message.topic, Payload = Message#mqtt_message.payload, Qos = Message#mqtt_message.qos, Dup = Message#mqtt_message.dup, Retain = Message#mqtt_message.retain, Timestamp = Message#mqtt_message.timestamp, ClientId = c(From), Username = u(From), Json = mochijson2:encode([ {type, <<"publish">>}, {client_id, ClientId}, {message, [ {username, Username}, {topic, Topic}, {payload, Payload}, {qos, i(Qos)}, {dup, i(Dup)}, {retain, i(Retain)} ]}, {cluster_node, node()}, {ts, emqttd_time:now_ms()} ]), %%ekaf:produce_async_batched(?TEST_TOPIC, list_to_binary(Json)),批量发送,不建议使用,我发现kafka接收会滞后 ekaf:produce_async(?TEST_TOPIC, list_to_binary(Json)), {ok, Message}. on_message_delivered(ClientId, Username, Message, _Env) -> io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]), {ok, Message}. on_message_acked(ClientId, Username, Message, _Env) -> io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]), {ok, Message}. %% Called when the plugin application stop unload() -> emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3), emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4), emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4), emqttd:unhook('session.created', fun ?MODULE:on_session_created/3), emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4), emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4), emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4), emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2), emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4), emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4). %% =================================================================== %% ekaf_init https://github.com/helpshift/ekaf %% =================================================================== ekaf_init(_Env) -> %% Start application:load(ekaf), %% Get parameters {ok, Kafka} = application:get_env(?MODULE, kafka), BootstrapBroker = proplists:get_value(bootstrap_broker, Kafka), PartitionStrategy= proplists:get_value(partition_strategy, Kafka), %% Set partition strategy, like application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin), application:set_env(ekaf, ekaf_partition_strategy, PartitionStrategy), %% Set broker url and port, like application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}), application:set_env(ekaf, ekaf_bootstrap_broker, BootstrapBroker), %% Set topic application:set_env(ekaf, ekaf_bootstrap_topics, ?TEST_TOPIC), {ok, _} = application:ensure_all_started(kafkamocker), {ok, _} = application:ensure_all_started(gproc), {ok, _} = application:ensure_all_started(ekaf), io:format("Init ekaf with ~p~n", [BootstrapBroker]). i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. c({ClientId, Username}) -> ClientId; c(From) -> From. u({ClientId, Username}) -> Username; u(From) -> From.
完整的源码下载地址:https://download.csdn.net/download/libaineu2004/10281784
---
参考文章:http://www.cnblogs.com/wunaozai/p/8249657.html
https://github.com/msdevanms/emqttd_plugin_kafka_bridge