倪完_个人页

个人头像照片 倪完
0
6
0

个人介绍

暂无个人介绍

擅长的技术

  • Python
获得更多能力
通用技术能力:

暂时未有相关通用技术能力~

云产品技术能力:

暂时未有相关云产品技术能力~

阿里云技能认证

详细说明
暂无更多信息
暂无更多信息
正在加载, 请稍后...
暂无更多信息
  • 回答了问题 2019-07-17

    请教一下 flink stream API 代码里面怎么获取window 的起始时间 和结束时间?

    TimeWindow有getStart()和getEnd()方法,参见示例代码: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3)) .rebalance() .timeWindowAll(Time.of(windowSize, MILLISECONDS)) .apply(new RichAllWindowFunction, Tuple4, TimeWindow>() { private boolean open = false; @Override public void open(Configuration parameters) { assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); open = true; } @Override public void apply( TimeWindow window, Iterable> values, Collector> out) { // validate that the function has been opened properly assertTrue(open); int sum = 0; long key = -1; for (Tuple2 value : values) { sum += value.f1.value; key = value.f0; } out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); } }) .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); env.execute('Tumbling Window Test'); TimeWindow有getStart()和getEnd()方法。
    踩0 评论0
  • 回答了问题 2019-07-17

    Flink对事件流程的多段时间控制怎么实现(可能使用CEP)?

    我理解你的需求实际是要求window的size是可变的,如果用DataStream,可以通过自定义一个WindowAssigner实现(见方法DataStream.windowAll(WindowAssigner assigner)),基于time的自定义的WindowAssigner实现可参见类TumblingProcessingTimeWindows或TumblingEventTimeWindows。
    踩1 评论0
  • 回答了问题 2019-07-17

    [flink] 请求一个flink实时计算的代码例子

    如果使用DataStream API可参见flink的WordCount(org.apache.flink.streaming.examples.wordcount.WordCount)示例代码: public class WordCount { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // Checking input parameters final ParameterTool params = ParameterTool.fromArgs(args); // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataStream text; if (params.has('input')) { // read the text file from given input path text = env.readTextFile(params.get('input')); } else { System.out.println('Executing WordCount example with default input data set.'); System.out.println('Use --input to specify file input.'); // get default test text data text = env.fromElements(WordCountData.WORDS); } DataStream> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field '0' and sum up tuple field '1' .keyBy(0).sum(1); // emit result if (params.has('output')) { counts.writeAsText(params.get('output')); } else { System.out.println('Printing result to stdout. Use --output to specify output path.'); counts.print(); } // execute program env.execute('Streaming WordCount'); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* /** * Implements the string tokenizer that splits sentences into words as a * user-defined FlatMapFunction. The function takes a line (String) and * splits it into multiple pairs in the form of '(word,1)' ({@code Tuple2}). */ public static final class Tokenizer implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) { // normalize and split the line String[] tokens = value.toLowerCase().split('\\W+'); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
    踩1 评论0
  • 回答了问题 2019-07-17

    Flink on Yarn 出现 Container Removed 问题

    贴出来的异常是现象,不是root cause,需要更详情的日志才好判断。
    踩0 评论0
  • 回答了问题 2019-07-17

    随着我数据量变大 checkpoint时间在变长 这个问题可以这么解决啊

    用RocksDBStateBackend的话使用增量checkpoint,设置配置项 state.backend.incremental 为 true。
    踩0 评论0
  • 回答了问题 2019-07-17

    用flink run -s命令指定checkpoint点手动恢复测试发现报错

    从checkpoint的metadata file手动恢复state,路径需要指定到具体的某个checkpoint, 即 /path/{job-id}/chk-{i}。
    踩0 评论0
正在加载, 请稍后...
滑动查看更多
正在加载, 请稍后...
暂无更多信息