1:pom.xml引依赖引入
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>demo</artifactId> <groupId>com.zyf</groupId> <version>1.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>KafkaFlink1.12.2HiveSink</artifactId> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.12.2</flink.version> <scala.binary.version>2.11</scala.binary.version> <hive.version>2.1.1-cdh6.2.0</hive.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>kafkaFlinkHive</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
2:ConfigUtils类文件
import java.io.FileInputStream; import java.util.Properties; public class ConfigUtils { private Properties producerProp = new Properties(); private Properties consumerProp = new Properties(); private Properties commonProp = new Properties(); public ConfigUtils(String KafkaConfigPath) { try { Properties prop = new Properties(); FileInputStream in = new FileInputStream(KafkaConfigPath); prop.load(in); in.close(); String keyPrefix; String keyValue; for (String key : prop.stringPropertyNames()) { keyPrefix = key.trim().split("\\.")[0]; keyValue = key.trim().substring(key.trim().indexOf(".") + 1); switch (keyPrefix.toLowerCase()) { case "producer": this.producerProp.put(keyValue, prop.getProperty(key)); break; case "consumer": this.consumerProp.put(keyValue, prop.getProperty(key)); break; default: this.commonProp.put(key, prop.getProperty(key)); break; } } } catch (Exception ex) { ex.printStackTrace(); } } public Properties getProducerProp() { return producerProp; } public void setProducerProp(Properties producerProp) { this.producerProp = producerProp; } public Properties getConsumerProp() { return consumerProp; } public void setConsumerProp(Properties consumerProp) { this.consumerProp = consumerProp; } public Properties getCommonProp() { return commonProp; } public void setCommonProp(Properties commonProp) { this.commonProp = commonProp; } @Override public String toString() { return "ConfigUtils{" + "producerProp=" + producerProp + ", consumerProp=" + consumerProp + ", commonProp=" + commonProp + '}'; } public static void main(String[] args) { System.out.println(new ConfigUtils("d://ConfigUtils.properties").producerProp); System.out.println(new ConfigUtils("d://ConfigUtils.properties").consumerProp); System.out.println(new ConfigUtils("d://ConfigUtils.properties").commonProp); } }
3:kafkaFlinkHive类文件
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintStream; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; public class kafkaFlinkHive { static Logger logger = LoggerFactory.getLogger(kafkaFlinkHive.class); public static void main(String[] args) throws Exception { final MultipleParameterTool multipleParameterTool = MultipleParameterTool.fromArgs(args); if (!multipleParameterTool.has("path")) { System.out.println("Error: not exist --path /opt/your.properties"); System.out.println("Usage: flink run -m yarn-cluster -d /opt/your.jar --path /opt/your.properties"); System.exit(0); } ConfigUtils configUtils = new ConfigUtils(multipleParameterTool.get("path")); logger.info(configUtils.toString()); Properties commonProp = configUtils.getCommonProp(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setUseSnapshotCompression(true); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//default mode Properties consumerProp = configUtils.getConsumerProp(); if (commonProp.containsKey("source.kafka.security.enable") && commonProp.getProperty("source.kafka.security.enable") .equalsIgnoreCase("true")) { consumerProp.setProperty("security.protocol", "SASL_PLAINTEXT"); consumerProp.setProperty("sasl.mechanism", "GSSAPI"); consumerProp.setProperty("sasl.kerberos.service.name", "kafka"); } FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(commonProp.getProperty("source.kafka.topic"), new SimpleStringSchema(), consumerProp); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); if (commonProp.containsKey("source.kafka.offset.reset") && commonProp.getProperty("source.kafka.offset.reset") .equalsIgnoreCase("latest")) { kafkaConsumer.setStartFromLatest(); } DataStream<String> sourceDataStream = env.addSource(kafkaConsumer).uid("source_kafka") .setParallelism(Integer.valueOf(commonProp.getProperty("source.kafka.parallelism"))); DataStream<String> convertData = sourceDataStream.map(new MapFunction<String, String>() { @Override public String map(String value) { return value; } }).uid("convert_data").setParallelism(Integer.valueOf(commonProp.getProperty("convert.format.parallelism"))); final StreamingFileSink<String> hdfsSink = StreamingFileSink .forRowFormat(new Path("hdfs:///tmp"), (Encoder<String>) (element, stream) -> { PrintStream out = new PrintStream(stream); String[] strArr = element.split(","); if (strArr.length == 2 && isInteger(strArr[0])) { out.println(element); } else { logger.error("data format must be : int,string"); } if (commonProp.containsKey("sink.hive.security.enable") && commonProp.getProperty("sink.hive.security.enable") .equalsIgnoreCase("true")) { Configuration conf = new Configuration(); conf.set("hadoop.security.authentication", "Kerberos"); UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(commonProp.getProperty("sink.hive.keytab.principal"), commonProp.getProperty("sink.hive.keytab.path")); } catch (IOException e) { e.printStackTrace(); } } }).withRollingPolicy(DefaultRollingPolicy.create() .withRolloverInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.rollover.interval.sec")))) .withInactivityInterval(TimeUnit.SECONDS.toMillis(Integer.valueOf(commonProp.getProperty("hdfs.inactivity.interval.sec")))) .withMaxPartSize(1024L * 1024L * 256L).build()) .build(); convertData.addSink(hdfsSink).uid("sink_hdfs").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism"))); convertData.addSink(new SinkToHive<>(commonProp)).uid("sink_hive").setParallelism(Integer.valueOf(commonProp.getProperty("sink.hdfs.parallelism"))); env.execute("Kafka2Flink2Hive kerberos Example"); } public static boolean isInteger(String str) { Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$"); return pattern.matcher(str).matches(); } }
4:SinkToHive类文件
import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hive.service.cli.HiveSQLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.Properties; public class SinkToHive<IN> extends RichSinkFunction<IN> implements CheckpointedFunction { private Logger logger = LoggerFactory.getLogger(SinkToHive.class); private String url; private String tableName; private Connection conn = null; private PreparedStatement preparedStatement = null; SinkToHive(Properties properties) { this.url = properties.getProperty("sink.hive.jdbc.url"); this.tableName = properties.getProperty("sink.hive.table.name"); } @Override public void open(Configuration parameters) throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); conn = DriverManager.getConnection(url); logger.info("================open"); } @Override public void close() throws Exception { try { if (preparedStatement != null) { preparedStatement.close(); } if (conn != null) { conn.close(); } } catch (Exception ex) { logger.error(ex.getMessage()); } } @Override public void invoke(IN value, Context context) throws Exception { try { preparedStatement = conn.prepareStatement("load data inpath '/tmp/2020*/part-*' into table " + tableName); preparedStatement.execute(); } catch (HiveSQLException ex) { logger.debug(ex.getMessage()); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { } @Override public void initializeState(FunctionInitializationContext context) throws Exception { } }
5:FlinkJob_Kafka2Hive.properties配置文件
#Kafka其他配置 source.kafka.topic=kafka_hive source.kafka.offset.reset=latest source.kafka.parallelism=1 source.kafka.security.enable=false ###################################### hive配置############################### convert.format.parallelism=1 sink.hive.table.name=h3c_table sink.hdfs.parallelism=1 sink.hive.jdbc.url=jdbc:hive2://zyf53.hde.com:2181,zyf55.hde.com:2181,zyf54.hde.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 hdfs.rollover.interval.sec=1 hdfs.inactivity.interval.sec=1 sink.hive.security.enable=false sink.hive.keytab.path=/opt/useradmin.keytab sink.hive.keytab.principal=useradmin