Aapche flink:PythonAPI实现流式计算无法引入三方模块-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Aapche flink:PythonAPI实现流式计算无法引入三方模块

2019-01-29 16:15:22 6927 2

想利用flink提供的PythonAPI实现流式计算,碰到下面无法导入三方包的问题.
通过flink安装包提供的shell命令执行Python脚本无法引用三方模块 ,但通过Python命令进入可以引入kafka模块
./bin/pyflink-stream.sh examples/python/streaming/word_count_bamboo.py
_1

image

脚本详情:

from __future__ import absolute_import
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds

from kafka import KafkaProducer
from kafka import KafkaConsumer 
from kafka.structs import TopicPartition
from kafka.errors import KafkaError

class Kafka_Consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic,groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkaTopic = kafkatopic
        self.groupId = groupid
        self.consumer = kafkaConsumer(self.kafkaTopic,
                                      group_id = self.groupId,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                                              kafka_host=self.kafkaHost,
                                              kafka_port=self.kafkaPort))


class Tokenizer(FlatMapFunction):
    def flatMap(self, value, collector):
        for word in value.lower().split():
            collector.collect((1, word))


class Selector(KeySelector):
    def getKey(self, input):
        return input[1]


class Sum(ReduceFunction):
    def reduce(self, input1, input2):
        count1, word1 = input1
        count2, word2 = input2
        return (count1 + count2, word1)

def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Kafka_Consumer("172.19.78.11", 9092, "demo", 'test-python')) \
        .flat_map(Tokenizer()) \
        .key_by(Selector()) \
        .time_window(milliseconds(50)) \
        .reduce(Sum()) \
        .output()
    env.execute()
取消 提交回答
全部回答(2)
添加回答
相关问答

3

回答

Apache Flink:Python流API中的Kafka连接器,“无法加载用户类”

2018-12-06 18:02:09 10363浏览量 回答数 3

0

回答

想问下geojson的标注要叠加到tif格式的图上,一般用什么来实现?

2019-09-20 16:24:09 243浏览量 回答数 0

3

回答

挂自己搭的阿里云centos的vpn,浏览某个网页出现Apache 2 Test Page powered by CentOS

2017-01-09 13:50:25 5766浏览量 回答数 3

0

回答

DYNAMIC ON-THE-FLY MODIFICATIONS OF SPARK APPLICATIONS

2018-07-29 00:34:26 733浏览量 回答数 0

1

回答

Windows10+Apache2.4+php5.6安装服务启动失败403.10 禁止访问:配置无效

2020-05-26 12:47:10 389浏览量 回答数 1

1

回答

flink 消费kafka 出现消息积压,但是就一阵一阵的 #Flink

2020-06-30 19:02:52 2364浏览量 回答数 1

1

回答

Apache Flink是批计算还是流计算?

2021-11-02 18:24:28 307浏览量 回答数 1

1

回答

Apache Flink中流处理可弹性伸缩的应用是什么呢?

2021-12-07 21:08:00 147浏览量 回答数 1

1

回答

Apache Flinkd中启动一个Flink集群向客户端提交作业执行的过程是什么呢?

2021-12-08 20:28:29 381浏览量 回答数 1

1

回答

这个报错是什么意思? class org.apache.hc.core5.http2.H2Strea

2022-09-15 16:11:56 100浏览量 回答数 1
+关注
文章
问答
问答排行榜
最热
最新
相关电子书
更多
Alink:基于Apache Flink的算法平台
立即下载
《Apache Flink-重新定义计算》PDF下载
立即下载
基于streaming构建统一的数据处理引擎的挑战与实践
立即下载