Flink 操作mapper、sink解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: Flink 操作mapper、sink解析

1.pom

 

org.apache.flink
    flink-streaming-java_2.11
    1.5.6

2.main

package com.jd.xq;
import com.jd.xq.mapper.LineMapper;
import com.jd.xq.sink.SinkTest;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.InputStream;
/**
 * @author
 * @Date 2019-08-16 15:45
 **/
public class StartJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setGlobalJobParameters(loadConfig("config.properties"));
        DataStream text = env.socketTextStream("localhost", 9090, "\n");
        DataStream stream = text.flatMap(new LineMapper());
        stream.addSink(new SinkTest());
        // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
        env.setParallelism(1);
        env.execute("WordCount from Kafka data");
    }
    public static ParameterTool loadConfig(String configFileName) throws Exception {
        try (InputStream is = StartJob.class.getClassLoader().getResourceAsStream(configFileName)) {
            return ParameterTool.fromPropertiesFile(is);
        }
    }
}

3.mapper

package com.jd.xq.mapper;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
 * @author duanxiaoqiu
 * @Date 2019-09-10 20:50
 **/
public class LineMapper extends RichFlatMapFunction {
    @Override
    public void open(Configuration parameters) {
        ExecutionConfig executionConfig = getRuntimeContext().getExecutionConfig();
        ParameterTool params = (ParameterTool) executionConfig.getGlobalJobParameters();
        System.out.println(params.getProperties().getProperty("xq.name"));
    }
    public void flatMap(String s, Collector collector) {
        System.out.println(s);
    }
}


4.sink

package com.jd.xq.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
 * @author duanxiaoqiu
 * @Date 2019-09-11 09:36
 **/
public class SinkTest extends RichSinkFunction {
    @Override
    public void open(Configuration parameters) throws Exception {
    }
    public void invoke(String value, Context context) throws Exception {
    }
}


5.配置文件  config.properties

xq.name=test

6.Flink

接数据、处理、写数据到其他数据库

open方法里面如果实例化类,最好使用单例的方式


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
78 3
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
153 0
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
147 0
|
4月前
|
SQL 关系型数据库 测试技术
实时数仓 Hologres操作报错合集之执行Flink的sink操作时出现报错,是什么原因
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
存储 SQL Java
实时数仓 Hologres产品使用合集之如何使用Flink的sink连接
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
5月前
|
SQL 安全 数据库
Ruby on Rails 数据库迁移操作深度解析
【7月更文挑战第19天】Rails 的数据库迁移功能是一个强大的工具,它帮助开发者以版本控制的方式管理数据库结构的变更。通过遵循最佳实践,并合理利用 Rails 提供的迁移命令和方法,我们可以更加高效、安全地管理数据库结构,确保应用的稳定性和可扩展性。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 流计算
实时计算 Flink版操作报错合集之怎么向一个未定义列的表中写入数据
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
资源调度 分布式计算 Hadoop
实时计算 Flink版操作报错合集之perjob提交给yarn,报错显示无法连接yarn- Connecting to ResourceManager,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

推荐镜像

更多