ROS2通讯中间件安装与使用

本文涉及的产品
资源编排,不限时长
简介: 本文记录主要ROS2的安装、如何JAVA和PYTHON对ROS2消息的订阅和发布。

1    安装

参考官网安装步骤《UbuntuDebian — ROS 2 文档:银河文档

1.1  添加ROS2

sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg

 

echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null

1.2  更新源

sudo apt update

sudo apt upgrade

1.3  安装版本

ubuntu20.04版本,选择galactic

选择以下任意版本安装,若安装较慢,可将ubuntu源切换成国内源。

版本

安装命令

桌面安装(推荐):ROSRViz,演示,教程

sudo apt install ros-galactic-desktop

ROS 基本安装(裸骨):通信库、消息包、命令行工具。 没有图形用户界面工具。

sudo apt install ros-galactic-ros-base

开发工具:用于构建 ROS 包的编译器和其他工具

sudo apt install ros-dev-tools

1.4  将源添加到 shell 启动脚本中

echo "source /opt/ros/galactic/setup.bash" >> ~/.bashrc

1.5  检查环境变量

printenv | grep -i ROS

 

1.6  bridge-server

该服务可用于websocket服务通信

apt install ros-galactic-rosbridge-server

 

1.7  注意事项

1)     配置ros2源无法地址403时,

解决:手动配置域名映射185.199.108.133 raw.githubusercontent.com

2)     更换阿里云apt源,以20.04为例

mv /etc/apt/sources.list  /etc/apt/sources.list_bak

vi /etc/apt/sources.list

deb https://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse

deb-src https://mirrors.aliyun.com/ubuntu/ focal main restricted universe multiverse

 

deb https://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse

deb-src https://mirrors.aliyun.com/ubuntu/ focal-security main restricted universe multiverse

 

deb https://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse

deb-src https://mirrors.aliyun.com/ubuntu/ focal-updates main restricted universe multiverse

 

# deb https://mirrors.aliyun.com/ubuntu/ focal-proposed main restricted universe multiverse

# deb-src https://mirrors.aliyun.com/ubuntu/ focal-proposed main restricted universe multiverse

 

deb https://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse

deb-src https://mirrors.aliyun.com/ubuntu/ focal-backports main restricted universe multiverse

 

3)    

2    使用

2.1  编译工具colcon安装

# 安装

sudo apt-get install python3-colcon-common-extensions

# 编译

colcon build --symlink-install

# 编译生成build/install/log文件夹,执行setup.bash使生效

source install/setup.bash

2.2  使用JAVA发布/订阅ROS2

相当于使用websocket进行消息通信,ros2需要启动rosbridge服务,java去订阅这个服务。

2.2.1 启动rosbridge_server服务

ros2 launch rosbridge_server rosbridge_websocket_launch.xml

2.2.2 Java订阅

maven依赖

<dependency>

    <groupId>edu.wpi.rail</groupId>

    <artifactId>jrosbridge</artifactId>

    <version>0.2.0</version>

</dependency>

代码实现

import com.alibaba.fastjson.JSON;

import edu.wpi.rail.jrosbridge.Ros;

import edu.wpi.rail.jrosbridge.Topic;

import edu.wpi.rail.jrosbridge.handler.RosHandler;

import edu.wpi.rail.jrosbridge.messages.Message;

import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.web.bind.annotation.RestController;

import team.iOceanPlus.PB.Target.PBTarget;

import team.iOceanPlus.PB.Target.PBTarget.Builder;

 

import javax.annotation.PostConstruct;

import javax.websocket.Session;

import java.util.HashMap;

import java.util.Map;

 

@Slf4j

@Component

public class RosSubscribe implements RosHandler {

    //properties配置文件配置

 

    //ros服务器ip

//    @Value("${ros.ip}")

    private String ip = "192.168.136.138";

 

    //指定订阅的话题名称/类型

//    @Value("${ros.topic.gnss.name}")

    private String topicName = "our_topic";

//    @Value("${ros.topic.gnss.type}")

    private String msgType = "std_msgs/String";

 

    Ros ros = null;

    boolean connect = false;

    /**

     * ros服务器故障断开每隔五秒重连

     */

    @Scheduled(cron = "*/5 * * * * ?")

    void reconnect() {

        if (!connect) {

            log.error("未连接服务!正在重连ros服务器!");

            init();

        }

    }

 

    @PostConstruct

    void init() {

        try {

            ros = new Ros(ip);

            connect = ros.connect();

            if (connect) {

                Topic topic = new Topic(ros, topicName, msgType);

                Map<String,Object> msg = new HashMap<>();

                msg.put("data","测试  publish");

                Message message1 = new Message(JSON.toJSONString(msg), "std_msgs/String");

                topic.publish(message1);

                //开始订阅话题

                topic.subscribe(message -> {

                    try {

                        Map maps = (Map) JSON.parse(message.toString());   //message即该话题的广播消息

                        System.out.println(maps);

 

                    } catch (Exception e) {

                        log.error(e.toString());

                    }

                });

 

                ros.addRosHandler(this);    //监听器监听ros连接状态

            }

 

        } catch (Exception e) {

            log.error("ros服务器连接错误:" + e.toString());

            connect = false;

        }

    }

 

    @Override

    public void handleConnection(Session session) {

        log.info("ros已连接:" + session.getId());

    }

 

    @Override

    public void handleDisconnection(Session session) {

        log.error("断开ros连接:" + session.getId());

        connect = false;

    }

 

    @Override

    public void handleError(Session session, Throwable t) {

        log.error("ros连接失败:" + session.getId() + "-----失败原因:" + t.toString());

        connect = false;

    }

 

}

2.2.3 JAVA发布

import com.alibaba.fastjson.JSON;

import edu.wpi.rail.jrosbridge.Ros;

import edu.wpi.rail.jrosbridge.Topic;

import edu.wpi.rail.jrosbridge.handler.RosHandler;

import edu.wpi.rail.jrosbridge.messages.Message;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

import team.iOceanPlus.PB.Target.PBTarget;

import team.iOceanPlus.PB.Target.PBTarget.Builder;

 

import javax.annotation.PostConstruct;

import javax.websocket.Session;

import java.util.HashMap;

import java.util.Map;

 

@Slf4j

@Component

public class RosPublish implements RosHandler {

 

 

    //properties配置文件配置

 

    //ros服务器ip

//    @Value("${ros.ip}")

    private String ip = "192.168.136.138";

 

    //指定订阅的话题名称/类型

//    @Value("${ros.topic.gnss.name}")

    private String topicName = "our_topic";

//    @Value("${ros.topic.gnss.type}")

    private String msgType = "std_msgs/String";

 

    Ros ros = null;

    boolean connect = false;

 

    /**

     * ros服务器故障断开每隔五秒重连

     */

    @Scheduled(cron = "*/5 * * * * ?")

    void reconnect() {

        if (!connect) {

            log.error("未连接服务!正在重连ros服务器!");

            init();

        }

    }

 

    @PostConstruct

    void init() {

        try {

            ros = new Ros(ip);

            connect = ros.connect();

            if (connect) {

                Topic topic = new Topic(ros, topicName, msgType);

                // 测试发布

                Map<String,Object> msg = new HashMap<>();

                msg.put("data","测试  publish");

                Message message1 = new Message(JSON.toJSONString(msg), "std_msgs/String");

                topic.publish(message1);

                ros.addRosHandler(this);    //监听器监听ros连接状态

            }

 

        } catch (Exception e) {

            log.error("ros服务器连接错误:" + e.toString());

            connect = false;

        }

    }

 

    @Override

    public void handleConnection(Session session) {

        log.info("ros已连接:" + session.getId());

    }

 

    @Override

    public void handleDisconnection(Session session) {

        log.error("断开ros连接:" + session.getId());

        connect = false;

    }

 

    @Override

    public void handleError(Session session, Throwable t) {

        log.error("ros连接失败:" + session.getId() + "-----失败原因:" + t.toString());

        connect = false;

    }

 

}

消息使用protobuf编码

使用Base64protobuf的二进制数组进行字符串编码。

PBTarget pbTarget = PBTarget.newBuilder().build();

// JAVA

String encodeStr = Base64.getEncoder().encodeToString(pbTarget.toByteArray());

byte[] bytes Base64.getDecoder().decode(encodeStr);

PBTarget pbTarget = PBTarget.parseFrom(bytes);

// JS

atob(base64编码)

 

 

2.3  使用Python发布/订阅ROS2

以下发布和订阅只适合ROS服务端和python程序部署在一起。如果跨服务器,请使用websocket

2.3.1 发布

import rclpy

from rclpy.node import Node

from std_msgs.msg import String

 

#####这部分配置似乎可有可无#####

#创建工作空间:(在ros2_ws/src下执行)ros2 pkg create --build-type ament_python topic_rclpy --dependencies rclpy

#将文件放到工作空间下的topic_rclpy目录下的topic_rclpy文件夹下

#编译工作空间:colcon build

#运行:ros2 run topic_rclpy topic_publisher(无需执行)

###########

class DataPublisher(Node):

    def __init__(self, name, topic_this):

        super().__init__(name)

        self.topic_this = topic_this

        self.get_logger().info("%s节点启动!" % name)

        self.command_publisher_ = self.create_publisher(String, topic_this, 10)

 

    def publish_command(self, value):

        msg = String()

        msg.data = value

        self.command_publisher_.publish(msg)

        self.get_logger().info(f'{self.topic_this}发布了消息:{msg.data}')

 

 

 

def main(args=None):

    rclpy.init(args=args)

    processor = DataPublisher('pubulisher',"command")  # 创建数据处理节点

    rclpy.spin(processor)

    processor.destroy_node()

    rclpy.shutdown()

 

2.3.2 订阅

原理:定义一个监听类

import rclpy

from rclpy.node import Node

from std_msgs.msg import String

 

 

class LISTENER(Node):

    def __init__(self, name, topic):

        # 初始化

        super().__init__(name)

        # 创建发布节点

        self.subscriber = self.create_subscription(String, topic, self.receive_callback, 10)

        self.get_logger().info("LISTENER: I'm listener %s, I subscribed topic %s" % (name, topic))

        #

        self.n_receive = 0

 

    # 订阅器都是通过回调函数来实现对应功能,callback()内部完成对应实现

    def receive_callback(self, msg):

        self.n_receive += 1

        self.get_logger().info("%d I heard %s" % (self.n_receive, msg.data))

 

 

def main():

    # 初始化

    rclpy.init()

    # 创建订阅节点

    node = LISTENER(name="listener_node", topic='our_topic')

    # 死循环

    rclpy.spin(node)

    # 关闭

    rclpy.shutdown()

2.3.3 Ros-bridge-server

通过roslibpy库连接ros-bridge-server服务,可以实现跨服务器对ROS的消息发布与订阅。

import base64

from time import sleep

import roslibpy

from roslibpy import Ros

 

import Target_pb2

 

'''

这是一个通过使用roslibpy库连接ros-bridge-server进行消息发布和订阅操作的例子。

使用时请填写实际的hostport端口

'''

 

 

# 消息发布

class Publisher:

    client = None

 

    def __init__(self, host: str, port: int):

        self.client = roslibpy.Ros(host=host, port=port)

        self.client.run()

 

    def publish(self, topic: str, data: str):

        if not self.client.is_connected:

            raise "ros client is not connect."

        rosTopic = roslibpy.Topic(self.client, topic, 'std_msgs/String')

        msg = {

            "data": data

        }

        rosTopic.publish(msg)

 

    def close(self):

        self.client.close()

 

 

# 消息订阅

class Listener:

    client: Ros = None

 

    def __init__(self, host: str, port: int):

        self.client = Ros(host=host, port=port)

        self.client.run()

 

    def subscribe(self, topic: str, callback):

        listener = roslibpy.Topic(self.client, topic, 'std_msgs/String')

        # 回调

        listener.subscribe(callback)

 

    def close(self):

        self.client.close()

 

 

def callbackFunc(message):

    # print('Heard talking: ' + message['data'])

    body = base64.b64decode(message['data'])

    pbTarget = Target_pb2.PBTarget()

    pbTarget.ParseFromString(body)

    print(pbTarget)

 

 

if __name__ == '__main__':

    listener = Listener(host="localhost", port=9090)

    listener.subscribe("/OnLineTruthTargetTracks", callbackFunc)

 

    publisher = Publisher(host=" localhost", port=9090)

    pbTarget = Target_pb2.PBTarget()

    for num in range(100):

        pbTarget.enum_Sender_Software = num

        # 序列化并转为base64字符串

        data = base64.b64encode(pbTarget.SerializeToString()).decode("UTF-8")

        publisher.publish("/OnLineTruthTargetTracks", data)

        # 消息发送太快可能会丢失

        sleep(0.0001)

 

    try:

        while True:

            pass

    except KeyboardInterrupt:

        listener.client.terminate()

        publisher.client.terminate()

 

2.3.4 注意事项

2.3.4.1 导入rclpy依赖缺失

python项目(非服务方式)中,通常使用pip install rclpy 或者conda来安装管理依赖,由于在pipy源中找不到该依赖。所以,可从ros2的安装目录下找到对应的依赖文件,操作步骤如下:

1、确认ros版本,如:galactic

2、临时生效:export PYTHONPATH=/opt/ros/galactic/lib/python3.8/site-packages

3、永久生效:echo “export PYTHONPATH=/opt/ros/galactic/lib/python3.8/site-packages” >> ~/.bashrc && source ~/.bashrc

3    参考资料

1)     UbuntuDebian — ROS 2 文档:银河文档

2)     GitHub - h2r/java_rosbridge:一个简单的库,使Jetty 9Java连接到rosbridge服务器。支持使用不同的主题委托发布和订阅

3)     java--Rosbridge--ros通信(基于webscoket

4)     ROS2前置基础教程| Python依赖查找流程

5)     gramaziokohler/roslibpy: Python ROS Bridge library (github.com)

相关实践学习
使用ROS创建VPC和VSwitch
本场景主要介绍如何利用阿里云资源编排服务,定义资源编排模板,实现自动化创建阿里云专有网络和交换机。
阿里云资源编排ROS使用教程
资源编排(Resource Orchestration)是一种简单易用的云计算资源管理和自动化运维服务。用户通过模板描述多个云计算资源的依赖关系、配置等,并自动完成所有资源的创建和配置,以达到自动化部署、运维等目的。编排模板同时也是一种标准化的资源和应用交付方式,并且可以随时编辑修改,使基础设施即代码(Infrastructure as Code)成为可能。 产品详情:https://www.aliyun.com/product/ros/
相关文章
|
3月前
|
存储 Ubuntu 安全
ROS2教程02 ROS2的安装、配置和测试
本文是关于ROS2(机器人操作系统2)的安装、配置和测试的教程。内容包括使用一键安装脚本快速安装ROS2 Humble版,手动安装步骤,设置语言环境、添加软件源、更新软件包、安装ROS2桌面版和开发工具,配置ROS2环境,创建工作空间,配置ROS2领域以避免网络冲突,以及如何删除ROS2。此外,还包括了测试ROS2是否安装成功的两个案例:基本的Topic通信测试和使用Turtlesim演示程序。适用于Ubuntu 22.04操作系统。
273 1
ROS2教程02 ROS2的安装、配置和测试
|
3月前
|
Ubuntu Shell C++
在Ubuntu18.04上安装ros2的环境,ros2的常用命令:播放包、录制包等
在Ubuntu18.04上安装ros2的环境,ros2的常用命令:播放包、录制包等
181 1
|
3月前
|
Ubuntu Apache
Ubuntu20.04下一键安装ROS1 Noetic
本文提供了一个简化在Ubuntu 20.04系统上安装ROS1 Noetic过程的一键安装脚本工具,该脚本通过优化配置和使用清华大学镜像源,加速了国内用户的下载速度,并自动完成环境设置和依赖安装,同时提供了详细的使用说明和源码。
345 0
Ubuntu20.04下一键安装ROS1 Noetic
|
6月前
|
开发框架 JavaScript 中间件
安装中间件
【5月更文挑战第19天】安装中间件
47 3
|
消息中间件 Linux 虚拟化
消息中间件系列教程(04) -RabbitMQ -简介&安装
消息中间件系列教程(04) -RabbitMQ -简介&安装
69 0
|
消息中间件 NoSQL 中间件
Linux-中间件安装指南
Linux-中间件安装指南
171 0
|
6月前
|
Ubuntu
【ubuntu】ubuntu20.04安装ros noetic(亲测有效,附操作步骤)
【ubuntu】ubuntu20.04安装ros noetic(亲测有效,附操作步骤)
|
6月前
|
弹性计算 关系型数据库 API
ECS安装问题之安装资源编排服务(ROS)如何解决
ECS(Elastic Compute Service,弹性计算服务)是云计算服务提供商提供的一种基础云服务,允许用户在云端获取和配置虚拟服务器。以下是ECS服务使用中的一些常见问题及其解答的合集:
|
6月前
[ROS2] --- ROS2安装
[ROS2] --- ROS2安装
216 0
|
6月前
|
中间件 关系型数据库 MySQL
史上最详细Docker安装Mycat中间件 | 实现主从的读写分离
史上最详细Docker安装Mycat中间件 | 实现主从的读写分离
405 2

推荐镜像

更多