首先,在没有flask的时候,我写了kafka的程序,一个生产者,一个消费者。
生产者进程,它负责读取本地的一张图片,把图片的二进制数据以消息的形式发送给kafka,代码如下:
from kafka import KafkaProducer
# get the binary data of a picture
f=open('/home/seven/Pictures/fff.png','rb')
data=f.read()
f.close()
# create a producer
producer=KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer=str.encode)
# send th binary data to kafka
producer.send('img_msg',key="Hello,Assassin424214141",value=data)
消费者进程,它负责接收消息,并用消息内容还原出一张图片存储在磁盘上,代码如下:
from kafka import KafkaConsumer
# create a consumer
consumer = KafkaConsumer('img_msg',bootstrap_servers=['localhost:9092'])
# receive messages
for message in consumer:
# print the message
print ("hello %s:%d:%d: key=%s"%(message.topic, message.partition,message.offset, message.key))
# use the message's data to create a picture
img_data=message.value
outfile=open("test.png","wb")
outfile.write(img_data)
outfile.close()
代码比较简陋,是个测试用的原型,并且是可以跑的。
然而,我把生产者的代码放到flask(python-flask是一个web框架)中后。flask程序接收客户端的请求(客户端会上传一张图片),确实是接收到图片了,而且消息发送给kafka的时候也没有报错。但是怪异的是kafka的消费者那边却始终没有动静...
然后我做了一个极端的测试,服务端(即flask)把接收到的客户端上传的图片先写到磁盘上,再从磁盘上读取这张图片(就和一开始的测试原型是一样的逻辑,而且验证了图片是成功写到磁盘上的,即图片接收这一环节是没有问题的),然而这种情况下kafka的消费者却还是接收不到消息,如之奈何?
有遇到同样情况的,请不吝赐教~
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
好吧,我发现了更加灵异的问题,我把kafka的生产者放到GET请求中,消费者那边是可以收到的。
然而在POST请求中调用kafka的producer,消费者那边就收不到。
感觉自己被flask和kafka玩弄了一样...
又搞了一上午,问题解决了,详细是这样的:
1. 要在kafka的生产者发送消息后,sleep一会(一般10毫秒就够了),但是这样还不行,准确来说。当flask处理POST请求,同时接收来自客户端的图片数据和非图片数据,kafka消费者就还是收不到消息。必须只能接收图片数据,此时才行得通。我也不知道为什么,感觉好神奇。
2. 好了,总结一下。 目前的解决方案是, 第一件事是要保证flask中不要同时接收图片和非图片数据 , 第二件事是在kafka-producer发送消息后,sleep十几毫秒。两件事都要做,才能让kafka-consumer接收到消息 。虽然sleep不是一个好办法,可以说又是迂行恶首,但目前也只能如此了。彻底解决flask中向kafka写入消息,结果消费者收不到消息的问题
假设客户端一次性发送了:一个文件(键名为targetfile),两个数字(键名为x和y)
flask服务端的解决方案是这样的:
获取图片的二进制内容:
flask.request.files['targetfile'].read()
获取两个数字参数:
post_data=dict(flask.request.form)
x=post_data['x'][0]
y=post_data['y'][0]
//此处不能用flask.request.form['x']
//也不能够用flask.request.form.get('x')
//也不能够用flask.request.values.get('x')
//否则,kafka的消费者那端就会收不到消息
//很诡异,但是目前的这个方案能够解决问题