• 关于 python新建数据库 的搜索结果

回答

本页目录 步骤一:安装Django 步骤二:创建应用 步骤三:打包应用 步骤四:部署应用至Web+ 更多信息 Django是Python的一个开放源代码的Web应用框架。本文档将演示如何使用Django创建一个应用和给应用添加MySQL数据库,并将其部署到Web+上。 步骤一:安装Django 执行以下命令安装Django。由于在本文档中将使用MySQL,因此需要安装pymysql模块。 pip install Django pymysql 注意 使用Django(2.2版本以上)需要Python 3.5以上版本,在本文档将使用3.7.4版本作为示例。 步骤二:创建应用 执行以下命令执行django-admin命令来快速创建一个项目。 django-admin startproject webplusdemo 可以看到创建的目录结构如下。 webplusdemo/ ├── manage.py └── webplusdemo ├── init.py ├── settings.py ├── urls.py └── wsgi.py 执行以下命令执行django-admin命令来快速创建一个项目。 django-admin startproject webplusdemo 步骤三:打包应用 执行以下命令来修改settings.py中ALLOWED_HOSTS配置项,允许所有域名的访问。 ALLOWED_HOSTS = ['*'] 执行以下命令改写settings.py中的数据库配置。Django默认使用sqlite数据库,本示例中将使用RDS中的MySQL数据库。 Database https://docs.djangoproject.com/en/2.2/ref/settings/#databases DATABASES = { 'default': { 'ENGINE': 'django.db.backends.mysql', 'NAME': os.environ['WP_RDS_DATABASE'], 'USER': os.environ['WP_RDS_ACCOUNT_NAME'], 'PASSWORD': os.environ['WP_RDS_ACCOUNT_PASSWORD'], 'HOST': os.environ['WP_RDS_CONNECTION_ADDRESS'], 'PORT': os.environ['WP_RDS_PORT'], } } 进入webplusdemo目录,执行以下命令完成应用打包,生成部署包文件webplusdemo.zip。 zip -r webplusdemo.zip ./ 步骤四:部署应用至Web+ 登录 Web+控制台,并在页面左上角选择所需地域。 在概览页最近更新的部署环境区域的右上角单击新建。 在应用基本信息页面选择技术栈类型为Python,设置应用基本信息,设置完成后单击下一步。 在部署环境信息页面设置部署环境名称,部署包来源选择上传本地程序,上传您刚打包的webplusdemo.zip,设置部署包版本后单击下一步。 在配置页面选择预设配置为自定义模式。 展开云数据库RDS,按图所示配置云数据库类型为MySQL,并选择数据库版本、系列和类型等数据库基本信息。 展开生命周期挂钩,在PostPrepareApp编辑框内输入以下内容。配置完成后单击完成创建。 source /etc/bashrc && cd $APP_HOME && python manage.py migrate 在完成创建页面单击查看该应用或完成创建可进入应用详情页面。单击部署环境名称进入部署环境详情页面,然后单击公网访问地址右侧的链接进入应用首页。 上面步骤配置了数据库,因此可以访问登录页。您可以通过在数据库写入用户表或执行python manage.py createsupersuer的方式来创建用户。 更多信息 在控制台部署应用的详细配置步骤请参见部署应用。 使用CLI完成应用创建和部署的操作请参见在CLI快速部署应用。 想了解更多Django信息,请进入Django官方网站或Django Github项目查看。

1934890530796658 2020-03-23 14:27:14 0 浏览量 回答数 0

问题

在 Debian 9 里安装 odoo 12

dongshan8 2019-12-01 21:48:43 2040 浏览量 回答数 3

回答

1 连接登入很多人第一次使用某工具时,往往打怵,排斥,感觉很不好用。我们先从连接操作的具体细节的讲起,来减少这种不适。step 1 点击上面工具栏的【Connect】按钮step 2 在弹出的Connections 界面中 点击[Create]按钮。因为是第一次,连接信息要新建。step 3 在弹出的Connection Editor 界面编辑登入信息。主要有三类信息要求输入 1. Basic;2.Authentication;3.Default Database。Basic 编辑界面;这时候大家一定要注意Port端口,因为它默认的是27017,大家要根据实际需求调整修改。还有就是Name是显示名称,可以修改为更有代表性的名称。2.点击【Authentication】,进入Authentication界面。此处需输入 Auth DB 数据(数据库名称),用户数据 和 密码数据3.点击【Default DataBase】,进入Default DataBase界面。请一定要输入指定的数据库,否则可能提示错误,如下。MongoError:Authentication failed或者 可以登入进去,但是看不到任何 集合。但是,随着版本的升级,新版本这个栏位的值在登入时可能会自动获取前面输入Auth DB的 输入值,但是目前来看还不是很稳定。所以,还以建议大家手动输入Default DataBase 数据。2.打开一个新的查询界面在很多工具,都会有一个打开查询界面的按钮。例如连接SQL Server的 SSMS客户端,工具栏很明显就有一个功能按钮【新建查询】单nosqlbooster 工具没有,然人着急,无从下手。其实,它可以通过快捷方式来实现。如下:【注意点击时,请先用鼠标点击选中要指定的集合或数据库】3.查询代码生成器这个工具还有一个查询代码生成器,可以将用户编写的查询语言装换成 MongoDB Shell, JavaScript (Node.js), Java, C# and Python 等各种语言。特别有利于初学者 对MongoDB上手,熟悉各种语法。例如 将以下的工具语言转换为可以执行C# 语句。从这儿我们可以看到很多C# 语言关于MongoDB的操作写法。查询语句生成器刚学习MongoDB,可能对一些查询写法比较陌生,没关系,这个工具可以自动生成一些查询语句。生成器按钮,点击红色标注的[Query]弹出 可视化的查询编辑器,如下:点击【OK and Run】就可以生成MongoDB Shell 查询语句。【一定要在生成了db.collectionname..find({}) 命令的界面上编辑,否则,点击Query无效】5 可以使用SQL(结构化查询语言)查询在前面的2中,我们说过了,点击【Ctrl+Alt+T】可以打开一个 SQL 查询界面。说明nosqlbooster支持SQL查询语言。例如 db.employees.aggregate([ { $group: { _id: "$department", total: { $sum: "$salary" } }, } ])可以转换为SQL语言,如下:mb.runSQLQuery( SELECT department, SUM(salary) AS total FROM employees GROUP BY department );其执行结果是一样的。【另外,为了促使自己尽快的熟悉mongo语言和其更高的执行性能,推荐大家还是使用mongo这种JSON类的语言,而不是SQL的语言】6 将查询出的数据导出到Excel文件中在MongoDB的导出功能中支持JSON和CSV格式,而大家熟悉的Excel一般的工具很难支持,而我们可以通过nosqlbooster工具将少量数据导出到Excel中(所谓的少量数据要求主要受限于本地内存)。下面以导出集合testexportToexcel的数据为例,进行演示说明。step 1 执行查询语句 step 2 将显示格式调整为 Table 格式step 3 按Shift 键,选中所要导出的数据step 4 在选中的数据区域中,鼠标右击,选中【Copy Document(s) to Clipboard -Tab-Separted Values】step 5 粘贴至excel文件中,即可。

景凌凯 2019-12-02 02:05:07 0 浏览量 回答数 0

海外云虚拟主机包年25元/月起

海外独享虚拟主机全面上线,助力构建海外网站,提升公司国际形象;全球有效覆盖,超高性价比;建站入门首选,助力出口,适合跨境贸易企业。

问题

社区系统 Icarus 1.3.0

寒喵 2019-12-01 21:51:20 1292 浏览量 回答数 0

问题

云效使用指南:持续交付:(待迁移)应用构建与发布

行者武松 2019-12-01 22:00:28 1443 浏览量 回答数 0

问题

【精品问答】E-MapReduce

montos 2020-04-08 15:00:05 2 浏览量 回答数 1

回答

第一步:我数据库备份某目录注明间: 运行备份脚本(注意备份目录我/home/dbback/) 查看目录否备份文件细同能发现我mysqldump没指定用户名密码啥我运行候费用输入密码呢莫着急马揭晓答案mysql5.6(具体版本编号记)密码写脚本运行警告告诉要密码写脚本危险我伙伴该办呢官给解决案期望配置my.cnf文件所现打my.cnf加入字段: 两条运行脚本提示要输入密码马测试看否功没问题我进入步 第二步:何自备份oss始前我要做两件事 1、登录阿云控制台点右管理控制台点左产品与服务第二列点击象存储OSS没通要通通直接点击右新建bucket创建bucket记住buket名字(注:其实通api直接创建bucket操作所我用控制台创建降低理解难度) 2、始写传脚本要导入osssdk所要安装比较简单跟安装其python包没啥区别解压进入目录运行python setup.py install 安装完毕接看我何使用全部代码: #!/usr/bin/python env #autor:glacier #date:2015-11-16 import os,os.path,time import operator import time from oss.oss_api import * prefix = '/home/dbback' logtime = time.strftime(time.ctime()) #filelist = [ file for file in os.listdir(os.path.dirname(os.path.abspath(file))) if os.path.isfile(file) ] filelist = [ file for file in os.listdir(prefix) if os.path.isfile(prefix + '/' + file) ] def get_time(filename): ft = os.stat(filename) return ft.st_ctime #def get_max(): # flist = [] # for file in filelist: # flist.append(os.stat(file).st_ctime) # return max(flist) def get_dist(): d = {} for file in filelist: d[file] = get_time(prefix + '/' + file) return d if name == 'main': #maxtime = get_max() d = get_dist() #dic= sorted(d.iteritems(), key=lambda d:d[1], reverse = True) upfile = max(d.iteritems(), key=operator.itemgetter(1))[0] endpoint = "your aliyun endpoint" accessKeyId, accessKeySecret="your accessKeyId","your accessKeySecret " oss = OssAPI(endpoint, accessKeyId, accessKeySecret) res = oss.put_object_from_file("bucketname",upfile,prefix + '/' + upfile) if res.status != 200: with open('/var/log/dbback.log', 'a+') as f: f.write(logtime + ' back failed' + '\n') 我接析脚本内容其脚本注释行都用看我编写程测试用始我设定备份文件目录记录志间备份目录所文件列表(列表其实式我用简单式文件列表慢)定义两函数get_time()函数获取文件创建间戳get_dist()函数获取文件名间戳字典主函数部比较难理解根据字典value排序获文件名d.iteritems()获字典每key,valuekey指定函数operator.itemgetter(1)表示用value排序(两元素key 0value1)间戳排序完返key[0]做工作脚本其部内容我说都见用没难于理解脚本介绍接进入我步 第三步:脚本写入crontab具体候执行根据家各自业务同设置没特别 答案来源网络,供参考,希望对您有帮助

KB小秘书 2019-12-02 03:00:11 0 浏览量 回答数 0

问题

MaxCompute百问集锦

yq传送门 2019-12-01 20:16:47 2404 浏览量 回答数 1

问题

MaxCompute百问集锦(持续更新20171011)

隐林 2019-12-01 20:19:23 38430 浏览量 回答数 18

问题

Python-SDK 之如何实现下载文件?

青衫无名 2019-12-01 21:40:58 1480 浏览量 回答数 0

回答

使用流程 云渲染管理系统(Render Manager 简称渲管)是一个开源的 web 应用,可以帮助用户轻松搭建阿里云上的私有渲染系统,直接调用海量计算资源,一键管控集群规模,在加速渲染任务的同时省去自建集群的烦恼。 渲管首页渲管建立在阿里云 BatchCompute 、OSS 和 ECS 的三个云产品基础之上的。详细介绍请参考官网,在使用渲管前,请确保已开通此三产品。 BatchCompute 是阿里云上的批量计算服务,可以帮助用户进行大规模并行计算。 OSS 是阿里云上的对象存储服务,可以存储海量数据。 ECS 是阿里云上的云服务器,极易运维和操作,可以方便的制作系统镜像。 渲管与这三个云产品的关系如下图rm_c 使用流程 A) 制作计算节点镜像 根据所要使用的区域,创建 ECS 按量云服务器,在云服务器中安装所需的渲染软件;保存为自定义镜像,并将镜像共享给账号1190847048572539,详见计算节点 镜像制作 章节。 B) 上传数据到OSS 将渲染所需要的数据上传到对应区域的OSS,并保持上传前的目录结构。 C) 启动渲管 在 ECS 控制台创建实例(短期使用,选择按量即可),镜像选择镜像市场中的rendermanager(也可以使用渲管安装包进行部署,详见 操作手册 部署章节)。 D) 配置渲管 登录渲管页面 https://ip/rm/login, 配置完基本信息后(AccessKeys 和 OSS bucket),在镜像管理页中添加上面制作的计算节点镜像 ID,并对该计算节点镜像配置渲染命令行。 E) 创建项目 在渲管的项目管理页面创建项目,指定 OSS 的数据映射规则(也称 OSS 挂载,在计算节点启动的时候,OSS 上的数据会被挂载到节点的本地路径),选择计算节点镜像 ID,OSS 的输出路径(用于保存渲染结果),计算节点中的临时输出路径。 F) 集群的创建和管理 在集群管理页面可以按需创建集群,指定计算节点使用的镜像 ID,节点类型和节点数量等信息。 G) 提交渲染作业 在项目页里提交渲染作业,要指定目的集群、渲染的帧范围以及节点数量等信息。提交完作业后,可实时查看渲染日志以及节点 CPU 使用率等信息。 BatchCompute 提供了测试用的计算节点镜像(windows server 2008,ID:m-wz9du0xaa1pag4ylwzsu),它预装了 blender 渲染软件。使用 blender 制作一个小场景的 演示视频 已上传 OSS(测试时,需下载并上传到您的 OSS bucket)。 实际生产时,请根据需求制作合适的计算节点镜像。 准备工作 注册阿里云账号并开通 OSS、ECS 和 BatchCompute 服务。 创建AccessKey。账号信息->AccessKeys->创建 Access Key,记录 Access Key 信息。p0 渲染示例 A) 创建 OSS bucket阿里云官网->管理控制台->对象存储 OSS->创建 bucket(例如,名字为 renderbucket),地域选择深圳(华南1),读写权限为私有。p1p2p3p4 获取blender场景并上传到您的 OSS bucket 在浏览器输入 http://openrm.oss-cn-qingdao.aliyuncs.com/blender/monkey/cube.blend 。 下载示例场景文件(BatchCompute 提供的测试场景),在 OSS 控制台创建目录结构blender/monkey,然后在该目录下上传文件,文件路径为oss://renderbucket/blender/monkey/cube.blend。 启动rendermanager A) 阿里云官网->管理控制台->云服务器 ECS->创建实例 选择按量付费,然后在镜像市场应用开发分类中搜索 rendermanager 镜像,使用 rendermanager 镜像并按下图配置购买,可适当提高带宽。 使用按量付费要求用户账户至少有 100 块金额,对于地域没有要求,看 ECS 实际售卖库存情况而定。 p8p9p001p10 B) 购买后,点击进入管理控制台,在实例列表中可看到刚才启动的云主机(创建会有延迟,请刷新几次)。p11p12 登入渲管页面 在本地浏览器输入 https://ecs_instance_ip/rm/login,ecs_instance_ip 为 ECS 实例的公网 IP(由于使用了 https,请在浏览器页面授权信任)。初始账号密码为: rm_admin rm_admin@123 生产系统,请一定更改账号和密码。 配置渲管 A) 登录后,点击右上角的配置可进入配置页面,填入 SECURITY_ID,SECURITY_KEY, OSS_BUCEKET 三个字段的值,SECURITY_ID 和 SECURITY_KEY 即上面准备工作中获取的 AccessKey 信息。p14 B) 设置 OSS_HOST 为 oss-cn-shenzhen.aliyuncs.com;REGION 的选择主要和计算节点的镜像归属有关,必须和计算节点镜像归属 REGION 保持一致;本例采用的官方计算节点镜像(该镜像部署在深圳 REGION)所以此处设置在深圳 REGION 。 p003 C) 设置 BATCHCOMPUTE_REGION 为 cn-shenzhen;设置深圳 REGION 原因同上。 p004 D) 点击保存。 添加计算节点镜像 镜像管理->添加计算节点镜像,ECS 镜像 ID:m-wz9du0xaa1pag4ylwzsu(BatchCompute 提供的公用计算节点镜像,实际生产,需要用户制作所需要的计算节点镜像,具体制作流程请参考 操作手册)。p15p16 配置渲染软件信息 A) 镜像管理->软件配置。p17 B) 添加软件。p18 C) 选择 blender 模板并确定,执行 render_cmd 渲染命令。p19 创建项目 A) 项目管理->新建项目。p20B) 填入需要映射的 OSS 路径数量(本例只映射一个OSS路径),并点击确认。p21C) 填入项目名称: blender_test。D) 镜像选择上面创建的镜像。E) OSS 映射中的选择/输入路径为 /renderbucket/blender/。F) OSS 映射的目的地为盘符 G: (本例中使用的镜像系统为 Windows2008 server)。G) OSS 输出目录填写为 /renderbucket/rm_test/output/。H) 虚拟机中的输出目录填写为 C:\render_output\,该路径用于渲染节点中临时存放渲染结果,并且该目录里的渲染结果会被传输到 OSS 上输出目录里。I) 确定提交。p22 提交渲染任务 A) 项目管理->提交渲染。p23 B) 选择场景所在的 OSS 路径前缀。p24 C) 选择项目根目录, 直到场景文件cube.blend,选中 monkey 文件夹;可以看到页面下部出现场景选择,勾选场景,选择渲染软件,填入渲染起止帧 1~5,并点击提交渲染按钮。p25 D) 选择渲染中的任务,可查看刚才提交的作业。p26 查看渲染日志 A) 点击任务名称并点击节点列表。p27 B) 点击想查看的节点,可以看到渲染器和渲管 worker 的各种日志、标准输出以及标准出错信息(计算节点运行起来后才能看到日志信息)。p28p29p30 查看渲染结果 A) 等待作业结束后,在已结束的任务中可以可以看到任务状态为 Finished。p31 B) 点击任务名称,可以查看 OSS 上的输出路径。p32 C) 在 OSS 控制台上查看对应输出路径,获取地址后点击获取 URL 并复制。p33 D) 在浏览器粘贴 URL 可以直接查看图片。p34 E) 恭喜您已跑通云上的 Blender 渲染测试。 渲管系统结构 A) 渲管与各云产品的详细关系 渲管与各云产品的依赖如下图所示。rm_c B) 渲管系统内部结构 p0渲管系统由如下 3 部分组成: render manager: 基于 flask 框架开发web 应用,主要负责和用户进行人机交互,接收用户请求。 render master:后台背景进程,根据人机交互的结果进行作业提交以调度。 本地数据库:主要存放用户提交的渲管请求,待渲管任务结束后自动删除该信息。 2. 渲管的部署 在阿里云云市场有已安装了渲管的 ECS 镜像免费售卖,在启动 ECS 实例时,将镜像指定为镜像市场中的 rendermanager,启动即可使用。 A) 获取渲管镜像 官方渲管镜像:RenderManager 镜像,创建 ECS 实例时,选择镜像市场,直接搜索以上关键字即可获取。自定义渲管镜像:基础镜像建议采用 Ubuntu 14.04 64 位,按照以下步骤安装渲管系统。 安装 flask sudo apt-get install python-flask -y 安装 uwsgi sudo apt-get install uwsgi uwsgi-plugin-python -y 安装 nginx sudo apt-get install nginx –y 修改 nginx 配置,在 http 模块里添加新的 server server { listen 1314; #listen port server_name localchost; location / { include uwsgi_params; uwsgi_pass 0.0.0.0:8818;#this must be same app_config.xml } } vi /etc/nginx/nginx.conf 启动 nginx 或重启 nginx 获取最新版渲管 wget http://openrm.oss-cn-qingdao.aliyuncs.com/render_manager_release/latest/rm.tar.gz 解压 tar –xf rm.tar.gz x.x.x 为版本号 cd rm-x.x.x 指定安装目录部署 python deploy.py /root/rm_install/ 启动 cd /root/rm_install/rm_install_s && python rm_cmd.py start 登陆渲管 http://installed_machine_ip:1314/rm/login 初始账号: rm_admin 密码: rm_admin@123 若监听在公网,建议采用https B) 开通 ECS 实例 请指定某 ECS 实例部署渲管系统,配置参数,请参考创建 Linux 实例 公网 IP 地址选择分配。 镜像市场: RenderManager 或者自定义镜像 设置密码 3. 渲管系统升级 p43页面右上角的版本信息中可以查看是否有可升级的新版本,第一次使用渲管前,建议升级到最新版本后再使用渲管(每次只能升级到下一版本,所以升级后请查看是否已是最新版本)。 渲管系统配置 p44配置页面里有渲管系统的各种系统设置。第一次使用渲管时,必须设置SECURITY_ID,SECURITY_KEY,OSS_BUCKET 三个值,不然渲管无法使用。 SECURITY_ID 和 SECURITY_KEY 即阿里云账号的 AccessKeys 信息,可以在阿里云官网控制台创建。 OSS_BUCKET 可以在 OSS 的控制台创建,用于存储渲管自身的 worker 包已经渲染数据。 渲管默认使用青岛(华北1)区域,如果使用其他区域的 BatchCompute,请修改配置中的OSS_HOST(OSS_BUCKET 必须与 OSS_HOST 属于同一个region)与 BATCHCOMPUTE_REGION,每个 REGION 的 OSS_HOST 也可以工单咨询获取。 区域的选择和计算节点的镜像区域保持一致,若计算节点镜像在深圳区域,则渲管的区域信息也必须是深圳,同时 OSS BUCKET 也必须是该 REGION 下的 BUCKET;若使用批量计算官方提供的计算节点镜像则需要选择深圳 REGION。p45 其他配置项,请参考页面上的说明。 OSS数据上传 提交渲染作业前,一定要将渲染用到的数据上传 OSS,在计算节点启动后再上传的数据将不能在计算节点中访问到。 由于 OSS 页面控制台上传数据有大小限制,所以上传数据建议使用 OSS 的 命令行工具(类 linux系统)、windows 客户端或者 MAC 客户端 。 参考 更多 OSS工具 。 计算节点镜像制作 渲染客户如希望定制计算节点镜像,请参考:自定义镜像。 计算节点镜像管理 A) 添加计算节点镜像 在镜像管理页面,可以添加计算节点镜像 ID。add_image B) 给计算节点镜像配置渲染软件信息 在添加完计算节点镜像 ID 后,在镜像信息页面可以点击添加软件并配置软件信息。image_config 在配置软件信息时,需要填入渲染软件的名称,渲染文件的后缀(用于识别渲染文件)以及执行代码。 执行代码(要求 python 语法)会在渲管 worker 中执行,render_cmd 变量即渲染时的命令行,命令行应根据实际安装的渲染软件来填写,比如渲染软件的路径,以及一些参数。渲管中的模板只是个示例,实际使用需要微调。 render_cmd 渲管已经预定义了一些变量和函数,在执行代码中可以调用这些变量和函数,例如$CPU在执行期会被替换成实际的cpu核数,$START_FRAME在执行期会被替换成起始帧号。 如果想增加自定义参数,可以选择添加参数,添加的自定义参数会需要在提交作业时填入。关于所有的可用变量可在软件配置页面点击查看。 $OUTPUT_LOCAL_DIR这个变量即创建项目时配置的节点内临时输出路径,渲染的输出结果应该放在该路径下(大部分渲染器都支持在命令行中指定输出路径),在渲染结束后该目录下的数据会被传输到 OSS。 项目管理 A) 项目创建 创建项目时需要指定 OSS 数据映射,计算节点镜像,虚拟机内的临时输出路径,OSS 输出路径。 i. 计算节点镜像 创建项目时选择的计算节点镜像(需要先在镜像管理页面添加计算节点镜像)是提交 AutoCluster 作业时使用的镜像,如果提交作业时指定了集群(在集群管理页面可以创建)则作业直接跑在所指定的集群中。 ii. OSS数据映射 OSS 数据映射(或者称 OSS 数据挂载),可以将 OSS 上的数据映射到计算节点的本地路径(windows 是盘符),一个作业中的所有计算节点可以共享访问到相同的数据。OSS 数据挂载有如下功能或限制: 映射的目的路径必须根据计算节点镜像实际的操作系统类型进行填写,否则会导致挂载失败,windows 只能映射到盘符(例 G:),linux 必须是绝对路径。 可共享读取访问 OSS 上的数据。 不支持修改 OSS 上已存在的文件和文件夹名称。 选择 WriteSupport 后,支持本地(挂载路径下)文件和文件夹的创建,以及新建文件的修改。 挂载的本地路径里的改动只是本计算节点可见,不会同步到 OSS。 在 Windows 系统中,在挂载时刻已存在的文件夹中创建的文件或文件夹将不支持删除操作,linux 系统可以。 选择 LockSupport 后,将可以使用文件锁功能(只影响 windows)。 OSS 数据挂载会有分布式cache(集群内),所以在大规模并发读取数据时性能较好(能达到 10MB~30MB,200 台并发,读取 20G 数据)。 OSS 路径必须以’/’结尾。 iii. OSS 输出目录与临时本地输出目录 渲染作业结束时,计算节点中的临时输出目录中的数据将会被传输到 OSS 输出目录中。临时输出路径格式必须与节点的操作系统类型对应,不然会出错。 B) 提交渲染任务 p41选择目的集群和场景所在的 OSS 路径前缀后进入提交的详细页面,选中场景文件的上一级目录,可以被提交渲染的场景文件则会被列出,勾选想要渲染的文件,选择配置的渲染软件和起止帧,即可提交渲染作业。 可指定节点数量,如果指定集群,并发数量上限是集群的节点数上限。填入的起止帧会均匀的分布在各个计算节点被渲染。p42 任务结束后可以在OSS上查看输出结果,如果开启自动下载(配置页面设置),渲管会在任务结束后将OSS上的输出结果下载到渲管部署的机器上。 C) 渲染日志 在节点列表页面,点击节点可以查看各种日志,渲管 worker 日志里都是渲管系统 worker 的日志,里面可以查看该计算节点中运行的实际渲染命令行。 渲染器标准输出和渲染器标准输出里的日志,就是渲染软件的输出日志。 p47 调试 新启动的渲管需要进行配置,并进行调试然后再提交大规模的渲染任务。 配置完,应该先提交1帧测试任务,查看错误日志(渲管 worker 日志和渲染器标准输出)调整渲染软件配置(主要是修改渲染命令行),走通全流程并确认结果没有问题后才进行正式生产渲染。 当作业失败的时候可以在作业信息中查看失败原因项。p46 建议创建一个集群然后将作业提交到该集群进行调试(AutoCluster 的作业需要启停计算节点,比较费时)。 集群管理 在集群管理页面可以创建自定义集群,需要选择所需的计算节点镜像 ID,节点的实例类型(BatchCompute 的不同区域可能支持的实例类型和磁盘类型不同,详细可以提工单咨询)。 磁盘类型和磁盘大小(根据实际制作的计算节点镜像的磁盘大小选择,选择过小会导致无法启动计算节点)。创建好的集群可以动态调整节点数量,甚至调整数量到 0。p48

1934890530796658 2020-03-28 20:45:20 0 浏览量 回答数 0

问题

人工智能技术百问——机器真的能取代人类吗

yq传送门 2019-12-01 20:27:57 4467 浏览量 回答数 3

问题

程序员报错QA大分享(1)

问问小秘 2020-06-18 15:46:14 8 浏览量 回答数 1

问题

【阿里云产品公测】以开发者角度看ACE服务『ACE应用构建指南』

mr_wid 2019-12-01 21:10:06 20092 浏览量 回答数 6

问题

什么是Logtail?

轩墨 2019-12-01 21:51:42 1799 浏览量 回答数 0

回答

12月17日更新 请问下同时消费多个topic的情况下,在richmap里面可以获取到当前消息所属的topic吗? 各位大佬,你们实时都是怎样重跑数据的? 有木有大神知道Flink能否消费多个kafka集群的数据? 这个问题有人遇到吗? 你们实时读取广业务库到kafka是通过什么读的?kafka connector 的原理是定时去轮询,这样如果表多了,会不会影响业务库的性能?甚至把业务库搞挂? 有没有flink 1.9 连接 hive的例子啊?官网文档试了,没成功 请问各位是怎么解决实时流数据倾斜的? 请问一下,对于有状态的任务,如果任务做代码升级的时候,可否修改BoundedOutOfOrdernessTimestampExtractor的maxOutOfOrderness呢?是否会有影响数据逻辑的地方呢? 老哥们有做过统计从0点开始截止到现在时刻的累计用户数吗? 比如五分钟输出一次,就是7点输出0点到7点的累计用户,7:05输出0点到7:05的累计用户。 但是我这里有多个维度,现在用redis来做的。 想知道有没有更好的姿势? 实时数仓用什么存储介质来存储维表,维表有大有小,大的大概5千万左右。 各位大神有什么建议和经验分享吗? 请教个问题,就是flink的窗口触发必须是有数据才会触发吗?我现在有个这样的需求,就是存在窗口内没有流数据进入,但是窗口结束是要触发去外部系统获取上一个窗口的结果值作为本次窗口的结果值!现在没有流数据进入窗口结束时如何触发? kafkaSource.setStartFromTimestamp(timestamp); 发现kafkasource从指定时间开始消费,有些topic有效,有效topic无效,大佬们有遇到过吗? 各位大佬,flink两个table join的时候,为什么打印不出来数据,已经赋了关联条件了,但是也不报错 各位大佬 请教一下 一个faile的任务 会在这里面存储展示多久啊? 各位大佬,我的程序每五分钟一个窗口做了基础指标的统计,同时还想统计全天的Uv,这个是用State就能实现吗? 大佬们,flink的redis sink是不是只适用redis2.8.5版本? 有CEP 源码中文注释的发出来学习一下吗? 有没有拿flink和tensorflow集成的? 那位大神,给一个java版的flink1.7 读取kafka数据,做实时监控和统计的功能的代码案例。 请问下风控大佬,flink为风控引擎做数据支撑的时候,怎么应对风控规则的不断变化,比如说登录场景需要实时计算近十分钟内登录次数超过20次用户,这个规则可能会变成计算近五分钟内登录次数超过20次的。 想了解一下大家线上Flink作业一般开始的时候都分配多少内存?广播没办法改CEP flink支持多流(大于2流)join吗? 谁能帮忙提供一下flink的多并行度的情况下,怎么保证数据有序 例如map并行度为2 那就可能出现数据乱序的情况啊 请教下现在从哪里可以可以看单任务的运行状况和内存占用情况,flink页面上能看单个任务的内存、cpu 大佬们 flink1.9 停止任务手动保存savepoint的命令是啥? flink 一个流计算多个任务和 还是一个流一个任务好? flink 1.9 on yarn, 自定义个connector里面用了jni, failover以后 就起不来了, 报错重复load so的问题。 我想问一下 这个,怎么解决。 难道flink 里面不能用jni吗。 ide里面调试没有问题,部署到集群就会报错了,可能什么问题? 请教一下对于长时间耗内存很大的任务,大家都是开checkpoint机制,采用rocksdb做状态后端吗? 请问下大佬,flink jdbc读取mysql,tinyin字段类型自动转化为Boolean有没有好的解决方法 Flink 1.9版本的Blink查询优化器,Hive集成,Python API这几个功能好像都是预览版,请问群里有大佬生产环境中使用这些功能了吗? 想做一个监控或数据分析的功能,如果我flink 的datastreaming实现消费Kafka的数据,但是我监控的规则数据会增加或修改,但是不想停这个正在运行的flink程序,要如何传递这个动态变化的规则数据,大神给个思路,是用ConnectedStream这个吗?还是用Broadcast ?还有一个,比如我的规则数据是存放在Mysql表中,用什么事件隔30秒去触发读取mysql规则表呢?谢谢! 想做一个监控或数据分析的功能,如果我flink 的datastreaming实现消费Kafka的数据,但是我监控的规则数据会增加或修改,但是不想停这个正在运行的flink程序,要如何传递这个动态变化的规则数据,大神给个思路,是用ConnectedStream这个吗?还是用Broadcast ?还有一个,比如我的规则数据是存放在Mysql表中,用什么事件隔30秒去触发读取mysql规则表呢?谢谢! 各位大佬,在一个 Job 计算过程中,查询 MySQL 来补全额外数据,是一个好的实践嘛?还是说流处理过程中应该尽量避免查询额外的数据? Flink web UI是jquery写的吗? 12月9日更新 成功做完一次checkpoint后,会覆盖上一次的checkpoint吗? 数据量较大时,flink实时写入hbase能够异步写入吗? flink的异步io,是不是只是适合异步读取,并不适合异步写入呀? 请问一下,flink将结果sink到redis里面会不会对存储的IO造成很大的压力,如何批量的输出结果呢? 大佬们,flink 1.9.0版本里DataStream api,若从kafka里加载完数据以后,从这一个流中获取数据进行两条业务线的操作,是可以的吗? flink 中的rocksdb状态怎么样能可视化的查看有大佬知道吗? 感觉flink 并不怎么适合做hive 中的计算引擎来提升hive 表的查询速度 大佬们,task端rocksdb状态 保存路径默认是在哪里的啊?我想挂载个新磁盘 把状态存到那里去 flink 的state 在窗口滑动到下一个窗口时候 上一个窗口销毁时候 state会自己清除吗? 求助各位大佬,一个sql里面包含有几个大的hop滑动窗口,如15个小时和24个小时,滑动步长为5分钟,这样就会产生很多overlap 数据,导致状态会很快就达到几百g,然后作业内存也很快达到瓶颈就oom了,然后作业就不断重启,很不稳定,请问这个业务场景有什么有效的解决方案么? 使用jdbcsink的时候,如果连接长时间不使用 就会被关掉,有人遇到过吗?使用的是ddl的方式 如何向云邪大佬咨询FLink相关技术问题? 请问各位公司有专门开发自己的实时计算平台的吗? 请问各位公司有专门开发自己的实时计算平台的吗? 有哪位大佬有cdh集成安装flink的文档或者手册? 有哪位大佬有cdh集成安装flink的文档或者手册? 想问下老哥们都是怎么统计一段时间的UV的? 是直接用window然后count嘛? Flink是不是也是这样的? 请问现在如有个实时程序,根据一个mysql的维表来清洗,但是我这个mysql表里面就只有几条信息且可能会变。 我想同一个定时器去读mysql,然后存在对象中,流清洗的时候读取这个数据,这个想法可行吗?我目前在主类里面定义一个对象,然后往里面更新,发现下面的map方法之类的读不到我更新进去的值 有大佬做过flink—sql的血缘分析吗? 12月3日更新 请教一下,为什么我flume已经登录成功了keytab认证的kafka集群,但是就是消费不到数据呢? flink 写入mysql 很长一段时间没有写入,报错怎么解决呢? flink timestamp转换为date类型,有什么函数吗 Run a single Flink job on YARN 我采用这种模式提交任务,出现无法找到 开启 HA 的ResourceManager Failed to connect to server: xxxxx:8032: retries get failed due to exceeded maximum allowed retries number: 0 有大佬遇到过吗 ? 各位大佬,请问有Flink写S3的方案吗? flink 连接hbase 只支持1.4.3版本? onnector: type: hbase version: "1.4.3" 请问 flink1.9能跑在hadoop3集群上吗? 滑动窗口 排序 报错这个是什么原因呢? 这个pravega和kafka有啥区别? flink 开发里数据源配置了RDS,但是在RDS里没有看到创建的表,是为什么呢? Tumbling Window里的数据,是等窗口期内的数据到齐之后一次性处理,还是到了一条就处理一条啊 双流join后再做time window grouping. 但是双流join会丢失时间属性,请问大家如何解决 stream processing with apache flink,这本书的中译版 现在可以买吗? flink on yarn时,jm和tm占用的内存最小是600M,这个可以修改吗? 各位大佬,使用默认的窗口Trigger,在什么情况下会触发两次啊?窗口关闭后,然后还来了这个窗口期内的数据,并且开了allowedLateness么? flink web里可以像storm那样 看每条数据在该算子中的平均耗时吗? 各位大佬,flink任务的并发数调大到160+以后,每隔几十分钟就会出现一次TM节点连接丢失的异常,导致任务重启。并发在100时运行比较稳定,哪位大佬可以提供下排查的思路? 感觉stateful function 是下一个要发力的点,这个现在有应用案例吗? 我有2个子网(a子网,b子网)用vpn联通,vpn几周可能会断一次。a子网有一个kafka集群,b子网运行我自己的flink集群和应用,b子网的flink应用连接到a子网的kafka集群接收消息来处理入库到数仓去。我的问题是,如果vpn断开,flink consumer会异常整个作业退出吗?如果作业退出,我重连vpn后,能从auto checkpoint再把flink应用恢复到出错时flink kafka consumer应该读取的partition/offset位置吗?flink的checkpoint除了保存自己开发的算子里的state,kafkaconsumer里的partition/offset也会保存和恢复吗? flink的反压为什么不加入metrics呢 hdfs是不是和flink共用一个集群? flink消费kafka,可以从指定时间消费的吗?目前提供的接口只是根据offset消费?有人知道怎么处理? flink 的Keyby是不是只是repartition而已?没有将key相同的数据放到一个组合里面 电商大屏 大家推荐用什么来做吗? 我比较倾向用数据库,因为有些数据需要join其他表,flink充当了什么角色,对这个有点迷,比如统计当天订单量,卖了多少钱,各个省的销量,销售金额,各个品类的销售量销售金额 开源1.9的sql中怎么把watermark给用起来,有大神知道吗? 有没有人能有一些flink的教程 代码之类的分享啊 采用了checkpoint,程序停止了之后,什么都不改,直接重启,还是能接着继续运行吗?如果可以的话,savepoint的意义又是什么呢? 有人做过flink 的tpc-ds测试吗,能不能分享一下操作的流程方法 checkpoint是有时间间隔的,也就可以理解为checkpoint是以批量操作的,那如果还没进行ckecnpoint就挂了,下次从最新的一次checkpoint重启,不是重复消费了? kafka是可以批量读取数据,但是flink是一条一条处理的,应该也可以一条一条提交吧。 各位大佬,flink sql目前是不是不支持tumbling window join,有人了解吗? 你们的HDFS是装在taskmanager上还是完全分开的,请问大佬们有遇到这种情况吗? 大佬们flink检查点存hdfs的话怎么自动清理文件啊 一个128M很快磁盘就满了 有谁遇到过这个问题? 请教一下各位,这段代码里面,我想加一个trigger,实现每次有数据进window时候,就输出,而不是等到window结束再输出,应该怎么加? 麻烦问下 flink on yarn 执行 客户端启动时 报上面错,是什么原因造成的 求大佬指点 ERROR org.apache.flink.client.program.rest.RestClusterClient - Error while shutting down cluster java.util.concurrent.ExecutionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. 大家怎么能动态的改变 flink WindowFunction 窗口数据时间 flink on yarn之后。yarn的日志目录被写满,大家如配置的? Flink1.9 启动 yarn-session报这个错误 怎么破? yarn 模式下,checkpoint 是存在 JobManager的,提交任务也是提交给 JobManager 的吧? heckpoint机制,会不会把window里面的数据全部放checkpoint里面? Flink On Yarn的模式下,如果通过REST API 停止Job,并触发savepiont呢 jenkins自动化部署flink的job,一般用什么方案?shell脚本还是api的方式? 各位大佬,开启增量checkpoint 情况下,这个state size 是总的checkpoint 大小,还是增量上传的大小? 想用状态表作为子表 外面嵌套窗口 如何实现呢 因为状态表group by之后 ctime会失去时间属性,有哪位大佬知道的? 你们有试过在同样的3台机器上部署两套kafka吗? 大家有没有比较好的sql解析 组件(支持嵌套sql)? richmapfuntion的open/close方法,和处理数据的map方法,是在同一个线程,还是不同线程调用的? flink on yarn 提交 参数 -p 20 -yn 5 -ys 3 ,我不是只启动了5个container么? Flink的乱序问题怎么解决? 我对数据流先进行了keyBy,print的时候是有数据的,一旦进行了timeWindow滑动窗口就没有数据了,请问是什么情况呢? 搭建flinksql平台的时候,怎么处理udf的呀? 怎么查看sentry元数据里哪些角色有哪些权限? 用java api写的kafka consumer能消费到的消息,但是Flink消费不到,这是为啥? 我state大小如果为2G左右 每次checkpoint会不会有压力? link-table中的udaf能用deltaTrigger么? flink1.7.2,场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温 001 Flink集群支持kerberos认证吗?也就是说flink客户端需要向Flink集群进行kerberos认证,认证通过之后客户端才能提交作业到Flink集群运行002 Flink支持多租户吗? 如果要对客户端提交作业到flink进行访问控制,你们有类似的这种使用场景吗? flink可以同时读取多个topic的数据吗? Flink能够做实时ETL(oracle端到oracle端或者多端)么? Flink是否适合普通的关系型数据库呢? Flink是否适合普通的关系型数据库呢? 流窗口关联mysql中的维度表大佬们都是怎么做的啊? 怎么保证整个链路的exactly one episode精准一次,从source 到flink到sink? 在SQL的TUMBLE窗口的统计中,如果没数据进来的,如何让他也定期执行,比如进行count计算,让他输出0? new FlinkKafkaConsumer010[String]("PREWARNING",new JSONKeyValueDeserializationSchema(true), kafkaProps).setStartFromGroupOffsets() ) 我这样new 它说要我传个KeyedDeserializationSchema接口进去 flink里面broadcast state想定时reload怎么做?我用kafka里的stream flink独立模式高可用搭建必需要hadoop吗? 有人用增量cleanupIncrementally的方式来清理状态的嘛,感觉性能很差。 flink sink to hbase继承 RichOutputFormat运行就报错 kafka 只有低级 api 才拿得到 offset 吗? 有个问题咨询下大家,我的flinksql中有一些参数是要从mysql中获取的,比如我flink的sql是select * from aa where cc=?,这个问号的参数需要从mysql中获取,我用普通的jdbc进行连接可以获的,但是有一个问题,就是我mysql的数据改了之后必须重启flink程序才能解决这个问题,但这肯定不符合要求,请问大家有什么好的办法吗? flink里怎样实现多表关联制作宽表 flink写es,因为半夜es集群做路由,导致写入容易失败,会引起source的反压,然后导致checkpoint超时任务卡死,请问有没有办法在下游es处理慢的时候暂停上游的导入来缓解反压? flink 写parquet 文件,使用StreamingFileSink streamingFileSink = StreamingFileSink.forBulkFormat( new Path(path), ParquetAvroWriters.forReflectRecord(BuyerviewcarListLog.class)). withBucketAssigner(bucketAssigner).build(); 报错 java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer 1.7.2 NoWindowInnerJoin这个实现,我看实现了CleanupState可更新过期时间删除当前key状态的接口,是不是这个1.7.2版本即使有个流的key一直没有被匹配到他的状态也会被清理掉,就不会存在内存泄漏的问题了? flink1.7.2 想在Table的UDAF中使用State,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用,大佬,如何在Table的UDAF中使用State啊? Flink有什么性能测试工具吗? 项目里用到了了KafkaTableSourceSinkFactory和JDBCTableSourceSinkFactory。maven打包后,META-INF里只会保留第一个 标签的org.apache.flink.table.factories.TableFactory内容。然后执行时就会有找不到合适factory的报错,请问有什么解决办法吗? 为什么这个这段逻辑 debug的时候 是直接跳过的 各位大佬,以天为单位的窗口有没有遇到过在八点钟的时候会生成一条昨天的记录? 想问一下,我要做一个规则引擎,需要动态改变规则,如何在flink里面执行? flink-1.9.1/bin/yarn-session.sh: line 32: construc 我要用sql做一个规则引擎,需要动态改变规则,如何在flink里面执行? 我要用sql做一个规则引擎,需要动态改变规则,如何在flink里面执行? 一般公司的flink job有没有进程进行守护?有专门的工具或者是自己写脚本?这种情况针对flink kafka能不能通过java获取topic的消息所占空间大小? Flink container was removed这个咋解决的。我有时候没有数据的时候也出现这 大家有没有这种场景,数据从binlog消费,这个信息是订单信息,同一个订单id,会有不同状态的变更 问大家个Hive问题,新建的hive外部分区表, 怎么把HDFS数据一次性全部导入hive里 ? flink里面的broadcast state值,会出现broad流的数据还没put进mapstat Flink SQL DDL 创建表时,如何定义字段的类型为proctime? 请问下窗口计算能对历史数据进行处理吗?比如kafka里的写数据没停,窗口计算的应用停掉一段时间再开起 请问下,想统计未退费的订单数量,如果一个订单退费了(发过来一个update流),flink能做到对结果进行-1吗,这样的需求sql支持吗? 使用Flink sql时,对table使用了group by操作。然后将结果转换为流时是不是只能使用的toRetractStream方法不能使用toAppendStream方法。 百亿数据实时去重,有哪位同学实践过吗? 你们的去重容许有误差?因为bloom filter其实只能给出【肯定不存在】和【可能存在】两种结果。对于可能存在这种结果,你们会认为是同一条记录? 我就运行了一个自带的示例,一运行就报错然后web页面就崩了 flink定时加载外部数据有人做过吗? NoSuchMethodError: org.apache.flink.api.java.Utils.resolveFactory(Ljava/lang/ThreadLocal;Ljava/lang/Object;)Ljava/util/Optional 各位知道这个是那个包吗? flink 可以把大量数据写入mysql吗?比如10g flink sql 解析复杂的json可以吗? 在页面上写规则,用flink执行,怎么传递给flink? 使用cep时,如何动态添加规则? 如何基于flink 实现两个很大的数据集的交集 并集 差集? flink的应用场景是?除了实时 各位好,请教一下,滑动窗口,每次滑动都全量输出结果,外部存储系统压力大,是否有办法,只输出变化的key? RichSinkFunction close只有任务结束时候才会去调用,但是数据库连接一直拿着,最后成了数据库连接超时了,大佬们有什么好的建议去处理吗?? 为啥我的自定义函数注册,然后sql中使用不了? 请问一下各位老师,flink flapmap 中的collector.collect经常出现Buffer pool is destroyed可能是什么原因呢? 用asyncIO比直接在map里实现读hbase还慢,在和hbase交互这块儿,每个算子都加了时间统计 请教一下,在yarn上运行,会找不到 org.apache.flink.streaming.util 请问下大佬,flink1.7.2对于sql的支持是不是不怎么好啊 ,跑的数据一大就会报错。 各位大佬,都用什么来监控flink集群? flink 有那种把多条消息聚合成一条的操作吗,比如说每五十条聚合成一条 如何可以让checkpoint 跳过对齐呢? 请问 阿里云实时计算(Blink)支持这4个源数据表吗?DataHub Kafka MQ MaxCompute? 为啥checkpoint时间会越来越长,请问哪位大佬知道是因为啥呢? 请问Flink的最大并行度跟kafka partition数量有关系吗? source的并行度应该最好是跟partition数量一致吧,那剩下的算子并行度呢? Flink有 MLIB库吗,为什么1.9中没有了啊? 请教一下,有没有flink ui的文章呢?在这块内存配置,我给 TM 配置的内存只有 4096 M,但是这里为什么对不上呢?请问哪里可以看 TM 内存使用了多少呢? 请教个问题,fink RichSinkFunction的invoke方法是什么时候被调用的? 请教一下,flink的window的触发条件 watermark 小于 window 的 end_time。这个 watermark 为什么是针对所有数据的呢?没有设计为一个 key 一个 watermark 呢? 就比如说有 key1、key2、key3,有3个 watermark,有 3个 window interval不支持left join那怎么可以实现把窗口内左表的数据也写到下游呢? 各位 1、sink如何只得到最终的结果而不是也输出过程结果 ;2、不同的运算如何不借助外部系统的存储作为另外一个运算的source 请教各位一个问题,flink中设置什么配置可以取消Generic这个泛型,如图报错: 有大佬在吗,线上遇到个问题,但是明明内存还有200多G,然后呢任务cancel不了,台也取消不了程序 flink遇到The assigned slot container_1540803405745_0094_01_000008_1 was removed. 有木有大佬遇到过。在flink on yarn上跑 这个报错是什么意思呢?我使用滑动窗口的时候出现报错 flink 双流union状态过期不清理有遇到的吗? 大家有没有这种场景,数据从binlog消费,这个信息是订单信息,同一个订单id,会有不同状态的变更,如果订单表与商品明细join查询,就会出现n条重复数据,这样数据就不准了,flink 这块有没有比较好的实战经验的。 大佬们、有没有人遇到过使用一分钟的TumblingEventTimeWindows,但是没有按时触发窗口、而是一直等到下一条消息进来之后才会把这个窗口的数据发送出去的? flink 有办法 读取 pytorch的 模型文件吗? 大佬们、有没有人遇到过使用一分钟的TumblingEventTimeWindows,但是没有按时触发窗口、而是一直等到下一条消息进来之后才会把这个窗口的数据发送出去的? flink timestamp转换为date类型,有什么函数吗 flink 写入mysql 很长一段时间没有写入,报错怎么解决呢? flink 有办法 读取 pytorch的 模型文件吗? 有没有大佬知道实时报表怎么做?就是统计的结果要实时更新,热数据。 刚接触flink 1.9 求问flink run脚本中怎么没有相关提交到yarn的命令了 请教一下,flink里怎么实现batch sink的操作而不导致数据丢失

问问小秘 2019-12-02 03:19:17 0 浏览量 回答数 0

回答

阿里云ECS在已有的系统事件的基础上,通过云监控新发布了状态变化类事件和抢占型实例的中断通知事件。每当ECS实例的状态发生变化的时候,都会触发一条ECS实例状态变化事件。这种变化包括您在控制台/OpenAPI/SDK操作导致的变化,也包括弹性伸缩或欠费等原因而自动触发的变化,还包括因为系统异常而触发的变化。 云监控以前发布的系统事件,主要针对告警后人工介入的场景,而这次新发布的事件属于正常类的信息通知,适合自动化的审计运维等场景。为了自动化处理ECS状态变化事件,云监控提供了两种主要途径:一种是通过函数计算,另一种是通过MNS消息队列。本文将为您介绍利用MNS消息队列自动化处理ECS事件的三种最佳实践。 自动化处理ECS状态变化事件的准备工作 创建消息队列 登录MNS控制台。 在队列列表页面,选择地域,单击右上角的创建队列,进入新建队列页面。 输入队列的名称(例如“ecs-cms-event”)等信息,单击确认即可完成创建消息队列。 创建事件报警规则 登录云监控控制台。 单击左侧导航栏中的事件监控,进入事件查询页面 单击报警规则页签,然后单击右上角的创建事件报警,弹出创建/修改事件报警对话框。 在基本信息区域,填写报警规则名称,例如如“ecs-test-rule”。 设置事件报警规则:选择事件类型为系统事件。 产品类型、事件等级、事件名称:产品类型选择云服务器ECS,事件类型选择StatusNotification,其余按照实际情况填写。 资源范围:选择全部资源时,任何资源发生相关事件,都会按照配置发送通知;选择应用分组时,只有指定分组内的资源发生相关事件时,才会发送通知。 在报警方式中,选择消息队列,然后选择地域和队列(例如ecs-cms-event)。 完成以上设置后,单击确定按钮即可完成创建事件报警规则。 安装Python依赖 本文所有的代码均使用Python 3.6测试通过,您也可以使用Java等其他编程语言。 请使用Pypi安装以下Python依赖: aliyun-python-sdk-core-v3>=2.12.1 aliyun-python-sdk-ecs>=4.16.0 aliyun-mns>=1.1.5 自动化处理ECS状态变化事件的实施步骤 云监控会把云服务器ECS所有的状态变化事件都投递到MNS里面,接下来我们需要通过编写代码从MNS获取消息并进行消息处理。 实践一:对所有ECS的创建和释放事件进行记录 目前ECS控制台无法查询已经释放的实例。如果您有查询需求,可以通过ECS状态变化事件把所有ECS的生命周期记录在自己的数据库或者日志里。每当创建ECS时,会首先发送一个Pending事件,每当释放ECS时,会最后发送一个Deleted事件。我们需要对这两种事件进行记录。 编辑一个Conf文件。需包含mns的endpoint(可以登录MNS的控制台,在队列列表页,单击获取Endpoint得到)、阿里云的access key和secrect、region id(例如cn-beijing)以及mns queue的名字。 class Conf: endpoint = 'http://<id>.mns.<region>.aliyuncs.com/' access_key = '<access_key>' access_key_secret = '<access_key_secrect>' region_id = 'cn-beijing' queue_name = 'test' vsever_group_id = '<your_vserver_group_id>' 使用MNS的SDK编写一个MNS Client用来获取MNS消息。 # -*- coding: utf-8 -*- import json from mns.mns_exception import MNSExceptionBase import logging from mns.account import Account from . import Conf class MNSClient(object): def __init__(self): self.account = Account(Conf.endpoint, Conf.access_key, Conf.access_key_secret) self.queue_name = Conf.queue_name self.listeners = dict() def regist_listener(self, listener, eventname='Instance:StateChange'): if eventname in self.listeners.keys(): self.listeners.get(eventname).append(listener) else: self.listeners[eventname] = [listener] def run(self): queue = self.account.get_queue(self.queue_name) while True: try: message = queue.receive_message(wait_seconds=5) event = json.loads(message.message_body) if event['name'] in self.listeners: for listener in self.listeners.get(event['name']): listener.process(event) queue.delete_message(receipt_handle=message.receipt_handle) except MNSExceptionBase as e: if e.type == 'QueueNotExist': logging.error('Queue %s not exist, please create queue before receive message.', self.queue_name) else: logging.error('No Message, continue waiting') class BasicListener(object): def process(self, event): pass 上述代码只是对MNS消息进行拉取,调用Listener消费消息之后删除消息,后面的实践也会用到。 注册一个Listener进消费指定事件。这个简单的Listener判断收到Pending和Deleted事件时,打印一行日志。 # -*- coding: utf-8 -*- import logging from .mns_client import BasicListener class ListenerLog(BasicListener): def process(self, event): state = event['content']['state'] resource_id = event['content']['resourceId'] if state == 'Panding': logging.info(f'The instance {resource_id} state is {state}') elif state == 'Deleted': logging.info(f'The instance {resource_id} state is {state}') Main函数可以这么写: mns_client = MNSClient() mns_client.regist_listener(ListenerLog()) mns_client.run() 实际生产环境下,可能需要把事件存储在数据库里,或者利用SLS日志服务,方便后期的搜索和审计。 实践二:ECS的关机自动重启 在某些场景下,ECS会非预期的关机,您可能需要自动重启已经关机的ECS。 为了实现这一目的,我们复用实践一里面的MNS Client,添加一个新的Listener。当收到Stopped事件的时候,对该ECS执行一个Start命令。 -- coding: utf-8 -- import logging from aliyunsdkecs.request.v20140526 import StartInstanceRequest from aliyunsdkcore.client import AcsClient from .mns_client import BasicListener from .config import Conf class ECSClient(object): def init(self, acs_client): self.client = acs_client # 启动ECS实例 def start_instance(self, instance_id): logging.info(f'Start instance {instance_id} ...') request = StartInstanceRequest.StartInstanceRequest() request.set_accept_format('json') request.set_InstanceId(instance_id) self.client.do_action_with_exception(request) class ListenerStart(BasicListener): def init(self): acs_client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id) self.ecs_client = ECSClient(acs_client) def process(self, event): detail = event['content'] instance_id = detail['resourceId'] if detail['state'] == 'Stopped': self.ecs_client.start_instance(instance_id) 在实际生产环境下,执行完Start命令后,可能还需要继续接收后续的Starting/Running/Stopped等事件,再配合计时器和计数器,进行Start成功或失败之后的处理。 实践三:抢占型实例释放前,自动从SLB移除 抢占型实例在释放之前五分钟左右,会发出释放告警事件,您可以利用这短暂的时间运行一些业务不中断的逻辑。例如,主动从SLB的后端服务器中去掉这台即将被释放的抢占型实例,而不是被动等待实例释放后SLB的自动处理。 我们还是复用实践一的MNS Client,添加一个新的Listener,当收到抢占型实例的释放告警时,调用SLB的SDK。 -- coding: utf-8 -- from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest from .mns_client import BasicListener from .config import Conf class SLBClient(object): def init(self): self.client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id) self.request = CommonRequest() self.request.set_method('POST') self.request.set_accept_format('json') self.request.set_version('2014-05-15') self.request.set_domain('slb.aliyuncs.com') self.request.add_query_param('RegionId', Conf.region_id) def remove_vserver_group_backend_servers(self, vserver_group_id, instance_id): self.request.set_action_name('RemoveVServerGroupBackendServers') self.request.add_query_param('VServerGroupId', vserver_group_id) self.request.add_query_param('BackendServers', "[{'ServerId':'" + instance_id + "','Port':'80','Weight':'100'}]") response = self.client.do_action_with_exception(self.request) return str(response, encoding='utf-8') class ListenerSLB(BasicListener): def init(self, vsever_group_id): self.slb_caller = SLBClient() self.vsever_group_id = Conf.vsever_group_id def process(self, event): detail = event['content'] instance_id = detail['instanceId'] if detail['action'] == 'delete': self.slb_caller.remove_vserver_group_backend_servers(self.vsever_group_id, instance_id)

景凌凯 2020-03-25 22:23:53 0 浏览量 回答数 0

回答

本文通过实践案例为您介绍云监控如何利用MNS消息队列实现自动化处理ECS状态变化事件。 背景信息 阿里云ECS在已有的系统事件的基础上,通过云监控新发布了状态变化类事件和抢占型实例的中断通知事件。每当ECS实例的状态发生变化的时候,都会触发一条ECS实例状态变化事件。这种变化包括您在控制台/OpenAPI/SDK操作导致的变化,也包括弹性伸缩或欠费等原因而自动触发的变化,还包括因为系统异常而触发的变化。 云监控以前发布的系统事件,主要针对告警后人工介入的场景,而这次新发布的事件属于正常类的信息通知,适合自动化的审计运维等场景。为了自动化处理ECS状态变化事件,云监控提供了两种主要途径:一种是通过函数计算,另一种是通过MNS消息队列。本文将为您介绍利用MNS消息队列自动化处理ECS事件的三种最佳实践。 自动化处理ECS状态变化事件的准备工作 创建消息队列 登录MNS控制台。 在队列列表页面,选择地域,单击右上角的创建队列,进入新建队列页面。 输入队列的名称(例如“ecs-cms-event”)等信息,单击确认即可完成创建消息队列。 创建事件报警规则 登录云监控控制台。 单击左侧导航栏中的事件监控,进入事件查询页面 单击报警规则页签,然后单击右上角的创建事件报警,弹出创建/修改事件报警对话框。 在基本信息区域,填写报警规则名称,例如如“ecs-test-rule”。 设置事件报警规则:选择事件类型为系统事件。 产品类型、事件等级、事件名称:产品类型选择云服务器ECS,事件类型选择StatusNotification,其余按照实际情况填写。 资源范围:选择全部资源时,任何资源发生相关事件,都会按照配置发送通知;选择应用分组时,只有指定分组内的资源发生相关事件时,才会发送通知。 在报警方式中,选择消息队列,然后选择地域和队列(例如ecs-cms-event)。 完成以上设置后,单击确定按钮即可完成创建事件报警规则。 安装Python依赖 本文所有的代码均使用Python 3.6测试通过,您也可以使用Java等其他编程语言。 请使用Pypi安装以下Python依赖: aliyun-python-sdk-core-v3>=2.12.1 aliyun-python-sdk-ecs>=4.16.0 aliyun-mns>=1.1.5 自动化处理ECS状态变化事件的实施步骤 云监控会把云服务器ECS所有的状态变化事件都投递到MNS里面,接下来我们需要通过编写代码从MNS获取消息并进行消息处理。 实践一:对所有ECS的创建和释放事件进行记录 目前ECS控制台无法查询已经释放的实例。如果您有查询需求,可以通过ECS状态变化事件把所有ECS的生命周期记录在自己的数据库或者日志里。每当创建ECS时,会首先发送一个Pending事件,每当释放ECS时,会最后发送一个Deleted事件。我们需要对这两种事件进行记录。 编辑一个Conf文件。需包含mns的endpoint(可以登录MNS的控制台,在队列列表页,单击获取Endpoint得到)、阿里云的access key和secrect、region id(例如cn-beijing)以及mns queue的名字。 class Conf: endpoint = 'http:// .mns. .aliyuncs.com/' access_key = '<access_key>' access_key_secret = '<access_key_secrect>' region_id = 'cn-beijing' queue_name = 'test' vsever_group_id = '<your_vserver_group_id>' 使用MNS的SDK编写一个MNS Client用来获取MNS消息。 -- coding: utf-8 -- import json from mns.mns_exception import MNSExceptionBase import logging from mns.account import Account from . import Conf class MNSClient(object): def init(self): self.account = Account(Conf.endpoint, Conf.access_key, Conf.access_key_secret) self.queue_name = Conf.queue_name self.listeners = dict() def regist_listener(self, listener, eventname='Instance:StateChange'): if eventname in self.listeners.keys(): self.listeners.get(eventname).append(listener) else: self.listeners[eventname] = [listener] def run(self): queue = self.account.get_queue(self.queue_name) while True: try: message = queue.receive_message(wait_seconds=5) event = json.loads(message.message_body) if event['name'] in self.listeners: for listener in self.listeners.get(event['name']): listener.process(event) queue.delete_message(receipt_handle=message.receipt_handle) except MNSExceptionBase as e: if e.type == 'QueueNotExist': logging.error('Queue %s not exist, please create queue before receive message.', self.queue_name) else: logging.error('No Message, continue waiting') class BasicListener(object): def process(self, event): pass 上述代码只是对MNS消息进行拉取,调用Listener消费消息之后删除消息,后面的实践也会用到。 注册一个Listener进消费指定事件。这个简单的Listener判断收到Pending和Deleted事件时,打印一行日志。 # -- coding: utf-8 -- import logging from .mns_client import BasicListener class ListenerLog(BasicListener): def process(self, event): state = event['content']['state'] resource_id = event['content']['resourceId'] if state == 'Panding': logging.info(f'The instance {resource_id} state is {state}') elif state == 'Deleted': logging.info(f'The instance {resource_id} state is {state}') Main函数可以这么写: mns_client = MNSClient() mns_client.regist_listener(ListenerLog()) mns_client.run() 实际生产环境下,可能需要把事件存储在数据库里,或者利用SLS日志服务,方便后期的搜索和审计。 实践二:ECS的关机自动重启 在某些场景下,ECS会非预期的关机,您可能需要自动重启已经关机的ECS。 为了实现这一目的,我们复用实践一里面的MNS Client,添加一个新的Listener。当收到Stopped事件的时候,对该ECS执行一个Start命令。 -- coding: utf-8 -- import logging from aliyunsdkecs.request.v20140526 import StartInstanceRequest from aliyunsdkcore.client import AcsClient from .mns_client import BasicListener from .config import Conf class ECSClient(object): def init(self, acs_client): self.client = acs_client # 启动ECS实例 def start_instance(self, instance_id): logging.info(f'Start instance {instance_id} ...') request = StartInstanceRequest.StartInstanceRequest() request.set_accept_format('json') request.set_InstanceId(instance_id) self.client.do_action_with_exception(request) class ListenerStart(BasicListener): def init(self): acs_client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id) self.ecs_client = ECSClient(acs_client) def process(self, event): detail = event['content'] instance_id = detail['resourceId'] if detail['state'] == 'Stopped': self.ecs_client.start_instance(instance_id) 在实际生产环境下,执行完Start命令后,可能还需要继续接收后续的Starting/Running/Stopped等事件,再配合计时器和计数器,进行Start成功或失败之后的处理。 实践三:抢占型实例释放前,自动从SLB移除 抢占型实例在释放之前五分钟左右,会发出释放告警事件,您可以利用这短暂的时间运行一些业务不中断的逻辑。例如,主动从SLB的后端服务器中去掉这台即将被释放的抢占型实例,而不是被动等待实例释放后SLB的自动处理。 我们还是复用实践一的MNS Client,添加一个新的Listener,当收到抢占型实例的释放告警时,调用SLB的SDK。 -- coding: utf-8 -- from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest from .mns_client import BasicListener from .config import Conf class SLBClient(object): def init(self): self.client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id) self.request = CommonRequest() self.request.set_method('POST') self.request.set_accept_format('json') self.request.set_version('2014-05-15') self.request.set_domain('slb.aliyuncs.com') self.request.add_query_param('RegionId', Conf.region_id) def remove_vserver_group_backend_servers(self, vserver_group_id, instance_id): self.request.set_action_name('RemoveVServerGroupBackendServers') self.request.add_query_param('VServerGroupId', vserver_group_id) self.request.add_query_param('BackendServers', "[{'ServerId':'" + instance_id + "','Port':'80','Weight':'100'}]") response = self.client.do_action_with_exception(self.request) return str(response, encoding='utf-8') class ListenerSLB(BasicListener): def init(self, vsever_group_id): self.slb_caller = SLBClient() self.vsever_group_id = Conf.vsever_group_id def process(self, event): detail = event['content'] instance_id = detail['instanceId'] if detail['action'] == 'delete': self.slb_caller.remove_vserver_group_backend_servers(self.vsever_group_id, instance_id) 注意 抢占型实例释放告警的event name与前面不同,应该是“Instance:PreemptibleInstanceInterruption”, mns_client.regist_listener(ListenerSLB(Conf.vsever_group_id), 'Instance:PreemptibleInstanceInterruption') 在实际生产环境下,您可能需要再申请一台新的抢占型实例,挂载到SLB上,来保证服务能力。

1934890530796658 2020-03-25 19:15:43 0 浏览量 回答数 0
阿里云大学 云服务器ECS com域名 网站域名whois查询 开发者平台 小程序定制 小程序开发 国内短信套餐包 开发者技术与产品 云数据库 图像识别 开发者问答 阿里云建站 阿里云备案 云市场 万网 阿里云帮助文档 免费套餐 开发者工具 云栖号物联网 小程序开发制作 视频内容分析 企业网站制作 视频集锦 代理记账服务 2020阿里巴巴研发效能峰会 企业建站模板 云效成长地图 高端建站 云栖号弹性计算 阿里云云栖号 云栖号案例 云栖号直播