flink 读取kafka 写入带kerberos认证的hive环境

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 读取kafka 写入带kerberos认证的hive环境

1:pom.xml引依赖引入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>demo</artifactId>
        <groupId>com.zyf</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>KafkaFlink1.12.2HiveSink</artifactId>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hive.version>2.1.1-cdh6.2.0</hive.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>kafkaFlinkHive</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2:ConfigUtils类文件

import java.io.FileInputStream;
import java.util.Properties;
public class ConfigUtils {
    private Properties producerProp = new Properties();
    private Properties consumerProp = new Properties();
    private Properties commonProp = new Properties();
    public ConfigUtils(String KafkaConfigPath) {
        try {
            Properties prop = new Properties();
            FileInputStream in = new FileInputStream(KafkaConfigPath);
            prop.load(in);
            in.close();
            String keyPrefix;
            String keyValue;
            for (String key : prop.stringPropertyNames()) {
                keyPrefix = key.trim().split("\\.")[0];
                keyValue = key.trim().substring(key.trim().indexOf(".") + 1);
                switch (keyPrefix.toLowerCase()) {
                    case "producer":
                        this.producerProp.put(keyValue, prop.getProperty(key));
                        break;
                    case "consumer":
                        this.consumerProp.put(keyValue, prop.getProperty(key));
                        break;
                    default:
                        this.commonProp.put(key, prop.getProperty(key));
                        break;
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public Properties getProducerProp() {
        return producerProp;
    } 
    public void setProducerProp(Properties producerProp) {
        this.producerProp = producerProp;
    }
    public Properties getConsumerProp() {
        return consumerProp;
    }
    public void setConsumerProp(Properties consumerProp) {
        this.consumerProp = consumerProp;
    }
    public Properties getCommonProp() {
        return commonProp;
    }
    public void setCommonProp(Properties commonProp) {
        this.commonProp = commonProp;
    }
    @Override
    public String toString() {
        return "ConfigUtils{" +
                "producerProp=" + producerProp +
                ", consumerProp=" + consumerProp +
                ", commonProp=" + commonProp +
                '}';
    }
    public static void main(String[] args) {
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").producerProp);
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").consumerProp);
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").commonProp);
    }
}

3:kafkaFlinkHive类文件

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class kafkaFlinkHive {
    static Logger logger = LoggerFactory.getLogger(kafkaFlinkHive.class);
    public static void main(String[] args) throws Exception {
        final MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);
        if (!multipleParameterTool.has("path")) {
            System.out.println("Error: not exist --path /opt/your.properties");
            System.out.println("Usage: flink run -m yarn-cluster -d /opt/your.jar --path /opt/your.properties");
            System.exit(0);
        }
        ConfigUtils configUtils = new ConfigUtils(multipleParameterTool.get("path"));
        logger.info(configUtils.toString());
        Properties commonProp = configUtils.getCommonProp();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setUseSnapshotCompression(true);
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//default mode
        Properties consumerProp = configUtils.getConsumerProp();
        if (commonProp.containsKey("source.kafka.security.enable") && commonProp.getProperty("source.kafka.security.enable")
                .equalsIgnoreCase("true")) {
            consumerProp.setProperty("security.protocol", "SASL_PLAINTEXT");
            consumerProp.setProperty("sasl.mechanism", "GSSAPI");
            consumerProp.setProperty("sasl.kerberos.service.name", "kafka");
        }
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(commonProp.getProperty("source.kafka.topic"),
                new SimpleStringSchema(), consumerProp);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        if (commonProp.containsKey("source.kafka.offset.reset") && commonProp.getProperty("source.kafka.offset.reset")
                .equalsIgnoreCase("latest")) {
            kafkaConsumer.setStartFromLatest();
        }
        DataStream<String> sourceDataStream = env.addSource(kafkaConsumer).uid("source_kafka")
                .setParallelism(Integer.valueOf(commonProp.getProperty("source.kafka.parallelism")));
        DataStream<String> convertData = sourceDataStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) {
                return value;
            }
        }).uid("convert_data").setParallelism(Integer.valueOf(commonProp.getProperty("convert.format.parallelism")));
        final StreamingFileSink<String> hdfsSink = StreamingFileSink
                .forRowFormat(new Path("hdfs:///tmp"), (Encoder<String>) (element, stream) -> {
                    PrintStream out = new PrintStream(stream);
                    String[] strArr = element.split(",");
                    if (strArr.length == 2 && isInteger(strArr[0])) {
                        out.println(element);
                    } else {
                        logger.error("data format must be : int,string");
                    }
                    if (commonProp.containsKey("sink.hive.security.enable") && commonProp.getProperty("sink.hive.security.enable")
                            .equalsIgnoreCase("true")) {
                        Configuration conf = new Configuration();
                        conf.set("hadoop.security.authentication", "Kerberos");
                        UserGroupInformation.setConfiguration(conf);
                        try {
                            UserGroupInformation.loginUserFromKeytab(commonProp.getProperty("sink.hive.keytab.principal"),
                                    commonProp.getProperty("sink.hive.keytab.path"));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).withRollingPolicy(DefaultRollingPolicy.create()
                        .withRolloverInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.rollover.interval.sec"))))
                        .withInactivityInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.inactivity.interval.sec"))))
                        .withMaxPartSize(1024L * 1024L * 256L).build())
                .build();
        convertData.addSink(hdfsSink).uid("sink_hdfs").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism")));
        convertData.addSink(new SinkToHive<>(commonProp)).uid("sink_hive").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism")));
        env.execute("Kafka2Flink2Hive kerberos Example");
    }
    public static boolean isInteger(String str) {
        Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
        return pattern.matcher(str).matches();
    }
}

4:SinkToHive类文件

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hive.service.cli.HiveSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;
public class SinkToHive<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private Logger logger = LoggerFactory.getLogger(SinkToHive.class);
    private String url;
    private String tableName;
    private Connection conn = null;
    private PreparedStatement preparedStatement = null;
    SinkToHive(Properties properties) {
        this.url = properties.getProperty("sink.hive.jdbc.url");
        this.tableName = properties.getProperty("sink.hive.table.name");
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        conn = DriverManager.getConnection(url);
        logger.info("================open");
    }
    @Override
    public void close() throws Exception {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (conn != null) {
                conn.close();
            }
        } catch (Exception ex) {
            logger.error(ex.getMessage());
        }
    }
    @Override
    public void invoke(IN value, Context context) throws Exception {
        try {
            preparedStatement = conn.prepareStatement("load data inpath '/tmp/2020*/part-*' into table " + tableName);
            preparedStatement.execute();
        } catch (HiveSQLException ex) {
            logger.debug(ex.getMessage());
        }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    }
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    }
}

5:FlinkJob_Kafka2Hive.properties配置文件

#Kafka其他配置
source.kafka.topic=kafka_hive
source.kafka.offset.reset=latest
source.kafka.parallelism=1
source.kafka.security.enable=false
###################################### hive配置###############################
convert.format.parallelism=1
sink.hive.table.name=h3c_table
sink.hdfs.parallelism=1
sink.hive.jdbc.url=jdbc:hive2://zyf53.hde.com:2181,zyf55.hde.com:2181,zyf54.hde.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
hdfs.rollover.interval.sec=1
hdfs.inactivity.interval.sec=1
sink.hive.security.enable=false
sink.hive.keytab.path=/opt/useradmin.keytab
sink.hive.keytab.principal=useradmin

相关文章
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
151 0
|
1月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
43 4
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
141 0
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
41 0
|
消息中间件 Kafka 流计算
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
835 7
阿里云实时计算Flink在多行业的应用和实践
|
23天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
797 17
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
19天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。
下一篇
无影云桌面