flink-sql(table api 编程)
Table API 标准结构EXP下面是一个标准的Table API 的例子public class FlinkTableStandardStructure {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table: 1)读取外部表;2)从Table API或者SQL查询结果创建表
Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "zhangsan"),
row(2L, "lisi")
).select($("id"), $("name"));
//注册表到catalog(可选的)
tEnv.createTemporaryView("sourceTable", projTable);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("id", DataTypes.DECIMAL(10, 2))
.column("name", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、Table API执行查询(可以执行多次查询,中间表可以注册到catalog也可以不注册)
Table resultTable = tEnv.from("sourceTable").select($("id"), $("name"));
// Table resultTable = projTable.select($("id"), $("name"));
//5、输出(包括执行,不需要单独在调用tEnv.execute("job"))
resultTable.executeInsert("sinkTable");
}
}
两种TableEnvironment1、TableEnvironment 标准的做法是创建TableEnvironment,在创建 TableEnv 的时候,可以传⼊⼀个 EnvironmentSettings 或者 TableConfig 参数, ⽤来配置 TableEnvironment 的⼀些特性: 注意:Flink1.14开始删除了其他的执⾏器了,只保留了BlinkPlanner 功能:注册catalog(可以理解为数据系统实例,⽐如某个HBase集群,某个MySQL服务器)在catalog注册库和表 加载插件模块 执⾏SQL查询 注册UDF DataStream和Table互转(仅仅在StreamExecutionEnvironment下) EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执⾏器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);2、StreamTableEnvironment 如果要混⽤DataStream和Table API/SQL,可以使⽤StreamTableEnvironment: //1、获取Stream执⾏环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执⾏环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);TableEnvironment和StreamExecutionEnvironment⼆选⼀即可。 TableEnviromnet的配置// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");具体配置项请参考如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/创建表和注册表有了TableEnvironment,我们接下来就需要创建表并注册表,注册表是可选的。创建表的两种⽅式 创建表是为了在表上执⾏TableAPI或者SQL查询,可以通过如下两种⽅式创建表创建表的方式说明代码示例虚拟表从现有的table创建表,通常是从Table API&SQL查询结果创建 表,这种一般是中间表tEnv.createTemporaryView("sourceTable", projTable);常规表另外⼀种更实⽤的⽅式是通过外部数据创建表,例如⽂件, 数据 库表, 或者消息队列,这种⼀般是创建输⼊表或者输出表,采⽤ connector方式创建tableEnv.createTable("SourceTableA", sourceDescriptor); tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor)创建虚拟表Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "zhangsan"),
row(2L, "lisi")
).select($("id"), $("name"));
//createTemporaryView
tEnv.createTemporaryView("sourceTable", projTable);创建常规表final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build();
tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
// 也可使⽤SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")注意: 具体的Connector可以参考如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/临时表和持久表注册表是为了把计算的中间结果注册为⼀个表,供同⼀Flink session后续访问或者跨Flink session访问。只有常规 的表(TABLES)才分临时表和持久表表类型说明临时表同一个Flink session可访问,跨Flink session 不能访问持久表同一个Flink session和跨Flink session 都能访问EXPfinal TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build();
//持久表
tableEnv.createTable("SourceTableA", sourceDescriptor);
//临时表
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor)表的标识符注册表时可以指定唯⼀标识符,格式为: Catalog(⽬录).database(数据库).objectname(表名)
如果没有指定Catalog或database,就使⽤当前默认值,即tEnv.useCatalog("...")和tEnv.useDatabase("...")指定的 默认值,如果tEnv没有指定,则默认值default_catalog.default_databaseEXPTableEnvironment tEnv = ...;
//当前Flink会话切换到custom_catalog.custom_database
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
//在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("exampleView", table);
//在custom_catalog.other_database下,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("other_database.exampleView", table);
//在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView.View的视图表,注意转义字
符
tableEnv.createTemporaryView("`example.View`", table);
//other_catalog.other_database,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);Table API 查询常规查询
/**
* Flink Table API执行聚合操作
*/
public class FlinkTableAggr {
public static void main(String[] args) throws Exception{
//1、获取Stream执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据
DataStream<ClickLogs> clickLogs = env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"
).map(event -> {
String[] props = event.split(",");
return ClickLogs
.builder()
.user(props[0])
.url(props[1])
.cTime(props[2])
.build();
});
//4、流转换为动态表
Table table = tEnv.fromDataStream(clickLogs);
//5、执行Table API查询/SQL查询
/**
* select
* user,
* count(url) as cnt
* from clicks
* group by user
*/
Table resultTable = table
.groupBy($("user"))
.aggregate($("url").count().as("cnt"))
.select($("user"),$("cnt"));
// System.out.println(resultTable.explain());
//6、将Table转换为DataStream
//聚合操作必须是撤回流
DataStream<Tuple2<Boolean,Row>> selectedClickLogs = tEnv.toRetractStream(resultTable,Row.class);
//7、处理结果:打印/输出
selectedClickLogs.print();
//8、执行
env.execute("FlinkTableAggr");
}
}
混⽤Table API和SQL查询public class MixTableAPIAndSQL {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table
Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("url", DataTypes.STRING()),
DataTypes.FIELD("cTime", DataTypes.STRING())
),
row("Mary", "./home","2022-02-02 12:00:00"),
row("Bob", "./cart","2022-02-02 12:00:00"),
row("Mary", "./prod?id=1","2022-02-02 12:00:05"),
row("Liz", "./home","2022-02-02 12:01:00"),
row("Bob", "./prod?id=3","2022-02-02 12:01:30"),
row("Mary", "./prod?id=7","2022-02-02 12:01:45")
).select($("user"), $("url"),$("cTime"));
//注册表到catalog(可选的)
tEnv.createTemporaryView("sourceTable", projTable);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、Table API和SQL混用
Table resultTable = tEnv.from("sourceTable").select($("user"), $("url"));
tEnv.createTemporaryView("resultTableView", resultTable);
Table result = tEnv.sqlQuery("select * from resultTableView where user = 'Mary'");
//5、输出(包括执行,不需要单独在调用tEnv.execute("job"))
result.executeInsert("sinkTable");
result.printSchema();
}
}Table API 和DataStream混用package com.blink.sb;
import com.blink.sb.beans.ClickLogs;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class MixFlinkTableAndDataStream {
public static void main(String[] args) throws Exception {
//1、获取Stream执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//3、读取数据(source)
DataStream<ClickLogs> clickLogs = env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"
).map(event -> {
String[] props = event.split(",");
return ClickLogs
.builder()
.user(props[0])
.url(props[1])
.cTime(props[2])
.build();
});
//4、流转换为动态表
Table table = tEnv.fromDataStream(clickLogs);
//5、执行Table API查询/SQL查询
Table resultTable = table
.where($("user").isEqual("Mary"))
.select($("user"), $("url"), $("cTime"));
//6、将Table转换为DataStream
DataStream<ClickLogs> selectedClickLogs = tEnv.toDataStream(resultTable, ClickLogs.class);
//7、处理结果:打印/输出
selectedClickLogs.print();
//8、执行
env.execute("FlinkTableFirstExample");
}
}
Table api Connectors (内置连接器)Flink 的 Table API & SQL通过Connectors连接外部系统,并执⾏批/流⽅式的读写操作。Connectors提供了丰富的 外部系统连接器,根据source和sink的类型,它们⽀持不同的格式,例如 CSV、Avro、Parquet 或 ORC。 ⽬前⽀持的内置Connectors如下表所示(截⽌到Flink 1.14.3):注意:如果作为sink⼤家还要注意⽀持的输出模式(Append/Retract/Upsert)参考:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/Table API 输入(kafka)和输出(kafka)package com.blink.sb;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkTableAPIKafka2Kafka {
public static final String input_topic = "clicklog_input";
public static final String output_topic = "clicklog_output";
public static final String output_topic_change = "clicklog_output_change";
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、创建TableEnvironment
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//2、创建kafka source table
final Schema schema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.column("cTime", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
.schema(schema)
.format("json")
.option("topic", input_topic)
.option("properties.bootstrap.servers", "node02:6667")
.option("properties.group.id", "testGroup")//每次都从最早的offsets开始
.option("scan.startup.mode", "latest-offset")
.build());
//3、创建kafka sink table
final Schema sinkSchema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.column("cTime", DataTypes.STRING())
// .column("cnt", DataTypes.BIGINT())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka")
.schema(sinkSchema)
.format("csv")
.option("topic", output_topic)
.option("properties.bootstrap.servers", "node02:6667")
.build());
//4、Table API从sourceTable读取数据并以csv格式写入sinkTable
/**
*
* 如果报错:Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath
* 原因:缺省 flink-connector-kafka_${scala.binary.version} 这个库的支持。 这个库支持 upsert-kafka
* 请查看 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
*
* 这种模式 insert_only
*/
tEnv.from("sourceTable")
.select($("user"), $("url"), $("cTime"))
.executeInsert("sinkTable");
/**
* Table sink 'default_catalog.default_database.sinkTable' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user, url], select=[user, url, COUNT(url) AS EXPR$0])
* 向sinkTable中注入数据时, 不支持 分组聚合的table
*
* 解决方式: 将聚合类型的table 转换为 ChangelogStream类型的<>DataStream</>, 然后,DataStream.sink 到 kafka 中
*/
Table select = tEnv.from("sourceTable")
.groupBy($("user"), $("url"))
.select($("user"), $("url").count().as("cnt"));
DataStream<Row> rowDataStream = tEnv.toChangelogStream(select,
Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("cnt", DataTypes.BIGINT()).build(),
ChangelogMode.all());
DataStream<String> stringDataStream = rowDataStream.map(new MapFunction<Row, String>() {
@Override
public String map(Row row) throws Exception {
return "xxxx";
}
});
stringDataStream.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers("node02:6667")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(output_topic_change)
.setKafkaValueSerializer(StringSerializer.class)
.build())
.build());
}
}
kafka输入和输出高级特性(更多option)可以通过更多option来启⽤⾼级特性,kafka connector⽀持的option详⻅如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#connector-optionskey和value格式 Kafka消息的key和value均可指定format。 仅指定value format kafka消息中key是可以选的,可以只指定value formattEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
.schema(schema)
.format("json")
...
.build());指定key和value formattEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
.schema(schema)
// .format("json")
.option("topic",input_topic)
.option("key.format","json")
.option("value.format","json")
...
.build())注意:format("json")和option("value.format","json")⼆选⼀,⼆者等价Topic和Partition发现可以通过topic或者topic-pattern来配置主题。配置项sourcesinktopic ⽀持单个topic或者topic列表,topic列表⽤分号分割:topic-1;topic-2,topic和topic-pattern⼆选⼀ 注意:作为sink时不⽀ 持topic列表topic-pattern通过topic名字的正则表达式来指定多个topic,例如:test-topic-[0- 9],topic和topic-pattern⼆选⼀不能作⽤于sink注意:要允许在作业开始运⾏后发现动态创建的topic,请为 scan.topic-partition-discovery.interval 设置⼀个⾮负 值source读取位置作为source是,可以通过scan.startup.mode选项指定从哪个位置开始消费,可选的值如下选项值说明group-offsets(默认 值)使⽤当前消费组提交到ZK / Kafka的offset开始消费,Flink1.14有bug:https://issues.apach e.org/jira/browse/FLINK-2469earliest-offset 从earliest offset开始消费(每次)latest-offset 从latest offset开始消费(每次)specific-offsets从⽤户指定的每个分区的特定偏移量开始消费,必须跟scan.startup.specific-offsets配置搭 配使⽤:例如:partition:0,offset:42;partition:1,offset:300sink分区当kafka作为sink时,可以通过sink.partitioner指定partitioner。选项值说明default使用kafka默认的partitionerfixed每个Flink分区上的消息最多分区到一个kafka分区,可以降低网络开销round-robin只有当kafka消息没有key时,才有用自定义FlinkKafkaPartitioner的子类一致性保证默认情况下,如果启⽤checkpoint,Kafka sink使⽤at-least-once⼀致性语意。 在启⽤checkpoint的前题下,可通过sink.delivery-guarantee来调整⼀致性语意:选项值说明noneFlink不做任何保证,生成的记录可能会丢失或重复at-least-once(default)至少一次保证,但可能会重复exactly-once使用kafka事务提供精确一次语义⼀旦启⽤了事物来保证exactly-once语意,⼀定要注意下游消费者要配置isolation.level为read_committed(默认 是read_uncommitted)。
Kafka 3.0新特性 详解(二)
与维护Consumer offset的方式类似,脱离ZK之后的Kafka集群将元数据视为日志,保存在一个内置的Topic中,且该Topic只有一个Partition。元数据日志的消息格式与普通消息没有太大不同,但必须携带Leader的纪元值(即之前的Controller epoch):Record => Offset LeaderEpoch ControlType Key Value Timestamp这样,Follower以拉模式复制Leader日志,就相当于以Consumer角色消费元数据Topic,符合Kafka原生的语义。那么在KRaft协议中,是如何维护哪些元数据日志已经提交——即已经成功复制到多数的Follower节点上的呢?Kafka仍然借用了原生副本机制中的概念——high watermark(HW,高水位线)保证日志不会丢失,HW的示意图如下。状态机说明:要让所有节点达成一致性的状态,大部分都是基于复制状态机来实现的(Replicated state machine)简单来说就是:初始相同的状态+相同的输入过程=相同的结束状态,这个其实也好理解,就类似于一对双胞胎,出生时候就长得一样,然后吃的喝的用的穿的都一样,你自然很难分辨。其中最重要的就是一定要注意中间的相同输入过程,各个不同节点要以相同且确定性的函数来处理输入,而不要引入一个不确定的值。使用replicated log来实现每个节点都顺序的写入客户端请求,然后顺序的处理客户端请求,最终就一定能够达到最终一致性。状态机安全性保证:在安全性方面,KRaft与传统Raft的选举安全性、领导者只追加、日志匹配和领导者完全性保证都是几乎相同的。下面只简单看看状态机安全性是如何保证的,仍然举论文中的极端例子:在时刻a,节点S1是Leader,epoch=2的日志只复制给了S2就崩溃了。在时刻b,S5被选举为Leader,epoch=3的日志还没来得及复制,也崩溃了。在时刻c,S1又被选举为Leader,继续复制日志,将epoch=2的日志给了S3。此时该日志复制给了多数节点,但还未提交。在时刻d,S1又崩溃,并且S5重新被选举为领导者,将epoch=3的日志复制给S0~S4。此时日志与新Leader S5的日志发生了冲突,如果按上图中d1的方式处理,消息2就会丢失。传统Raft协议的处理方式是:在Leader任期开始时,立刻提交一条空的日志,所以上图中时刻c的情况不会发生,而是如同d2一样先提交epoch=4的日志,连带提交epoch=2的日志。与传统Raft不同,KRaft附加了一个较强的约束:当新的Leader被选举出来,但还没有成功提交属于它的epoch的日志时,不会向前推进HW。也就是说,即使上图中时刻c的情况发生了,消息2也被视为没有成功提交,所以按照d1方式处理是安全的。日志格式说明:所有节点持久化保存在本地的日志,大概就是类似于这个样子:上图显示,共有八条日志数据,其中已经提交了7条,提交的日志都将通过状态机持久化到本地磁盘当中,防止宕机。日志复制的保证机制如果两个节点不同的日志文件当中存储着相同的索引和任期号,那么他们所存储的命令是相同的。(原因:leader最多在一个任期里的一个日志索引位置创建一条日志条目,日志条目所在的日志位置从来不会改变)。如果不同日志中两个条目有着相同的索引和任期号,那么他们之前的所有条目都是一样的(原因:每次RPC发送附加日志时,leader会把这条日志前面的日志下标和任期号一起发送给follower,如果follower发现和自己的日志不匹配,那么就拒绝接受这条日志,这个称之为一致性检查)日志的不正常情况一般情况下,Leader和Followers的日志保持一致,因此Append Entries一致性检查通常不会失败。然而,Leader崩溃可能会导致日志不一致:旧的Leader可能没有完全复制完日志中的所有条目。下图阐述了一些Followers可能和新的Leader日志不同的情况。一个Follower可能会丢失掉Leader上的一些条目,也有可能包含一些Leader没有的条目,也有可能两者都会发生。丢失的或者多出来的条目可能会持续多个任期。如何保证日志的正常复制如果出现了上述leader宕机,导致follower与leader日志不一致的情况,那么就需要进行处理,保证follower上的日志与leader上的日志保持一致,leader通过强制follower复制它的日志来处理不一致的问题,follower与leader不一致的日志会被强制覆盖。leader为了最大程度的保证日志的一致性,且保证日志最大量,leader会寻找follower与他日志一致的地方,然后覆盖follower之后的所有日志条目,从而实现日志数据的一致性。具体的操作就是:leader会从后往前不断对比,每次Append Entries失败后尝试前一个日志条目,直到成功找到每个Follower的日志一致的位置点,然后向该Follower所在位置之后的条目进行覆盖。详细过程如下:Leader维护了每个Follower节点下一次要接收的日志的索引,即nextIndex。Leader选举成功后将所有Follower的nextIndex设置为自己的最后一个日志条目+1。Leader将数据推送给Follower,如果Follower验证失败(nextIndex不匹配),则在下一次推送日志时缩小nextIndex,直到nextIndex验证通过。总结一下就是:当leader和follower日志冲突的时候,leader将校验 follower最后一条日志是否和leader匹配,如果不匹配,将递减查询,直到匹配,匹配后,删除冲突的日志。这样就实现了主从日志的一致性。(二)Raft协议算法代码实现前面我们已经大致了解了Raft协议算法的实现原理,如果我们要自己实现一个Raft协议的算法,其实就是将我们讲到的理论知识给翻译成为代码的过程,具体的开发需要考虑的细节比较多,代码量肯定也比较大,好在有人已经实现了Raft协议的算法了,我们可以直接拿过来使用。创建maven工程并导入jar包地址如下:<dependencies>
<dependency> <groupId>com.github.wenweihu86.raft</groupId> <artifactId>raft-java-core</artifactId> <version>1.8.0</version> </dependency>
<dependency> <groupId>com.github.wenweihu86.rpc</groupId> <artifactId>rpc-java</artifactId> <version>1.8.0</version> </dependency>
<dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>5.1.4</version> </dependency>
</dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>定义Server端代码实现:public class Server1 { public static void main(String[] args) { // parse args // peers, format is "host:port:serverId,host2:port2:serverId2"
//localhost:16010:1,localhost:16020:2,localhost:16030:3 localhost:16010:1 String servers = "localhost:16010:1,localhost:16020:2,localhost:16030:3";
// local server RaftMessage.Server localServer = parseServer("localhost:16010:1");
String[] splitArray = servers.split(","); List<RaftMessage.Server> serverList = new ArrayList<>(); for (String serverString : splitArray) { RaftMessage.Server server = parseServer(serverString); serverList.add(server); }
// 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); // 设置Raft选项,比如: // just for test snapshot RaftOptions raftOptions = new RaftOptions(); /* raftOptions.setSnapshotMinLogSize(10 * 1024); raftOptions.setSnapshotPeriodSeconds(30); raftOptions.setMaxSegmentFileSize(1024 * 1024);*/ // 应用状态机 ExampleStateMachine stateMachine = new ExampleStateMachine(raftOptions.getDataDir()); // 初始化RaftNode RaftNode raftNode = new RaftNode(raftOptions, serverList, localServer, stateMachine); raftNode.getLeaderId(); // 注册Raft节点之间相互调用的服务 RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode); server.registerService(raftConsensusService); // 注册给Client调用的Raft服务 RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注册应用自己提供的服务 ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine); server.registerService(exampleService); // 启动RPCServer,初始化Raft节点 server.start(); raftNode.init(); }
private static RaftMessage.Server parseServer(String serverString) { String[] splitServer = serverString.split(":"); String host = splitServer[0]; Integer port = Integer.parseInt(splitServer[1]); Integer serverId = Integer.parseInt(splitServer[2]); RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder() .setHost(host).setPort(port).build(); RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder(); RaftMessage.Server server = serverBuilder.setServerId(serverId).setEndPoint(endPoint).build(); return server; }}定义客户端代码实现如下:public class ClientMain { public static void main(String[] args) { // parse args String ipPorts = args[0]; String key = args[1]; String value = null; if (args.length > 2) { value = args[2]; } // init rpc client RPCClient rpcClient = new RPCClient(ipPorts); ExampleService exampleService = RPCProxy.getProxy(rpcClient, ExampleService.class); final JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace(); // set if (value != null) { ExampleMessage.SetRequest setRequest = ExampleMessage.SetRequest.newBuilder() .setKey(key).setValue(value).build(); ExampleMessage.SetResponse setResponse = exampleService.set(setRequest); try { System.out.printf("set request, key=%s value=%s response=%s\n", key, value, printer.print(setResponse)); } catch (Exception ex) { ex.printStackTrace(); } } else { // get ExampleMessage.GetRequest getRequest = ExampleMessage.GetRequest.newBuilder().setKey(key).build(); ExampleMessage.GetResponse getResponse = exampleService.get(getRequest); try { String value1 = getResponse.getValue(); System.out.println(value1); System.out.printf("get request, key=%s, response=%s\n", key, printer.print(getResponse)); } catch (Exception ex) { ex.printStackTrace(); } }
rpcClient.stop(); }}先启动服务端,然后启动客户端,就可以将实现客户端向服务端发送消息,并且服务端会向三台机器进行保存消息了。五、Kafka常见问题(一)消息队列模型知道吗?Kafka是怎么做到支持这两种模型的?对于传统的消息队列系统支持两个模型:点对点:也就是消息只能被一个消费者消费,消费完后消息删除。发布订阅:相当于广播模式,消息可以被所有消费者消费。kafka其实就是通过Consumer Group同时支持了这两个模型。如果说所有消费者都属于一个Group,消息只能被同一个Group内的一个消费者消费,那就是点对点模式。如果每个消费者都是一个单独的Group,那么就是发布订阅模式。(二)说说Kafka通信过程原理吗?首先kafka broker启动的时候,会去向Zookeeper注册自己的ID(创建临时节点),这个ID可以配置也可以自动生成,同时会去订阅Zookeeper的brokers/ids路径,当有新的broker加入或者退出时,可以得到当前所有broker信。生产者启动的时候会指定bootstrap.servers,通过指定的broker地址,Kafka就会和这些broker创建TCP连接(通常我们不用配置所有的broker服务器地址,否则kafka会和配置的所有broker都建立TCP连接)随便连接到任何一台broker之后,然后再发送请求获取元数据信息(包含有哪些主题、主题都有哪些分区、分区有哪些副本,分区的Leader副本等信息)接着就会创建和所有broker的TCP连接。之后就是发送消息的过程。消费者和生产者一样,也会指定bootstrap.servers属性,然后选择一台broker创建TCP连接,发送请求找到协调者所在的broker。然后再和协调者broker创建TCP连接,获取元数据。根据分区Leader节点所在的broker节点,和这些broker分别创建连接。最后开始消费消息。(三)发送消息时如何选择分区的?主要有两种方式:轮询,按照顺序消息依次发送到不同的分区。随机,随机发送到某个分区。如果消息指定key,那么会根据消息的key进行hash,然后对partition分区数量取模,决定落在哪个分区上,所以,对于相同key的消息来说,总是会发送到同一个分区上,也是我们常说的消息分区有序性。很常见的场景就是我们希望下单、支付消息有顺序,这样以订单ID作为key发送消息就达到了分区有序性的目的。如果没有指定key,会执行默认的轮询负载均衡策略,比如第一条消息落在P0,第二条消息落在P1,然后第三条又在P1。除此之外,对于一些特定的业务场景和需求,还可以通过实现Partitioner接口,重写configure和partition方法来达到自定义分区的效果。(四)为什么需要分区?有什么好处?这个问题很简单,如果说不分区的话,我们发消息写数据都只能保存到一个节点上,这样的话就算这个服务器节点性能再好最终也支撑不住。实际上分布式系统都面临这个问题,要么收到消息之后进行数据切分,要么提前切分,kafka正是选择了前者,通过分区可以把数据均匀地分布到不同的节点。分区带来了负载均衡和横向扩展的能力。发送消息时可以根据分区的数量落在不同的Kafka服务器节点上,提升了并发写消息的性能,消费消息的时候又和消费者绑定了关系,可以从不同节点的不同分区消费消息,提高了读消息的能力。另外一个就是分区又引入了副本,冗余的副本保证了Kafka的高可用和高持久性。(五)详细说说消费者组和消费者重平衡?Kafka中的消费者组订阅topic主题的消息,一般来说消费者的数量最好要和所有主题分区的数量保持一致最好(举例子用一个主题,实际上当然是可以订阅多个主题)。当消费者数量小于分区数量的时候,那么必然会有一个消费者消费多个分区的消息。而消费者数量超过分区的数量的时候,那么必然会有消费者没有分区可以消费。所以,消费者组的好处一方面在上面说到过,可以支持多种消息模型,另外的话根据消费者和分区的消费关系,支撑横向扩容伸缩。当我们知道消费者如何消费分区的时候,就显然会有一个问题出现了,消费者消费的分区是怎么分配的,有先加入的消费者时候怎么办?旧版本的重平衡过程主要通过ZK监听器的方式来触发,每个消费者客户端自己去执行分区分配算法。新版本则是通过协调者来完成,每一次新的消费者加入都会发送请求给协调者去获取分区的分配,这个分区分配的算法逻辑由协调者来完成。而重平衡Rebalance就是指的有新消费者加入的情况,比如刚开始我们只有消费者A在消费消息,过了一段时间消费者B和C加入了,这时候分区就需要重新分配,这就是重平衡,也可以叫做再平衡,但是重平衡的过程和我们的GC时候STW很像,会导致整个消费群组停止工作,重平衡期间都无法消息消息。另外,发生重平衡并不是只有这一种情况,因为消费者和分区总数是存在绑定关系的,上面也说了,消费者数量最好和所有主题的分区总数一样。那只要消费者数量、主题数量(比如用的正则订阅的主题)、分区数量任何一个发生改变,都会触发重平衡。下面说说重平衡的过程。重平衡的机制依赖消费者和协调者之间的心跳来维持,消费者会有一个独立的线程去定时发送心跳给协调者,这个可以通过参数heartbeat.interval.ms来控制发送心跳的间隔时间。每个消费者第一次加入组的时候都会向协调者发送JoinGroup请求,第一个发送这个请求的消费者会成为“群主”,协调者会返回组成员列表给群主。群主执行分区分配策略,然后把分配结果通过SyncGroup请求发送给协调者,协调者收到分区分配结果。其他组内成员也向协调者发送SyncGroup,协调者把每个消费者的分区分配分别响应给他们。(六)具体讲讲分区分配策略?主要有3种分配策略Range对分区进行排序,排序越靠前的分区能够分配到更多的分区。比如有3个分区,消费者A排序更靠前,所以能够分配到P0\P1两个分区,消费者B就只能分配到一个P2。如果是4个分区的话,那么他们会刚好都是分配到2个。但是这个分配策略会有点小问题,他是根据主题进行分配,所以如果消费者组订阅了多个主题,那就有可能导致分区分配不均衡。比如下图中两个主题的P0\P1都被分配给了A,这样A有4个分区,而B只有2个,如果这样的主题数量越多,那么不均衡就越严重。RoundRobin也就是我们常说的轮询了,这个就比较简单了,不画图你也能很容易理解。这个会根据所有的主题进行轮询分配,不会出现Range那种主题越多可能导致分区分配不均衡的问题。P0->A,P1->B,P1->A。。。以此类推Sticky这个从字面看来意思就是粘性策略,大概是这个意思。主要考虑的是在分配均衡的前提下,让分区的分配更小的改动。比如之前P0\P1分配给消费者A,那么下一次尽量还是分配给A。这样的好处就是连接可以复用,要消费消息总是要和broker去连接的,如果能够保持上一次分配的分区的话,那么就不用频繁的销毁创建连接了。(七)如何保证消息可靠性?生产者发送消息丢失kafka支持3种方式发送消息,这也是常规的3种方式,发送后不管结果、同步发送、异步发送,基本上所有的消息队列都是这样玩的。发送并忘记,直接调用发送send方法,不管结果,虽然可以开启自动重试,但是肯定会有消息丢失的可能。同步发送,同步发送返回Future对象,我们可以知道发送结果,然后进行处理。异步发送,发送消息,同时指定一个回调函数,根据结果进行相应的处理。为了保险起见,一般我们都会使用异步发送带有回调的方式进行发送消息,再设置参数为发送消息失败不停地重试。acks=all,这个参数有可以配置0|1|all。0表示生产者写入消息不管服务器的响应,可能消息还在网络缓冲区,服务器根本没有收到消息,当然会丢失消息。1表示至少有一个副本收到消息才认为成功,一个副本那肯定就是集群的Leader副本了,但是如果刚好Leader副本所在的节点挂了,Follower没有同步这条消息,消息仍然丢失了。配置all的话表示所有ISR都写入成功才算成功,那除非所有ISR里的副本全挂了,消息才会丢失。retries=N,设置一个非常大的值,可以让生产者发送消息失败后不停重试Kafka 自身消息丢失。kafka因为消息写入是通过PageCache异步写入磁盘的,因此仍然存在丢失消息的可能。因此针对kafka自身丢失的可能设置参数:replication.factor=N,设置一个比较大的值,保证至少有2个或者以上的副本。min.insync.replicas=N,代表消息如何才能被认为是写入成功,设置大于1的数,保证至少写入1个或者以上的副本才算写入消息成功。unclean.leader.election.enable=false,这个设置意味着没有完全同步的分区副本不能成为Leader副本,如果是true的话,那些没有完全同步Leader的副本成为Leader之后,就会有消息丢失的风险。消费者消息丢失消费者丢失的可能就比较简单,关闭自动提交位移即可,改为业务处理成功手动提交。因为重平衡发生的时候,消费者会去读取上一次提交的偏移量,自动提交默认是每5秒一次,这会导致重复消费或者丢失消息。enable.auto.commit=false,设置为手动提交。还有一个参数我们可能也需要考虑进去的:auto.offset.reset=earliest,这个参数代表没有偏移量可以提交或者broker上不存在偏移量的时候,消费者如何处理。earliest代表从分区的开始位置读取,可能会重复读取消息,但是不会丢失,消费方一般我们肯定要自己保证幂等,另外一种latest表示从分区末尾读取,那就会有概率丢失消息。综合这几个参数设置,我们就能保证消息不会丢失,保证了可靠性。(八)聊聊副本和它的同步原理吧?Kafka副本的之前提到过,分为Leader副本和Follower副本,也就是主副本和从副本,和其他的比如Mysql不一样的是,Kafka中只有Leader副本会对外提供服务,Follower副本只是单纯地和Leader保持数据同步,作为数据冗余容灾的作用。在Kafka中我们把所有副本的集合统称为AR(Assigned Replicas),和Leader副本保持同步的副本集合称为ISR(InSyncReplicas)。ISR是一个动态的集合,维持这个集合会通过replica.lag.time.max.ms参数来控制,这个代表落后Leader副本的最长时间,默认值10秒,所以只要Follower副本没有落后Leader副本超过10秒以上,就可以认为是和Leader同步的(简单可以认为就是同步时间差)。另外还有两个关键的概念用于副本之间的同步:HW(High Watermark):高水位,也叫做复制点,表示副本间同步的位置。如下图所示,04绿色表示已经提交的消息,这些消息已经在副本之间进行同步,消费者可以看见这些消息并且进行消费,46黄色的则是表示未提交的消息,可能还没有在副本间同步,这些消息对于消费者是不可见的。LEO(Log End Offset):下一条待写入消息的位移副本间同步的过程依赖的就是HW和LEO的更新,以他们的值变化来演示副本同步消息的过程,绿色表示Leader副本,黄色表示Follower副本。首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。(九)Kafka为什么快?主要是3个方面:顺序IOkafka写消息到分区采用追加的方式,也就是顺序写入磁盘,不是随机写入,这个速度比普通的随机IO快非常多,几乎可以和网络IO的速度相媲美。Page Cache和零拷贝kafka在写入消息数据的时候通过mmap内存映射的方式,不是真正立刻写入磁盘,而是利用操作系统的文件缓存PageCache异步写入,提高了写入消息的性能,另外在消费消息的时候又通过sendfile实现了零拷贝。批量处理和压缩Kafka在发送消息的时候不是一条条的发送的,而是会把多条消息合并成一个批次进行处理发送,消费消息也是一个道理,一次拉取一批次的消息进行消费。并且Producer、Broker、Consumer都使用了优化后的压缩算法,发送和消息消息使用压缩节省了网络传输的开销,Broker存储使用压缩则降低了磁盘存储的空间。参考资料:1.《深入理解Kafka:核心设计实践原理》2.状态机程序设计套路3.raft算法源码4.https://www.bbsmax.com/A/QW5Y3kaBzm/总结认为就是同步时间差)。另外还有两个关键的概念用于副本之间的同步:HW(High Watermark):高水位,也叫做复制点,表示副本间同步的位置。如下图所示,04绿色表示已经提交的消息,这些消息已经在副本之间进行同步,消费者可以看见这些消息并且进行消费,46黄色的则是表示未提交的消息,可能还没有在副本间同步,这些消息对于消费者是不可见的。LEO(Log End Offset):下一条待写入消息的位移[外链图片转存中…(img-xSblaHHK-1650934432034)]副本间同步的过程依赖的就是HW和LEO的更新,以他们的值变化来演示副本同步消息的过程,绿色表示Leader副本,黄色表示Follower副本。首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。[外链图片转存中…(img-pLogNcDZ-1650934432035)]此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。[外链图片转存中…(img-hV0shtYL-1650934432035)]之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。[外链图片转存中…(img-xdjmej3f-1650934432036)](九)Kafka为什么快?主要是3个方面:顺序IOkafka写消息到分区采用追加的方式,也就是顺序写入磁盘,不是随机写入,这个速度比普通的随机IO快非常多,几乎可以和网络IO的速度相媲美。Page Cache和零拷贝kafka在写入消息数据的时候通过mmap内存映射的方式,不是真正立刻写入磁盘,而是利用操作系统的文件缓存PageCache异步写入,提高了写入消息的性能,另外在消费消息的时候又通过sendfile实现了零拷贝。批量处理和压缩Kafka在发送消息的时候不是一条条的发送的,而是会把多条消息合并成一个批次进行处理发送,消费消息也是一个道理,一次拉取一批次的消息进行消费。并且Producer、Broker、Consumer都使用了优化后的压缩算法,发送和消息消息使用压缩节省了网络传输的开销,Broker存储使用压缩则降低了磁盘存储的空间。参考资料:1.《深入理解Kafka:核心设计实践原理》2.状态机程序设计套路3.raft算法源码4.https://www.bbsmax.com/A/QW5Y3kaBzm/总结如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看,您的支持是我坚持写作最大的动力。
Kafka 3.0新特性 详解(一)
一、Kafka简介Kafka是一款开源的消息引擎系统。一个典型的Kafka体系架构包括若干Producer、若干Broker、若干Consumer,以及一个ZooKeeper集群,如上图所示。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。(一)Kafka核心组件producer:消息生产者,就是向broker发送消息的客户端。consumer:消息消费者,就是从broker拉取数据的客户端。consumer group:消费者组,由多个消费者consumer组成。消费者组内每个消费者负责消费不同的分区,一个分区只能由同一个消费者组内的一个消费者消费;消费者组之间相互独立,互不影响。所有的消费者都属于某个消费者组,即消费者组是一个逻辑上的订阅者。broker:一台服务器就是一个broker,一个集群由多个broker组成,一个broker可以有多个topic。topic:可以理解为一个队列,所有的生产者和消费者都是面向topic的。partition:分区,kafka中的topic为了提高拓展性和实现高可用而将它分布到不同的broker中,一个topic可以分为多个partition,每个partition都是有序的,即消息发送到队列的顺序跟消费时拉取到的顺序是一致的。replication:副本。一个topic对应的分区partition可以有多个副本,多个副本中只有一个为leader,其余的为follower。为了保证数据的高可用性,leader和follower会尽量均匀的分布在各个broker中,避免了leader所在的服务器宕机而导致topic不可用的问题。(二)kafka2当中zk的作用/admin:主要保存kafka当中的核心的重要信息,包括类似于已经删除的topic就会保存在这个路径下面。/brokers:主要用于保存kafka集群当中的broker信息,以及没被删除的topic信息。/cluster: 主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号。/config: 集群配置信息。/controller:kafka集群当中的控制器信息,控制器组件(Controller),是Apache Kafka的核心组件。它的主要作用是在Apache ZooKeeper的帮助下管理和协调整个Kafka集群。/controller_epoch:主要用于保存记录controller的选举的次数。/isr_change_notification:isr列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,定义了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。/latest_producer_id_block:使用/latest_producer_id_block节点来保存PID块,主要用于能够保证生产者的任意写入请求都能够得到响应。/log_dir_event_notification:主要用于保存当broker当中某些LogDir出现异常时候,例如磁盘损坏,文件读写失败等异常时候,向ZK当中增加一个通知序号,controller监听到这个节点的变化之后,就会做出对应的处理操作。以上就是kafka在zk当中保留的所有的所有的相关的元数据信息,这些元数据信息保证了kafka集群的正常运行。二、kafka3的安装配置在kafka3的版本当中已经彻底去掉了对zk的依赖,如果没有了zk集群,那么kafka当中是如何保存元数据信息的呢,这里我们通过kafka3的集群来一探究竟。(一)kafka安装配置核心重要参数Controller服务器不管是kafka2还是kafka3当中,controller控制器都是必不可少的,通过controller控制器来维护kafka集群的正常运行,例如ISR列表的变更,broker的上线或者下线,topic的创建,分区的指定等等各种操作都需要依赖于Controller,在kafka2当中,controller的选举需要通过zk来实现,我们没法控制哪些机器选举成为Controller,而在kafka3当中,我们可以通过配置文件来自己指定哪些机器成为Controller,这样做的好处就是我们可以指定一些配置比较高的机器作为Controller节点,从而保证controller节点的稳健性。被选中的controller节点参与元数据集群的选举,每个controller节点要么是Active状态,或者就是standBy状态。Process.Roles使用KRaft模式来运行kafka集群的话,我们有一个配置叫做Process.Roles必须配置,这个参数有以下四个值可以进行配置:Process.Roles=Broker, 服务器在KRaft模式中充当Broker。Process.Roles=Controller, 服务器在KRaft模式下充当Controller。Process.Roles=Broker,Controller,服务器在KRaft模式中同时充当Broker和Controller。如果process.roles没有设置。那么集群就假定是运行在ZooKeeper模式下。如果需要从zookeeper模式转换成为KRaft模式,那么需要进行重新格式化。如果一个节点同时是Broker和Controller节点,那么就称之为组合节点。实际工作当中,如果有条件的话,尽量还是将Broker和Controller节点进行分离部署。避免由于服务器资源不够的情况导致OOM等一系列的问题Quorum Voters通过controller.quorum.voters配置来实习哪些节点是Quorum的投票节点,所有想要成为控制器的节点,都必须放到这个配置里面。每个Broker和每个Controller都必须配置Controller.quorum.voters,该配置当中提供的节点ID必须与提供给服务器的节点ID保持一直。每个Broker和每个Controller 都必须设置 controller.quorum.voters。需要注意的是,controller.quorum.voters 配置中提供的节点ID必须与提供给服务器的节点ID匹配。比如在Controller1上,node.Id必须设置为1,以此类推。注意,控制器id不强制要求你从0或1开始。然而,分配节点ID的最简单和最不容易混淆的方法是给每个服务器一个数字ID,然后从0开始。(二)下载并解压安装包bigdata01下载kafka的安装包,并进行解压:[hadoop@bigdata01 kraft]$ cd /opt/soft/[hadoop@bigdata01 soft]$ wget http://archive.apache.org/dist/kafka/3.1.0/kafka_2.12-3.1.0.tgz[hadoop@bigdata01 soft]$ tar -zxf kafka_2.12-3.1.0.tgz -C /opt/install/修改kafka的配置文件broker.properties:[hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0/config/kraft/[hadoop@bigdata01 kraft]$ vim broker.properties修改编辑内容如下:node.id=1controller.quorum.voters=1@bigdata01:9093listeners=PLAINTEXT://bigdata01:9092advertised.listeners=PLAINTEXT://bigdata01:9092log.dirs=/opt/install/kafka_2.12-3.1.0/kraftlogs创建两个文件夹:[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/kraftlogs[hadoop@bigdata01 kafka_2.12-3.1.0]$ mkdir -p /opt/install/kafka_2.12-3.1.0/topiclogs同步安装包到其他机器上面去。(三)服务器集群启动启动kafka服务:[hadoop@bigdata01 kafka_2.12-3.1.0]$ ./bin/kafka-storage.sh random-uuidYkJwr6RESgSJ(四)创建kafka的topic集群启动成功之后,就可以来创建kafka的topic了,使用以下命令来创建kafka的topic:./bin/kafka-topics.sh --create --topic kafka_test --partitions 3 --replication-factor 2 --bootstr(五)任意一台机器查看kafka的topic组成集群之后,任意一台机器就可以通过以下命令来查看到刚才创建的topic了:[hadoop@bigdata03 ~]$ cd /opt/install/kafka_2.12-3.1.0/[hadoop@bigdata03 kafka_2.12-3.1.0]$ bin/kafka-topics.sh --list --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092(六)消息生产与消费使用命令行来生产以及消费kafka当中的消息:[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test
[hadoop@bigdata02 kafka_2.12-3.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --topic kafka_test --from-beginning三、Kafka当中Raft的介绍(一)kafka强依赖zk所引发的问题前面我们已经看到了kafka3集群在没有zk集群的依赖下,也可以正常运行,那么kafka2在zk当中保存的各种重要元数据信息,在kafka3当中如何实现保存的呢?kafka一直都是使用zk来管理集群以及所有的topic的元数据,并且使用了zk的强一致性来选举集群的controller,controller对整个集群的管理至关重要,包括分区的新增,ISR列表的维护,等等很多功能都需要靠controller来实现,然后使用zk来维护kafka的元数据也存在很多的问题以及存在性能瓶颈。以下是kafka将元数据保存在zk当中的诸多问题。元数据存取困难元数据的存取过于困难,每次重新选举的controller需要把整个集群的元数据重新restore,非常的耗时且影响集群的可用性。元数据更新网络开销大整个元数据的更新操作也是以全量推的方式进行,网络的开销也会非常大。强耦合违背软件设计原则Zookeeper对于运维来说,维护Zookeeper也需要一定的开销,并且kafka强耦合与zk也并不好,还得时刻担心zk的宕机问题,违背软件设计的高内聚,低耦合的原则。网络分区复杂度高Zookeeper本身并不能兼顾到broker与broker之间通信的状态,这就会导致网络分区的复杂度成几何倍数增长。zk本身不适合做消息队列zookeeper不适合做消息队列,因为zookeeper有1M的消息大小限制 zookeeper的children太多会极大的影响性能znode太大也会影响性能 znode太大会导致重启zkserver耗时10-15分钟 zookeeper仅使用内存作为存储,所以不能存储太多东西。并发访问zk问题多最好单线程操作zk客户端,不要并发,临界、竞态问题太多。基于以上各种问题,所以提出了脱离zk的方案,转向自助研发强一致性的元数据解决方案,也就是KIP-500。KIP-500议案提出了在Kafka中处理元数据的更好方法。基本思想是"Kafka on Kafka",将Kafka的元数据存储在Kafka本身中,无需增加额外的外部存储比如ZooKeeper等。去zookeeper之后的kafka新的架构在KIP-500中,Kafka控制器会将其元数据存储在Kafka分区中,而不是存储在ZooKeeper中。但是,由于控制器依赖于该分区,因此分区本身不能依赖控制器来进行领导者选举之类的事情。而是,管理该分区的节点必须实现自我管理的Raft仲裁。在kafka3.0的新的版本当中,使用了新的KRaft协议,使用该协议来保证在元数据仲裁中准确的复制元数据,这个协议类似于zk当中的zab协议以及类似于Raft协议,但是KRaft协议使用的是基于事件驱动的模式,与ZAB协议和Raft协议还有点不一样在kafka3.0之前的的版本当中,主要是借助于controller来进行leader partition的选举,而在3.0协议当中,使用了KRaft来实现自己选择leader,并最终令所有节点达成共识,这样简化了controller的选举过程,效果更加高效。(二)kakfa3 Raft前面我们已经知道了在kafka3当中可以不用再依赖于zk来保存kafka当中的元数据了,转而使用Kafka Raft来实现元数据的一致性,简称KRaft,并且将元数据保存在kafka自己的服务器当中,大大提高了kafka的元数据管理的性能。KRaft运行模式的Kafka集群,不会将元数据存储在Apache ZooKeeper中。即部署新集群的时候,无需部署ZooKeeper集群,因为Kafka将元数据存储在Controller节点的KRaft Quorum中。KRaft可以带来很多好处,比如可以支持更多的分区,更快速的切换Controller,也可以避免Controller缓存的元数据和Zookeeper存储的数据不一致带来的一系列问题。在新的版本当中,控制器Controller节点我们可以自己进行指定,这样最大的好处就是我们可以自己选择一些配置比较好的机器成为Controller节点,而不像在之前的版本当中,我们无法指定哪台机器成为Controller节点,而且controller节点与broker节点可以运行在同一台机器上,并且控制器controller节点不再向broker推送更新消息,而是让Broker从这个Controller Leader节点进行拉去元数据的更新。(三)如何查看kafka3当中的元数据信息在kafka3当中,不再使用zk来保存元数据信息了,那么在kafka3当中如何查看元数据信息呢,我们也可以通过kafka自带的命令来进行查看元数据信息,在KRaft中,有两个命令常用命令脚本,kafka-dump-log.sh和kakfa-metadata-shell.sh需要我们来进行关注,因为我们可以通过这两个脚本来查看kafka当中保存的元数据信息。Kafka-dump-log.sh脚本来导出元数据信息KRaft模式下,所有的元数据信息都保存到了一个内部的topic上面,叫做@metadata,例如Broker的信息,Topic的信息等,我们都可以去到这个topic上面进行查看,我们可以通过kafka-dump-log.sh这个脚本来进行查看该topic的信息。Kafka-dump-log.sh是一个之前就有的工具,用来查看Topic的的文件内容。这工具加了一个参数–cluster-metadata-decoder用来,查看元数据日志,如下所示:[hadoop@bigdata01 kafka_2.12-3.1.0]$ cd /opt/install/kafka_2.12-3.1.0[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadata --files /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.index,/opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log >>/opt/metadata.txtkafka-metadata-shell.sh直接查看元数据信息平时我们用zk的时候,习惯了用zk命令行查看数据,简单快捷。bin目录下自带了kafka-metadata-shell.sh工具,可以允许你像zk一样方便的查看数据。使用kafka-metadata-shell.sh脚本进入kafka的元数据客户端[hadoop@bigdata01 kafka_2.12-3.1.0]$ bin/kafka-metadata-shell.sh --snapshot /opt/install/kafka_2.12-3.1.0/topiclogs/__cluster_metadata-0/00000000000000000000.log四、Raft算法介绍raft算法中文版本翻译介绍:https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md著名的CAP原则又称CAP定理的提出,真正奠基了分布式系统的诞生,CAP定理指的是在一个分布式系统中,[一致性]、[可用性](Availability)、[分区容错性](Partition tolerance),这三个要素最多只能同时实现两点,不可能三者兼顾(nosql)。分布式系统为了提高系统的可靠性,一般都会选择使用多副本的方式来进行实现,例如hdfs当中数据的多副本,kafka集群当中分区的多副本等,但是一旦有了多副本的话,那么久面临副本之间一致性的问题,而一致性算法就是 用于解决分布式环境下多副本的数据一致性的问题。业界最著名的一致性算法就是大名鼎鼎的Paxos,但是Paxos比较晦涩难懂,不太容易理解,所以还有一种叫做Raft的算法,更加简单容易理解的实现了一致性算法。(一)Raft协议的工作原理Raft协议当中的角色分布Raft协议将分布式系统当中的角色分为Leader(领导者),Follower(跟从者)以及Candidate(候选者)Leader:主节点的角色,主要是接收客户端请求,并向Follower同步日志,当日志同步到过半及以上节点之后,告诉follower进行提交日志。Follower:从节点的角色,接受并持久化Leader同步的日志,在Leader通知可以提交日志之后,进行提交保存的日志。Candidate:Leader选举过程中的临时角色。Raft协议当中的底层原理Raft协议当中会选举出Leader节点,Leader作为主节点,完全负责replicate log的管理。Leader负责接受所有客户端的请求,然后复制到Follower节点,如果leader故障,那么follower会重新选举leader,Raft协议的一致性,概括主要可以分为以下三个重要部分:Leader选举日志复制安全性其中Leader选举和日志复制是Raft协议当中最为重要的。Raft协议要求系统当中,任意一个时刻,只有一个leader,正常工作期间,只有Leader和Follower角色,并且Raft协议采用了类似网络租期的方式来进行管理维护整个集群,Raft协议将时间分为一个个的时间段(term),也叫作任期,每一个任期都会选举一个Leader来管理维护整个集群,如果这个时间段的Leader宕机,那么这一个任期结束,继续重新选举leader。Raft算法将时间划分成为任意不同长度的任期(term)。任期用连续的数字进行表示。每一个任期的开始都是一次选举(election),一个或多个候选人会试图成为领导人。如果一个候选人赢得了选举,它就会在该任期的剩余时间担任领导人。在某些情况下,选票会被瓜分,有可能没有选出领导人,那么,将会开始另一个任期,并且立刻开始下一次选举。Raft算法保证在给定的一个任期最多只有一个领导人。Leader选举的过程Raft使用心跳来进行触发leader选举,当服务器启动时,初始化为follower角色。leader向所有Follower发送周期性心跳,如果Follower在选举超时间内没有收到Leader的心跳,就会认为leader宕机,稍后发起leader的选举。每个Follower都会有一个倒计时时钟,是一个随机的值,表示的是Follower等待成为Leader的时间,倒计时时钟先跑完,就会当选成为Leader,这样做得好处就是每一个节点都有机会成为Leader。当满足以下三个条件之一时,Quorum中的某个节点就会触发选举:向Leader发送Fetch请求后,在超时阈值quorum.fetch.timeout.ms之后仍然没有得到Fetch响应,表示Leader疑似失败。从当前Leader收到了EndQuorumEpoch请求,表示Leader已退位。Candidate状态下,在超时阈值quorum.election.timeout.ms之后仍然没有收到多数票,也没有Candidate赢得选举,表示此次选举作废,重新进行选举。具体详细过程实现描述如下:增加节点本地的current term,切换到candidate状态。自己给自己投一票。给其他节点发送RequestVote RPCs,要求其他节点也投自己一票。等待其他节点的投票回复。整个过程中的投票过程可以用下图进行表述。leader节点选举的限制每个节点只能投一票,投给自己或者投给别人。候选人所知道的日志信息,一定不能比自己的更少,即能被选举成为leader节点,一定包含了所有已经提交的日志。先到先得的原则数据一致性保证(日志复制机制)前面通过选举机制之后,选举出来了leader节点,然后leader节点对外提供服务,所有的客户端的请求都会发送到leader节点,由leader节点来调度这些并发请求的处理顺序,保证所有节点的状态一致,leader会把请求作为日志条目(Log entries)加入到他的日志当中,然后并行的向其他服务器发起AppendEntries RPC复制日志条目。当这条请求日志被成功复制到大多数服务器上面之后,Leader将这条日志应用到它的状态机并向客户端返回执行结果。客户端的每个请求都包含被复制状态机执行的指令leader将客户端请求作为一条心得日志添加到日志文件中,然后并行发起RPC给其他的服务器,让他们复制这条信息到自己的日志文件中保存。如果这条日志被成功复制,也就是大部分的follower都保存好了执行指令日志,leader就应用这条日志到自己的状态机中,并返回给客户端。如果follower宕机或者运行缓慢或者数据丢失,leader会不断地进行重试,直至所有在线的follower都成功复制了所有的日志条目。
常用消息队列 Kafka、RabbitMQ、RocketMQ、ActiveMQ 综合对比(18个方面)
一、简介Kafka:中。有kafka作者自己写的书,网上资料也有一些。rabbitmq:多。有一些不错的书,网上资料多。zeromq:少。没有专门写zeromq的书,网上的资料多是一些代码的实现和简单介绍rocketmq:少。没有专门写rocketmq的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。activemq:多。没有专门写activemq的书,网上资料多。二、开发语言Kafka:Scalarabbitmq:Erlangzeromq:crocketmq:javaactivemq:java三、支持的协议Kafka:自己定义的一套…(基于TCP) rabbitmq:AMQP zeromq:TCP、UDP rocketmq:自己定义的一套… activemq:OpenWire、STOMP、REST、XMPP、AMQP四、消息存储Kafka:内存、磁盘、数据库。支持大量堆积。kafka的最小存储单元是分区,一个topic包含多个分区,kafka创建主题时,这些分区会被分配在多个服务器上,通常一个broker一台服务器。分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的broker加入集群的时候,部分副本会被移动到新的broker上。根据配置文件中的目录清单,kafka会把新的分区分配给目录清单里分区数最少的目录。默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了key的情况,会根据key的hashcode取模后的值存到对应的分区中。rabbitmq:内存、磁盘。支持少量堆积。rabbitmq的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。引入镜像队列机制,可将重要队列“复制”到集群中的其他broker上,保证这些队列的消息不会丢失。配置镜像的队列,都包含一个主节点master和多个从节点slave,如果master失效,加入时间最长的slave会被提升为新的master,除发送消息外的所有动作都向master发送,然后由master将命令执行结果广播给各个slave,rabbitmq会让master均匀地分布在不同的服务器上,而同一个队列的slave也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。zeromq:消息发送端的内存或者磁盘中。不支持持久化。rocketmq:磁盘。支持大量堆积。commitLog文件存放实际的消息数据,每个commitLog上限是1G,满了之后会自动新建一个commitLog文件保存数据。ConsumeQueue队列只存放offset、size、tagcode,非常小,分布在多个broker上。ConsumeQueue相当于CommitLog的索引文件,消费者消费时会从consumeQueue中查找消息在commitLog中的offset,再去commitLog中查找元数据。ConsumeQueue存储格式的特性,保证了写过程的顺序写盘(写CommitLog文件),大量数据IO都在顺序写同一个commitLog,满1G了再写新的。加上rocketmq是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。activemq:内存、磁盘、数据库。支持少量堆积。五、消息事务Kafka:支持 rabbitmq:支持。客户端将信道设置为事务模式,只有当消息被rabbitMq接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降 zeromq:不支持 rocketmq:支持 activemq:支持六、负载均衡Kafka:支持负载均衡。1>一个broker通常就是一台服务器节点。对于同一个Topic的不同分区,Kafka会尽力将这些分区分布到不同的Broker服务器上,zookeeper保存了broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,kafka分区首领会被分配到不同的broker服务器上,让不同的broker服务器共同分担任务。每一个broker都缓存了元数据信息,客户端可以从任意一个broker获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。2>kafka的消费者组订阅同一个topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。3>当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。kafka的负载均衡大部分是自动完成的,分区的创建也是kafka完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。4>发送端由topic和key来决定消息发往哪个分区,如果key为null,那么会使用轮询算法将消息均衡地发送到同一个topic的不同分区中。如果key不为null,那么会根据key的hashcode取模计算出要发往的分区。rabbitmq:对负载均衡的支持不好。1>消息被投递到哪个队列是由交换器和key决定的,交换器、路由键、队列都需要手动创建。rabbitmq客户端发送消息要和broker建立连接,需要事先知道broker上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在broker上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个broker上,那么这个broker的负载就会过大。(可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,rabbitmq的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用)使用镜像队列机制建立rabbitmq集群可以解决这个问题,形成master-slave的架构,master节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave节点只是负责转发,在master失效时会选择加入时间最长的slave成为master。当新节点加入镜像队列的时候,队列中的消息不会同步到新的slave中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。2>当rabbitmq队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。这种方式非常适合扩展,而且是专门为并发程序设计的。如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq不再向这个消费者发送任何消息。3>对于rabbitmq而言,客户端与集群建立的TCP连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。但是rabbitmq集群可以借助HAProxy、LVS技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。客户端均衡算法:1)轮询法。按顺序返回下一个服务器的连接地址。2)加权轮询法。给配置高、负载低的机器配置更高的权重,让其处理更多的请求;而配置低、负载高的机器,给其分配较低的权重,降低其系统负载。3)随机法。随机选取一个服务器的连接地址。4)加权随机法。按照概率随机选取连接地址。5)源地址哈希法。通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算。6)最小连接数法。动态选择当前连接数最少的一台服务器的连接地址。zeromq:去中心化,不支持负载均衡。本身只是一个多线程网络库。rocketmq:支持负载均衡。一个broker通常是一个服务器节点,broker分为master和slave,master和slave存储的数据一样,slave从master同步数据。1>nameserver与每个集群成员保持心跳,保存着Topic-Broker路由信息,同一个topic的队列会分布在不同的服务器上。2>发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。tags选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。keys选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。3>rocketmq的负载均衡策略规定:Consumer数量应该小于等于Queue数量,如果Consumer超过Queue数量,那么多余的Consumer 将不能消费消息。这一点和kafka是一致的,rocketmq会尽可能地为每一个Consumer分配相同数量的队列,分摊负载。activemq:支持负载均衡。可以基于zookeeper实现负载均衡。七、集群方式Kafka:天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave。分区首领均匀地分布在不同的kafka服务器上,分区副本也均匀地分布在不同的kafka服务器上,所以每一台kafka服务器既含有分区首领,同时又含有分区副本,每一台kafka服务器是某一台kafka服务器的Slave,同时也是某一台kafka服务器的leader。kafka的集群依赖于zookeeper,zookeeper支持热扩展,所有的broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠zookeeper集群的mq相比,这是最大的优势。rabbitmq:支持简单集群,'复制’模式,对高级集群模式支持不好。rabbitmq的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。在rabbitmq集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。zeromq:去中心化,不支持集群。rocketmq:常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成MasterName Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。客户端先找到NameServer, 然后通过NameServer再找到 Broker。一个topic有多个队列,这些队列会均匀地分布在不同的broker服务器上。rocketmq队列的概念和kafka的分区概念是基本一致的,kafka同一个topic的分区尽可能地分布在不同的broker上,分区副本也会分布在不同的broker上。rocketmq集群的slave会从master拉取数据备份,master分布在不同的broker上。activemq:支持简单集群模式,比如’主-备’,对高级集群模式支持不好。八、管理界面Kafka:一般 rabbitmq:好 zeromq:无 rocketmq:无 activemq:一般九、可用性Kafka:非常高(分布式) rabbitmq:高(主从) zeromq:高。rocketmq:非常高(分布式) activemq:高(主从)十、消息重复Kafka:支持at least once、at most oncerabbitmq:支持at least once、at most oncezeromq:只有重传机制,但是没有持久化,消息丢了重传也没有用。既不是at least once、也不是at most once、更不是exactly only oncerocketmq:支持at least onceactivemq:支持at least once十一、吞吐量TPSKafka:极大 Kafka按批次发送消息和消费消息。发送端将多个小消息合并,批量发向Broker,消费端每次取出一个批次的消息批量处理。rabbitmq:比较大 zeromq:极大 rocketmq:大 rocketMQ接收端可以批量消费消息,可以配置每次消费的消息数,但是发送端不是批量发送。activemq:比较大十二、订阅形式和消息分发Kafka:基于topic以及按照topic进行正则匹配的发布订阅模式。【发送】发送端由topic和key来决定消息发往哪个分区,如果key为null,那么会使用轮询算法将消息均衡地发送到同一个topic的不同分区中。如果key不为null,那么会根据key的hashcode取模计算出要发往的分区。【接收】1>consumer向群组协调器broker发送心跳来维持他们和群组的从属关系以及他们对分区的所有权关系,所有权关系一旦被分配就不会改变除非发生再均衡(比如有一个consumer加入或者离开consumer group),consumer只会从对应的分区读取消息。2>kafka限制consumer个数要少于分区个数,每个消息只会被同一个 Consumer Group的一个consumer消费(非广播)。3>kafka的 Consumer Group订阅同一个topic,会尽可能地使得每一个consumer分配到相同数量的分区,不同 Consumer Group订阅同一个主题相互独立,同一个消息会被不同的 Consumer Group处理。rabbitmq:提供了4种:direct, topic ,Headers和fanout。【发送】先要声明一个队列,这个队列会被创建或者已经被创建,队列是基本存储单元。由exchange和key决定消息存储在哪个队列。direct>发送到和bindingKey完全匹配的队列。topic>路由key是含有"."的字符串,会发送到含有“*”、“#”进行模糊匹配的bingKey对应的队列。fanout>与key无关,会发送到所有和exchange绑定的队列headers>与key无关,消息内容的headers属性(一个键值对)和绑定键值对完全匹配时,会发送到此队列。此方式性能低一般不用【接收】rabbitmq的队列是基本存储单元,不再被分区或者分片,对于我们已经创建了的队列,消费端要指定从哪一个队列接收消息。当rabbitmq队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。这种方式非常适合扩展,而且是专门为并发程序设计的。如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq不再向这个消费者发送任何消息。zeromq:点对点(p2p)rocketmq:基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式【发送】发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。tags选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。keys选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。【接收】1>广播消费。一条消息被多个Consumer消费,即使Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。2>集群消费。一个 Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有3个实例,那么每个实例只消费其中的 3 条消息。即每一个队列都把消息轮流分发给每个consumer。activemq:点对点(p2p)、广播(发布-订阅)点对点模式,每个消息只有1个消费者;发布/订阅模式,每个消息可以有多个消费者。【发送】点对点模式:先要指定一个队列,这个队列会被创建或者已经被创建。发布/订阅模式:先要指定一个topic,这个topic会被创建或者已经被创建。【接收】点对点模式:对于已经创建了的队列,消费端要指定从哪一个队列接收消息。发布/订阅模式:对于已经创建了的topic,消费端要指定订阅哪一个topic的消息。十三、顺序消息Kafka:支持。设置生产者的max.in.flight.requests.per.connection为1,可以保证消息是按照发送顺序写入服务器的,即使发生了重试。kafka保证同一个分区里的消息是有序的,但是这种有序分两种情况1>key为null,消息逐个被写入不同主机的分区中,但是对于每个分区依然是有序的2>key不为null , 消息被写入到同一个分区,这个分区的消息都是有序。rabbitmq:不支持zeromq:不支持rocketmq:支持activemq:不支持十四、消息确认Kafka:支持。1>发送方确认机制ack=0,不管消息是否成功写入分区ack=1,消息成功写入首领分区后,返回成功ack=all,消息成功写入所有分区后,返回成功。2>接收方确认机制自动或者手动提交分区偏移量,早期版本的kafka偏移量是提交给Zookeeper的,这样使得zookeeper的压力比较大,更新版本的kafka的偏移量是提交给kafka服务器的,不再依赖于zookeeper群组,集群的性能更加稳定。rabbitmq:支持。1>发送方确认机制,消息被投递到所有匹配的队列后,返回成功。如果消息和队列是可持久化的,那么在写入磁盘后,返回成功。支持批量确认和异步确认。2>接收方确认机制,设置autoAck为false,需要显式确认,设置autoAck为true,自动确认。当autoAck为false的时候,rabbitmq队列会分成两部分,一部分是等待投递给consumer的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且consumer已经断开连接,rabbitmq会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。未确认的消息不会有过期时间,如果一直没有确认,并且没有断开连接,rabbitmq会一直等待,rabbitmq允许一条消息处理的时间可以很久很久。zeromq:支持。rocketmq:支持。activemq:支持。十五、消息回溯Kafka:支持指定分区offset位置的回溯。rabbitmq:不支持 zeromq:不支持 rocketmq:支持指定时间点的回溯。activemq:不支持十六、消息重试Kafka:不支持,但是可以实现。kafka支持指定分区offset位置的回溯,可以实现消息重试。rabbitmq:不支持,但是可以利用消息确认机制实现。rabbitmq接收方确认机制,设置autoAck为false。当autoAck为false的时候,rabbitmq队列会分成两部分,一部分是等待投递给consumer的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且consumer已经断开连接,rabbitmq会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。zeromq:不支持,rocketmq:支持。消息消费失败的大部分场景下,立即重试99%都会失败,所以rocketmq的策略是在消费失败时定时重试,每次时间间隔相同。1>发送端的 send 方法本身支持内部重试,重试逻辑如下:a)至多重试3次;b)如果发送失败,则轮转到下一个broker;c)这个方法的总耗时不超过sendMsgTimeout 设置的值,默认 10s,超过时间不在重试。2>接收端。Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以分为以下两种情况:由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。定时重试机制,比如过 10s 秒后再重试。由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况可以 sleep 30s,再消费下一条消息,减轻 Broker 重试消息的压力。activemq:不支持十七、并发度Kafka:高一个线程一个消费者,kafka限制消费者的个数要小于等于分区数,如果要提高并行度,可以在消费者中再开启多线程,或者增加consumer实例数量。rabbitmq:极高本身是用Erlang语言写的,并发性能高。可在消费者中开启多线程,最常用的做法是一个channel对应一个消费者,每一个线程把持一个channel,多个线程复用connection的tcp连接,减少性能开销。当rabbitmq队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。这种方式非常适合扩展,而且是专门为并发程序设计的。如果某些消费者的任务比较繁重,那么可以设置basicQos限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq不再向这个消费者发送任何消息。zeromq:高rocketmq:高1>rocketmq限制消费者的个数少于等于队列数,但是可以在消费者中再开启多线程,这一点和kafka是一致的,提高并行度的方法相同。修改消费并行度方法a) 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度,超过订阅队列数的 Consumer实例无效。b) 提高单个 Consumer 的消费并行线程,通过修改参数consumeThreadMin、consumeThreadMax2>同一个网络连接connection,客户端多个线程可以同时发送请求,连接会被复用,减少性能开销。activemq:高单个ActiveMQ的接收和消费消息的速度在1万笔/秒(持久化 一般为1-2万, 非持久化 2 万以上),在生产环境中部署10个Activemq就能达到10万笔/秒以上的性能,部署越多的activemq broker 在MQ上latency也就越低,系统吞吐量也就越高。总结如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看,您的支持是我坚持写作最大的动力。
秒懂消息队列MQ,万字总结带你全面了解消息队列MQ
前面介绍了分布式锁以及如何使用Redis实现分布式锁,接下来介绍分布式系统中另外一个非常重要的组件:消息队列。消息队列是大型分布式系统不可缺少的中间件,也是高并发系统的基石中间件,所以掌握好消息队列MQ就变得极其重要。接下来我就将从零开始介绍什么是消息队列?消息队列的应用场景?如何进行选型?如何在Spring Boot项目中整合集成消息队列。一、消息队列概述消息队列(Message Queue,简称MQ)指保存消息的一个容器,其实本质就是一个保存数据的队列。消息中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的构建。消息中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性的系统架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。二、消息队列应用场景消息中间件在互联网公司使用得越来越多,主要用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削峰和消息通讯四个场景。2.1 异步处理异步处理,就是将一些非核心的业务流程以异步并行的方式执行,从而减少请求响应时间,提高系统吞吐量。以下单为例,用户下单后需要生成订单、赠送活动积分、赠送红包、发送下单成功通知等一系列业务处理。假设三个业务节点每个使用100毫秒钟,不考虑网络等其他开销,则串行方式的时间是400毫秒,并行的时间只需要200毫秒。这样就大大提高了系统的吞吐量。2.2 应用解耦应用解耦,顾名思义就是解除应用系统之间的耦合依赖。通过消息队列,使得每个应用系统不必受其他系统影响,可以更独立自主。以电商系统为例,用户下单后,订单系统需要通知积分系统。一般的做法是:订单系统直接调用积分系统的接口。这就使得应用系统间的耦合特别紧密。如果积分系统无法访问,则积分处理失败,从而导致订单失败。加入消息队列之后,用户下单后,订单系统完成下单业务后,将消息写入消息队列,返回用户订单下单成功。积分系统通过订阅下单消息的方式获取下单通知消息,从而进行积分操作。实现订单系统与库存系统的应用解耦。如果,在下单时积分系统系统异常,也不影响用户正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作。2.3 流量削峰流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。以秒杀活动为例,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列,秒杀业务处理系统根据消息队列中的请求信息,再做后续处理。如上图所示,服务器接收到用户的请求后,首先写入消息队列,秒杀业务处理系统根据消息队列中的请求信息,做后续业务处理。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。2.4 消息通讯消息通讯是指应用间的数据通信。消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等点对点通讯。以上实际是消息队列的两种消息模式,点对点或发布订阅模式。三、如何选择合适的消息队列目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。面对这么多的中消息队列中间件,如何选择适合我们自身业务的消息中间件呢?3.1 衡量标准虽然这些消息队列在功能和特性方面各有优劣,但我们在选型时要有基本衡量标准:1、首先,是开源。开源意味着,如果有一天你使用的消息队列遇到了一个影响你系统业务的Bug,至少还有机会通过修改源代码来迅速修复或规避这个Bug,解决你的系统的问题,而不是等待开发者发布的下一个版本来解决。2、其次,是社区活跃度。这个产品必须是近年来比较流行并且有一定社区活跃度的产品。我们知道,开源产品越流行 Bug 越少,因为大部分遇到的 Bug,其他人早就遇到并且修复了。而且在使用过程中遇到的问题,也比较容易在网上搜索到类似的问题并快速找到解决方案。同时,流行开源产品一般与周边生态系统会有一个比较好的集成和兼容。3、最后,作为一款及格的消息队列,必须具备的几个特性包括:消息的可靠传递:确保不丢消息;支持集群:确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;性能:具备足够好的性能,能满足绝大多数场景的性能要求。3.2 选型对比接下来我们一起看一下有哪些符合上面这些条件,可供选择的开源消息队列产品。以下是关于各个消息队列中间件的选型对比:特性KafkaRocketMQRabbitMQActiveMQ单机吞吐量10万级10万级万级10万级开发语言ScalaJavaErlangJava高可用分布式分布式主从分布式消息延迟ms级ms级us级ms级消息丢失理论上不会丢失理论上不会丢失低低消费模式拉取推拉推拉 持久化 文件内存,文件内存,文件,数据库支持协议自定义协议自定义协议AMQP,XMPP, SMTP,STOMPAMQP,MQTT,OpenWire,STOMP社区活跃度高中高高管理界面 web console好一般部署难度中 低 部署方式独立独立独立独立,嵌入成熟度成熟比较成熟成熟成熟综合评价优点:拥有强大的性能及吞吐量,兼容性很好。 缺点:由于支持消息堆积,导致延迟比较高。优点:性能好,稳定可靠,有活跃的中文社区,特点响应快。 缺点:兼容性较差,但随着影响力的扩大,该问题会有改善。优点:产品成熟,容易部署和使用,拥有灵活的路由配置。 缺点:性能和吞吐量较差,不易进行二次开发。优点:产品成熟,支持协议多,支持多种语言的客户端。 缺点:社区不活跃,存在消息丢失的可能。以上四种消息队列都有各自的优劣势,需要根据现有系统的情况,选择最适合的消息队列。总结起来,电商、金融等对事务性要求很高的,可以考虑RocketMQ;技术挑战不是特别高,用 RabbitMQ 是不错的选择;如果是大数据领域的实时计算、日志采集等场景可以考虑 Kafka。四、Spring Boot整合RabbitMQ实现消息队列Spring Boot提供了spring-bootstarter-amqp组件对消息队列进行支持,使用非常简单,仅需要非常少的配置即可实现完整的消息队列服务。接下来介绍Spring Boot对RabbitMQ的支持。如何在SpringBoot项目中使用RabbitMQ?4.1 Spring Boot集成RabbitMQSpring Boot提供了spring-boot-starter-amqp组件,只需要简单的配置即可与Spring Boot无缝集成。下面通过示例演示集成RabbitMQ实现消息的接收和发送。第一步,配置pom包。创建Spring Boot项目并在pom.xml文件中添加spring-bootstarter-amqp等相关组件依赖:<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>在上面的示例中,引入Spring Boot自带的amqp组件spring-bootstarter-amqp。第二步,修改配置文件。修改application.properties配置文件,配置rabbitmq的host地址、端口以及账户信息。spring.rabbitmq.host=10.2.1.231
spring.rabbitmq.port=5672
spring.rabbitmq.username=zhangweizhong
spring.rabbitmq.password=weizhong1988
spring.rabbitmq.virtualHost=order在上面的示例中,主要配置RabbitMQ服务的地址。RabbitMQ配置由spring.rabbitmq.*配置属性控制。virtual-host配置项指定RabbitMQ服务创建的虚拟主机,不过这个配置项不是必需的。第三步,创建消费者消费者可以消费生产者发送的消息。接下来创建消费者类Consumer,并使用@RabbitListener注解来指定消息的处理方法。示例代码如下:@Component
public class Consumer {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue"))
public void process(String message) {
System.out.println("消费者消费消息111=====" + message);
}
}在上面的示例中,Consumer消费者通过@RabbitListener注解创建侦听器端点,绑定rabbitmq_queue队列。(1)@RabbitListener注解提供了@QueueBinding、@Queue、@Exchange等对象,通过这个组合注解配置交换机、绑定路由并且配置监听功能等。(2)@RabbitHandler注解为具体接收的方法。第四步,创建生产者生产者用来产生消息并进行发送,需要用到RabbitTemplate类。与之前的RedisTemplate类似,RabbitTemplate是实现发送消息的关键类。示例代码如下:@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void produce() {
String message = new Date() + "Beijing";
System.out.println("生产者产生消息=====" + message);
rabbitTemplate.convertAndSend("rabbitmq_queue", message);
}
}如上面的示例所示,RabbitTemplate提供了 convertAndSend方法发送消息。convertAndSend方法有routingKey和message两个参数:(1)routingKey为要发送的路由地址。(2)message为具体的消息内容。发送者和接收者的queuename必须一致,不然无法接收。第五步,测试验证。创建对应的测试类ApplicationTests,验证消息发送和接收是否成功。@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
Producer producer;
@Test
public void contextLoads() throws InterruptedException {
producer.produce();
Thread.sleep(1*1000);
}
}在上面的示例中,首先注入生产者对象,然后调用produce()方法来发送消息。最后,单击Run Test或在方法上右击,选择Run 'contextLoads()',运行单元测试程序,查看后台输出情况,结果如下图所示。通过上面的程序输出日志可以看到,消费者已经收到了生产者发送的消息并进行了处理。这是常用的简单使用示例。4.2 发送和接收实体对象Spring Boot支持对象的发送和接收,且不需要额外的配置。下面通过一个例子来演示RabbitMQ发送和接收实体对象。4.2.1 定义消息实体首先,定义发送与接收的对象实体User类,代码如下:public class User implements Serializable {
public String name;
public String password;
// 省略get和set方法
}在上面的示例中,定义了普通的User实体对象。需要注意的是,实体类对象必须继承Serializable序列化接口,否则会报数据无法序列化的错误。4.2.2 定义消费者修改Consumer类,将参数换成User对象。示例代码如下:@Component
public class Consumer {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue("rabbitmq_queue_object"))
public void process(User user) {
System.out.println("消费者消费消息111user=====name:" + user.getName()+",password:"+user.getPassword());
}
}其实,消费者类和消息处理方法和之前的类似,只不过将参数换成了实体对象,监听rabbitmq_queue_object队列。4.2.3 定义生产者修改Producer类,定义User实体对象,并通过convertAndSend方法发送对象消息。示例代码如下:@Component
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void produce() {
User user=new User();
user.setName("weiz");
user.setPassword("123456");
System.out.println("生产者生产消息111=====" + user);
rabbitTemplate.convertAndSend("rabbitmq_queue_object", user);
}
}在上面的示例中,还是调用convertAndSend()方法发送实体对象。convertAndSend()方法支持String、Integer、Object等基础的数据类型。4.2.4 验证测试创建单元测试类,注入生产者对象,然后调用produceObj()方法发送实体对象消息,从而验证消息能否被成功接收。@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
Producer producer;
@Test
public void testProduceObj() throws InterruptedException {
producer.produceObj();
Thread.sleep(1*1000);
}
}最后,单击Run Test或在方法上右击,选择Run 'contextLoads()',运行单元测试程序,查看后台输出情况,运行结果如下图所示。通过上面的示例成功实现了RabbitMQ发送和接收实体对象,使得消息的数据结构更加清晰,也更加贴合面向对象的编程思想。五、实现消息的100%可靠性发送5.1 什么是实现消息的100%可靠性发送?在使用消息队列时,因为生产者和消费者不直接交互,所以面临下面几个问题:1)要把消息添加到队列中,怎么保证消息成功添加?2)如何保证消息发送出去时一定会被消费者正常消费?3)消费者正常消费了,生产者或者队列如何知道消费者已经成功消费了消息?要解决前面这些问题,就要保证消息的可靠性发送,实现消息的100%可靠性发送。5.2 技术实现方案RabbitMQ为我们提供了解决方案,下面以常见的创建订单业务为例进行介绍,假设订单创建成功后需要发送短信通知用户。实现消息的100%可靠性发送需要以下条件:1)完成订单业务处理后,生产者发送一条消息到消息队列,同时记录这条操作日志(发送中)。2)消费者收到消息后处理进行;3)消费者处理成功后给消息队列发送ack应答;4)消息队列收到ack应答后,给生成者的Confirm Listener发送确认;5)生产者对消息日志表进行操作,修改之前的日志状态(发送成功);6)在消费端返回应答的过程中,可能发生网络异常,导致生产者未收到应答消息,因此需要一个定时任务去提取其状态为“发送中”并已经超时的消息集合;7)使用定时任务判断为消息事先设置的最大重发次数,大于最大重发次数就判断消息发送失败,更新日志记录状态为发送失败。具体流程如下图所示 5.3 实现消息的100%可靠性发送前面介绍了实现消息的100%可靠性发送的解决方案,接下来从项目实战出发演示如何实现消息的可靠性发送。1. 创建生产者首先把核心生产者的代码编写好,生产者由基本的消息发送和监听组成:@Component
public class RabbitOrderSender {
private static Logger logger = LoggerFactory.getLogger(RabbitOrderSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageLogMapper messageLogMapper;
/**
* Broker应答后,会调用该方法区获取应答结果
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("correlationData:"+correlationData);
String messageId = correlationData.getId();
logger.info("消息确认返回值:"+ack);
if (ack){
//如果返回成功,则进行更新
messageLogMapper.changeMessageLogStatus(messageId, Constans.ORDER_SEND_SUCCESS,new Date());
}else {
//失败进行操作:根据具体失败原因选择重试或补偿等手段
logger.error("异常处理,返回结果:"+cause);
}
}
};
/**
* 发送消息方法调用: 构建自定义对象消息
* @param order
* @throws Exception
*/
public synchronized void sendOrder(OrderInfo order) throws Exception {
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("order.exchange", "order.message", order, correlationData);
}
}上面的消息发送示例代码和之前的没什么区别,只是增加了confirmCallback应答结果回调。通过实现ConfirmCallback接口,消息发送到Broker后触发回调,确认消息是否到达Broker服务器。因此,ConfirmCallback只能确认消息是否正确到达交换机中。2. 消息重发定时任务实现消息重发的定时任务,示例代码如下:@Component
public class RetryMessageTasker {
private static Logger logger = LoggerFactory.getLogger(RetryMessageTasker.class);
@Autowired
private RabbitOrderSender rabbitOrderSender;
@Autowired
private MessageLogMapper messageLogMapper;
/**
* 定时任务
*/
@Scheduled(initialDelay = 5000, fixedDelay = 10000)
public void reSend(){
logger.info("-----------定时任务开始-----------");
//抽取消息状态为0且已经超时的消息集合
List<MessageLog> list = messageLogMapper.query4StatusAndTimeoutMessage();
list.forEach(messageLog -> {
//投递三次以上的消息
if(messageLog.getTryCount() >= 3){
//更新失败的消息
messageLogMapper.changeMessageLogStatus(messageLog.getMessageId(), Constans.ORDER_SEND_FAILURE, new Date());
} else {
// 重试投递消息,将重试次数递增
messageLogMapper.update4ReSend(messageLog.getMessageId(), new Date());
OrderInfo reSendOrder = JsonUtil.jsonToObject(messageLog.getMessage(), OrderInfo.class);
try {
rabbitOrderSender.sendOrder(reSendOrder);
} catch (Exception e) {
e.printStackTrace();
logger.error("-----------异常处理-----------");
}
}
});
}
}在上面的定时任务程序中,每10秒钟提取状态为0且已经超时的消息,重发这些消息,如果发送次数已经在3次以上,则认定为发送失败。3. 创建消费者创建消费者程序负责接收处理消息,处理成功后发送消息确认。示例代码如下:@Component
public class OrderReceiver {
/**
* @RabbitListener 消息监听,可配置交换机、队列、路由key
* 该注解会创建队列和交互机 并建立绑定关系
* @RabbitHandler 标识此方法如果有消息过来,消费者要调用这个方法
* @Payload 消息体
* @Headers 消息头
* @param order
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order.queue",declare = "true"),
exchange = @Exchange(name = "order.exchange",declare = "true",type = "topic"),
key = "order.message"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers,
Channel channel) throws Exception{
//消费者操作
try {
System.out.println("------收到消息,开始消费------");
System.out.println("订单ID:"+order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//现在是手动确认消息 ACK
channel.basicAck(deliveryTag,false);
} finally {
channel.close();
}
}
}
消息处理程序和一般的接收者类似,都是通过@RabbitListener注解监听消息队列。不同的是,发送程序处理成功后,通过channel.basicAck(deliveryTag,false)发送消息确认ACK。4. 运行测试创建单元测试程序。创建一个生成订单的测试方法,测试代码如下:@RunWith(SpringRunner.class)
@SpringBootTest
public class MqApplicationTests {
@Autowired
private OrderService orderService;
/**
* 测试订单创建
*/
@Test
public void createOrder(){
OrderInfo order = new OrderInfo();
order.setId("201901236");
order.setName("测试订单6");
order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());
try {
orderService.createOrder(order);
} catch (Exception e) {
e.printStackTrace();
}
}
}启动消费者程序,启动成功之后,运行createOrder创建订单测试方法。结果表明发送成功并且入库正确,业务表和消息记录表均有数据且status状态=1,表示成功。如果消费者程序处理失败或者超时,未返回ack确认;则生产者的定时程序会重新投递消息。直到三次投递均失败。六、MQ常见问题总结6.1 怎么保证消息没有重复消费?使用消息队列如何保证幂等性?幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用问题出现原因我们先来了解一下产生消息重复消费的原因,对于MQ的使用,有三个角色:生产者、MQ、消费者,那么消息的重复这三者会出现:生产者:生产者可能会推送重复的数据到MQ中,有可能controller接口重复提交了两次,也可能是重试机制导致的MQ:假设网络出现了波动,消费者消费完一条消息后,发送ack时,MQ还没来得及接受,突然挂了,导致MQ以为消费者还未消费该条消息,MQ回复后会再次推送了这条消息,导致出现重复消费。消费者:消费者接收到消息后,正准备发送ack到MQ,突然消费者挂了,还没得及发送ack,这时MQ以为消费者还没消费该消息,消费者重启后,MQ再次推送该条消息。解决方案在正常情况下,生产者是客户,我们很难避免出现用户重复点击的情况,而MQ是允许存在多条一样的消息,但消费者是不允许出现消费两条一样的数据,所以幂等性一般是在消费端实现的:状态判断:消费者把消费消息记录到redis中,再次消费时先到redis判断是否存在该数据,存在则表示消费过,直接丢弃业务判断:消费完数据后,都是需要插入到数据库中,使用数据库的唯一约束防止重复消费。插入数据库前先查询是否存在该数据,存在则直接丢弃消息,这种方式是比较简单粗暴地解决问题6.2 消息丢失的情况消息丢失属于比较常见的问题。一般有生产端丢失、MQ服务丢失、消费端丢失等三种情况。针对各种情况应对方式也不一样。1.生产端丢失的解决方案主要有开启confirm模式,生产着收到MQ发回的confirm确认之后,再进行消息删除,否则消息重推。生产者端消息保存的数据库,由后台定时程序异步推送,收到confirm确认则认为成功,否则消息重推,重推多次均未成功,则认为发送失败。2.MQ服务丢失则主要是开启消息持久化,让消息及时保存到磁盘。3.消费端消息丢失则关闭自动ack确认,消息消费成功后手动发送ack确认。消息消费失败,则重新消费。6.3 消息的传输顺序性解决思路在生产端发布消息时,每次法发布消息都把上一条消息的ID记录到消息体中,消费者接收到消息时,做如下操作先根据上一条Id去检查是否存在上一条消息还没被消费,如果不存在(消费后去掉id),则正常进行,如果正常操作如果存在,则根据id到数据库检查是否被消费,如果被消费,则正常操作如果还没被消费,则休眠一定时间(比如30ms),再重新检查,如被消费,则正常操作如果还没被消费,则抛出异常6.4 怎么解决消息积压问题?所谓的消息积压,就是生成者生成消息太快,而消费者处理消息太慢,从而导致消费端消息积压在MQ中无法处理的问题。遇到这种消息积压的情况,可以根据消息重要程度,分为两种情况处理:如果消息可以被丢弃,那么直接丢弃就好了一般情况下,消息是不可以被丢弃的,那么这样需要考虑策略了,我们可以把原来的消费端重新当做生产端,重新部署一天MQ,再后面出现增加消费端,这样形成另一条生产-消息-消费的线路最后以上,我们就把消息队列介绍完了。消息中间件在互联网公司使用得越来越多,希望大家能够熟悉其使用。