Aapche flink:PythonAPI实现流式计算无法引入三方模块-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Aapche flink:PythonAPI实现流式计算无法引入三方模块

想利用flink提供的PythonAPI实现流式计算,碰到下面无法导入三方包的问题.
通过flink安装包提供的shell命令执行Python脚本无法引用三方模块 ,但通过Python命令进入可以引入kafka模块
./bin/pyflink-stream.sh examples/python/streaming/word_count_bamboo.py
_1

image

脚本详情:

from __future__ import absolute_import
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds

from kafka import KafkaProducer
from kafka import KafkaConsumer 
from kafka.structs import TopicPartition
from kafka.errors import KafkaError

class Kafka_Consumer():
    def __init__(self, kafkahost, kafkaport, kafkatopic,groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkaTopic = kafkatopic
        self.groupId = groupid
        self.consumer = kafkaConsumer(self.kafkaTopic,
                                      group_id = self.groupId,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
                                              kafka_host=self.kafkaHost,
                                              kafka_port=self.kafkaPort))


class Tokenizer(FlatMapFunction):
    def flatMap(self, value, collector):
        for word in value.lower().split():
            collector.collect((1, word))


class Selector(KeySelector):
    def getKey(self, input):
        return input[1]


class Sum(ReduceFunction):
    def reduce(self, input1, input2):
        count1, word1 = input1
        count2, word2 = input2
        return (count1 + count2, word1)

def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Kafka_Consumer("172.19.78.11", 9092, "demo", 'test-python')) \
        .flat_map(Tokenizer()) \
        .key_by(Selector()) \
        .time_window(milliseconds(50)) \
        .reduce(Sum()) \
        .output()
    env.execute()

展开
收起
bamboo先生 2019-01-29 16:15:22 7014 0
2 条回答
写回答
取消 提交回答
  • 游客7g2tuxty5qe5c

    大哥 你还在搞pyflink嘛 还是从入门到放弃了

    2021-01-30 14:46:01
    赞同 展开评论 打赏
  • 1426694731350515

    路过

    2019-07-17 23:27:16
    赞同 展开评论 打赏
问答排行榜
最热
最新
相关电子书
更多
朱翥、贺小令|更快更稳更易用:Flink 自适应批处理能力演
立即下载
任庆盛|Flink CDC + Kafka 加速业务实时化
立即下载
李劲松|Flink Table Store 典型应用场景
立即下载