Kafka安全机制与授权(linux)
下载kafka
地址:https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
下载:wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
解压:tar zxvf kafka_2.12-2.5.0.tgz
进入目录:cd kafka_2.12-2.5.0/
[root@localhost kafka_2.12-2.5.0]# ll
总用量 56
drwxr-xr-x 3 root root 4096 4月 8 09:16 bin
drwxr-xr-x 2 root root 4096 4月 8 09:16 config
drwxr-xr-x 2 root root 8192 6月 30 13:01 libs
-rw-r--r-- 1 root root 32216 4月 8 09:13 LICENSE
-rw-r--r-- 1 root root 337 4月 8 09:13 NOTICE
drwxr-xr-x 2 root root 44 4月 8 09:16 site-docs
[root@localhost kafka_2.12-2.5.0]#
bin
目录放的都是可执行文件,config
目录放的都是配置文件。
首先我们先创建一个证书
创建证书
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=test],SCRAM-SHA-512=[password=test]' --entity-type users --entity-name test
验证证书
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name admin
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name test
这地方的admin
就是下面的admin
我们先进入config
目录下,创建一个名为kafka_server_jaas.conf
文件,内容如下:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
具体操作
[root@localhost kafka_2.12-2.5.0]# cd config/
[root@localhost config]# touch kafka_server_jaas.conf
[root@localhost config]# vim kafka_server_jaas.conf
[root@localhost config]#
创建好这个文件后,然后我们去修改server.properties
# 设置监听使用SASL而不是SSL
listeners=SASL_PLAINTEXT://192.168.0.98:9092
advertised.listeners=SASL_PLAINTEXT://192.168.0.98:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
# ACL
allow.everyone.if.no.acl.found=false
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#这里加上这句话就不需要kafka_server_jaas.conf这个文件了
#如果没加则需要执行
#export KAFKA_OPTS="-Djava.security.auth.login.config=/你的目录/kafka_2.12-2.5.0/config/kafka_server_jaas.conf"
listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin";
启动zookeeper
cd bin/
./zookeeper-server-start.sh ../config/zookeeper.properties
启动kafka
./kafka-server-start.sh ../config/server.properties
创建Topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_test
添加读写权限
# 添加读权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic kafka_test
# 添加写权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Write --topic kafka_test
# 添加消费者组权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --group group_test
# 查看权限列表
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list
生产者
/**
* 生产者配置
*/
@Configuration
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
public ProducerFactory<String,String> producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
private Map<String, Object> producerConfigs() {
HashMap<String, Object> properties = new HashMap<String, Object>();
//配置的是kafka的端口
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.98:9092");
//配置key的序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//配置value的序列化类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
//这有助于提升客户端和服务端之间的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
//0生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。
//1只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。
//all或-1只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
properties.put(ProducerConfig.ACKS_CONFIG, "1");
//producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
//
//配置文件设置sasl_plaintext认证
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test\";");
return properties;
}
}
生产消息
@RestController
public class TestController {
private static Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("test")
public String test(){
HashMap<String,String> hashMap = new HashMap<>();
hashMap.put("name","zd");
hashMap.put("age","18");
//这地方如果有topic:testTopic会往这个topic添加消息,如果没有则创建这个testTopic然后往里面添加消息
ListenableFuture<SendResult<String, String>> testTopic = kafkaTemplate.send("kafka_test", JSONObject.toJSONString(hashMap));
//可有可无看自己需求
testTopic.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
logger.info("发送失败");
//失败do something
}
@Override
public void onSuccess(SendResult<String, String> integerStringSendResult) {
logger.info("发送成功");
//成功do something
}
});
return "ok";
}
}
消费者
/**
* 消费者配置
*/
@Configuration
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() { Map<String, Object> properties = new HashMap<>();
//配置的是kafka的端口
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.98:9092");
//消息反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group,默认:""
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用,默认:true
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//consumer自动向zookeeper提交offset的频率,默认:5000
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);
//没有初始化的offset时,可以设置以下三种情况:(默认:latest)
//earliest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none
//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//消费者进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。。默认:""
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "zdpower");
//配置文件设置sasl_plaintext认证
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test1\" password=\"test1\";");
return properties;
}
}
消费消息
@Component
public class MyKafkaConsumer {
public static Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
@KafkaListener(containerFactory="kafkaListenerContainerFactory",topics = {"kafka_test"})
public void preCommandTicket1(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("----------------- record =" + record);
logger.info("------------------ message =" + message);
}
}
}