• 关于

    python 输出流

    的搜索结果

问题

Python-SDK之如何实现 中文和时间?

青衫无名 2019-12-01 21:41:09 983 浏览量 回答数 0

回答

如果您正在谈论python解释器或CMD.exe,它是您脚本的“父”,那么不可能。在每个类似POSIX的系统中(现在你正在运行Windows,看起来可能有一些我不知道的怪癖,YMMV)每个进程都有三个流,标准输入,标准输出和标准错误。Bu默认(在控制台中运行时)会将这些指向控制台,但可以使用管道符号进行重定向: python script_a.py | python script_b.py 这将脚本a的标准输出流与脚本B的标准输入流联系起来。在此示例中,标准错误仍然发送到控制台。请参阅Wikipedia 上有关标准流的文章。 如果你正在谈论一个子进程,你可以像这样从python启动它(如果你想要双向通信,stdin也是一个选项): import subprocess process = subprocess.Popen(["python", "main.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) x = process.stderr.readline() y = process.stdout.readline() process.wait() 有关管理进程的信息,请参阅Python 子进程模块。对于通信,process.stdin和process.stdout管道被视为标准文件对象。 对于管道使用,从标准输入读取lassevk建议你做这样的事情: import sys x = sys.stderr.readline() y = sys.stdin.readline() sys.stdin和sys.stdout 是上面提到的标准文件对象,在sys模块中定义。

游客gsy3rkgcdl27k 2019-12-02 02:10:33 0 浏览量 回答数 0

问题

如何使用linux将多个值从kapacitor存储到python变量中?

kun坤 2019-12-25 09:43:23 4 浏览量 回答数 1

阿里云试用中心,为您提供0门槛上云实践机会!

0元试用32+款产品,最高免费12个月!拨打95187-1,咨询专业上云建议!

问题

基础语言百问-Python

薯条酱 2019-12-01 20:12:27 56807 浏览量 回答数 30

回答

ava 编写-常用 Hadoop Streaming:使用 unix 标准的输入和输出流作为 hadooop 和应用程序之间的接口,支持像Ruby,python 等不同的编程语言编写 map 和 reduce Hadoop Pipes 是 hadoop 提供的 C++ 的接口的名称

游客6nvww5bb5kd2w 2020-02-14 19:35:05 0 浏览量 回答数 0

回答

本文介绍了 Serverless 工作流的回调功能。相比较轮询,使用回调有效地降低了延迟、减少了轮询对服务器造成的不必要压力。另外,回调功能配合队列可以实现对非 FC 任务的编排,将 Serverless 工作流的编排范围扩展到任意类型的计算资源。 简介 长时间执行的任务通常会采用异步提交任务并返回任务标识 (ID),判断异步任务结束的方法通常有两种:轮询 (polling)和回调 (callback),在任务状态轮询中我们介绍了使用轮询来判断任务结束。Serverless 工作流的回调 (callback)功能,覆盖以下的痛点或场景: 消除轮询周期长带来的不必要延迟。 消除大流量场景下高并发的轮询造成不必要的服务器资源压力和浪费 。 编排非 FC Function 的任务,例如运行在自建机房或 ECS 上的进程 。 需要人工干预的步骤,例如通知审批通过。 下图展示了使用 MNS 队列服务集成结合回调 API 编排自建资源,拓宽了 Serverless 工作流的适用场景。 fnf-doc-service-integration-mns-queue 回调使用详解 在 Task 步骤中指定 pattern: waitForCallback,如下图状态机所示:该步骤会在提交 resourceArn 指定的任务后(如 FC invocation)该步骤会将一个 taskToken 存入到该步骤的 context 对象并进入一个暂停的状态,直到 Serverless 工作流收到回调或指定的任务超时。将 taskToken 传入 ReportTaskSucceed 或 ReportTaskFailed 接口去回调会使得该步骤继续执行。 fnf-doc-callback-state-machine - type: task name: mytask resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function} pattern: waitForCallback # 指定该 Task 步骤在提交任务后等待回调。 inputMappings: - target: taskToken source: $context.task.token # 将 context 中的 taskToken 作为 input 传入 resourceArn 指定的函数。 outputMappings: - target: k source: $local.key # 将 ReportTaskSucceeded 中 output {"key": "value"} 映射成 {"k": "value"} 并作为该步骤的输出。 示例 该示例共分为以下 3 个步骤: 准备 Task Function 开始工作流 回调 步骤 1:准备 Task Function 创建下面一个简单的函数,该函数会将输入直接返回。 服务:fnf-demo。 函数:echo。 运行环境:python2.7。 函数入口:index.handler。 #!/usr/bin/env python import json def handler(event, context): return event 步骤 2:开始工作流 在 Serverless 工作流控制台创建下面的流程,并开始执行。 流程名称:fnf-demo-callback。 流程角色:配置一个有 FC Invocation 权限的角色。 version: v1 type: flow steps: - type: task name: mytask resourceArn: acs:fc:::services/fnf-demo/functions/echo pattern: waitForCallback inputMappings: - target: taskToken source: $context.task.token outputMappings: - target: s source: $local.status 流程开始后可以看到 mytask 步骤暂停在 TaskSubmitted 事件,等待回调。该事件的 output 中含有 taskToken 作为回调任务的标识。 Screen Shot 2019-08-15 at 11.00.09 AM 步骤 3:回调 使用 Serverless 工作流的 Python SDK 在本地(或其他可以运行 Python 的环境)运行 callback.py 脚本,将 {task-token} 替换为 TaskSubmitted 事件中的值。 cd /tmp mkdir fnf-demo-callback cd fnf-demo-callback 在虚拟环境中,安装 fnf python SDK。 virtualenv env source env/bin/activate pip install -t . aliyun-python-sdk-core pip install -t . aliyun-python-sdk-fnf 执行 worker 进程。 export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret} python worker.py {task-token-from-TaskSubmitted} worker.py 代码: import os import sys from aliyunsdkcore.acs_exception.exceptions import ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest def main(): account_id = os.environ['ACCOUNT_ID'] akid = os.environ['AK_ID'] ak_secret = os.environ['AK_SECRET'] fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou") task_token = sys.argv[1] print "task token " + task_token try: request = ReportTaskSucceededRequest.ReportTaskSucceededRequest() request.set_Output("{"status": "ok"}") request.set_TaskToken(task_token) resp = fnf_client.do_action_with_exception(request) print "Report task succeeded finished" except ServerException as e: print(e) if name == 'main': main() 上述脚本回调成功后可以看到 mytask 步骤继续执行, ReportTaskSucceeded 中指定的输出 "{"status": "ok"}" 经过 outputMappings 的映射后变成 "{"s": "ok"}"。

1934890530796658 2020-03-27 10:49:44 0 浏览量 回答数 0

回答

详细解答可以参考官方帮助文档控制台 功能 描述 直播加速域名管理 支持直播加速域名的创建,删除操作 直播录制设置 直播录制创建,存储路径选择,文件名称编辑操作 直播转码设置 针对每个 AppName 创建不同转码模板 直播流管理 分别对实时直播流与历史直播流进行查询 直播流黑名单管理 对流进行黑名单设置与删除 直播录制索引管理 对录制的直播文件创建多条索引文件,支持索引文件查询 设置 NotifyUrl 设置直播流信息推送到的 URL 地址 API 功能 描述 添加 APP 录制配置 配置 APP 录制,输出内容保存到 OSS 中 创建录制索引文件 创建录制索引文件 删除 APP 录制配置 解除录制配置 查询 APP 录制配置 查询域名下指定 App 录制配置 查询域名下录制配置列表 查询域名下所有 App 录制配置 查询在线人数 获取 RTMP 直播流的在线人数,支持基于域名和基于流的查询 查询录制内容 查询某路直播流录制内容 查询单个录制索引文件 查询单个录制索引文件 查询录制索引文件 查询录制索引文件 查询推流黑名单列表 获取域名下直播流播放的黑名单 查询流控历史 获取某个域名或应用下的直播流操作记录 查询直播流帧率和码率 获取直播流帧率和码率历史数据的查询,支持基于域名和基于流的查询 查询推流列表 查看指定域名下(或者指定域名下某个应用)的所有正在推的流的信息 查询推流历史 查看一段时间内某个域名(或域名下某应用)的推流记录 禁止直播流推送 禁止某条流的推送,可以预设某个时刻将流恢复 恢复直播流推送 恢复某条流的推送 设置 NotifyUrl 设置直播流信息推送到的 URL 地址 SDK 支持 Java、Python、PHP、.Net 语言版本的服务端SDK。 支持 iOS、Android 推流 SDK。 支持 iOS、Android 播放器 SDK。 支持 WEB 播放器(Html5 与 Flash)。

2019-12-01 23:11:42 0 浏览量 回答数 0

问题

img src标签如何指向并一个mongodb 的图片字段?

蛮大人123 2019-12-01 19:53:52 1250 浏览量 回答数 1

问题

<img src指向一个mongodb 的图片字段如何做?

落地花开啦 2019-12-01 19:47:27 1126 浏览量 回答数 1

回答

常用方法: 1、 通过sys模块获取程序参数 import sys def usage(): '''usage''' print 'Usage: %s %s %s %s' % (sys.argv[0], 'tokenid', 'Subject', 'Content') sys.exit() def main(): if len(sys.argv) != 4: usage() else: print(sys.argv[0]) print(sys.argv[1]) print(sys.argv[2]) print(sys.argv[3]) if name == "__main__": main() 运行脚本: D:Pythonmodules>python os_modules.pyUsage: os_modules.py tokenid Subject Content D:Pythonmodules>python os_modules.py aa bb ccos_modules.pyaabbccpython的sys模块默认是把第一个参数默认是程序本省,从第二个参数起都是代码后面跟着的参数,通过sys.arg[n]就可以获得传入到程序中的参数\ sys.stdinstdoutstderr 功能:stdin , stdout , 以及stderr 变量包含与标准I/O 流对应的流对象. 如果需要更好地控制输出,而print 不能满足你的要求, 它们就是你所需要的. 你也可以替换它们, 这时候你就可以重定向输出和输入到其它设备( device ), 或者以非标准的方式处理它们 sys.stdout 与print 当我们在 Python 中打印对象调用 print obj 时候,事实上是调用了sys.stdout.write(obj+'n'),print 将你需要的内容打印到了控制台,然后追加了一个换行符,print 会调用 sys.stdout 的 write 方法 以下两行在事实上等价 import syssys.stdout.write("hello")print('hello') hellohello import syssys.stdout.write("hello n")print('hello') hello hello sys.stdin 与 raw_input import sysa = raw_input('raw_input_name: ')print(a)print 'stdin_name: ',b = sys.stdin.readline()print(b) raw_input_name: ;LIJUNJIANG ;LIJUNJIANG stdin_name: AAAA AAAA 从控制台重定向到文件 Import sysf_handler=open('out.log', 'w')sys.stdout=f_handlerprint 'hello' 在当前文件下新生成一个文件out.log,文件内容为hello, 捕获sys.exit(n)调用 功能:执行到主程序末尾,解释器自动退出,但是如果需要中途退出程序,可以调用sys.exit函数,带有一个可选的整数参数返回给调用它的程序,表示你可以在主程序中捕获对sys.exit的调用。(0是正常退出,其他为异常) def exitfunc(): print "hello world" sys.exitfunc = exitfunc # 设置捕获时调用的函数print "This exit test"sys.exit(1) # 退出自动调用exitfunc()后,程序依然退出了print "there" # 不会被 print结果:This exit testhello world exitfunc()函数,及当执行sys.exit(1)的时候,调用exitfunc()函数 sys.exit(1)后面的内容就不会执行了,因为程序已经退出

xuning715 2019-12-02 01:10:13 0 浏览量 回答数 0

问题

Python熊猫合并数据流而不需要复制列

kun坤 2019-12-29 21:55:21 0 浏览量 回答数 1

回答

python可以做shell脚本吗? 首先介绍一个函数: os.system(command) 这个函数可以调用shell运行命令行command并且返回它的返回值。试一下在 python的解释器里输入os.system(”ls -l”),就可以看到”ls”列出了当前目录下的文件。可以说,通过这个函数,python就拥有了shell的所有能力。呵呵。。不过,通常这条命令不需要用到。因为shell常用的那些命令在python中通常有对应而且同样简洁的写法。 shell中最常用的是ls命令,python对应的写法是:os.listdir(dirname),这个函数返回字符串列表,里面是所有的文件名,不过不包含”.”和”..”。如果要遍历整个目录的话就会比较复杂一点。我们等下再说吧。先在解释器里试一下: os.listdir(”/”) [’tmp’, ‘misc’, ‘opt’, ‘root’, ‘.autorelabel’, ’sbin’, ’srv’, ‘.autofsck’, ‘mnt’, ‘usr’, ‘var’, ‘etc’, ’selinux’, ‘lib’, ‘net’, ‘lost found’, ’sys’, ‘media’, ‘dev’, ‘proc’, ‘boot’, ‘home’, ‘bin’] 就像这样,接下去所有命令都可以在python的解释器里直接运行观看结果。 对应于cp命令的是:shutil.copy(src,dest),这个函数有两个参数,参数src是指源文件的名字,参数dest则是目标文件或者目标目录的名字。 如果dest是一个目录名,就会在那个目录下创建一个相同名字的文件。与shutil.copy函数相类似的是 shutil.copy2(src,dest),不过copy2还会复制最后存取时间和最后更新时间。 不过,shell的cp命令还可以复制目录,python的shutil.copy却不行,第一个参数只能是一个文件。这怎么办?其实,python还有个shutil.copytree(src,dst[,symlinks]) 。参数多了一个symlinks,它是一个布尔值,如果是True的话就创建符号链接。 移动或者重命名文件和目录呢?估计被聪明的朋友猜到了,shutil.move(src,dst),呵呵。。与mv命令类似,如果src和dst在同一个文件系统上,shutil.move只是简单改一下名字,如果src和dst在不同的文件系统上,shutil.move会先把src复制到dst,然后删除src文件。看到现在,大多数朋友应该已经对 python的能力有点眉目了,接下来我就列个表,介绍一下其它的函数: os.chdir(dirname)把当前工作目录切换到dirname下 os.getcwd()返回当前的工作目录路径 os.chroot(dirname)把dirname作为进程的根目录。和*nix下的chroot命令类似 os.chmod(path,mode)更改path的权限位。mode可以是以下值(使用or)的组合: os.S_ISUIDos.S_ISGIDos.S_ENFMTos.S_ISVTXos.S_IREADos.S_IWRITEos.S_IEXECos.S_IRWXUos.S_IRUSRos.S_IWUSRos.S_IXUSRos.S_IRWXGos.S_IRGRPos.S_IWGRPos.S_IXGRPos.S_IRWXOos.S_IROTHos.S_IWOTHos.S_IXOTH 具体它们是什么含义,就不仔细说了,基本上就是R代表读,W代表写,X代表执行权限。USR 代表用户,GRP代表组,OTH代表其它。 os.chown(path,uid,gid)改变文件的属主。uid和gid为-1的时候不改变原来的属主。 os.link(src,dst)创建硬连接 os.mkdir(path,[mode])创建目录。mode的意义参见os.chmod(),默认是0777 os.makedirs(path,[mode])和os.mkdir()类似,不过会先创建不存在的父目录。 os.readlink(path)返回path这个符号链接所指向的路径 os.remove(path)删除文件,不能用于删除目录 os.rmdir(path)删除文件夹,不能用于删除文件 os.symlink(src,dst)创建符号链接 shutil.rmtree(path[,ignore_errors[,onerror]]) 删除文件夹介绍了这么多,其实只要查一下os和shutil两个模块的文档就有了,呵呵。。真正编写 shell脚本的时候还需要注意: 1.环境变量。python的环境变量保存在os.environ这个字典里,可以用普通字典的方法修改它,使用system启动其它程序的时候会自动被继承。比如: os.environ[”fish”]=”nothing”不过也要注意,环境变量的值只能是字符串。和shell有些不同的是,python没有 export环境变量这个概念。为什么没有呢?因为python没有必要有:-) 2.os.path这个模块里包含了很多关于路径名处理的函数。在shell里路径名处理好像不是很重要,但是在python里经常需要用到。最常用的两个是分离和合并目录名和文件名: os.path.split(path) -> (dirname,basename)这个函数会把一个路径分离为两部分,比如:os.path.split(”/foo /bar.dat”)会返回(”/foo”,”bar.dat”) os.path.join(dirname,basename)这个函数会把目录名和文件名组合成一个完整的路径名,比如:os.path.join(”/foo”,”bar.dat”)会返回”/foo/bar.dat”。这个函数和os.path.split()刚好相反。 还有这些函数: os.path.abspath(path)把path转成绝对路径 os.path.expanduser(path)把path中包含的”~”和”~user”转换成用户目录 os.path.expandvars(path)根据环境变量的值替换path中包含的”$name”和”${name}”,比如环境变量 FISH=nothing,那os.path.expandvars(”$FISH/abc”)会返回”nothing/abc” os.path.normpath(path)去掉path中包含的”.”和”..” os.path.splitext(path)把path分离成基本名和扩展名。比如:os.path.splitext(”/foo /bar.tar.bz2″)返回(’/foo/bar.tar’, ‘.bz2′)。要注意它和os.path.split()的区别 3.在os模块有一个很好用的函数叫os.stat()没有介绍,因为os.path模块里包含了一组和它具有同样功能的函数,但是名字更好记一点。 os.path.exists(path)判断文件或者目录是否存在 os.path.isfile(判断path所指向的是否是一个普通文件,而不是目录 os.path.isdir(path) 判断path所指向的是否是一个目录,而不是普通文件 os.path.islink(path)判断path所指向的是否是一个符号链接 os.path.ismount(path)判断path所指向的是否是一个挂接点(mount point) os.path.getatime(path)返回path所指向的文件或者目录的最后存取时间。 os.path.getmtime(path)返回path所指向的文件或者目录的最后修改时间 os.path.getctime(path)返回path所指向的文件的创建时间 os.path.getsize(path返回path所指向的文件的大小 4.应用python编写shell脚本经常要用到os,shutil,glob(正则表达式的文件名),tempfile(临时文件),pwd(操作/etc/passwd文件),grp(操作/etc/group文件),commands(取得一个命令的输出)。前面两个已经基本上介绍完了,后面几个很简单,看一下文档就可以了。 5.sys.argv是一个列表,保存了python程序的命令行参数。其中 sys.argv[0]是程序本身的名字。不能光说不练,接下来我们就编写一个用于复制文件的简单脚本。前两天叫我写脚本的同事有个几万个文件的目录,他想复制这些文件到其它的目录,又不能直接复制目录本身。他试了一下”cp src/* dest/”结果报了一个命令行太长的错误,让我帮他写一个脚本。操起python来:import sys,os.path,shutilfor f in os.listdir(sys.argv[1]):shutil.copy(os.path.join(sys.argv[1],f),sys.argv[2]) 再试一下linuxapp版里的帖子——把一个文件夹下的所有文件重命名成 10001~10999。可以这样写:import os.path,sysdirname=sys.argv[1]i=10001for f in os.listdir(dirname):src=os.path.join(dirname,f)if os.path.isdir(src):continueos.rename(src,str(i)) i =1 os.chkdir(path) 转换到目录path 下。 os.system('md a') 可以直接创建目录。 os.name字符串指示你正在使用的平台。比如对于Windows,它是'nt',而对于Linux/Unix用户,它是'posix'。● os.getcwd()函数得到当前工作目录,即当前Python脚本工作的目录路径。● os.getenv()和os.putenv()函数分别用来读取和设置环境变量。● os.listdir()返回指定目录下的所有文件和目录名。● os.remove()函数用来删除一个文件。● os.system()函数用来运行shell命令。● os.linesep字符串给出当前平台使用的行终止符。例如,Windows使用'rn',Linux使用'n'而Mac使用'r'。● os.path.split()函数返回一个路径的目录名和文件名。 os.path.split('/home/swaroop/byte/code/poem.txt') ('/home/swaroop/byte/code', 'poem.txt')● os.path.isfile()和os.path.isdir()函数分别检验给出的路径是一个文件还是目录。类似地,os.path.exists()函数用来检验给出的路径是否真地存在。 文件重定向 已有PY文件new1.py ,在命令行下输入:new1>new.txt 可以将new1运行的结果输出到文件new.txt,这称为流重定向。

元芳啊 2019-12-02 01:04:36 0 浏览量 回答数 0

回答

" AES 确实是复杂而有效的加密算法. 这里我帮你分析一下加密结果不同的原因.其实代码都是没问题的, 关键问题是在填充(padding)算法和分段大小(segment size). 首先, 密文反馈模式(CFB)模式是基于流的加密方案, 为了制作可以处理任意位情况错误的流密码, CFB 通常要设定一个分段大小, 用来表示通过 AES 加密明文的时候每次会处理多少个字节的明文. 这个过程的细节可以参考: 维基百科:密文反馈(CFB) 和 stackexchange: What is segment size when using Cipher FeedBack mode. 而根据这个 stackoverflow: Encrypt in python and decrypt in Java with AES-CFB,Python 的 PyCrypto 模块默认使用的 segment_size 是 8, Java 则默认采用segment_size为128.所以要使 Python 获的和 Java 一样的加密结果, 必须让 Python 和 Java 使用相同的segment_size. 假设我们让 Python 适应 Java, 使用 segment_size=128, 那么可以这样设置: generator = AES.new(key, AES.MODE_CFB, iv, segment_size=128) 然后我们用这个segment_size去解密一下 Java 的加密结果bRfM0cD63D96il8ZeXhJEw==: import base64 from Crypto.Cipher import AES key = b'B31F2A75FBF94099' iv = b'1234567890123456' PADDING = '\0' def decrypt_aes(cryptedStr): # 注意这里 segment_size=128 generator = AES.new(key, AES.MODE_CFB, iv, segment_size=128) cryptedStr = base64.b64decode(cryptedStr) recovery = generator.decrypt(cryptedStr) print(recovery) decryptedStr = recovery.rstrip(PADDING.encode('utf-8')) return decryptedStr decrypt_aes('bRfM0cD63D96il8ZeXhJEw==') 会发现打印了下面内容: b'\xe4\xbd\xa0\xe5\xa5\xbd\n\n\n\n\n\n\n\n\n\n' 而这正是"你好\n\n\n\n\n\n\n\n\n\n".encode('utf-8'), 也就是你好在UTF-8转码后的字节串值. 我们得到了正确的明文, 提取明文只要rstrip('\n')即可. 但是, 问题还没有这么简单, 因为 Java 这边使用的padding算法是PKCS5Padding. 参考:Java中的AES算法,加密恰好一个block的时候返回两个block,为什么?: 这种 padding模式下,如果末位分组缺少 n个字节,就补足 n个 n. 也就是: If numberOfBytes(clearText) mod 16 == 15, PM = M + 0x01 If numberOfBytes(clearText) mod 16 == 14, PM = M + 0x0202 If numberOfBytes(clearText) mod 16 == 13, PM = M + 0x030303 ... If numberOfBytes(clearText) mod 16 == 1, PM = M + (0x0F0F0F...0F0F) |---15个0x0F---| 所以在设计encrypt_aes方法的时候还要注意使用正确的padding方法, 这里可以这样写对于byte的padding方法: BLOCK_SIZE = 16 def pad_byte(b): bytes_num_to_pad = BLOCK_SIZE - (len(b) % BLOCK_SIZE) byte_to_pad = bytes([bytes_num_to_pad]) padding = byte_to_pad * bytes_num_to_pad padded = b + padding return padded 然后加密方法就要这样写了: def encrypt_aes(sourceStr): # 注意这里 segment_size=128 generator = AES.new(key, AES.MODE_CFB, iv, segment_size=128) # 注意这里先将明文通过 utf-8 转码成为字节串再调用 padding 算法 padded = pad_byte(sourceStr.encode('utf-8')) crypt = generator.encrypt(padded) cryptedStr = base64.b64encode(crypt) return cryptedStr sourceStr = '你好' print(encrypt_aes(sourceStr)) # b'bRfM0cD63D96il8ZeXhJEw==' 最后我们就得到了和 Java 一样的加密结果b'bRfM0cD63D96il8ZeXhJEw=='. 这里我只描述一下如何让 Python 得到和 Java 一样的结果, 如果有兴趣同样也可以对 Java 的方法改进一下让它输出和 Python 一样的结果."

因为相信,所以看见。 2020-05-27 16:24:34 0 浏览量 回答数 0

问题

如何使用python列表有效地重命名dataframe索引?

kun坤 2019-12-25 21:47:59 3 浏览量 回答数 1

问题

【精品问答】Python二级考试题库

珍宝珠 2019-12-01 22:03:38 1146 浏览量 回答数 2

回答

使用 subprocess.check_output() 函数。例如: import subprocess out_bytes = subprocess.check_output(['netstat','-a']) 这段代码执行一个指定的命令并将执行结果以一个字节字符串的形式返回。 如果你需要文本形式返回,加一个解码步骤即可。例如: out_text = out_bytes.decode('utf-8') 如果被执行的命令以非零码返回,就会抛出异常。 下面的例子捕获到错误并获取返回码: try: out_bytes = subprocess.check_output(['cmd','arg1','arg2']) except subprocess.CalledProcessError as e: out_bytes = e.output # Output generated before error code = e.returncode # Return code 默认情况下,check_output() 仅仅返回输入到标准输出的值。 如果你需要同时收集标准输出和错误输出,使用 stderr 参数: out_bytes = subprocess.check_output(['cmd','arg1','arg2'], stderr=subprocess.STDOUT) 如果你需要用一个超时机制来执行命令,使用 timeout 参数: try: out_bytes = subprocess.check_output(['cmd','arg1','arg2'], timeout=5) except subprocess.TimeoutExpired as e: ... 通常来讲,命令的执行不需要使用到底层shell环境(比如sh、bash)。 一个字符串列表会被传递给一个低级系统命令,比如 os.execve() 。 如果你想让命令被一个shell执行,传递一个字符串参数,并设置参数 shell=True . 有时候你想要Python去执行一个复杂的shell命令的时候这个就很有用了,比如管道流、I/O重定向和其他特性。例如: out_bytes = subprocess.check_output('grep python | wc > out', shell=True) 需要注意的是在shell中执行命令会存在一定的安全风险,特别是当参数来自于用户输入时。 这时候可以使用 shlex.quote() 函数来将参数正确的用双引用引起来。 讨论

哦哦喔 2020-04-17 17:26:55 0 浏览量 回答数 0

问题

Apache Flink:Python流API中的Kafka连接器,“无法加载用户类”

flink小助手 2019-12-01 19:23:57 2796 浏览量 回答数 3

回答

API 访问 访问地址 目前已部署 5 个地域,后续会根据需要开通其它地域的管理 API。各个区域的访问地址如下表: 地域 访问地址 华东1(杭州) csb.cn-hangzhou.aliyuncs.com 华东2(上海) csb.cn-shanghai.aliyuncs.com 华北2(北京) csb.cn-beijing.aliyuncs.com 华南1(深圳) csb.cn-shenzhen.aliyuncs.com 中国香港 csb.cn-hongkong.aliyuncs.com 访问权限 目前只开通白名单用户有权访问管理 API,需要使用此管理 API 的用户,请您管理 CSB 接口人。 流量控制 目前配置和管理 API 的流量控制:每 API 50tpm、每用户每 API 5tpm。 说明 如果此流控阈值不满足您的要求,请您联系 CSB 接口人。 API 访问方法 请使用最新的 1.1.5 版本的管理 API SDK。 此 SDK 是访问 CSB 管理 API 的 SDK,不能用于访问 CSB 上发布的业务服务。CSB上的业务服务访问,请参见 SDK 参考。 Java SDK 使用方法参见 阿里云 SDK 说明 。 CSB 管理 API SDK 的 Maven 依赖(详细信息可查看Maven 仓库): com.aliyun aliyun-java-sdk-csb 1.1.5 示例代码: public static void main(String[] args) { try { // 创建DefaultAcsClient实例并初始化,设置对应regoin的endPoint DefaultProfile.addEndpoint("CSB", "cn-hangzhou", "CSB", "csb.cn-hangzhou.aliyuncs.com"); DefaultProfile profile = DefaultProfile.getProfile( "cn-hangzhou", // The region ID "", // The AccessKey ID of the RAM account ""); // The AccessKey Secret of the RAM account IAcsClient client = new DefaultAcsClient(profile); FindProjectListRequest request = new FindProjectListRequest(); //设置业务参数 request.setCsbId(227L); request.setPageNum(1); FindProjectListResponse response = client.getAcsResponse(request); System.out.println(gson.toJson(response)); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); } } 示例输出结果: { "code": 200, "message": "success", "requestId": "40FE0129-41AE-4464-9F0F-56328872623F", "data": { "currentPage": 1, "pageNumber": 1, "total": 1, "projectList": [ { "apiNum": 11, "csbId": 227, "deleteFlag": 0, "description": "asdcdfdsdfddddddsa在sc", "gmtCreate": 1511164185000, "gmtModified": 1531134184000, "id": 420, "ownerId": "******", "projectName": "group2", "projectOwnerEmail": "group2ddd", "projectOwnerName": "group2ddd", "projectOwnerPhoneNum": "group2dddqq", "status": 1, "userId": "******" } ] } } SDK 源代码: https://github.com/aliyun/aliyun-openapi-java-sdk/tree/master/aliyun-java-sdk-csb Python SDK SDK 地址: https://pypi.python.org/pypi/aliyun-python-sdk-csb/1.1.5 SDK 源代码: https://github.com/aliyun/aliyun-openapi-python-sdk/tree/master/aliyun-python-sdk-csb PHP SDK SDK 源代码: https://github.com/aliyun/aliyun-openapi-php-sdk/tree/master/aliyun-php-sdk-csb FAQ 专有云里有些 API,在公有云的管理 API 列表里没有? 目前只开放了用户需要的部分 API,根据需要,后续可增加开放 API 的类别和数量。 管理 API 支持 HTTP 方式访问吗? 出于安全考虑,管理 API 只支持 HTTPS 方式访问。 管理 API 提供哪些 SDK? 目前只提供了 Java、Python 和 PHP,如果您有其它语言的需求,请您联系 CSB 接口人。

保持可爱mmm 2020-03-28 17:48:22 0 浏览量 回答数 0

问题

【精品问答】媒体处理

montos 2020-04-08 19:12:05 4 浏览量 回答数 1

回答

调用 DescribeWorkflow 查询单个工作流的详细信息。 请求信息 请求行 RequestLine GET /gs/workflow/{workflowName} HTTP/1.1 特有请求头 RequestHead 无,请参考 公共请求头部。 请求体 RequestBody 无 返回信息 返回行 ResponseLine HTTP/1.1 200 OK 特有返回头 ResponseHead 无,请参考 公共返回头部。 返回体 ResponseBody { "create_time": "2020-01-15 16:30:25 +0800 CST", "duration": "1h15m33.529968361s", "finish_time": "0001-01-01 00:00:00 +0000 UTC", "input_data_size": "0", "job_name": "wgs-gpu-97xfn", "job_namespace": "1171330362041663", "output_data_size": "0", "status": "Running", "total_bases": "0", "total_reads": "0", "user_input_data": "{"wgs_oss_region":"cn-shenzhen","wgs_fastq_first_name":"fastq/huada/MGISEQ-200019SZ0002402","wgs_fastq_second_name":"fastq/huada/MGISEQ-200019SZ0002402","wgs_bucket_name":"gene-shenzhen","wgs_vcf_file_name":"output/vcf/huada.vcf","wgs_bam_file_name":"output/bam/huada.bam","wgs_reference_file":"hg19","wgs_service":"g"}" } 表 1. 返回体解释 名称 类型 描述 create_time String 工作流创建时间。 duration String 工作流经过时长。 finish_time String 任务结束时间。 input_data_size String 输入数据大小。 job_name String 工作流名称。 job_namespace String 工作流所在命名空间。 output_data_size String 输出数据大小。 status String 工作流当前状态。 total_bases String 碱基对个数。 total_reads String Reads 个数。 user_input_data String 用户输入参数。 示例 请求示例(Python) #!/usr/bin/env python #coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import os client = AcsClient(os.environ['accessKeyID'], os.environ['accessKeySecret'], 'cn-beijing') request = CommonRequest() request.set_accept_format('json') request.set_method('GET') request.set_protocol_type('https') # https | http request.set_domain('cs.cn-beijing.aliyuncs.com') request.set_version('2015-12-15') request.add_query_param('RegionId', "cn-beijing") request.add_header('Content-Type', 'application/json') request.set_uri_pattern('/gs/workflow/wgs-gpu-97xfn') response = client.do_action_with_exception(request) print(response)

1934890530796658 2020-03-31 21:19:39 0 浏览量 回答数 0

问题

盘点年度 Python 类库 Top 10

珍宝珠 2020-01-09 13:39:35 77 浏览量 回答数 1

问题

批量计算

黄一刀 2020-04-04 03:16:01 71 浏览量 回答数 1

回答

使用os.system("cmd")特点是执行的时候程序会打出cmd在linux上执行的信息。import osos.system("ls")使用Popen模块产生新的process现在大部分人都喜欢使用Popen。Popen方法不会打印出cmd在linux上执行的信息。的确,Popen非常强大,支持多种参数和模式。使用前需要from subprocess import Popen, PIPE。但是Popen函数有一个缺陷,就是它是一个阻塞的方法。如果运行cmd时产生的内容非常多,函数非常容易阻塞住。解决办法是不使用wait()方法,但是也不能获得执行的返回值了。Popen原型是:subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0) 参数bufsize:指定缓冲。参数executable用于指定可执行程序。一般情况下我们通过args参数来设置所要运行的程序。如果将参数shell设为 True,executable将指定程序使用的shell。在windows平台下,默认的shell由COMSPEC环境变量来指定。参数stdin, stdout, stderr分别表示程序的标准输入、输出、错误句柄。他们可以是PIPE,文件描述符或文件对象,也可以设置为None,表示从父进程继承。参数preexec_fn只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。参数Close_sfs:在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管 道。我们不能将close_fds设置为True同时重定向子进程的标准输入、输出与错误(stdin, stdout, stderr)。如果参数shell设为true,程序将通过shell来执行。参数cwd用于设置子进程的当前目录。参数env是字典类型,用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。参数Universal_newlines:不同操作系统下,文本的换行符是不一样的。如:windows下用’/r/n’表示换,而Linux下用 ‘/n’。如果将此参数设置为True,Python统一把这些换行符当作’/n’来处理。参数startupinfo与createionflags只在windows下用效,它们将被传递给底层的CreateProcess()函数,用 于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等。subprocess.PIPE在创建Popen对象时,subprocess.PIPE可以初始化stdin, stdout或stderr参数,表示与子进程通信的标准流。subprocess.STDOUT创建Popen对象时,用于初始化stderr参数,表示将错误通过标准输出流输出。Popen的方法:Popen.poll() 用于检查子进程是否已经结束。设置并返回returncode属性。Popen.wait() 等待子进程结束。设置并返回returncode属性。Popen.communicate(input=None)与子进程进行交互。向stdin发送数据,或从stdout和stderr中读取数据。可选参数input指定发送到子进程的参数。 Communicate()返回一个元组:(stdoutdata, stderrdata)。注意:如果希望通过进程的stdin向其发送数据,在创建Popen对象的时候,参数stdin必须被设置为PIPE。同样,如 果希望从stdout和stderr获取数据,必须将stdout和stderr设置为PIPE。Popen.send_signal(signal) 向子进程发送信号。Popen.terminate()停止(stop)子进程。在windows平台下,该方法将调用Windows API TerminateProcess()来结束子进程。Popen.kill()杀死子进程。Popen.stdin 如果在创建Popen对象是,参数stdin被设置为PIPE,Popen.stdin将返回一个文件对象用于策子进程发送指令。否则返回None。Popen.stdout 如果在创建Popen对象是,参数stdout被设置为PIPE,Popen.stdout将返回一个文件对象用于策子进程发送指令。否则返回 None。Popen.stderr 如果在创建Popen对象是,参数stdout被设置为PIPE,Popen.stdout将返回一个文件对象用于策子进程发送指令。否则返回 None。Popen.pid 获取子进程的进程ID。Popen.returncode 获取进程的返回值。如果进程还没有结束,返回None。例如:p = Popen("cp -rf a/* b/", shell=True, stdout=PIPE, stderr=PIPE) p.wait() if p.returncode != 0: print "Error." return -1使用commands.getstatusoutput方法这个方法也不会打印出cmd在linux上执行的信息。这个方法唯一的优点是,它不是一个阻塞的方法。即没有Popen函数阻塞的问题。使用前需要import commands。例如:status, output = commands.getstatusoutput("ls") 还有只获得output和status的方法:commands.getoutput("ls") commands.getstatus("ls")

xuning715 2019-12-02 01:10:25 0 浏览量 回答数 0

回答

本篇主要是介绍如何将渲染软件 Blender 创建成 BatchCompute 的 App,并通过此 App 提交 Blender 渲染作业。 Blender 是目前最流行的一款开源的跨平台全能三维动画制作软件,提供从建模、动画、材质、渲染、到音频处理、视频剪辑等一系列动画短片制作解决方案。 具体介绍可以看这里:https://www.blender.org/features/。 准备工作 (1) 开通服务 开通批量计算服务(BatchCompute): https://help.aliyun.com/document_detail/127644.html 开通对象存储服务(OSS): https://oss.console.aliyun.com 开通MNS服务: https://mns.console.aliyun.com 开通容器服务: https://cr.console.aliyun.com 如果已经开通,请忽略此步骤。 (2) 地域的选择 本篇例子所有阿里云服务都需要使用相同的地域。 本篇例子使用地域: 华南1(深圳) (3) 准备OSS Bucket 请到OSS控制台 创建一个Bucket。 本篇例子假设创建的 bucket 名称为:blender-demo, 地域在华南1(深圳)。 注意: 使用批量计算时,地域需要和 OSS bucket 的地域相同。 注意: 实际操作时,需要将例子中的bucket 名称修改为您自己创建的真实的bucket名称。 制作 Blender Docker 镜像 (1) 创建一个Dockerfile文件 文件名:Dockerfile, 内容如下: FROM ubuntu:latest MAINTAINER your-name 更新源 RUN apt update 清除缓存 RUN apt autoclean 安装 RUN apt install python python-pip curl pulseaudio blender -y 启动时运行这个命令 CMD ["/bin/bash"] (2) build docker build -t ubuntu-blender ./ 等待完成,然后使用下面的命令查看是否有 ubuntu-blender docker images (3) check docker run -t ubuntu-blender blender -v 显示: Blender 2.79 (sub 0) 记住此版本信息,下面要用到。 Docker镜像上传 您需要将 ubuntu-blender 上传到 BatchCompute 支持Registry。 BatchCompute支持2种Registry:阿里云的 CR(Container Registry)和阿里云的OSS。 选择一种即可,推荐第一种: CR。 如何上传,请参考这2篇文档: docker 镜像上传到CR docker 镜像上传到OSS 假设已经上传到CR(地域:华南1-深圳),名称为: registry.cn-shenzhen.aliyuncs.com/batchcompute_test/blender:1.0 创建 App BatchCompute 提交作业,需要配置很多参数。BatchCompute 提供的 App 模板机制,让用户很方便预设参数默认值,提交作业时,只需填写少量参数即可。 下面我们来创建一个 Blender 渲染 App。 (1) 开始创建 App 打开批量计算控制台: https://batchcompute.console.aliyun.com 1 填写基本信息 Docker 镜像名称,填写您已经上传到CR的镜像名称,如: registry.cn-shenzhen.aliyuncs.com/batchcompute_test/blender:1.0 2 运行时参数 只需修改 实例类型为 8核16GB规格,其他的默认即可。 3 (2) 命令行和参数配置 4 命令行填写: python -c "import os;import sys;sys.path.append('/home/scripts/'); from framer import parseFrames; frames=parseFrames('${frames}'); framestr=','.join(map(lambda x:str(x), frames)); s='blender -b /home/input/${scene_file_path} -o /home/output/result/${output_name_format} -F ${format} -f %s' % framestr; print('exec: %s' % s); os.system(s);" ${..} 都是变量,可以作为输入和输出参数。在使用此App提交作业的时候,传入的参数将替换掉这些变量。 参考文档 Blender 2.79 命令行参数 输入参数 注意: 实际操作时,需要将例子中的bucket 名称修改为您自己创建的真实的bucket名称。 名称 默认值 允许覆盖 本地目录绝对路径 备注 scripts_oss_folder oss://blender-demo/scripts/ 否 /home/scripts/ 输入oss目录路径,该路径将挂载到虚拟机的/home/scripts/,应该包含要渲染的 framer.py 文件, 如: oss://bucket/scripts/ input_oss_folder 是 /home/input/ 输入oss目录路径,该路径将挂载到虚拟机的/home/input/,应该包含要渲染的.blend文件, 如: oss://bucket/input/ scene_file_path 是 渲染场景文件的路径,相对于input_oss_folder的目录路径, 如:a.blend 或者 folder_name/a.blend frames 是 支持连续帧:”1-10”, 支持多帧(逗号隔开,无空格):”1,3,5-10” format PNG 是 渲染输出格式,支持: TGA,RAWTGA,JPEG,IRIS,IRIZ,AVIRAW,AVIJPEG,PNG,BMP output_name_format ####.png 是 输出文件名,#会被替代为帧序号,不足位补零。举例: test_###.png 变成 test_001.png,可以在前面加目录名: test/test_###.png 输出参数 名称 默认值 允许覆盖 本地目录绝对路径 备注 output_oss_folder 是 /home/output/ 输出oss目录路径, 如: oss://bucket/output/。 环境变量 环境变量可以不用配置, 直接提交即可。 提交渲染作业 在App列表中可以看到已经创建好的 ubuntu-blender, 点击”提交作业”。 5 (1) 准备工作 在提交作业前,还有一些准备工作。 手动上传分帧器 分帧器python代码(见附录),上传到您的OSS目录下,比如: oss://blender-demo/scripts/framer.py 手动上传blender场景文件 Blender 官网提供了好多 demo 文件: https://www.blender.org/download/demo-files/ 本例子需要下载 2.79 版本(注意:要和镜像中安装的Blender版本相同。不同版本的可能渲染不出来) 6 素材下载后,解压得到目录: splash279/ 将整个目录上传到 oss://blender-demo/input/ 下面,即:oss://blender-demo/input/splash279/。 (2) 开始提交作业 7 实例类型要选大一点的,比如: 8核16GB。 实例数量本例子填 2 个。 (3) 参数配置 注意: 实际操作时,需要将例子中的 bucket 名称修改为您自己创建的真实的bucket名称。 8 输入: 参数 值 说明 input_oss_folder oss://blender-demo/input/ 场景文件所在OSS目录 scene_file_path splash279/splash279.blend 场景文件名 frames 1-4 渲染1到4帧 format PNG 渲染输出格式,默认即可 output_name_format ####.png 渲染输出文件名,默认即可 scripts_oss_folder 设置了默认值,且不允许覆盖,可以不用填。 输出: 参数 值 说明 input_oss_folder oss://blender-demo/output/ 输出OSS目录, 渲染结果图片将保存到此目录的 result/ 子目录下 Loggin(日志目录配置): 参数 值 说明 StdoutPath oss://blender-demo/log/ stdout日志输出到此 StderrPath oss://blender-demo/log/ stderr日志输出到此 填好后点击提交即可。 查看作业状态和结果 (1) 查看作业状态 9 (2) 查看结果 oss://blender-demo/output/result/ 10 (3) 渲染时长和实例规格参考 实例规格 节点数 渲染帧数 时长 ecs.sn1ne.2xlarge(8核16GB) 2 1-4 9-12分钟 ecs.sn1ne.4xlarge (16核/32GB) 2 1-4 4-6分钟 6. 附录 分帧器代码(python): framer.py: #!/usr/bin/python -- coding: UTF-8 -- import os import math import sys import re NOTHING_TO_DO = 'Nothing to do, exit' def _calcRange(a,b, id, step): start = min(id * step + a, b) end = min((id+1) * step + a-1, b) return (start, end) def _parseContinuedFrames(render_frames, total_nodes, id=None, return_type='list'): ''' 解析连续帧, 如: 1-10 ''' [a,b]=render_frames.split('-') a=int(a) b=int(b) #print(a,b) step = int(math.ceil((b-a+1)*1.0/total_nodes)) #print('step:', step) mod = (b-a+1) % total_nodes #print('mod:', mod) if mod==0 or id < mod: (start, end) = _calcRange(a,b, id, step) #print('--->',start, end) return (start, end) if return_type!='list' else range(start, end+1) else: a1 = step * mod + a #print('less', a1, b, id) (start, end) = _calcRange(a1 ,b, id-mod, step-1) #print('--->',start, end) return (start, end) if return_type!='list' else range(start, end+1) def _parseIntermittentFrames(render_frames, total_nodes, id=None): ''' 解析不连续帧, 如: 1,3,8-10,21 ''' a1=render_frames.split(',') a2=[] for n in a1: a=n.split('-') a2.append(range(int(a[0]),int(a[1])+1) if len(a)==2 else [int(a[0])]) a3=[] for n in a2: a3=a3+n #print('a3',a3) step = int(math.ceil(len(a3)*1.0/total_nodes)) #print('step',step) mod = len(a3) % total_nodes #print('mod:', mod) if mod==0 or id < mod: (start, end) = _calcRange(0, len(a3)-1, id, step) #print(start, end) a4= a3[start: end+1] #print('--->', a4) return a4 else: #print('less', step * mod , len(a3)-1, id) (start, end) = _calcRange( step * mod ,len(a3)-1, id-mod, step-1) if start > len(a3)-1: print(NOTHING_TO_DO) sys.exit(0) #print(start, end) a4= a3[start: end+1] #print('--->', a4) return a4 def parseFrames(render_frames, return_type='list', id=None, total_nodes=None): ''' @param render_frames {string}: 需要渲染的总帧数列表范围,可以用"-"表示范围,不连续的帧可以使用","隔开, 如: 1,3,5-10 @param return_type {string}: 取值范围[list,range]。 list样例: [1,2,3], range样例: (1,3)。 注意: render_frames包含","时有效,强制为list。 @param id, 节点ID,从0开始。 正式环境不要填写,将从环境变量 BATCH_COMPUTE_DAG_INSTANCE_ID 中取得。 @param total_nodes, 总共的节点个数。正式环境不要填写,将从环境变量 BATCH_COMPUTE_DAG_INSTANCE_COUNT 中取得。 ''' if id==None: id=os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID'] if type(id)==str: id = int(id) if total_nodes==None: total_nodes = os.environ['BATCH_COMPUTE_DAG_INSTANCE_COUNT'] if type(total_nodes)==str: total_nodes = int(total_nodes) if re.match(r'^(\d+)-(\d+)$',render_frames): # 1-2 # continued frames return _parseContinuedFrames(render_frames, total_nodes, id, return_type) else: # intermittent frames return _parseIntermittentFrames(render_frames, total_nodes, id)

1934890530796658 2020-03-28 20:42:58 0 浏览量 回答数 0

回答

调用 StartWorkflow 创建一个新的基因工作流。 请求信息 请求行 RequestLine POST /gs/workflow HTTP/1.1 特有请求头 RequestHead 无,请参考 公共请求头部。 请求体 RequestBody 这里以mapping为例 { "workflow_type": "mapping", "service": "s" (#SLA: [n: normal|s: silver|g: gold|p: platinum]) "mapping_oss_region": "cn-shenzhen", "mapping_fastq_first_filename": "MGISEQ2000_PCR-free_NA12878_1_V100003043_L01_1.fq.gz", "mapping_fastq_second_filename": "MGISEQ2000_PCR-free_NA12878_1_V100003043_L01_2.fq.gz", "mapping_bucket_name": "gene-shenzhen", "mapping_fastq_path": "fastq/MGISEQ2000", "mapping_reference_path": "reference/hg19", [Optional] "mapping_is_mark_dup": "true", "mapping_bam_out_path": "output/bamDirName", "mapping_bam_out_filename": "abc.bam" } 这里以WGS为例 { "workflow_type": "wgs", "service": "s" (#SLA: [n: normal|s: silver|g: gold|p: platinum]), "wgs_oss_region": "cn-shenzhen", "wgs_fastq_first_filename": "MGISEQ2000_PCR-free_NA12878_1_V100003043_L01_1.fq.gz", "wgs_fastq_second_filename": "MGISEQ2000_PCR-free_NA12878_1_V100003043_L01_2.fq.gz", "wgs_bucket_name": "gene-shenzhen", "wgs_fastq_path": "fastq/MGISEQ2000", "wgs_reference_path": "reference/hg19", "wgs_vcf_out_path": "output/vcf", "wgs_vcf_out_filename": "abc.vcf" } 表 1. 请求体解释 名称 类型 描述 workflow_type String 工作流类型,可选值:wgs 或 mapping。 service String SLA 类型,可选值:n、s、g、p。 mapping_oss_region String mapping oss 数据的存放 region。 mapping_fastq_first_filename String mapping 的第一个 fastq 文件名。 mapping_fastq_second_filename String mapping 的第二个 fastq 文件名。 mapping_bucket_name String 存放 mapping 的 bucket 名称。 mapping_fastq_path String mapping 的 fastq 文件路径。 mapping_reference_path String mapping 的 reference 文件位置。 mapping_is_mark_dup String 是否进行 dup。 mapping_bam_out_path String bam 文件输出路径。 mapping_bam_out_filename String bam 文件输出名称。 wgs_oss_region String wgs oss 数据的存放 region。 wgs_fastq_first_filename String wgs 的第一个 fastq 文件名。 wgs_fastq_second_filename String wgs 的第二个 fastq 文件名。 wgs_bucket_name String 存放 wgs 的 bucket 名称。 wgs_fastq_path String wgs 的 fastq 文件路径。 wgs_reference_path String wgs 的 fastq 文件路径。 wgs_vcf_out_path String wgs 的 vcf 输出路径。 wgs_vcf_out_filename String wgs 的 vcf 输出文件名称。 返回信息 返回行 ResponseLine HTTP/1.1 200 OK 特有返回头 ResponseHead 无,请参考 公共返回头部。 返回体 ResponseBody 这里以mapping为例 { JobName: mapping-gpu-66xv7 } 这里以 wgs 为例 { JobName: wgs-gpu-tvltf } 表 2. 返回体解释 名称 类型 描述 JobName String 工作流名称 示例 请求示例(Python) #!/usr/bin/env python coding=utf-8 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import os os.environ.setdefault('DEBUG', 'sdk') client = AcsClient(os.environ['accessKeyID'], os.environ['accessKeySecret'], 'cn-beijing') request = CommonRequest() request.set_accept_format('json') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_domain('cs.cn-beijing.aliyuncs.com') request.set_version('2015-12-15') request.add_query_param('RegionId', "cn-shenzhen") request.add_header('Content-Type', 'application/json') request.set_uri_pattern('/gs/workflow') body = '''{"cli_version":"v1.0.1-882299b","wgs_bucket_name":"my-test-shenzhen","wgs_fastq_first_name":"MGISEQ/MGISEQ2000_PCR-free_NA12878_1_V100003043_L01_1.fq.gz","wgs_fastq_second_name":"MGISEQ/MGISEQ2000_PCR-free_NA12878_1_V100003043_L01_2.fq.gz","wgs_oss_region":"cn-shenzhen","wgs_reference_file":"hs37d5","wgs_service":"g","wgs_vcf_File_name":"vcf/MGISEQ_NA12878_hs37d5_13.vcf","workflow_type":"WGS"}''' request.set_content(body.encode('utf-8')) response = client.do_action_with_exception(request)

1934890530796658 2020-03-31 21:19:56 0 浏览量 回答数 0

回答

Cromwell 是 Broad Institute 开发的工作流管理系统,当前已获得阿里云批量计算服务的支持。通过 Cromwell 可以将 WDL 描述的 workflow 转化为批量计算的作业(Job)运行。用户将为作业运行时实际消耗的计算和存储资源付费,不需要支付资源之外的附加费用。本文将介绍如何使用 Cromwell 在阿里云批量计算服务上运行工作流。 准备工作 A) 开通批量计算服务 要使用批量计算服务,请根据官方文档里面的指导开通批量计算和其依赖的相关服务,如OSS等。 注意:创建 OSS Bucket 的区域,需要和使用批量计算的区域一致。 B) 下载 Cromwell Cromwell 官方下载 注意:为了确保所有的特性可用,建议下载45及之后的最新版本。 C) 开通 ECS 作为 Cromwell server 当前批量计算提供了 Cromwell server 的 ECS 镜像,用户可以用此镜像开通一台 ECS 作为 server。镜像中提供了 Cromwell 官网要求的基本配置和常用软件。在此镜像中,Cromwell 的工作目录位于/home/cromwell,上一步下载的 Crowwell jar 包可以放置在 /home/cromwell/cromwell 目录下。 注意:用户也可以自己按照 Cromwell 官方的要求自己搭建 Cromwell server, 上面的镜像只是提供了方便的方式,不是强制要求。 使用 Cromwell 配置文件 Cromwell 运行的配置文件,包括: Cromwell 公共配置。 批量计算相关配置,包含了批量计算作为后端需要的存储、计算等资源配置。 关于配置参数的详细介绍请参考 Cromwell 官方文档。如下是一个批量计算配置文件的例子 bcs.conf: include required(classpath("application")) database { profile = "slick.jdbc.MySQLProfile$" db { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost/db_cromwell?rewriteBatchedStatements=true&useSSL=false&allowPublicKeyRetrieval=true" user = "user_cromwell" #Your mysql password password = "" connectionTimeout = 5000 } } workflow-options { workflow-log-dir = "/home/cromwell/cromwell/logs/" } call-caching { # Allows re-use of existing results for jobs you've already run # (default: false) enabled = false # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users): # (default: true) invalidate-bad-cache-results = true } docker { hash-lookup { enabled = false # Set this to match your available quota against the Google Container Engine API #gcr-api-queries-per-100-seconds = 1000 # Time in minutes before an entry expires from the docker hashes cache and needs to be fetched again #cache-entry-ttl = "20 minutes" # Maximum number of elements to be kept in the cache. If the limit is reached, old elements will be removed from the cache #cache-size = 200 # How should docker hashes be looked up. Possible values are "local" and "remote" # "local": Lookup hashes on the local docker daemon using the cli # "remote": Lookup hashes on docker hub and gcr method = "remote" #method = "local" alibabacloudcr { num-threads = 5 #aliyun CR credentials auth { #endpoint = "cr.cn-shanghai.aliyuncs.com" access-id = "" access-key = "" } } } } engine { filesystems { oss { auth { endpoint = "oss-cn-shanghai.aliyuncs.com" access-id = "" access-key = "" } } } } backend { default = "BCS" providers { BCS { actor-factory = "cromwell.backend.impl.bcs.BcsBackendLifecycleActorFactory" config { root = "oss://your-bucket/cromwell_dir" region = "cn-shanghai" access-id = "" access-key = "" filesystems { oss { auth { endpoint = "oss-cn-shanghai.aliyuncs.com" access-id = "" access-key = "" } caching { # When a cache hit is found, the following duplication strategy will be followed to use the cached outputs # Possible values: "copy", "reference". Defaults to "copy" # "copy": Copy the output files # "reference": DO NOT copy the output files but point to the original output files instead. # Will still make sure than all the original output files exist and are accessible before # going forward with the cache hit. duplication-strategy = "reference" } } } default-runtime-attributes { failOnStderr: false continueOnReturnCode: 0 autoReleaseJob: true cluster: "OnDemand ecs.sn1.medium img-ubuntu-vpc" #cluster: cls-6kihku8blloidu3s1t0006 vpc: "192.168.0.0/16" } } } } } 如果使用前面章节中的镜像开通 ECS 作为 Cromwell server,配置文件位于 /home/cromwell/cromwell/bcs_sample.conf,只需要填写自己的配置即可使用 Cromwell。 注意:Cromwell 可以在公网环境(如本地服务器、配置了公网 IP 的阿里云 ECS 等)运行,也可以在阿里云 VPC 环境下运行。在 VPC 环境下使用时,有如下几处要修改为 VPC 内网下的配置: OSS 的内网 endpoint : engine.filesystems.oss.auth.endpoint = "oss-cn-shanghai-internal.aliyuncs.com" backend.providers.BCS.config.filesystems.oss.auth.endpoint = "oss-cn-shanghai-internal.aliyuncs.com" 添加批量计算的内网 endpoint: backend.providers.BCS.config.user-defined-region = "cn-shanghai-vpc" backend.providers.BCS.config.user-defined-domain = "batchcompute-vpc.cn-shanghai.aliyuncs.com" 添加容器镜像服务的内网 endpoint: docker.hash-lookup.alibabacloudcr.auth.endpoint = "cr-vpc.cn-shanghai.aliyuncs.com" 运行模式 Cromwell支持两种模式: run 模式 server 模式 关于两种模式的详细描述,请参考 Cromwell 官网文档。下面重点介绍这两种模式下如何使用批量计算。 A) run模式 run模式适用于本地运行一个单独的 WDL 文件描述的工作流,命令行如下:java -Dconfig.file=bcs.conf -jar cromwell.jar run echo.wdl --inputs echo.inputs WDL 文件:描述详细的工作流。工作流中每个 task 对应批量计算的一个作业(Job)。 inputs文件:是 WDL 中定义的工作流的输入信息inputs 文件是用来描述 WDL 文件中定义的工作流及其 task 的输入文件。如下所示: { "workflow_name.task_name.input1": "xxxxxx" } 运行成功后,WDL 文件中描述的工作流中的一个 task 会作为批量计算的一个作业(Job)来提交。此时登录批量计算的控制台就可以看到当前的 Job 状态。 show_bcs_job 当 workflow 中所有的 task 对应的作业运行完成后,工作流运行完成。 B) server 模式 启动 server 相比 run 模式一次运行只能处理一个 WDL 文件,server 模式可以并行处理多个 WDL 文件。关于 server 模式的更多信息,请参考 Cromwell 官方文档。可以采用如下命令行启动 server:java -Dconfig.file=bsc.conf -jar cromwell.jar serverserver 启动成功后,就可以接收来自 client 的工作流处理请求。下面分别介绍如何使用 API 和 CLI 的方式向 server 提交工作流。 使用 API 提交工作流 server 启动后,可以通过浏览器访问 Cromwell Server,比如 Server 的 IP 为39.105.xxx.yyy,则在浏览器中输入http://39.105.xxx.yyy:8000,通过如下图所示的界面提交任务:cromwell_server更多API接口及用法,请参考 Cromwell 官网文档。 使用 CLI 提交工作流[推荐] 除了可以使用 API 提交工作流以外,Cromwell 官方还提供了一个开源的 CLI 命令行工具 widder。可以使用如下的命令提交一个工作流: python widdler.py run echo.wdl echo.inputs -o bcs_workflow_tag:tagxxx -S localhost 其中-o key:value是用于设置option,批量计算提供了 bcs_workflow_tag:tagxxx 选项,用于配置作业输出目录的tag(下一节查看运行结果中会介绍)。 如果使用前面章节中的镜像开通 ECS 作为 Cromwell server,镜像中已经安装了 widdler,位于 /home/cromwell/widdler。可以使用如下的命令提交工作流: widdler run echo.wdl echo.inputs -o bcs_workflow_tag:tagxxx -S localhost 更多命令用法可使用widdler -h命令查看,或参考官方文档。 查看运行结果 工作流运行结束后,输出结果被上传到了配置文件或 WDL 中定义的 OSS 路径下。在OSS路径上面的目录结构如下: cromwell_output_dir如上图所示,在配置文件中的config.root目录下有如下输出目录: 第一层:workflowname 工作流的名称 第二层:通过上一节中 CLI 命令的-o设置的目录tag 第三层:workflow id,每次运行会生成一个 第四层:workflow 中每个 task 的运行输出,比如上图中的 workflow 15e45adf-6dc7-4727-850c-89545faf81b0 有两个 task,每个task对应的目录命名是call-taskname,目录中包含三部分内容: 批量计算的日志,包括 bcs-stdout 和 bcs-stderr 当前 task 的输出,比如图中的 output1/output2 等 当前 task 执行的 stdout 和 stderr 4. 使用建议 在使用过程中,关于 BCS 的配置,有如下的建议供参考: 使用集群 批量计算提供了两种使用集群的方式: 自动集群 固定集群 A) 自动集群 在config配置文件中指定默认的资源类型、实例类型以及镜像类型,在提交批量计算 Job 时就会使用这些配置自动创建集群,比如: default-runtime-attributes { cluster : "OnDemand ecs.sn1ne.large img-ubuntu-vpc" } 如果在某些 workflow 中不使用默认集群配置,也可以通过inputs文件中指定 workflow 中某个 task 的对应的批量计算的集群配置(将 cluster_config 作为 task 的一个输入),比如: { "workflow_name.task_name.cluster_config": "OnDemand ecs.sn2ne.8xlarge img-ubuntu-vpc" } 然后在 task 中重新设置运行配置: task task_demo { String cluster_config runtime { cluster: cluster_config } } 就会覆盖默认配置,使用新的配置信息创建集群。 B) 固定集群 使用自动集群时,需要创建新集群,会有一个等待集群的时间。如果对于启动时间有要求,或者有了大量的作业提交,可以考虑使用固定集群。比如: default-runtime-attributes { cluster : "cls-xxxxxxxxxx" } 注意:使用固定集群时,如果使用完毕,请及时释放集群,否则集群中的实例会持续收费。 Cromwell Server 配置建议 大压力作业时,建议使用较高配置的机器作为 Cromwell Server,比如ecs.sn1ne.8xlarge等32核64GB的机器。 大压力作业时,修改 Cromwell Server 的最大打开文件数。比如在ubuntu下可以通过修改/etc/security/limits.conf文件,比如修改最大文件数为100万: root soft nofile 1000000 root hard nofile 1000000 * soft nofile 1000000 * hard nofile 1000000 确认 Cromwell Server 有配置数据库,防止作业信息丢失。 设置 bcs.conf 里面的并发作业数,比如 system.max-concurrent-workflows = 1000 开通批量计算相关配额 如果有大压力场景,可能需要联系批量计算服务开通对应的配额,比如: 一个用户所有作业的数量(包括完成的、运行的、等待的等多种状态下); 同时运行的作业的集群的数量(包括固定集群和自动集群); 使用 NAS 使用 NAS 时要注意以下几点: NAS 必须在 VPC 内使用,要求添加挂载点时,必须指定 VPC; 所以要求在 runtime 中必须包含: VPC 信息 mounts 信息 下面的例子可供参考: runtime { cluster: cluster_config mounts: "nas://1f****04-xkv88.cn-beijing.nas.aliyuncs.com:/ /mnt/ true" vpc: "192.168.0.0/16 vpc-2zexxxxxxxx1hxirm" } 高级特性支持 Glob Cromwell 支持使用 glob 来指定工作流中多个文件作为 task 的输出,比如: task globber { command <<< for i in seq 1 5 do mkdir out-$i echo globbing is my number $i best hobby out-$i/$i.txt done output { Array[File] outFiles = glob("out-/.txt") } } workflow test { call globber } 当 task 执行结束时,通过 glob 指定的多个文件会作为输出,上传到 OSS 上。 Call Caching Call Caching 是 Cromwell 提供的高级特性,如果检测到工作流中某个 task (对应一个批量计算的 job )和之前已经执行过的某个 task 具有相同的输入和运行时等条件,则不需要再执行,直接取之前的运行结果,这样可以为客户节省时间和费用。一个常见的场景是如果一个工作流有 n 个 task,当执行到中间某一个 task 时由于某些原因失败了,排除了错误之后,再次提交这个工作流运行后,Cromwell 判断如果满足条件,则已经完成的几个 task 不需要重新执行,只需要从出错的 task 开始继续运行。 配置 Call Caching 要在 BCS 后端情况下使用 Call Caching 特性,需要如下配置项: database { profile = "slick.jdbc.MySQLProfile$" db { driver = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost/db_cromwell?rewriteBatchedStatements=true&useSSL=false" user = "user_cromwell" password = "xxxxx" connectionTimeout = 5000 } } call-caching { # Allows re-use of existing results for jobs you have already run # (default: false) enabled = true # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users): # (default: true) invalidate-bad-cache-results = true } docker { hash-lookup { enabled = true # How should docker hashes be looked up. Possible values are local and remote # local: Lookup hashes on the local docker daemon using the cli # remote: Lookup hashes on alibab cloud Container Registry method = remote alibabacloudcr { num-threads = 10 auth { access-id = "xxxx" access-key = "yyyy" } } } } engine { filesystems { oss { auth { endpoint = "oss-cn-shanghai.aliyuncs.com" access-id = "xxxx" access-key = "yyyy" } } } } backend { default = "BCS" providers { BCS { actor-factory = "cromwell.backend.impl.bcs.BcsBackendLifecycleActorFactory" config { #其他配置省略 filesystems { oss { auth { endpoint = "oss-cn-shanghai.aliyuncs.com" access-id = "xxxx" access-key = "yyyy" } caching { # When a cache hit is found, the following duplication strategy will be followed to use the cached outputs # Possible values: copy, reference. Defaults to copy # copy: Copy the output files # reference: DO NOT copy the output files but point to the original output files instead. # Will still make sure than all the original output files exist and are accessible before # going forward with the cache hit. duplication-strategy = "reference" } } } default-runtime-attributes { failOnStderr: false continueOnReturnCode: 0 cluster: "OnDemand ecs.sn1.medium img-ubuntu-vpc" vpc: "192.168.0.0/16" } } } } } database 配置:Cromwell 将 workflow 的执行元数据存储在数据库中,所以需要添加数据库配置,详细情况参考Cromwell 官网指导。 call-caching 配置:Call Caching 的开关配置等; docker.hash-lookup 配置: 设置 Hash 查找开关及阿里云 CR 等信息,用于查找镜像的 Hash 值。 backend.providers.BCS.config.filesystems.oss.caching 配置:设置 Call Caching命中后,使用原来输出的方式,批量计算在这里支持 reference 模式,不需要拷贝原有的结果,节省时间和成本。 命中条件 使用批量计算作为后端时,Cromwell 通过如下条件判断一个 task 是否需要重新执行: 条件 解释 inputs task 的输入,比如 OSS 上的样本文件 command task 定义中的命令行 continueOnReturnCode 公共运行时参数,可以继续执行的返回码 docker 公共运行时参数,后端的Docker配置 failOnStderr 公共运行时参数,stderr非空时是否失败 imageId 批量计算后端运行时参数,标识作业运行的 ECS 镜像,如果使用的官方镜像如img-ubuntu-vpc可不用填写此项 userData 批量计算后端,用户自定义数据 如果一个 task 的上述参数未发生改变,Cromwell 会判定为不需要执行的 task,直接获取上次执行的结果,并继续工作流的执行。

1934890530796658 2020-03-28 20:47:14 0 浏览量 回答数 0

回答

1、RMI 利用java.rmi包实现,基于Java远程方法协议(Java Remote Method Protocol) 和java的原生序列化。 2、Hessian 是一个轻量级的remoting onhttp工具,使用简单的方法提供了RMI的功能。 基于HTTP协议,采用二进制编解码。 3、protobuf-rpc-pro 是一个Java类库,提供了基于 Google 的 Protocol Buffers 协议的远程方法调用的框架。基于 Netty 底层的 NIO 技术。支持 TCP 重用/ keep-alive、SSL加密、RPC 调用取消操作、嵌入式日志等功能。 4、Thrift 是一种可伸缩的跨语言服务的软件框架。它拥有功能强大的代码生成引擎,无缝地支持C + +,C#,Java,Python和PHP和Ruby。thrift允许你定义一个描述文件,描述数据类型和服务接口。依据该文件,编译器方便地生成RPC客户端和服务器通信代码。 最初由facebook开发用做系统内个语言之间的RPC通信,2007年由facebook贡献到apache基金 ,现在是apache下的opensource之一 。支持多种语言之间的RPC方式的通信:php语言client可以构造一个对象,调用相应的服务方法来调用java语言的服务,跨越语言的C/S RPC调用。底层通讯基于SOCKET。 5、Avro 出自Hadoop之父Doug Cutting, 在Thrift已经相当流行的情况下推出Avro的目标不仅是提供一套类似Thrift的通讯中间件,更是要建立一个新的,标准性的云计算的数据交换和存储的Protocol。支持HTTP,TCP两种协议。 6、Dubbo Dubbo是 阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成。

剑曼红尘 2020-03-15 15:38:04 0 浏览量 回答数 0

回答

敢用自己的名字做软件名字的,都有非常强大的自信。比如,垠语言什么的。 awk的命名得自于它的三个创始人姓别的首字母,都是80来岁的老爷爷了。当然也有四个人的组合:流行的GoF设计模式。但对于我这游戏爱好者来说,想到的竟然是三位一体,果然是不争气啊。 它长的很像C,为什么这么有名,除了它强大的功能,我们姑且认为a这个字母比较靠前吧。awk比sed简单,它更像一门编程语言。 打印某一列 下面,这几行代码的效果基本是相同的:打印文件中的第一列。 这可能是awk最常用的功能了:打印文件中的某一列。它智能的去切分你的数据,不管是空格,还是TAB,大概率是你想要的。 对于csv这种文件来说,分隔的字符是,。AWK使用-F参数去指定。以下代码打印csv文件中的第1和第2列。 awk -F "," '{print $1,$2}' file 由此,我们可以看出一个基本的awk命令的组成部分。 一般的开发语言,数组下标是以0开始的,但awk的列$是以1开始的,而0指的是原始字符串。 网络状态统计 本小节,采用awk统计netstat命令的一些网络状态,来看一下awk语言的基本要素。netstat的输出类似于: 其中,第6列,标明了网络连接所处于的网络状态。我们先给出awk命令,看一下统计结果。 netstat -ant | awk ' \ BEGIN{print "State","Count" } \ /^tcp/ \ { rt[$6]++ } \ END{ for(i in rt){print i,rt[i]} }' 输出结果为: State Count LAST_ACK 1 LISTEN 64 CLOSE_WAIT 43 ESTABLISHED 719 SYN_SENT 5 TIME_WAIT 146 下面这张图会配合以上命令详细说明,希望你能了解awk的精髓。 乍一看,好吓人的命令,但是很简单。awk和我们通常的程序不太一样,它分为四个部分。 1、BEGIN 开头部分,可选的。用来设置一些参数,输出一些表头,定义一些变量等。上面的命令仅打印了一行信息而已。 2、END 结尾部分,可选的。用来计算一些汇总逻辑,或者输出这些内容。上面的命令,使用简单的for循环,输出了数组rt中的内容。 3、Pattern 匹配部分,依然可选。用来匹配一些需要处理的行。上面的命令,只匹配tcp开头的行,其他的不进入处理。 4、Action 模块。主要逻辑体,按行处理,统计打印,都可以。 注意点 1、awk的主程序部分使用单引号‘包围,而不能是双引号 2、awk的列开始的index是0,而不是1 例子 我们从几个简单的例子,来看下awk的作用。 1、输出Recv-Q不为0的记录 netstat -ant | awk '$2 > 0 {print}' 2、外网连接数,根据ip分组 netstat -ant | awk '/^tcp/{print $4}' | awk -F: '!/^:/{print $1}' | sort | uniq -c 3、打印RSS物理内存占用 top -b -n 1 | awk 'NR>7{rss+=$6}END{print rss} 4、过滤(去掉)空白行 awk 'NF' file 5、打印奇数行 awk 'a=!a' file 6、输出行数 awk 'END{print NR}' file 这些命令,是需要了解awk的一些内部变量的,接下来我们来介绍。 内置变量 FS 下面的两个命令是等价的 。 awk -F ':' '{print $3}' file awk 'BEGIN{FS=":"}{print $3}' file BEGIN块中的FS,就是内部变量,可以直接指定或者输出。如果你的文件既有用,分隔的,也有用:分割的,FS甚至可以指定多个分隔符同时起作用。 FS="[,:|]" 其他 OFS 指定输出内容的分割符,列数非常多的时候,简化操作。相似命令: awk -F ':' '{print $1,"-",$2,"-",$4}' file awk 'BEGIN{FS=":";OFS="-"}{print $1,$2,$4}' file NF 列数。非常有用,比如,过滤一些列数不满足条件的内容。 awk -F, '{if(NF==3){print}}' file NR 行号,例如,下面两个命令是等价的。 cat -n file awk '{print NR,$0}' file RS 记录分隔标志 ORS 指定记录输出的分隔标志 FILENAME 当前处理的文件名称,在一次性处理多个文件时非常有用 编程语言特性 数学运算 从上面的代码可以看出,awk可以做一些简单的运算。它的语言简洁,不需要显示的定义变量的类型。 比如上面的rt[$6]++,就已经默认定义了一个叫做rt的hash(array?),里面的key是网络状态,而value是可以进行运算的(+-*/%)。 包含一些内置的数学运算(有限) int log sqrt exp sin cos atan2 rand srand 字符串操作 类似其他语言,awk也内置了很多字符串操作函数。它本来就是处理字符串的,所以必须强大。 length(str) #获取字符串长度 split(input-string,output-array,separator) substr(input-string, location, length) 语言特性 awk是个小型的编程语言,看它的基本语法,如果你需要复杂一点的逻辑,请自行深入了解,包括一些时间处理函数: # logic if(x=a){} if(x=a){}else{} while(x=a){break;continue;} do{}while(x=a) for(;;){} # array arr[key] = value for(key in arr){arr[key]} delete arr[key] asort(arr) #简单排序 据说,awk可以胜任所有的文本操作。因为它本身就是一门语言啊。 End 曾经使用awk编写过复杂的日志处理和统计程序。虽然比写sed舒畅了很多,但还是备受煎熬。更加上现在有各种nawk,gawk版本之间的区别,所以业务复杂度一增长,就习惯性的转向更加简洁、工具更全的python。 awk处理一些简单的文本还是极其方便的,最常用的还是打印某一列之类的,包括一些格式化输出。对于awk,要简单的滚瓜烂熟,复杂的耳熟能详,毕竟有些大牛,就喜欢写这种脚本呢。 注明:转载于小姐妹养的狗

剑曼红尘 2020-04-01 11:18:23 0 浏览量 回答数 0

回答

本文介绍了如何使用 Serverless 工作流提供长流程分布式事务保证,帮助用户聚焦于自身业务逻辑。 简介 复杂的业务场景例如电商网站、酒店、航班预定这类涉及订单管理的应用通常要访问多个远程服务,并且对操作事务性语义(即所有步骤全部成功或全部失败,不存在中间状态)有较高要求。在流量较小、数据存储集中的应用中,事务性可以通过关系型数据库提供的 ACID 特性满足。然而在大流量场景下,为了高可用和可扩展性,业务通常选择向微服务的分布式架构方向演进。在这样的架构中提供多步骤事务性的保证通常需要引入队列和数据库来持久化消息以及展现流程状态,这类系统的开发和运维会给业务方带来额外的成本和负担。而使用 Serverless 工作流提供长流程分布式事务保证会帮您解决这些问题。 场景描述 假设某应用为其用户提供预定火车票、航班和酒店的功能,要求三个步骤保证事务性。该功能需要三个远程调用实现(例如预定火车票需要调用 12306 接口),如果三个调用都成功则该订单成功。然而实际上任何一个远程调用都有可能会失败,因此该应用需要对不同的失败场景做出相应的补偿逻辑,回退已完成操作。如下图所示: 如果预定火车票(BuyTrainTicket)成功,而预定航班(ReserveFlight)失败,则需要取消已经购买的火车票 (CancelTrainTicket),并告知用户订单失败。 如果预定火车票(BuyTrainTicket)和预定航班(ReserveFlight)均成功,但是预订酒店(ReserveHotel) 失败,则需要取消已经预定的航班(CancelFlight)和火车票(CancelTrainTicket),并告知用户订单失败。 longtxn-saga_train_flight_hotel Serverless 工作流实现 下文的示例将 FC 函数编排成一个 Serverless 工作流流程从而实现了一个可靠的多步骤长流程,该示例分为 3 步: 创建 FC 函数 创建流程 执行并查看结果 步骤 1:创建 FC 函数(模拟上面提到的3个操作:预定火车票、预定航班、预定酒店) 创建下面的 Python2.7 的函数,关于创建的详细步骤,可以参见 FC 文档,建议命名: Service: fnf-demo Function: Operation Operation 函数模拟各操作(例如预定航班、预定酒店)的实现,根据输入决定该操作执行结果(成功或失败)。 import json import logging import uuid def handler(event, context): evt = json.loads(event) logger = logging.getLogger() id = uuid.uuid4() op = "operation" if 'operation' in evt: op = evt['operation'] if op in evt: result = evt[op] if result == False: logger.info("%s failed" % op) exit() logger.info("%s succeeded, id %s" % (op, id)) return '{"%s":"success", "%s_txnID": "%s"}' % (op, op, id) 步骤 2:创建流程 使用 Serverless 工作流控制台创建下面的流程。 配置流程 RAM 角色 { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "fnf.aliyuncs.com" ] } } ], "Version": "1" } 流程定义 version: v1 type: flow steps: - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: BuyTrainTicket inputMappings: - target: operation source: buy_train_ticket - target: buy_train_ticket source: $input.buy_train_ticket_result catch: - errors: - FC.Unknown goto: OrderFailed - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: ReserveFlight inputMappings: - target: operation source: reserve_flight - target: reserve_flight source: $input.reserve_flight_result catch: # 捕获 ReserveFlight task 抛出的 FC.Unknown 错误,跳转到 CancelTrainTicket。 - errors: - FC.Unknown goto: CancelTrainTicket - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: ReserveHotel inputMappings: - target: operation source: reserve_hotel - target: reserve_hotel source: $input.reserve_hotel_result retry: # 对 FC.Unknown 类型的错误最多指数退避重试 3 次,初始间隔 1s,后续间隔 = 上次间隔 * 2。 - errors: - FC.Unknown intervalSeconds: 1 maxAttempts: 3 multiplier: 2 catch: # 捕获 ReserveHotel task 抛出的 FC.Unknown 错误,跳转到 CancelFlight。 - errors: - FC.Unknown goto: CancelFlight - type: succeed name: OrderSucceeded - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: CancelFlight inputMappings: - target: operation source: cancel_flight - target: reserve_flight_txnID source: $local.reserve_flight_txnID - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: CancelTrainTicket inputMappings: - target: operation source: cancel_train_ticket - target: reserve_flight_txnID source: $local.reserve_flight_txnID - type: fail name: OrderFailed 步骤 3:执行并查看结果 在控制台上对创建好的流程(Flow)开始一个新的执行(Execution)。StartExecution API 要求传入 JSON 格式的输入。下面的 JSON 对象可以模拟每个步骤的成功或失败(例如 "reserve_hotel_result":"fail" 代表模拟预定酒店这步失败)。StartExecution 是一个异步 API,调用结束后,Serverless 工作流会返回一个执行名字用来查询流程执行状态。 { "buy_train_ticket_result":"success", "reserve_flight_result":"success", "reserve_hotel_result":"fail" } 流程执行开始后,在 Serverless 工作流控制台单击进入该执行并查看执行过程和结果。可以看到,由于 "reserve_hotel_result":"fail" 和 ReserveHotel 函数调用失败,Serverless 工作流按照流程定义,依次取消航班(CancelFlight)、取消火车票(CancelTrainTicket)。Serverless 工作流每个步骤转换有持久化的保证,因此网络中断或进程崩溃等失败场景不会影响流程事务性的保证。 Screen Shot 2019-06-26 at 12.14.50 PM 流程执行会产生执行历史事件(event),这些事件可以通过控制台或者 SDK/CLI 调用 GetExecutionHistory API 查询。 Screen Shot 2019-06-26 at 12.17.26 PM 错误处理和重试 上面示例中的预定航班、预定酒店等远程调用都有可能受到网络或服务错误等原因导致调用失败,而增加对瞬时错误的重试可以提高订单流程成功率。Serverless 工作流在任务(Task)类型的步骤(Step)自带重试功能,如预定酒店这个步骤用下面的写法可以实现对 FC.Unknown 类型的错误指数退避。假设重试到达最大次数后 ReserveHotel 都无法成功,按照该步骤中 catch 的定义,ReserveHotel 函数抛出的 FC.Unknown 错误会被捕获并将跳转到 CancelFlight 执行定义好的补偿逻辑。 - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: ReserveHotel inputMappings: - target: operation source: reserve_hotel retry: # 对 FC.Unknown 类型的错误最多指数退避重试3次,初始间隔1s,后续间隔 = 上次间隔 * 2。 - errors: - FC.Unknown intervalSeconds: 1 maxAttempts: 3 multiplier: 2 catch: # 捕获 ReserveHotel task 抛出的 FC.Unknown 错误,跳转到 CancelFlight。 - errors: - FC.Unknown goto: CancelFlight 下图可以看到加入重试之后预订酒店(ReserveHotel)任务执行了多次直到最大重试数。Screen Shot 2019-06-26 at 12.19.55 PM 步骤间的数据传递 预定酒店失败后需要取消航班和火车票,这两部分别需要用到预定航班和预定火车票返回的交易 ID (txnID),下面的 inputMapping 对象描述了如何将之前步骤产生的输出传入 CancelFlight 这个步骤中。 - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: CancelFlight inputMappings: - target: operation source: cancel_flight - target: reserve_flight_txnID source: $local.reserve_flight_txnID 流程执行各步骤结束的输出都会被放在 StepExited 事件详情(EventDetail)的 local 对象中。 { "input":{ "operation":"reserve_hotel", "reserve_hotel_result":"fail" }, "local":{ "buy_train_ticket":"success", "buy_train_ticket_txnID":"d37412b3-bb68-4d04-9d90-c8c15643d45e", "reserve_flight_result":"success", "reserve_flight_txnID":"024caecf-cfa3-43a6-b561-9b6fe0571b55" }, "resourceArn":"acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation", "cause":"{"errorMessage":"Process exited unexpectedly before completing request (duration: 12ms, maxMemoryUsage: 9.18MB)"}", "error":"FC.Unknown", "retryCount":3, "goto":"CancelFlight" } 结合上面的 EventDetail 和 inputMappings 的映射之后,传入到 CancelFlight 步骤的输入变成如下 JSON 对象,这样 CancelFlight 函数的输入会包含 reserve_flight_txnID 字段。 "input":{ "operation":"cancel_flight", "reserve_flight_txnID":"024caecf-cfa3-43a6-b561-9b6fe0571b55" }

1934890530796658 2020-03-27 10:47:41 0 浏览量 回答数 0

回答

推荐工具 1、Linx: Linx 是一款低代码的 IDE 和服务器。IT 专业人员使用 Linx 可以快速创建自定义的自动化业务流程、集成应用程序、公开 Web 服务,并有效地处理高工作负载。 易用使用的拖放式界面。 超过 100 种预建功能和服务,可实现快速开发。 直接从 IDE 一键部署到任何本地或远程 Linx 服务器。 输入和输出包括几乎所有的 SQL 和 NoSQL 数据库、大量文件格式(文本和二进制)或 REST 和 SOAP Web 服务。 使用分步逻辑进行现场调试。 通过计时器、目录事件或消息队列将后端流程自动化,或者公开 Web 服务,并通过 HTTP 请求调用 API。 下载链接:https://linx.software/lowcode-application-designer/?utm=99 2、Buddy Buddy 是一款面向 Web 开发人员的智能 CI/CD 工具,旨在降低 DevOps 的入门门槛。它使用交付管道来构建、测试和部署软件。这些管道由 100 多个随时可用的动作创建的,这些动作可以以任何方式进行安排,就像你用砖头建造房子一样。 15 分钟的配置,清晰易懂的用户界面 / 用户体验。 基于变更集的快速部署。 构建在具有缓存依赖项的独立容器中运行。 支持所有流行语言、框架和任务管理器。 Docker/Kubernetes 动作专用名册。 与 AWS、Google、DigitalOcean、Azure、Shopify、WordPress 等集成。 支持并行和 YAML 配置。 下载链接:https://buddy.works/ IDE (集成开发环境) 3、NetBeans NetBeans 是一款流行的免费开源 IDE。它可以用来开发桌面、移动和 Web 应用程序。 支持快速和智能代码编辑。 简单高效的项目管理流程。 快速用户界面开发。 帮助编写无 Bug 代码。 NetBeans IDE 为 C/C++ 和 PHP 开发人员提供了卓越的支持。 它可以安装在任何支持 Java 的操作系统上,从 Windows 到 Linux,再到 Mac OSX 系统。 下载链接:https://netbeans.org/downloads/index.html 4、Cloud9 IDE Cloud9 IDE 是一款在线集成软件开发环境。它支持许多编程语言,如 C、C++、PHP、Ruby、Perl、Python、JavaScript 和 Node.js。 允许克隆整个开发环境。 命令行想到的内置终端。 代码完成建议的功能可以帮助软件开发人员更快地编写代码并避免输入错误。 调试器可帮助开发人员设置断点,并检查任何 JS/Node.js 应用的变量。 只需拖动任何文件或终端即可创建多个拆分视图。 开发人员可以选择广泛的默认运行程序来执行应用程序,如 Ruby、Pythn、PHP/Apache。 下载链接:https://c9.io/pricing

有只黑白猫 2020-01-20 17:21:34 0 浏览量 回答数 0
阿里云大学 云服务器ECS com域名 网站域名whois查询 开发者平台 小程序定制 小程序开发 国内短信套餐包 开发者技术与产品 云数据库 图像识别 开发者问答 阿里云建站 阿里云备案 云市场 万网 阿里云帮助文档 免费套餐 开发者工具 企业信息查询 小程序开发制作 视频内容分析 企业网站制作 视频集锦 代理记账服务 2020阿里巴巴研发效能峰会 企业建站模板 云效成长地图 高端建站