kafka 磁盘扩容与数据均衡操作代码

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议

一、概述
Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议:

1)Kafka 磁盘扩容概述
添加新磁盘:在服务器上添加新的磁盘,确保磁盘有足够的容量,并且其性能符合集群的需求。
修改 Kafka 配置:在Kafka的配置文件(server.properties)中更新log.dirs属性,将新磁盘路径添加到现有的路径中。
log.dirs=/path/to/old/disk,/path/to/new/disk
重新启动 Kafka 节点:重新启动 Kafka 节点,确保新的配置生效。在进行重启之前,确保已经备份了关键的配置文件和数据。
2)Kafka 数据均衡概述
分区重新平衡:在 Kafka 中,分区数据的均衡很重要,以确保每个节点的负载相对均匀。您可以使用 Kafka 提供的工具或 API 来重新平衡分区,确保每个节点负责处理相似数量的分区和数据。
监控分区状态:使用Kafka的监控工具,例如Kafka Manager、Burrow 等,来监控分区的状态和分布情况。确保没有分区处于不平衡的状态。
手动干预:在某些情况下,可能需要手动干预来解决数据均衡问题。这可能包括手动重新分配分区或手动调整分区的副本分布。
考虑工作负载变化:在Kafka集群上部署新的生产者或消费者时,要考虑工作负载的变化。新的生产者可能导致更多的数据写入,而新的消费者可能导致更多的数据读取。
分区数量和副本:考虑适当的分区数量和副本数量。分区数太多可能导致管理和维护的困难,而分区数太少可能导致单个节点的负载过重。
使用Kafka工具:Kafka提供了一些工具,如 kafka-reassign-partitions.sh 用于手动重新分配分区,以及 kafka-preferred-replica-election.sh 用于执行首选副本选举。
在进行磁盘扩容和数据均衡时,请确保在生产环境中小心操作,并在非生产环境中进行测试和模拟。细心的规划和执行可以确保Kafka集群的可用性和性能。
二、K8s 集群部署
k8s 环境安装之前写过很多文档,可以参考我以下几篇文章:
【云原生】k8s 离线部署讲解和实战操作
【云原生】k8s 环境快速部署(一小时以内部署完)
三、kafka on k8s 环境部署
这里为了快速演示,选择了 on k8s 部署方式,当然也可以选择物理机部署方式。以前也写过很多关于 kafka的文章,可以参考一下:
Kafka原理介绍+安装+基本操作(kafka on k8s)
大数据Hadoop之——Kafka鉴权认证
大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)
大数据Hadoop之——Kafka Streams原理介绍与简单应用示例
大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)
大数据Hadoop之——EFAK安全认证实现(kafka+zookeeper)
【云原生】zookeeper + kafka on k8s 环境部署
【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程
1)安装 helm

下载包

wget -O /tmp/helm-v3.7.1-linux-amd64.tar.gz

解压压缩包

tar -xf /tmp/helm-v3.7.1-linux-amd64.tar.gz -C /root/

软链

ln -s /root/linux-amd64/helm /usr/local/bin/helm
2)安装 zookeeper
1、添加源并下载部署包
helm repo add bitnami
helm pull bitnami/zookeeper --version 10.2.1
tar -xf zookeeper-10.2.1.tgz
2、修改配置
修改zookeeper/values.yaml
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: bigdata_cloudnative/zookeeper
tag: 3.8.0-debian-11-r36
...
replicaCount: 3
service:
type: NodePort
nodePorts:

#NodePort 默认范围是 30000-32767
client: "32181"
tls: "32182"

persistence:
storageClass: "zookeeper-local-storage"
size: "10Gi"

目录需要提前在宿主机上创建

local:

- name: zookeeper-0
  host: "local-168-182-110"
  path: "/opt/bigdata/servers/zookeeper/data/data1"
- name: zookeeper-1
  host: "local-168-182-111"
- name: zookeeper-2
  host: "local-168-182-112"

Enable Prometheus to access ZooKeeper metrics endpoint

metrics:
enabled: true
添加zookeeper/templates/pv.yaml

{ {- range .Values.persistence.local }}

apiVersion: v1
kind: PersistentVolume
metadata:
name: { { .name }}
labels:
name: { { .name }}
spec:
storageClassName: { { $.Values.persistence.storageClass }}
capacity:
storage: { { $.Values.persistence.size }}
accessModes:

- ReadWriteOnce
path: {
  { .path }}

nodeAffinity:
required:
nodeSelectorTerms:

    - matchExpressions:
        - key: kubernetes.io/hostname
          operator: In
          values:
            - {
  { .host }}

{ {- end }}
添加zookeeper/templates/storage-class.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
name: { { .Values.persistence.storageClass }}
provisioner: kubernetes.io/no-provisioner
设置时区,zookeeper/templates/statefulset.yaml
env:

- name: TZ
  value: Asia/Shanghai

3、开始安装 zookeeper
docker pull docker.io/bitnami/zookeeper:3.8.0-debian-11-r36

为了方便下次快速拉取镜像,将镜像推送到阿里云上

docker tag docker.io/bitnami/zookeeper:3.8.0-debian-11-r36 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/zookeeper:3.8.0-debian-11-r36
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/zookeeper:3.8.0-debian-11-r36

开始安装

helm install zookeeper ./zookeeper -n zookeeper --create-namespace
NOTES
NAME: zookeeper
LAST DEPLOYED: Sun Nov 12 22:39:36 2023
NAMESPACE: zookeeper
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 10.2.1
APP VERSION: 3.8.0
Please be patient while the chart is being deployed
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.zookeeper.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace zookeeper -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh
To connect to your ZooKeeper server from outside the cluster execute the following commands:
export NODE_IP=$(kubectl get nodes --namespace zookeeper -o jsonpath="{.items[0].status.addresses[0].address}")
export NODE_PORT=$(kubectl get --namespace zookeeper -o jsonpath="{.spec.ports[0].nodePort}" services zookeeper)
zkCli.sh $NODE_IP:$NODE_PORT
查看pod状态
kubectl get pods,svc -n zookeeper -owide
4、测试验证

登录zookeeper pod

kubectl exec -it zookeeper-0 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-1 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-2 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-0 -n zookeeper -- bash
5、卸载
helm uninstall zookeeper -n zookeeper
kubectl delete pod -n zookeeper kubectl get pod -n zookeeper|awk 'NR>1{print $1}' --force
kubectl patch ns zookeeper -p '{"metadata":{"finalizers":null}}'
kubectl delete ns zookeeper --force
3)安装 kafka
helm pull bitnami/kafka --version 18.4.2
tar -xf kafka-18.4.2.tgz
修改kafka/values.yaml
repository: bigdata_cloudnative/kafka
tag: 3.2.1-debian-11-r16
client: "30092"
external: "30094"
externalAccess
service:
type: NodePort
nodePorts:

   - 30001
   - 30002
   - 30003
 useHostIPs: true

storageClass: "kafka-local-storage"

- name: kafka-0
  path: "/opt/bigdata/servers/kafka/data/data1"
- name: kafka-1
- name: kafka-2

kafka:
enabled: true
image:
registry: registry.cn-hangzhou.aliyuncs.com
repository: bigdata_cloudnative/kafka-exporter
tag: 1.6.0-debian-11-r8
jmx:
repository: bigdata_cloudnative/jmx-exporter
tag: 0.17.1-debian-11-r1
annotations:
prometheus.io/path: "/metrics"
zookeeper:
enabled: false
externalZookeeper
servers:

- zookeeper-0.zookeeper-headless.zookeeper
- zookeeper-1.zookeeper-headless.zookeeper
- zookeeper-2.zookeeper-headless.zookeeper

添加kafka/templates/pv.yaml
添加kafka/templates/storage-class.yaml
设置时区,kafka/templates/statefulset.yaml
3、开始安装
docker pull docker.io/bitnami/kafka:3.2.1-debian-11-r16
docker tag docker.io/bitnami/kafka:3.2.1-debian-11-r16 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16

node-export

docker pull docker.io/bitnami/kafka-exporter:1.6.0-debian-11-r8
docker tag docker.io/bitnami/kafka-exporter:1.6.0-debian-11-r8 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka-exporter:1.6.0-debian-11-r8
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka-exporter:1.6.0-debian-11-r8

JXM

docker pull docker.io/bitnami/jmx-exporter:0.17.1-debian-11-r1
docker tag docker.io/bitnami/jmx-exporter:0.17.1-debian-11-r1 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/jmx-exporter:0.17.1-debian-11-r1
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/jmx-exporter:0.17.1-debian-11-r1

开始安装

helm install kafka ./kafka -n kafka --create-namespace
NAME: kafka
LAST DEPLOYED: Sun Nov 12 23:32:49 2023
NAMESPACE: kafka
CHART NAME: kafka
CHART VERSION: 18.4.2
APP VERSION: 3.2.1
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.kafka.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.kafka.svc.cluster.local:9092
kafka-1.kafka-headless.kafka.svc.cluster.local:9092
kafka-2.kafka-headless.kafka.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16 --namespace kafka --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace kafka -- bash
PRODUCER:
kafka-console-producer.sh \
--broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092,kafka-1.kafka-headless.kafka.svc.cluster.local:9092,kafka-2.kafka-headless.kafka.svc.cluster.local:9092 \
--topic test
CONSUMER:
kafka-console-consumer.sh \
--bootstrap-server kafka.kafka.svc.cluster.local:9092 \
--topic test \
--from-beginning
kubectl get pods,svc -n kafka -owide
kubectl exec -it kafka-0 -n kafka -- bash

1、创建分区

kafka-topics.sh --create --topic test001 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1

查看

kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test001
问题处理:Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 5555; nested exception is:
修改 /opt/bitnami/kafka/bin/kafka-run-class.sh 脚本,修改内容如下:

增加

ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; then
ISKAFKASERVER="true"
fi

修改

if [ $JMX_PORT ];then

if [ $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then
修改后的完整脚本

!/bin/bash

Licensed to the Apache Software Foundation (ASF) under one or more

contributor license agreements. See the NOTICE file distributed with

this work for additional information regarding copyright ownership.

The ASF licenses this file to You under the Apache License, Version 2.0

(the "License"); you may not use this file except in compliance with

the License. You may obtain a copy of the License at

#

http://www.nanchang-company.com/sitemap.html

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
exit 1

CYGWIN == 1 if Cygwin is detected, else 0.

if [[ $(uname -a) =~ "CYGWIN" ]]; then
CYGWIN=1
else
CYGWIN=0
if [ -z "$INCLUDE_TEST_JARS" ]; then
INCLUDE_TEST_JARS=false

Exclude jars not necessary for running commands.

regex="(-(test|test-sources|src|scaladoc|javadoc).jar|jar.asc|connect-file.*.jar)$"
should_include_file() {
if [ "$INCLUDE_TEST_JARS" = true ]; then
return 0
fi
file=$1
if [ -z "$(echo "$file" | egrep "$regex")" ] ; then
else
return 1
}
base_dir=$(dirname $0)/..
if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.6
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2
if [ -z "$SCALA_BINARY_VERSION" ]; then
SCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')

run ./gradlew copyDependantLibs to get all dependant jars in a local dir

shopt -s nullglob
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
for dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION};
do
CLASSPATH="$CLASSPATH:$dir/
"
done
for file in "$base_dir"/examples/build/libs/kafka-examples.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
done
clients_lib_dir=$(dirname $0)/../clients/build/libs
streams_lib_dir=$(dirname $0)/../streams/build/libs
streams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
streams_lib_dir=$clients_lib_dir
streams_dependant_clients_lib_dir=$streams_lib_dir
for file in "$clients_lib_dir"/kafka-clients
.jar;
for file in "$streams_lib_dir"/kafka-streams.jar;
for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples
.jar;
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
VERSION_NO_DOTS=echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'
SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests.jar;
CLASSPATH="$file":"$CLASSPATH"
if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
for file in "$streams_dependant_clients_lib_dir"/rocksdb
.jar;
CLASSPATH="$CLASSPATH":"$file"
for file in "$streams_dependant_clients_lib_dir"/hamcrest.jar;
for file in "$base_dir"/shell/build/libs/kafka-shell.jar;
for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}
;
CLASSPATH="$CLASSPATH:$dir/"
for file in "$base_dir"/tools/build/libs/kafka-tools
.jar;
for dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION};
for file in "$base_dir"/trogdor/build/libs/trogdor-
.jar;
for dir in "$base_dir"/trogdor/build/dependant-libs-${SCALA_VERSION};
for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}
.jar;
if [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; then
CLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"

classpath addition for release

for file in "$base_dir"/libs/;
for file in "$basedir"/core/build/libs/kafka${SCALA_BINARY_VERSION}
.jar;
shopt -u nullglob
if [ -z "$CLASSPATH" ] ; then
echo "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"

JMX settings

if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "

JMX port to use

KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "

Log directory to use

if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"

Log4j settings

if [ -z "$KAFKA_LOG4J_OPTS" ]; then

Log to console. This is a tool.

LOG4J_DIR="$base_dir/config/tools-log4j.properties"

If Cygwin is detected, LOG4J_DIR is converted to Windows format.

(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"

create logs directory

if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"

If Cygwin is detected, LOG_DIR is converted to Windows format.

(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"

Generic jvm settings you want to add

if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS=""

Set Debug options if enabled

if [ "x$KAFKA_DEBUG" != "x" ]; then

# Use default ports
DEFAULT_JAVA_DEBUG_PORT="5005"
if [ -z "$JAVA_DEBUG_PORT" ]; then
    JAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"
# Use the defaults if JAVA_DEBUG_OPTS was not set
DEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"
if [ -z "$JAVA_DEBUG_OPTS" ]; then
    JAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"
echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"

Which java to use

if [ -z "$JAVA_HOME" ]; then
JAVA="java"
JAVA="$JAVA_HOME/bin/java"

Memory options

if [ -z "$KAFKA_HEAP_OPTS" ]; then
KAFKA_HEAP_OPTS="-Xmx256M"

JVM performance options

MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported

if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
while [ $# -gt 0 ]; do
COMMAND=$1
case $COMMAND in
-name)
DAEMON_NAME=$2
CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
shift 2
;;
-loggc)
if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
GC_LOG_ENABLED="true"
fi
shift
-daemon)
DAEMON_MODE="true"
*)
break
esac

GC options

GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX

The first segment of the version number, which is '1' for releases before Java 9

it then becomes '9', '10', ...

Some examples of the first line of java --version:

8 -> java version "1.8.0_152"

9.0.4 -> java version "9.0.4"

10 -> java version "10" 2018-03-20

10.0.1 -> java version "10.0.1" 2018-04-17

We need to match to the end of the line to prevent sed from printing the characters that do not match

JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/. version "([0-9]).$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
KAFKA_GC_LOG_OPTS="-Xlog:gc
:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"

Remove a possible colon prefix from the classpath (happens at lines like CLASSPATH="$CLASSPATH:$file" when CLASSPATH is blank)

Syntax used on the right side is native Bash string manipulation; for more details see

http://www.nanchang-company.com/sitemap.html, specifically the section titled "Substring Removal"

CLASSPATH=${CLASSPATH#:}

If Cygwin is detected, classpath is converted to Windows format.

(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")

Launch mode

if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
exec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
将脚本覆盖容器里的
kubectl cp kafka-run-class.sh kafka-0:/opt/bitnami/kafka/bin/kafka-run-class.sh -n kafka
再执行创建topic

参数解释:

--create: 指定创建topic动作

--topic:指定新建topic的名称

--bootstrap-server: 指定kafka连接地址

--config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration

--partitions:指定当前创建的kafka分区数量,默认为1个

--replication-factor:指定每个分区的副本数,默认1个

1、创建topic,三分区,三副本,设置数据过期时间72小时(-1表示不过期,默认是永久保存的,不会自动过期),单位ms,7236001000=259200000

kafka-topics.sh --create --topic test001 --bootstrap-server kafka.kafka:9092 --partitions 3 --replication-factor 3 --config retention.ms=259200000
kafka-topics.sh --list --bootstrap-server kafka.kafka:9092
生产者/消费者测试

【生产者】

kafka-console-producer.sh --broker-list kafka.kafka:9092 --topic test001
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

【消费者】

从头开始消费

kafka-console-consumer.sh --bootstrap-server kafka.kafka:9092 --topic test001 --from-beginning

指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区

kafka-console-consumer.sh --bootstrap-server kafka.kafka:9092 --topic test001 --partition 0 --offset 100 --group test001

查看数据积压

kafka-consumer-groups.sh --bootstrap-server kafka.kafka:9092 --describe --group test001
删除 topic

删除topic,默认是没有启用删除topic的

kafka-topics.sh --delete --topic test001 --bootstrap-server kafka.kafka:9092

配置启用可以删除topic,topic 配置文件里,delete.topic.enable=true;k8s helm chat包里开启这个参数:

deleteTopicEnable: true
helm uninstall kafka -n kafka
kubectl delete pod -n kafka kubectl get pod -n kafka|awk 'NR>1{print $1}' --force
kubectl patch ns kafka -p '{"metadata":{"finalizers":null}}'
kubectl delete ns kafka --force
四、kafka 分区与副本
Kafka中的分区(Partitions)和副本(Replicas)是关键的概念,它们有助于实现高可用性、容错性和扩展性。下面是有关Kafka分区和副本的基本概念:
1)分区(Partitions):
定义:分区是Kafka中用于存储消息的基本单元。每个主题(Topic)都可以被划分为一个或多个分区。分区中的每条消息都会被分配到一个特定的分区中。
1、作用:
水平扩展:通过将主题划分为多个分区,Kafka可以水平扩展,允许消息的并行处理和更好的性能。
顺序保证:每个分区中的消息保持有序。在同一分区中,消息的写入和读取顺序是严格有序的。
2、分区的属性:
编号:每个分区都有一个唯一的编号(从0开始),用于标识分区。
持久化:分区的数据是持久化的,可以在多个节点之间复制以提高可用性和容错性。
副本数量:每个分区可以有一个或多个副本。
3、生产者和消费者:
生产者可以指定消息发送到特定的分区。
消费者订阅主题时,会消费所有分区中的消息。
2)副本(Replicas):
定义:副本是分区的复制。每个分区可以配置多个副本,这些副本分布在Kafka集群的不同节点上。
高可用性:副本提供了故障恢复和高可用性。当某个节点或分区不可用时,仍然可以从其他节点或副本读取数据。
容错性:通过在多个节点上存储相同的数据,即使某个节点发生故障,数据仍然可用。
2、副本的属性:
同步复制:副本之间可以配置为同步或异步复制。同步复制确保写入操作在所有副本上都完成后才返回成功。
领导者和追随者:每个分区都有一个领导者(Leader)和零个或多个追随者(Follower)。生产者和消费者通常与分区的领导者进行交互。
3、ISR(In-Sync Replicas):
ISR 是指与分区领导者保持同步的副本集合。只有ISR中的副本才能成为新的领导者。当某个副本无法保持同步时,它将从ISR中移除。
生产者和消费者与分区和副本的关系:
生产者可以选择将消息发送到特定的分区,也可以根据分区键选择。
消费者订阅主题时,会消费分区中的消息,与分区中的领导者和追随者进行交互。
总体而言,Kafka的分区和副本机制提供了高度的可伸缩性、高可用性和容错性,使其成为处理大规模实时数据流的强大平台。
五、kafka 磁盘扩容
场景:可能因为数据量上涨,就得靠谱扩容磁盘了,这里每个节点增加一块磁盘,如果不新增topic的情况下,是不会写到对应新磁盘的。kafka配置文件log.dirs增加了几个目录。

log.dirs用来配置多个根目录(以逗号分隔)

log.dirs=/data1,/data2

修改完配置重启kafka即可

六、数据均衡(分区迁移)
场景:kafka配置文件log.dirs增加了几个目录,但是新目录没有分区数据写入,所以打算进行重分区一下。
1)查看topic分区情况

为了测试这里多建几个topic

kafka-topics.sh --create --topic test002 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test003 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test004 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test005 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
kafka-topics.sh --create --topic test006 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
kafka-topics.sh --create --topic test007 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000

查看分区情况

2)查看分区大小

显示所有的topic详情

kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092

只显示test001信息

kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092 --topic-list test001
数据格式化:
{
"version": 1,
"brokers": [
{
"broker": 2,
"logDirs": [
{
"logDir": "/bitnami/kafka/data",
"error": null,
"partitions": [
{
"partition": "test001-0",
"size": 380,
"offsetLag": 0,
"isFuture": false
},
"partition": "test001-2",
"size": 198,
"partition": "test001-1",
"size": 190,
}
]
}
]
},
"broker": 1,
"broker": 0,
}
]
3)编写 move-json-file.json,生成执行计划
move-json-file.json 这个文件就是告知想对哪些Topic进行重新分配的计算。
【示例一】分区迁移
"topics": [{
"topic": "test002"
}],
"version": 1

查看分区

kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test002

查看分区大小

kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092 --topic-list test002
开始执行

当前topic在,0节点,迁移到1节点

kafka-reassign-partitions.sh --bootstrap-server kafka.kafka:9092 --topics-to-move-json-file /tmp/move-json-file.json --broker-list "1" --generate

输出信息:生成了两条信息,第一条为现在的分配情况,第二条为计划更改的内容

当前:Current partition replica assignment

{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[0],"log_dirs":["any"]}]}

迁移:Proposed partition reassignment configuration

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}
把计划修改的结果复制,放在第二个json文件中,这里取名为reassignment-json-file.json
【注意】现在还没真正迁移,只是输出迁移信息。可以执行查看就知道了。
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test002
【温馨提示】--broker-list "1":扩容后的所有机器的broker.id。
4)开始迁移
运行kafka-reassign-partition.sh命令根据上述执行计划生成的结果进行分配,命令如下:
echo '{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}' >/tmp/reassignment-json-file.json
kafka-reassign-partitions.sh --bootstrap-server kafka.kafka:9092 -reassignment-json-file /tmp/reassignment-json-file.json -execute
【示例二】磁盘间、不同路径分区迁移
"partitions": [{
"topic": "test01",
"partition": 2,
"replicas": [0],
"log_dirs": ["/data1"]
}, {
"partition": 1,
"log_dirs": ["/data2"]
}]
version:固定值 1
开始执行迁移
kafka-reassign-partitions.sh --zookeeper --bootstrap-server kafka.kafka:9092 --reassignment-json-file config/move-json-file.json --execute --bootstrap-server
kafka.kafka:9092 --execute --replica-alter-log-dirs-throttle 10000 --throttle 50000000
参数讲解:
--replica-alter-log-dirs-throttle:需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上 --replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流。
--throttle 50000000:那么执行移动分区的时候,会被限制流量在50000000 B/s 。
kafka 磁盘扩容与数据均衡实在操作讲解就先到这里了,有任何疑问也可关注我公众号:大数据与云原生技术分享,进行技术交流,如本篇文章对您有所帮助,麻烦帮忙一键三连(点赞、转发、收藏)~

相关文章
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1
|
2月前
|
消息中间件 存储 缓存
大数据-71 Kafka 高级特性 物理存储 磁盘存储特性 如零拷贝、页缓存、mmp、sendfile
大数据-71 Kafka 高级特性 物理存储 磁盘存储特性 如零拷贝、页缓存、mmp、sendfile
78 3
|
2月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
60 3
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
79 2
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
62 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
59 0
|
4月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
355 9
|
4月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
109 0
|
4月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
171 0
|
4月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。