复制安全组小工具

本文涉及的产品
云服务器 ECS,每月免费额度200元 3个月
云服务器ECS,u1 2核4GB 1个月
简介: 通过调用openApi接口,主要实现了已存在安全组的复制(包括其所有规则),支持VPC类型安全组复制,支持跨Region复制。
  1. 小工具解决的问题:

    • 由于安全组不能垮Region,当一个Region中的安全组比较好,我们在另一个Region中也想使用同样的规则时,通过这个小工具实现安全组约束的拷贝。
  2. 数量限制:

  3. 都拷贝了什么:

    • 安全组 :Name,IpProtocol,PortRange,SourceGroupOwnerAccount,Priority,SourceGroupId,SourceCidrIp,Policy,NicType
    • 所属VPC :Name,UserCidrs,CidrBlock,Description
  4. 如何使用:

    • 公共参数设置:(1)填写Access Key Id 和 Secrect。(2)regionIdLocal:被复制的regionId。(3)securityGroupIdLocal:regionIdLocal中被复制的安全组Id。(4)regionId:要复制到的regionId。
    • 复制classic类型安全组,直接运行。
    • 复制vpc类型安全组,当isVpcKeepSame为True,表示复制的安全组所属的VPC也是相似的。本工具首先根据UserCidrs和CidrBlock在新Region中查找是否有相似的VPC,如果找到,就把安全组复制到这个VPC中,如果没找到,根据原始Region中VPC创建类似的VPC,然后复制安全组,当isVpcKeepSame为False,直接在新Region中查找Available状态VPC,然后复制安全组,如果没有找到,新建VPC。
#  coding=utf-8

# if the python sdk is not install using 'sudo pip install aliyun-python-sdk-ecs'
# if the python sdk is install using 'sudo pip install --upgrade aliyun-python-sdk-ecs'
# make sure the sdk version is 2.1.2, you can use command 'pip show aliyun-python-sdk-ecs' to check

import json
import logging
import time

from aliyunsdkcore import client
from aliyunsdkecs.request.v20140526.AuthorizeSecurityGroupRequest import \
    AuthorizeSecurityGroupRequest
from aliyunsdkecs.request.v20140526.DescribeSecurityGroupAttributeRequest import \
    DescribeSecurityGroupAttributeRequest
from aliyunsdkecs.request.v20140526.CreateSecurityGroupRequest import \
    CreateSecurityGroupRequest
from aliyunsdkecs.request.v20140526.CreateVpcRequest import CreateVpcRequest
from aliyunsdkecs.request.v20140526.DescribeVpcsRequest import DescribeVpcsRequest

# configuration the log output formatter, if you want to save the output to file,
# append ",filename='ecs_invoke.log'" after datefmt.

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%a, %d %b %Y %H:%M:%S')

# send open api request
def _send_request(request):
    request.set_accept_format('json')
    try:
        response_str = clt.do_action(request)
        logging.info(response_str)
        response_detail = json.loads(response_str)
        return response_detail
    except Exception as e:
        logging.error(e)

# To add rules to  classic network cloud server
def execute_add_classical_intranat_ingress(regionId,source_sg_id, permission, dry_run):
    request = AuthorizeSecurityGroupRequest()
    clt.set_region_id(regionId)
    request.set_SecurityGroupId(source_sg_id)
    request.set_IpProtocol(permission.get("IpProtocol"))
    request.set_PortRange(permission.get("PortRange"))
    request.set_SourceGroupOwnerAccount(permission.get("SourceGroupOwnerAccount"))
    request.set_Priority(permission.get("Priority"))
    request.set_SourceGroupId(permission.get("SourceGroupId"))
    request.set_SourceCidrIp(permission.get("SourceCidrIp"))
    request.set_Policy(permission.get("Policy"))
    request.set_NicType(permission.get("NicType"))

    if dry_run:
        logging.info("add sg id %s as the in security group id %s", in_sg_id, source_sg_id)
    else:
        response = _send_request(request)
        return response

# How many rules are there under the current security group
def query_sg_rules_for_sg(regionId,security_group_id):
    request = DescribeSecurityGroupAttributeRequest()
    clt.set_region_id(regionId)
    request.set_SecurityGroupId(security_group_id)
    response = _send_request(request)
    if (str(response.get("Code")) == "InvalidSecurityGroupId.NotFound"):
        logging.error("please check if there is an securityGroupId in your region")
        return "null"
    return response

# create VPC
def createVPC(vpcLocal):
    request = CreateVpcRequest()
    request.set_CidrBlock(vpcLocal.get("CidrBlock"))
    request.set_Description(vpcLocal.get("Description"))
    UserCidrs = ''
    for uc in vpcLocal.get("UserCidrs").get("UserCidr"):
        UserCidrs = UserCidrs + str(uc)+","
    if(UserCidrs != ''):
        request.set_UserCidr(UserCidrs)
    request.set_VpcName(vpcLocal.get("VpcName")+"copy")
    response = _send_request(request)
    newVPCId = response.get("VpcId")
    if(newVPCId == None):
        logging.error(response)
        newVPCId = "null"
    return newVPCId

# compare UserCidrs
def compareUserCidrs(usercidrs1,usercidrs2):
    l1 = len(usercidrs1)
    l2 = len(usercidrs2)
    flag = False
    if(l1 == l2):
        flag = True
        for cidr in usercidrs1:
            if(cidr in usercidrs2):
                logging.info(cidr+"passed")
            else:
                flag = False
                break
    else:
        return flag
    return flag

# find sameVpc according to UserCidrs and CidrBlock
def findSameVpc(vpcLocal,isVpcKeepSame):
    request = DescribeVpcsRequest()
    response = _send_request(request)
    vpcs = response.get("Vpcs").get("Vpc")
    if(isVpcKeepSame == False):
        if(len(vpcs)>=1):
            for vpc in vpcs:
                if(vpc.get("Status") == 'Available'):
                    return vpc.get("VpcId")
        else:
            return "null"
    for vpc in vpcs:
        if(str(vpc.get("Status")) == 'Available'):
            usercidrs1 = vpc.get("UserCidrs").get("UserCidr")
            usercidrs2 = vpcLocal.get("UserCidrs").get("UserCidr")
            if((vpc.get("CidrBlock") == vpcLocal.get("CidrBlock")) and (compareUserCidrs(usercidrs1,usercidrs2))):
                return vpc.get("VpcId")
    return "null"

# find VPC by vpcId
def GetVpc(vpcId):
    request = DescribeVpcsRequest()
    request.set_VpcId(vpcId)
    response = _send_request(request)
    if(response.get("TotalCount") >= 1):
        return response.get("Vpcs").get("Vpc")[0]
    return "null"

# create SecurityGroup
def CreateSecurityGroup(regionId,securityGroupName,vpcId,vpcLocal,isVpcKeepSame):
    request = CreateSecurityGroupRequest()
    clt.set_region_id(regionId)
    request.set_SecurityGroupName(securityGroupName)
    newVpcId = ''
    if(vpcId != ''):
        vpcIdAvailable = findSameVpc(vpcLocal,isVpcKeepSame)
        if(vpcIdAvailable == 'null'):
            newVpcId = createVPC(vpcLocal)
            time.sleep(10)
            if(newVpcId == "null"):
                return "null"
        else:
            newVpcId = vpcIdAvailable
    request.set_VpcId(str(newVpcId))
    response = _send_request(request)
    return response

def copy_securityGroup(regionIdLocal,securityGroupIdLocal,regionId,isVpcKeepSame):
    # Query a region under a security group all rules
    responsepermissions = query_sg_rules_for_sg(regionIdLocal, securityGroupIdLocal)
    if (responsepermissions == "null"):
        logging.error("error:please check if there is an securityGroupId in your region")
        return "null"
    else:
        permissions = responsepermissions.get("Permissions").get("Permission")
        vpcId = responsepermissions.get("VpcId")
        vpcLocal = ''
        if (vpcId != ''):
            vpcLocal = GetVpc(vpcId)
            if(vpcLocal == "null"):
                logging.error("the vpc=vpcId:%s not exist",vpcId)
                return vpcId
        securityGroupNameLocal = responsepermissions.get("SecurityGroupName")
        # Create security group
        securityGroupName = securityGroupNameLocal + "_copyfrom_" + regionIdLocal
        response = CreateSecurityGroup(regionId, securityGroupName, vpcId, vpcLocal,isVpcKeepSame)
        if(response == "null"):
            return "null"
        securityGroupId = response.get("SecurityGroupId")

        # Copy rules
        for permission in permissions:
            execute_add_classical_intranat_ingress(regionId, securityGroupId, permission, False)
        logging.info("In region %s ,create securityGroup:%s success", regionId, securityGroupName)


if __name__ == '__main__':

    clt = client.AcsClient('Your Access Key Id', 'Your Access Key Secrect', 'cn-beijing')

    #The regionId of the security group being copied
    regionIdLocal = 'cn-shanghai'
    #Copy securityGroupId in regionIdLocal
    securityGroupIdLocal = 'sg-uf62j00kbggavkplfb56'
    #Copy to region regionId
    regionId = 'ap-northeast-1'
    #whether VPC need to be consistent
    isVpcKeepSame = False
    #start copy
相关实践学习
一小时快速掌握 SQL 语法
本实验带您学习SQL的基础语法,快速入门SQL。
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情: https://www.aliyun.com/product/ecs
相关文章
|
7月前
|
小程序 JavaScript API
小程序点击复制功能制作
小程序点击复制功能制作
125 0
|
移动开发 机器学习/深度学习 安全
|
10月前
代码对比工具
代码对比工具
|
Web App开发 JavaScript 前端开发
前端er怎样操作剪切复制以及禁止复制+破解等
前言 有时候我们经常会碰到这些场景:玩掘金、知乎的时候复制一段文字,总是会在内容后面加上一些版权信息,以及像小说网站等都有禁止选中,禁止复制这种功能,还有点击自动复制账号的功能。 我也经常遇到这些场景,有时候会去想这后面到底是怎么做,周末趁着有空去研究了一下,然后发现这些都跟操作剪贴板有关系,并且都不难,了解一下基本都知道怎么做了,整理分享一波给大家。 个人博客了解一下:obkoro1.com 目录: API介绍 实现类知乎/掘金复制大段文本添加版权信息 实现类起点网的防复制功能 破解防复制 点击复制功能 API介绍: 复制、剪切、粘贴事件: copy 发生复制操作时触发;
462 0
前端er怎样操作剪切复制以及禁止复制+破解等
|
编解码 Windows
Win系统 - 如何实现N卡四屏复制输出呢?
Win系统 - 如何实现N卡四屏复制输出呢?
157 0
Win系统 - 如何实现N卡四屏复制输出呢?
BAT 批处理命令 - 文件批量复制、克隆功能实例演示
BAT 批处理命令 - 文件批量复制、克隆功能实例演示
519 0
BAT 批处理命令 - 文件批量复制、克隆功能实例演示
|
XML 数据可视化 Java
代码对比工具,我就用这 6 个!
在程序开发的过程中,程序员会经常对源代码以及库文件进行代码对比,在这篇文章里我们向大家介绍六款程序员常用的代码比较工具。
271 0
代码对比工具,我就用这 6 个!
VSCode 在保存时自动对代码进行修复
在设置中更改如下内容 // 自动修复 "tslint.autoFixOnSave": true
3119 0
|
存储 NoSQL Redis