RabbitMq连接Java与Python

简介: RabbitMq连接Java与Python
最近用Python写了一个爬虫项目,为了方便,用Java做了一个控制端,然后用RabbitMq将他们串起来

首先Java端的代码,生产者与消费者都采用的单例模式,其中消费者在tomcat启动时自动进行消费。话不多说,上代码

//消费者
public class ScrapyRabbitCon{
    //队列名
    private final static String QUEUE_NAME = "pythonjava";
    private static ScrapyRabbitCon rabbitmq;

    public static ScrapyRabbitCon getRabbit() {
        if(rabbitmq==null){
            try {
                rabbitmq = new ScrapyRabbitCon();
            } catch (IOException | TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return rabbitmq;
    }
    private ScrapyRabbitCon() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
//        factory.setConnectionTimeout(2);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                //此处采用Swing弹窗显示接收到的消息
                JOptionPane.showMessageDialog(null, message, "ERROR", JOptionPane.ERROR_MESSAGE);
                System.out.println(message);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, consumer);
    }

    //生产者
    public class ScrapyRabbitPro {
    //队列名
    private final static String QUEUE_NAME = "javapython";
    private Channel channel;
    private static ScrapyRabbitPro sendRabbit;
    public static ScrapyRabbitPro getSendRabbit(){
        if(sendRabbit==null){
            try {
                sendRabbit = new ScrapyRabbitPro();
            } catch (IOException | TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return sendRabbit;
    }
    private ScrapyRabbitPro() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
//        factory.setConnectionTimeout(2);
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    }
    public void send(JSONObject message){
        try {
            channel.basicPublish("", QUEUE_NAME, null, message.toString().getBytes("utf-8"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Producer Send +'" + message + "'");
    }

以上是Java实现RabbitMq的代码,其中生产者封装了一个send方法,调用send方法可将对应的json格式消息发送,由Python端的消费者进行消费。

def callback(ch, method, properties, body):  # 定义一个回调函数,用来接收生产者发送的消息
    global TASKINFO, TASKSTATUS
    body = body.decode('utf-8')
    js = json.loads(body)
    taskid = js.get("taskid")
    TASKINFO = get_taskinfo(taskid)
    TASKSTATUS = get_taskstatus(taskid)
    mq = get_or_save_mq("pythonjava")
    if js.get("method") == 'start':
        writeconf(taskid)
        t1 = threading.Thread(target=go, args=(mq,))
        t1.start()
    if js.get("method") == 'stop':
        t2 = threading.Thread(target=ki, args=(mq,))
        t2.start()


credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='javapython')

channel.basic_consume(callback,
                      queue='javapython',
                      no_ack=True)
print('[消费者] waiting for msg .')
channel.start_consuming()  # 开始循环取消息

以上是Python消费者的代码,目前消费者代码是以脚本形式完成的,作为整个爬虫的入口,消费者代码监听来自Java控制端的命令来控制整个爬虫的运行。

def get_or_save_mq(queue_name):
    mq = MQ_DICT.get(queue_name)
    if mq:
        return mq
    else:
        mq = InitMq(queue_name)
        MQ_DICT[queue_name] = mq
        return mq


class InitMq:
    def __init__(self, uuid):
        queue = uuid
        print("***********初始化MQ驱动*************")
        credentials = pika.PlainCredentials('guest', 'guest')
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
        self.channel = connection.channel()
        self.channel.queue_declare(queue=queue)
        self.routing_key = queue

    def send_data(self, body):
        self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=body.encode('utf-8'))

以上是Python中生产者代码,此生产者将爬虫端产生的错误信息与提示信息发到Java控制端。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
24天前
|
存储 Java 关系型数据库
高效连接之道:Java连接池原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。频繁创建和关闭连接会消耗大量资源,导致性能瓶颈。为此,Java连接池技术通过复用连接,实现高效、稳定的数据库连接管理。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接池的基本操作、配置和使用方法,以及在电商应用中的具体应用示例。
44 5
|
1月前
|
数据采集 缓存 Java
Python vs Java:爬虫任务中的效率比较
Python vs Java:爬虫任务中的效率比较
|
26天前
|
关系型数据库 MySQL 数据库连接
python脚本:连接数据库,检查直播流是否可用
【10月更文挑战第13天】本脚本使用 `mysql-connector-python` 连接MySQL数据库,检查 `live_streams` 表中每个直播流URL的可用性。通过 `requests` 库发送HTTP请求,输出每个URL的检查结果。需安装 `mysql-connector-python` 和 `requests` 库,并配置数据库连接参数。
124 68
|
2天前
|
人工智能 安全 Java
Java和Python在企业中的应用情况
Java和Python在企业中的应用情况
20 7
|
21天前
|
SQL Java 数据库连接
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率
在Java应用中,数据库访问常成为性能瓶颈。连接池技术通过预建立并复用数据库连接,有效减少连接开销,提升访问效率。本文介绍了连接池的工作原理、优势及实现方法,并提供了HikariCP的示例代码。
36 3
|
21天前
|
Java 数据库连接 数据库
深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能
在Java应用开发中,数据库操作常成为性能瓶颈。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能。文章介绍了连接池的优势、选择和使用方法,以及优化配置的技巧。
22 1
|
21天前
|
Java 数据库连接 数据库
Java连接池在数据库性能优化中的重要作用。连接池通过预先创建和管理数据库连接,避免了频繁创建和关闭连接的开销
本文深入探讨了Java连接池在数据库性能优化中的重要作用。连接池通过预先创建和管理数据库连接,避免了频繁创建和关闭连接的开销,显著提升了系统的响应速度和吞吐量。文章介绍了连接池的工作原理,并以HikariCP为例,展示了如何在Java应用中使用连接池。通过合理配置和优化,连接池技术能够有效提升应用性能。
34 1
|
27天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
1月前
|
SQL 存储 Java
Java中使用ClickHouseDriver连接和基本操作
通过上述步骤,你可以轻松地在Java应用中集成ClickHouse数据库,执行基本的CRUD操作。需要注意的是,实际开发中应当根据实际情况调整数据库连接配置(如URL中的主机、端口、数据库名等),并根据应用需求选择合适的异常处理策略,确保代码的健壮性和资源的有效管理。此外,对于复杂查询和大批量数据处理,建议充分利用ClickHouse的特性(如分布式处理、列式存储优化等),以进一步提升性能。
100 2
|
1月前
|
IDE 网络安全 开发工具
IDE之pycharm:专业版本连接远程服务器代码,并配置远程python环境解释器(亲测OK)。
本文介绍了如何在PyCharm专业版中连接远程服务器并配置远程Python环境解释器,以便在服务器上运行代码。
320 0
IDE之pycharm:专业版本连接远程服务器代码,并配置远程python环境解释器(亲测OK)。