flink 读取kafka 写入带kerberos认证的hive环境

简介: flink 读取kafka 写入带kerberos认证的hive环境

1:pom.xml引依赖引入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>demo</artifactId>
        <groupId>com.zyf</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>KafkaFlink1.12.2HiveSink</artifactId>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hive.version>2.1.1-cdh6.2.0</hive.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>kafkaFlinkHive</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2:ConfigUtils类文件

import java.io.FileInputStream;
import java.util.Properties;
public class ConfigUtils {
    private Properties producerProp = new Properties();
    private Properties consumerProp = new Properties();
    private Properties commonProp = new Properties();
    public ConfigUtils(String KafkaConfigPath) {
        try {
            Properties prop = new Properties();
            FileInputStream in = new FileInputStream(KafkaConfigPath);
            prop.load(in);
            in.close();
            String keyPrefix;
            String keyValue;
            for (String key : prop.stringPropertyNames()) {
                keyPrefix = key.trim().split("\\.")[0];
                keyValue = key.trim().substring(key.trim().indexOf(".") + 1);
                switch (keyPrefix.toLowerCase()) {
                    case "producer":
                        this.producerProp.put(keyValue, prop.getProperty(key));
                        break;
                    case "consumer":
                        this.consumerProp.put(keyValue, prop.getProperty(key));
                        break;
                    default:
                        this.commonProp.put(key, prop.getProperty(key));
                        break;
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public Properties getProducerProp() {
        return producerProp;
    } 
    public void setProducerProp(Properties producerProp) {
        this.producerProp = producerProp;
    }
    public Properties getConsumerProp() {
        return consumerProp;
    }
    public void setConsumerProp(Properties consumerProp) {
        this.consumerProp = consumerProp;
    }
    public Properties getCommonProp() {
        return commonProp;
    }
    public void setCommonProp(Properties commonProp) {
        this.commonProp = commonProp;
    }
    @Override
    public String toString() {
        return "ConfigUtils{" +
                "producerProp=" + producerProp +
                ", consumerProp=" + consumerProp +
                ", commonProp=" + commonProp +
                '}';
    }
    public static void main(String[] args) {
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").producerProp);
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").consumerProp);
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").commonProp);
    }
}

3:kafkaFlinkHive类文件

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class kafkaFlinkHive {
    static Logger logger = LoggerFactory.getLogger(kafkaFlinkHive.class);
    public static void main(String[] args) throws Exception {
        final MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);
        if (!multipleParameterTool.has("path")) {
            System.out.println("Error: not exist --path /opt/your.properties");
            System.out.println("Usage: flink run -m yarn-cluster -d /opt/your.jar --path /opt/your.properties");
            System.exit(0);
        }
        ConfigUtils configUtils = new ConfigUtils(multipleParameterTool.get("path"));
        logger.info(configUtils.toString());
        Properties commonProp = configUtils.getCommonProp();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setUseSnapshotCompression(true);
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//default mode
        Properties consumerProp = configUtils.getConsumerProp();
        if (commonProp.containsKey("source.kafka.security.enable") && commonProp.getProperty("source.kafka.security.enable")
                .equalsIgnoreCase("true")) {
            consumerProp.setProperty("security.protocol", "SASL_PLAINTEXT");
            consumerProp.setProperty("sasl.mechanism", "GSSAPI");
            consumerProp.setProperty("sasl.kerberos.service.name", "kafka");
        }
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(commonProp.getProperty("source.kafka.topic"),
                new SimpleStringSchema(), consumerProp);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        if (commonProp.containsKey("source.kafka.offset.reset") && commonProp.getProperty("source.kafka.offset.reset")
                .equalsIgnoreCase("latest")) {
            kafkaConsumer.setStartFromLatest();
        }
        DataStream<String> sourceDataStream = env.addSource(kafkaConsumer).uid("source_kafka")
                .setParallelism(Integer.valueOf(commonProp.getProperty("source.kafka.parallelism")));
        DataStream<String> convertData = sourceDataStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) {
                return value;
            }
        }).uid("convert_data").setParallelism(Integer.valueOf(commonProp.getProperty("convert.format.parallelism")));
        final StreamingFileSink<String> hdfsSink = StreamingFileSink
                .forRowFormat(new Path("hdfs:///tmp"), (Encoder<String>) (element, stream) -> {
                    PrintStream out = new PrintStream(stream);
                    String[] strArr = element.split(",");
                    if (strArr.length == 2 && isInteger(strArr[0])) {
                        out.println(element);
                    } else {
                        logger.error("data format must be : int,string");
                    }
                    if (commonProp.containsKey("sink.hive.security.enable") && commonProp.getProperty("sink.hive.security.enable")
                            .equalsIgnoreCase("true")) {
                        Configuration conf = new Configuration();
                        conf.set("hadoop.security.authentication", "Kerberos");
                        UserGroupInformation.setConfiguration(conf);
                        try {
                            UserGroupInformation.loginUserFromKeytab(commonProp.getProperty("sink.hive.keytab.principal"),
                                    commonProp.getProperty("sink.hive.keytab.path"));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).withRollingPolicy(DefaultRollingPolicy.create()
                        .withRolloverInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.rollover.interval.sec"))))
                        .withInactivityInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.inactivity.interval.sec"))))
                        .withMaxPartSize(1024L * 1024L * 256L).build())
                .build();
        convertData.addSink(hdfsSink).uid("sink_hdfs").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism")));
        convertData.addSink(new SinkToHive<>(commonProp)).uid("sink_hive").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism")));
        env.execute("Kafka2Flink2Hive kerberos Example");
    }
    public static boolean isInteger(String str) {
        Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
        return pattern.matcher(str).matches();
    }
}

4:SinkToHive类文件

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hive.service.cli.HiveSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;
public class SinkToHive<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private Logger logger = LoggerFactory.getLogger(SinkToHive.class);
    private String url;
    private String tableName;
    private Connection conn = null;
    private PreparedStatement preparedStatement = null;
    SinkToHive(Properties properties) {
        this.url = properties.getProperty("sink.hive.jdbc.url");
        this.tableName = properties.getProperty("sink.hive.table.name");
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        conn = DriverManager.getConnection(url);
        logger.info("================open");
    }
    @Override
    public void close() throws Exception {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (conn != null) {
                conn.close();
            }
        } catch (Exception ex) {
            logger.error(ex.getMessage());
        }
    }
    @Override
    public void invoke(IN value, Context context) throws Exception {
        try {
            preparedStatement = conn.prepareStatement("load data inpath '/tmp/2020*/part-*' into table " + tableName);
            preparedStatement.execute();
        } catch (HiveSQLException ex) {
            logger.debug(ex.getMessage());
        }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    }
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    }
}

5:FlinkJob_Kafka2Hive.properties配置文件

#Kafka其他配置
source.kafka.topic=kafka_hive
source.kafka.offset.reset=latest
source.kafka.parallelism=1
source.kafka.security.enable=false
###################################### hive配置###############################
convert.format.parallelism=1
sink.hive.table.name=h3c_table
sink.hdfs.parallelism=1
sink.hive.jdbc.url=jdbc:hive2://zyf53.hde.com:2181,zyf55.hde.com:2181,zyf54.hde.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
hdfs.rollover.interval.sec=1
hdfs.inactivity.interval.sec=1
sink.hive.security.enable=false
sink.hive.keytab.path=/opt/useradmin.keytab
sink.hive.keytab.principal=useradmin

相关文章
|
4月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
788 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
333 0
|
8月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
281 11
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
632 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
11月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1161 0
|
12月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
951 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
消息中间件 Kafka 流计算
docker环境安装kafka/Flink/clickhouse镜像
通过上述步骤和示例,您可以系统地了解如何使用Docker Compose安装和配置Kafka、Flink和ClickHouse,并进行基本的验证操作。希望这些内容对您的学习和工作有所帮助。
1284 28
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3118 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
739 9