最近用Python写了一个爬虫项目,为了方便,用Java做了一个控制端,然后用RabbitMq将他们串起来
首先Java端的代码,生产者与消费者都采用的单例模式,其中消费者在tomcat启动时自动进行消费。话不多说,上代码
//消费者
public class ScrapyRabbitCon{
//队列名
private final static String QUEUE_NAME = "pythonjava";
private static ScrapyRabbitCon rabbitmq;
public static ScrapyRabbitCon getRabbit() {
if(rabbitmq==null){
try {
rabbitmq = new ScrapyRabbitCon();
} catch (IOException | TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return rabbitmq;
}
private ScrapyRabbitCon() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
// factory.setConnectionTimeout(2);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
//此处采用Swing弹窗显示接收到的消息
JOptionPane.showMessageDialog(null, message, "ERROR", JOptionPane.ERROR_MESSAGE);
System.out.println(message);
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
}
//生产者
public class ScrapyRabbitPro {
//队列名
private final static String QUEUE_NAME = "javapython";
private Channel channel;
private static ScrapyRabbitPro sendRabbit;
public static ScrapyRabbitPro getSendRabbit(){
if(sendRabbit==null){
try {
sendRabbit = new ScrapyRabbitPro();
} catch (IOException | TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return sendRabbit;
}
private ScrapyRabbitPro() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
// factory.setConnectionTimeout(2);
Connection connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
}
public void send(JSONObject message){
try {
channel.basicPublish("", QUEUE_NAME, null, message.toString().getBytes("utf-8"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Producer Send +'" + message + "'");
}
以上是Java实现RabbitMq的代码,其中生产者封装了一个send方法,调用send方法可将对应的json格式消息发送,由Python端的消费者进行消费。
def callback(ch, method, properties, body): # 定义一个回调函数,用来接收生产者发送的消息
global TASKINFO, TASKSTATUS
body = body.decode('utf-8')
js = json.loads(body)
taskid = js.get("taskid")
TASKINFO = get_taskinfo(taskid)
TASKSTATUS = get_taskstatus(taskid)
mq = get_or_save_mq("pythonjava")
if js.get("method") == 'start':
writeconf(taskid)
t1 = threading.Thread(target=go, args=(mq,))
t1.start()
if js.get("method") == 'stop':
t2 = threading.Thread(target=ki, args=(mq,))
t2.start()
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='javapython')
channel.basic_consume(callback,
queue='javapython',
no_ack=True)
print('[消费者] waiting for msg .')
channel.start_consuming() # 开始循环取消息
以上是Python消费者的代码,目前消费者代码是以脚本形式完成的,作为整个爬虫的入口,消费者代码监听来自Java控制端的命令来控制整个爬虫的运行。
def get_or_save_mq(queue_name):
mq = MQ_DICT.get(queue_name)
if mq:
return mq
else:
mq = InitMq(queue_name)
MQ_DICT[queue_name] = mq
return mq
class InitMq:
def __init__(self, uuid):
queue = uuid
print("***********初始化MQ驱动*************")
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
self.channel = connection.channel()
self.channel.queue_declare(queue=queue)
self.routing_key = queue
def send_data(self, body):
self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=body.encode('utf-8'))
以上是Python中生产者代码,此生产者将爬虫端产生的错误信息与提示信息发到Java控制端。