想利用flink提供的PythonAPI实现流式计算,碰到下面无法导入三方包的问题.
通过flink安装包提供的shell命令执行Python脚本无法引用三方模块 ,但通过Python命令进入可以引入kafka模块
./bin/pyflink-stream.sh examples/python/streaming/word_count_bamboo.py
脚本详情:
from __future__ import absolute_import
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from kafka.errors import KafkaError
class Kafka_Consumer():
def __init__(self, kafkahost, kafkaport, kafkatopic,groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkaTopic = kafkatopic
self.groupId = groupid
self.consumer = kafkaConsumer(self.kafkaTopic,
group_id = self.groupId,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort))
class Tokenizer(FlatMapFunction):
def flatMap(self, value, collector):
for word in value.lower().split():
collector.collect((1, word))
class Selector(KeySelector):
def getKey(self, input):
return input[1]
class Sum(ReduceFunction):
def reduce(self, input1, input2):
count1, word1 = input1
count2, word2 = input2
return (count1 + count2, word1)
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Kafka_Consumer("172.19.78.11", 9092, "demo", 'test-python')) \
.flat_map(Tokenizer()) \
.key_by(Selector()) \
.time_window(milliseconds(50)) \
.reduce(Sum()) \
.output()
env.execute()
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。