开发者社区> 问答> 正文

在将Flink数据集写入hdfs时如何创建Job对象

"我想写一些数据集给hive。我试过hive jdbc,但它不支持batchExecute。所以我改为将其写入hdfs,然后生成hive表。

我尝试使用以下代码来编写hdfs:

package test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.fs.Path;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.io.IntWritable;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.io.Text;
import org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.flink.util.Collector;
import org.apache.hadoop.mapreduce.Job;

public class Test {

public static void main(String[] args) {

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSet<String> text = env.fromElements(
            ""Who's there?"",
            ""I think I hear them. Stand, ho! Who's there?"");

        DataSet<Tuple2<String, Integer>> hadoopResult = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);
        //job and jobConf is null,I do not know how to initialize them (new)
    Job job = null;
    Job jobConf = null;
    HadoopOutputFormat<String, Integer> hadoopOF =
              new HadoopOutputFormat<String, Integer>(
                new TextOutputFormat<String, Integer>(), job
              );
            hadoopOF.getConfiguration().set(""mapreduce.output.textoutputformat.separator"", "" "");
            TextOutputFormat.setOutputPath(jobConf, new Path(""hdfs://somewhere/""));
            hadoopResult.output(hadoopOF);
}

 public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 3100297611484689639L;
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split("" "")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
但是有很多编译错误。所有代码都从官方网站复制并拼接这些代码。

我的问题:如何创建Job和Jobconf对象,然后将数据集写入hdfs?"

展开
收起
flink小助手 2018-11-28 16:12:36 3969 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "

    创建工作:

    Job job = Job.getInstance();
    我认为您不需要Jobconf对象 - 似乎你可以在两个地方使用Job对象。
    "

    2019-07-17 23:16:49
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
海量数据分布式存储——Apache HDFS之最新进展 立即下载