Neutron分析(6)—— neutron-openvswitch-agent

简介:

neutron-openvswitch-agent代码分析

neutron.plugins.openvswitch.agent.ovs_neutron_agent:main

复制代码
# init ovs first by agent_config:
# setup plugin_rpc, state_rpc, msgq consumer, periodically state report
# setup br-int, br-tun, bridge_mapping
# start sg_agent
agent = OVSNeutronAgent(**agent_config) # start to process rpc events
# process port up/down and related flow update/local vlan bounding
# process security group updates
agent.daemon_loop()
复制代码
OVSNeutronAgent的初始化
复制代码
class OVSNeutronAgent(n_rpc.RpcCallback,
                      sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                      l2population_rpc.L2populationRpcCallBackMixin):
    '''Implements OVS-based tunneling, VLANs and flat networks.

    Two local bridges are created: an integration bridge (defaults to
    'br-int') and a tunneling bridge (defaults to 'br-tun'). An
    additional bridge is created for each physical network interface
    used for VLANs and/or flat networks.

    All VM VIFs are plugged into the integration bridge. VM VIFs on a
    given virtual network share a common "local" VLAN (i.e. not
    propagated externally). The VLAN id of this local VLAN is mapped
    to the physical networking details realizing that virtual network.

    For virtual networks realized as GRE tunnels, a Logical Switch
    (LS) identifier is used to differentiate tenant traffic on
    inter-HV tunnels. A mesh of tunnels is created to other
    Hypervisors in the cloud. These tunnels originate and terminate on
    the tunneling bridge of each hypervisor. Port patching is done to
    connect local VLANs on the integration bridge to inter-hypervisor
    tunnels on the tunnel bridge.

    For each virtual network realized as a VLAN or flat network, a
    veth or a pair of patch ports is used to connect the local VLAN on
    the integration bridge with the physical network bridge, with flow
    rules adding, modifying, or stripping VLAN tags as necessary.
    '''

    # history
    #   1.0 Initial version
    #   1.1 Support Security Group RPC
    RPC_API_VERSION = '1.1'

    def __init__(self, integ_br, tun_br, local_ip,
                 bridge_mappings, root_helper,
                 polling_interval, tunnel_types=None,
                 veth_mtu=None, l2_population=False,
                 minimize_polling=False,
                 ovsdb_monitor_respawn_interval=(
                     constants.DEFAULT_OVSDBMON_RESPAWN),
                 arp_responder=False,
                 use_veth_interconnection=False):
        '''Constructor.

        :param integ_br: name of the integration bridge.
        :param tun_br: name of the tunnel bridge.
        :param local_ip: local IP address of this hypervisor.
        :param bridge_mappings: mappings from physical network name to bridge.
        :param root_helper: utility to use when running shell cmds.
        :param polling_interval: interval (secs) to poll DB.
        :param tunnel_types: A list of tunnel types to enable support for in
               the agent. If set, will automatically set enable_tunneling to
               True.
        :param veth_mtu: MTU size for veth interfaces.
        :param l2_population: Optional, whether L2 population is turned on
        :param minimize_polling: Optional, whether to minimize polling by
               monitoring ovsdb for interface changes.
        :param ovsdb_monitor_respawn_interval: Optional, when using polling
               minimization, the number of seconds to wait before respawning
               the ovsdb monitor.
        :param arp_responder: Optional, enable local ARP responder if it is
               supported.
        :param use_veth_interconnection: use veths instead of patch ports to
               interconnect the integration bridge to physical bridges.
        '''
        super(OVSNeutronAgent, self).__init__()
        self.use_veth_interconnection = use_veth_interconnection
        self.veth_mtu = veth_mtu
        self.root_helper = root_helper
        self.available_local_vlans = set(moves.xrange(q_const.MIN_VLAN_TAG, # 1-1094
                                                      q_const.MAX_VLAN_TAG))
        self.tunnel_types = tunnel_types or []
        self.l2_pop = l2_population
        # TODO(ethuleau): Initially, local ARP responder is be dependent to the
        #                 ML2 l2 population mechanism driver.
        self.arp_responder_enabled = (arp_responder and
                                      self._check_arp_responder_support() and
                                      self.l2_pop)
        self.agent_state = {
            'binary': 'neutron-openvswitch-agent',
            'host': cfg.CONF.host,
            'topic': q_const.L2_AGENT_TOPIC,
            'configurations': {'bridge_mappings': bridge_mappings,
                               'tunnel_types': self.tunnel_types,
                               'tunneling_ip': local_ip,
                               'l2_population': self.l2_pop,
                               'arp_responder_enabled':
                               self.arp_responder_enabled},
            'agent_type': q_const.AGENT_TYPE_OVS,
            'start_flag': True}

        # Keep track of int_br's device count for use by _report_state()
        self.int_br_device_count = 0

        self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper)
        # Create integration bridge and patch ports, 
        # remove all existing flows, and then witch all traffic using L2 learning
        # add a canary flow to int_br to track OVS restarts
        self.setup_integration_br()

        # Stores port update notifications for processing in main rpc loop
        self.updated_ports = set()

        # state_rpc: periodically report state by topic q-plugin
        # plugin_rpc:communicate with plugin by topic q-agent-notifier
        # consumer topic id includes:
        #       q-agent-notifier-port_update
        #       q-agent-notifier-network_delete
        #       q-agent-notifier-tunnel_update
        #       q-agent-notifier-security_group_update
        #       q-agent-notifier-l2population_update (需要启动l2populcation)
        self.setup_rpc()

        # bridge_mappings = default:br-eth1
        self.bridge_mappings = bridge_mappings
        # Creates physical network bridges and links them to integration bridge using veths.
        self.setup_physical_bridges(self.bridge_mappings)

        self.local_vlan_map = {}
        self.tun_br_ofports = {p_const.TYPE_GRE: {},
                               p_const.TYPE_VXLAN: {}}

        self.polling_interval = polling_interval
        self.minimize_polling = minimize_polling
        self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval

        if tunnel_types:
            self.enable_tunneling = True
        else:
            self.enable_tunneling = False
        self.local_ip = local_ip
        self.tunnel_count = 0
        self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
        self.dont_fragment = cfg.CONF.AGENT.dont_fragment
        self.tun_br = None

        # Creates tunnel bridge, and links it to the integration bridge
        # using a patch port.
        # create default flow tables
        if self.enable_tunneling:
            self.setup_tunnel_br(tun_br)

        # Collect additional bridges to monitor
        # Setup ancillary bridges - for example br-ex
        self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)

        # Security group agent support
        # firewall_driver = neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
        self.sg_agent = OVSSecurityGroupAgent(self.context,
                                              self.plugin_rpc,
                                              root_helper)
        # Initialize iteration counter
        self.iter_num = 0
        self.run_daemon_loop = True
复制代码
OVSNeutronAgent初始化完成后启动agent.daemon_loop()
复制代码
    def daemon_loop(self):
        with polling.get_polling_manager(
            self.minimize_polling,
            self.root_helper,
            self.ovsdb_monitor_respawn_interval) as pm:

            self.rpc_loop(polling_manager=pm)
复制代码

rpc_loop

复制代码
   def rpc_loop(self, polling_manager=None):
        # sync tunnel and check ovs restart
        if not polling_manager:
            polling_manager = polling.AlwaysPoll()

        sync = True
        ports = set()
        updated_ports_copy = set()
        ancillary_ports = set()
        tunnel_sync = True
        ovs_restarted = False
        while self.run_daemon_loop:
            start = time.time()
            port_stats = {'regular': {'added': 0,
                                      'updated': 0,
                                      'removed': 0},
                          'ancillary': {'added': 0,
                                        'removed': 0}}
            LOG.debug(_("Agent rpc_loop - iteration:%d started"),
                      self.iter_num)
            if sync:
                LOG.info(_("Agent out of sync with plugin!"))
                ports.clear()
                ancillary_ports.clear()
                sync = False
                polling_manager.force_polling()
            # check ovs restart by checking exist for the canary flow
            ovs_restarted = self.check_ovs_restart()
            if ovs_restarted: 
                # reset br-int, br-tun, and other physical bridges
                self.setup_integration_br()
                self.setup_physical_bridges(self.bridge_mappings)
                if self.enable_tunneling:
                    self.setup_tunnel_br()
                    tunnel_sync = True

            # Notify the plugin of tunnel IP
            if self.enable_tunneling and tunnel_sync:
                LOG.info(_("Agent tunnel out of sync with plugin!"))
                try:
                    tunnel_sync = self.tunnel_sync()
                except Exception:
                    LOG.exception(_("Error while synchronizing tunnels"))
                    tunnel_sync = True

            # check if ports/security_groups updates and perform update
            if self._agent_has_updates(polling_manager) or ovs_restarted:
                try:
                    LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
                                "starting polling. Elapsed:%(elapsed).3f"),
                              {'iter_num': self.iter_num,
                               'elapsed': time.time() - start})
                    # Save updated ports dict to perform rollback in
                    # case resync would be needed, and then clear
                    # self.updated_ports. As the greenthread should not yield
                    # between these two statements, this will be thread-safe
                    updated_ports_copy = self.updated_ports
                    self.updated_ports = set()
                    reg_ports = (set() if ovs_restarted else ports)
                    # {
                    #   'current': ['ovs-vsctl', --format=json', '--', '--columns=name,external_ids,ofport','list', 'Interface'] and then filted in get_vif_port_set()
                    #   'updated': check_changed_vlans() returns a set of port ids of the ports concerned by a vlan tag loss, and then updated_ports &= cur_ports
                    #   'added'  : cur_ports - registered_ports
                    #   'removed': registered_ports - cur_ports
                    #  }
                    port_info = self.scan_ports(reg_ports, updated_ports_copy)
                    LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
                                "port information retrieved. "
                                "Elapsed:%(elapsed).3f"),
                              {'iter_num': self.iter_num,
                               'elapsed': time.time() - start})
                    # Secure and wire/unwire VIFs and update their status
                    # on Neutron server
                    if (self._port_info_has_changes(port_info) or
                        self.sg_agent.firewall_refresh_needed() or
                        ovs_restarted):
                        LOG.debug(_("Starting to process devices in:%s"),
                                  port_info)
                        # If treat devices fails - must resync with plugin
                        sync = self.process_network_ports(port_info,
                                                          ovs_restarted)
                        LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
                                    "ports processed. Elapsed:%(elapsed).3f"),
                                  {'iter_num': self.iter_num,
                                   'elapsed': time.time() - start})
                        port_stats['regular']['added'] = (
                            len(port_info.get('added', [])))
                        port_stats['regular']['updated'] = (
                            len(port_info.get('updated', [])))
                        port_stats['regular']['removed'] = (
                            len(port_info.get('removed', [])))
                    ports = port_info['current']
                    # Treat ancillary devices if they exist
                    if self.ancillary_brs:
                        port_info = self.update_ancillary_ports(
                            ancillary_ports)
                        LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
                                    "ancillary port info retrieved. "
                                    "Elapsed:%(elapsed).3f"),
                                  {'iter_num': self.iter_num,
                                   'elapsed': time.time() - start})

                        if port_info:
                            rc = self.process_ancillary_network_ports(
                                port_info)
                            LOG.debug(_("Agent rpc_loop - iteration:"
                                        "%(iter_num)d - ancillary ports "
                                        "processed. Elapsed:%(elapsed).3f"),
                                      {'iter_num': self.iter_num,
                                       'elapsed': time.time() - start})
                            ancillary_ports = port_info['current']
                            port_stats['ancillary']['added'] = (
                                len(port_info.get('added', [])))
                            port_stats['ancillary']['removed'] = (
                                len(port_info.get('removed', [])))
                            sync = sync | rc

                    polling_manager.polling_completed()
                except Exception:
                    LOG.exception(_("Error while processing VIF ports"))
                    # Put the ports back in self.updated_port
                    self.updated_ports |= updated_ports_copy
                    sync = True

            # sleep till end of polling interval
            elapsed = (time.time() - start)
            LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d "
                        "completed. Processed ports statistics: "
                        "%(port_stats)s. Elapsed:%(elapsed).3f"),
                      {'iter_num': self.iter_num,
                       'port_stats': port_stats,
                       'elapsed': elapsed})
            if (elapsed < self.polling_interval):
                time.sleep(self.polling_interval - elapsed)
            else:
                LOG.debug(_("Loop iteration exceeded interval "
                            "(%(polling_interval)s vs. %(elapsed)s)!"),
                          {'polling_interval': self.polling_interval,
                           'elapsed': elapsed})
            self.iter_num = self.iter_num + 1
复制代码

rpc_loop()中最重要的两个函数为tunnel_sync(查询并建立隧道)和process_network_ports(处理port和安全组变更)

tunnel_sync 查询并建立隧道

复制代码
    def tunnel_sync(self):
        resync = False
        try:
            for tunnel_type in self.tunnel_types:  # tunnel_types = vxlan, gre
                # query tunnel details from plugin_rpc by local_ip and tunnel_type
                details = self.plugin_rpc.tunnel_sync(self.context,
                                                      self.local_ip,
                                                      tunnel_type)
                # establish tunnel with all other tunnel_ip if l2_pop disabled
                # if l2_pop enabled, tunnel sync is processed by agent in q-agent-notifier-l2population_update consumer id
                if not self.l2_pop:
                    tunnels = details['tunnels']
                    for tunnel in tunnels:
                        if self.local_ip != tunnel['ip_address']:
                            tunnel_id = tunnel.get('id')
                            # Unlike the OVS plugin, ML2 doesn't return an id
                            # key. So use ip_address to form port name instead.
                            # Port name must be <=15 chars, so use shorter hex.
                            remote_ip = tunnel['ip_address']
                            remote_ip_hex = self.get_ip_in_hex(remote_ip)
                            if not tunnel_id and not remote_ip_hex:
                                continue
                            tun_name = '%s-%s' % (tunnel_type,
                                                  tunnel_id or remote_ip_hex)
                            # setup tunnel_port and related flows
                            self.setup_tunnel_port(tun_name,
                                                   tunnel['ip_address'],
                                                   tunnel_type)
        except Exception as e:
            LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
                      {'local_ip': self.local_ip, 'e': e})
            resync = True
        return resync
复制代码

process_network_ports 处理port和安全组变更

复制代码
def process_network_ports(self, port_info, ovs_restarted):
        resync_a = False
        resync_b = False
        # TODO(salv-orlando): consider a solution for ensuring notifications
        # are processed exactly in the same order in which they were
        # received. This is tricky because there are two notification
        # sources: the neutron server, and the ovs db monitor process
        # If there is an exception while processing security groups ports
        # will not be wired anyway, and a resync will be triggered
        # TODO(salv-orlando): Optimize avoiding applying filters unnecessarily
        # (eg: when there are no IP address changes)
        # 通过plugin_rpc到plugin查询安全组,然后通过sg_agent应用安全组
        self.sg_agent.setup_port_filters(port_info.get('added', set()),
                                         port_info.get('updated', set()))
        # VIF wiring needs to be performed always for 'new' devices.
        # For updated ports, re-wiring is not needed in most cases, but needs
        # to be performed anyway when the admin state of a device is changed.
        # A device might be both in the 'added' and 'updated'
        # list at the same time; avoid processing it twice.
        devices_added_updated = (port_info.get('added', set()) |
                                 port_info.get('updated', set()))
        if devices_added_updated:
            start = time.time()
            try:
                # 添加或更新的ports: 查询port详细信息后通知plugin port up/down,接着再port_bound/port_dead
                skipped_devices = self.treat_devices_added_or_updated(
                    devices_added_updated, ovs_restarted)
                LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
                            "treat_devices_added_or_updated completed. "
                            "Skipped %(num_skipped)d devices of "
                            "%(num_current)d devices currently available. "
                            "Time elapsed: %(elapsed).3f"),
                          {'iter_num': self.iter_num,
                           'num_skipped': len(skipped_devices),
                           'num_current': len(port_info['current']),
                           'elapsed': time.time() - start})
                # Update the list of current ports storing only those which
                # have been actually processed.
                port_info['current'] = (port_info['current'] -
                                        set(skipped_devices))
            except DeviceListRetrievalError:
                # Need to resync as there was an error with server
                # communication.
                LOG.exception(_("process_network_ports - iteration:%d - "
                                "failure while retrieving port details "
                                "from server"), self.iter_num)
                resync_a = True
        if 'removed' in port_info:
            start = time.time()
            # 删除的port:删除安全组,通知plugin port down, 并unbound local_vlan,
            resync_b = self.treat_devices_removed(port_info['removed'])
            LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
                        "treat_devices_removed completed in %(elapsed).3f"),
                      {'iter_num': self.iter_num,
                       'elapsed': time.time() - start})
        # If one of the above operations fails => resync with plugin
        return (resync_a | resync_b)
复制代码

neutron-ovs-cleanup

This service starts on boot and ensures that Networking has full control over the creation and management of tap devices.

复制代码
def main():
    """Main method for cleaning up OVS bridges.

    The utility cleans up the integration bridges used by Neutron.
    """

    conf = setup_conf()
    conf()
    config.setup_logging(conf)

    configuration_bridges = set([conf.ovs_integration_bridge,
                                 conf.external_network_bridge])
    ovs_bridges = set(ovs_lib.get_bridges(conf.AGENT.root_helper))
    available_configuration_bridges = configuration_bridges & ovs_bridges

    if conf.ovs_all_ports:
        bridges = ovs_bridges
    else:
        bridges = available_configuration_bridges

    # Collect existing ports created by Neutron on configuration bridges.
    # After deleting ports from OVS bridges, we cannot determine which
    # ports were created by Neutron, so port information is collected now.
    ports = collect_neutron_ports(available_configuration_bridges,
                                  conf.AGENT.root_helper)

    for bridge in bridges:
        LOG.info(_("Cleaning %s"), bridge)
        ovs = ovs_lib.OVSBridge(bridge, conf.AGENT.root_helper)
        ovs.delete_ports(all_ports=conf.ovs_all_ports)

    # Remove remaining ports created by Neutron (usually veth pair)
    delete_neutron_ports(ports, conf.AGENT.root_helper)

    LOG.info(_("OVS cleanup completed successfully"))
复制代码

最后看下nova与neutron-openvswitch-agent的交互,这张图片来源于GongYongSheng在香港峰会的PPT:

首先boot虚机时,nova-compute发消息给neutron-server请求创建port。之后,在driver里面在br-int上建立 port后,neutron-openvswitch-port循环检测br-int会发现新增端口,对其设定合适的openflow规则以及 localvlan,最后将port状态设置为ACTIVE。


本文转自feisky博客园博客,原文链接:http://www.cnblogs.com/feisky/p/3851479.html,如需转载请自行联系原作者

相关文章
|
存储 消息中间件 缓存
【Flume】Flume Agent的内部原理分析
【4月更文挑战第4天】【Flume】Flume Agent的内部原理分析
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
246 2
|
数据采集 存储 自然语言处理
基于Qwen2.5的大规模ESG数据解析与趋势分析多Agent系统设计
2022年中国上市企业ESG报告数据集,涵盖制造、能源、金融、科技等行业,通过Qwen2.5大模型实现报告自动收集、解析、清洗及可视化生成,支持单/多Agent场景,大幅提升ESG数据分析效率与自动化水平。
730 0
|
Kubernetes 安全 Go
对于阿里开源混沌工程工具chaosblade-box-agent心跳报错问题的分析与解决
摘要: 本文记录了一个由chaosblade-box平台后台发现的偶发的chaosblade-box-agent不发送心跳的问题,从报错日志入手,结合chaosblade-box-agent源码进行分析,最终解决问题并修复打包的过程。
779 7
|
人工智能 测试技术 API
【AIGC】LangChain Agent(代理)技术分析与实践
【5月更文挑战第12天】 LangChain代理是利用大语言模型和推理引擎执行一系列操作以完成任务的工具,适用于从简单响应到复杂交互的各种场景。它能整合多种服务,如Google搜索、Wikipedia和LLM。代理通过选择合适的工具按顺序执行任务,不同于链的固定路径。代理的优势在于可以根据上下文动态选择工具和执行策略。适用场景包括网络搜索、嵌入式搜索和API集成。代理由工具组成,每个工具负责单一任务,如Web搜索或数据库查询。工具包则包含预定义的工具集合。创建代理需要定义工具、初始化执行器和设置提示词。LangChain提供了一个从简单到复杂的AI解决方案框架。
1169 3
|
人工智能 自然语言处理 搜索推荐
【AGI】智能体简介及场景分析
【4月更文挑战第14天】AI时代,智能体的意义,使用场景及对未来的意义
519 1
|
域名解析
OpenStack使用neutron agent-list缺少组件
OpenStack使用neutron agent-list缺少组件
350 0
OpenStack使用neutron agent-list缺少组件
|
存储 机器学习/深度学习 人工智能
大模型自主智能体爆火,OpenAI也在暗中观察、发力,这是内部人的分析博客(1)
大模型自主智能体爆火,OpenAI也在暗中观察、发力,这是内部人的分析博客
380 1
|
人工智能 监控 搜索推荐
使用LangChain的自定义Tool+Agent, 构建全新的AIOps故障分析流程?
如果能够利用LangChain的Agent对问题的推理、任务的编排能力, 再进一步结合自定义的检查脚本工具, 是否就能够更好的实现故障分析的流程化智能编排和执行。
5964 0

热门文章

最新文章