flink 读取kafka 写入带kerberos认证的hive环境

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink 读取kafka 写入带kerberos认证的hive环境

1:pom.xml引依赖引入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>demo</artifactId>
        <groupId>com.zyf</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>KafkaFlink1.12.2HiveSink</artifactId>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <hive.version>2.1.1-cdh6.2.0</hive.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>kafkaFlinkHive</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2:ConfigUtils类文件

import java.io.FileInputStream;
import java.util.Properties;
public class ConfigUtils {
    private Properties producerProp = new Properties();
    private Properties consumerProp = new Properties();
    private Properties commonProp = new Properties();
    public ConfigUtils(String KafkaConfigPath) {
        try {
            Properties prop = new Properties();
            FileInputStream in = new FileInputStream(KafkaConfigPath);
            prop.load(in);
            in.close();
            String keyPrefix;
            String keyValue;
            for (String key : prop.stringPropertyNames()) {
                keyPrefix = key.trim().split("\\.")[0];
                keyValue = key.trim().substring(key.trim().indexOf(".") + 1);
                switch (keyPrefix.toLowerCase()) {
                    case "producer":
                        this.producerProp.put(keyValue, prop.getProperty(key));
                        break;
                    case "consumer":
                        this.consumerProp.put(keyValue, prop.getProperty(key));
                        break;
                    default:
                        this.commonProp.put(key, prop.getProperty(key));
                        break;
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public Properties getProducerProp() {
        return producerProp;
    } 
    public void setProducerProp(Properties producerProp) {
        this.producerProp = producerProp;
    }
    public Properties getConsumerProp() {
        return consumerProp;
    }
    public void setConsumerProp(Properties consumerProp) {
        this.consumerProp = consumerProp;
    }
    public Properties getCommonProp() {
        return commonProp;
    }
    public void setCommonProp(Properties commonProp) {
        this.commonProp = commonProp;
    }
    @Override
    public String toString() {
        return "ConfigUtils{" +
                "producerProp=" + producerProp +
                ", consumerProp=" + consumerProp +
                ", commonProp=" + commonProp +
                '}';
    }
    public static void main(String[] args) {
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").producerProp);
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").consumerProp);
        System.out.println(new ConfigUtils("d://ConfigUtils.properties").commonProp);
    }
}

3:kafkaFlinkHive类文件

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class kafkaFlinkHive {
    static Logger logger = LoggerFactory.getLogger(kafkaFlinkHive.class);
    public static void main(String[] args) throws Exception {
        final MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args);
        if (!multipleParameterTool.has("path")) {
            System.out.println("Error: not exist --path /opt/your.properties");
            System.out.println("Usage: flink run -m yarn-cluster -d /opt/your.jar --path /opt/your.properties");
            System.exit(0);
        }
        ConfigUtils configUtils = new ConfigUtils(multipleParameterTool.get("path"));
        logger.info(configUtils.toString());
        Properties commonProp = configUtils.getCommonProp();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setUseSnapshotCompression(true);
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//default mode
        Properties consumerProp = configUtils.getConsumerProp();
        if (commonProp.containsKey("source.kafka.security.enable") && commonProp.getProperty("source.kafka.security.enable")
                .equalsIgnoreCase("true")) {
            consumerProp.setProperty("security.protocol", "SASL_PLAINTEXT");
            consumerProp.setProperty("sasl.mechanism", "GSSAPI");
            consumerProp.setProperty("sasl.kerberos.service.name", "kafka");
        }
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(commonProp.getProperty("source.kafka.topic"),
                new SimpleStringSchema(), consumerProp);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        if (commonProp.containsKey("source.kafka.offset.reset") && commonProp.getProperty("source.kafka.offset.reset")
                .equalsIgnoreCase("latest")) {
            kafkaConsumer.setStartFromLatest();
        }
        DataStream<String> sourceDataStream = env.addSource(kafkaConsumer).uid("source_kafka")
                .setParallelism(Integer.valueOf(commonProp.getProperty("source.kafka.parallelism")));
        DataStream<String> convertData = sourceDataStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) {
                return value;
            }
        }).uid("convert_data").setParallelism(Integer.valueOf(commonProp.getProperty("convert.format.parallelism")));
        final StreamingFileSink<String> hdfsSink = StreamingFileSink
                .forRowFormat(new Path("hdfs:///tmp"), (Encoder<String>) (element, stream) -> {
                    PrintStream out = new PrintStream(stream);
                    String[] strArr = element.split(",");
                    if (strArr.length == 2 && isInteger(strArr[0])) {
                        out.println(element);
                    } else {
                        logger.error("data format must be : int,string");
                    }
                    if (commonProp.containsKey("sink.hive.security.enable") && commonProp.getProperty("sink.hive.security.enable")
                            .equalsIgnoreCase("true")) {
                        Configuration conf = new Configuration();
                        conf.set("hadoop.security.authentication", "Kerberos");
                        UserGroupInformation.setConfiguration(conf);
                        try {
                            UserGroupInformation.loginUserFromKeytab(commonProp.getProperty("sink.hive.keytab.principal"),
                                    commonProp.getProperty("sink.hive.keytab.path"));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).withRollingPolicy(DefaultRollingPolicy.create()
                        .withRolloverInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.rollover.interval.sec"))))
                        .withInactivityInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.inactivity.interval.sec"))))
                        .withMaxPartSize(1024L * 1024L * 256L).build())
                .build();
        convertData.addSink(hdfsSink).uid("sink_hdfs").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism")));
        convertData.addSink(new SinkToHive<>(commonProp)).uid("sink_hive").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism")));
        env.execute("Kafka2Flink2Hive kerberos Example");
    }
    public static boolean isInteger(String str) {
        Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
        return pattern.matcher(str).matches();
    }
}

4:SinkToHive类文件

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hive.service.cli.HiveSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;
public class SinkToHive<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    private Logger logger = LoggerFactory.getLogger(SinkToHive.class);
    private String url;
    private String tableName;
    private Connection conn = null;
    private PreparedStatement preparedStatement = null;
    SinkToHive(Properties properties) {
        this.url = properties.getProperty("sink.hive.jdbc.url");
        this.tableName = properties.getProperty("sink.hive.table.name");
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        conn = DriverManager.getConnection(url);
        logger.info("================open");
    }
    @Override
    public void close() throws Exception {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (conn != null) {
                conn.close();
            }
        } catch (Exception ex) {
            logger.error(ex.getMessage());
        }
    }
    @Override
    public void invoke(IN value, Context context) throws Exception {
        try {
            preparedStatement = conn.prepareStatement("load data inpath '/tmp/2020*/part-*' into table " + tableName);
            preparedStatement.execute();
        } catch (HiveSQLException ex) {
            logger.debug(ex.getMessage());
        }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    }
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    }
}

5:FlinkJob_Kafka2Hive.properties配置文件

#Kafka其他配置
source.kafka.topic=kafka_hive
source.kafka.offset.reset=latest
source.kafka.parallelism=1
source.kafka.security.enable=false
###################################### hive配置###############################
convert.format.parallelism=1
sink.hive.table.name=h3c_table
sink.hdfs.parallelism=1
sink.hive.jdbc.url=jdbc:hive2://zyf53.hde.com:2181,zyf55.hde.com:2181,zyf54.hde.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
hdfs.rollover.interval.sec=1
hdfs.inactivity.interval.sec=1
sink.hive.security.enable=false
sink.hive.keytab.path=/opt/useradmin.keytab
sink.hive.keytab.principal=useradmin

相关文章
|
7月前
|
SQL 安全 Java
一篇文章彻底理解 HIVE 常见的三种 AUTHENTICATION 认证机制的配置与使用
一篇文章彻底理解 HIVE 常见的三种 AUTHENTICATION 认证机制的配置与使用
|
5月前
|
SQL 消息中间件 存储
案例:Flume消费Kafka数据保存Hive
案例:Flume消费Kafka数据保存Hive
74 0
|
5月前
|
消息中间件 分布式计算 资源调度
深度解读flink kerberos认证(含流程图及源码)
深度解读flink kerberos认证(含流程图及源码)
87 0
|
5月前
|
消息中间件 分布式计算 安全
flink kerberos认证源码剖析
flink kerberos认证源码剖析
23 0
|
5月前
|
分布式计算 安全 Java
深入理解Java GSS(含kerberos认证及在hadoop、flink案例场景举例)
深入理解Java GSS(含kerberos认证及在hadoop、flink案例场景举例)
58 0
|
7月前
|
SQL 分布式计算 资源调度
大数据问题排查系列-大数据集群开启 kerberos 认证后 HIVE 作业执行失败
大数据问题排查系列-大数据集群开启 kerberos 认证后 HIVE 作业执行失败
|
8月前
|
消息中间件 分布式计算 资源调度
|
8月前
|
消息中间件 分布式计算 安全
flink kerberos认证源码剖析
超详细讲解flink kerberos认证源码
78 0
|
10月前
|
SQL 分布式计算 关系型数据库
大数据平台搭建(容器环境)——hive3.X 安装部署
大数据平台搭建(容器环境)——hive3.X 安装部署
大数据平台搭建(容器环境)——hive3.X 安装部署
|
10月前
|
分布式计算 资源调度 安全
深入理解Java GSS(含kerberos认证及在hadoop、flink案例场景举例)
深入理解Java GSS(含kerberos认证及在hadoop、flink案例场景举例)
312 0