操作流程
一、前提
1.本文描述不做特殊说明,默认都是使用RAM账户登陆控制台。更多创建或获取RAM用户信息操作,请参见准备RAM用户
- 请确保已具备使用OSS投递(新版)的权限。更多有关OSS投递权限的相关操作,请参考授权概述
- 请确保您已经有创建好的OSS投递任务。更多有关创建OSS投递任务的相关操作,请参考创建OSS投递任务(新版)
- 请确保您已经开通运行日志。更多关于运行日志的相关操作,请参考管理服务日志
二、数据投递概览查看投递任务进度落后状态
1.登录日志服务控制台。
2.在Project列表区域,单击目标Project。
3.点击左侧任务管理 -> 数据投递新版 -> 特定oss投递(新版)任务
4.浏览右侧数据投递概览 -> 进入落后
说明 :
进度落后的横轴表示时间,纵轴表示落后的秒数,右侧则是各个shard,图中天蓝色直线表示,shard#27在23/07/07 13:39时投递进度落后100.75k秒
三、internal-diagnostic_log中通过sql查询投递任务进度落后状态
说明:
创建OSS投递任务后,投递过程中的内部日志信息会存储在internal-diagnostic_log logstore中, 通过分析该日志,您可以观测投递的状态 。关于内部日志的详细字段信息,请参考日志类型
1.点击进度落后右上角 -> 预览查询语句
2.右侧弹出预览查询语句方框 -> 点击查询分析 ·
3.跳转至 Internal-diagnostic_log logstore , 右侧可选时间范围 -> 查询分析 , 下方预览图表的lags展示延迟情况
说明 : 该sql的中 (__topic__: etl_metrics and metric_type: ConnectorMetrics and "_etl_:connector_meta.action": ingest)and (project:your-project and job_name:job-test) 是搜索语句限定查询的范围,剩余sql为分析语句,用于聚合出精度精确到一分钟,某个shard的平均延时 |
表示shard#5 存在119795.0s的延迟
四、通过sdk的GetLogs接口对internal-diagnostic_log中的内部日志进行查询
1.安装python aliyun log sdk , 如果需要其他语言的sdk请参考SDK参考概述
2.使用第三步中的sql,使用GetLogs查询延迟日志
importtimefromaliyun.logimportLogClient, GetLogsRequestendpoint=''accessKeyId=''accessKeySecret=''project='test-project'logstore='internal-diagnostic_log'logClient=LogClient(endpoint, accessKeyId, accessKeySecret) end=time.time() start=end-60query='''(__topic__: etl_metricsand metric_type: ConnectorMetricsand "_etl_:connector_meta.action": ingest)and (project:test-project and job_name:your_oss_sink_job_name)| select *FROM ( select time_series(__time__, '1m', '%y/%m/%d %H:%i', '0') as dt, "_etl_:connector_meta.task_name" as task, avg("_etl_:connector_metrics.lags") as lags FROM log group by dt, task order by dt limit 100000 )where task is not null '''request=GetLogsRequest(project, logstore, start, end, '', query=query, line=3, offset=0, reverse=False) response=logClient.get_logs(request) logs=response.get_logs() forloginlogs: print(log.log_print())
五、通过sdk批量查看多个logstore的投递延迟情况
1 假设当前到project 内有多个投递任务 ,多个投递任务的运行日志保存在当前project下的internal-diagnostic_log (请确保已开通运行日志)
2 利用sdk批量查询oss投递任务的名称
fromaliyun.logimportLogClientendpoint=""project=""logstore=""accessKeyId=""accessKeySecret=""logClient=LogClient(endpoint, accessKeyId, accessKeySecret) # 利用sdk获取当前project下度所有导出任务listExportResponse=logClient.list_export(project) exportTaskList=listExportResponse.get_exports() # 将oss投递任务筛选出来oss_sink_export_list= [] forexportinexportTaskList: ifexport.get("configuration", {}).get("sink", {}).get("type") =="AliyunOSS": oss_sink_export_list.append(export) # 获取所有oss投递job度名称oss_sink_job_name_list= [] foross_sinkinoss_sink_export_list: oss_sink_job_name_list.append(oss_sink['name']) forjob_nameinoss_sink_job_name_list: print(job_name)
3 假设第2步中oss投递job名称列表是[job-168****360-*****1,job-168****360-*****2,job-168****360-*****3] , 那么查询的sql 可以是 :
(__topic__: etl_metrics and metric_type: ConnectorMetrics and"_etl_:connector_meta.action": ingest)and(project:changqi-test-0608and(job_name:job-168****360-*****1or job_name:job-168****360-*****2or job_name:job-168****360-*****2))|select*FROM(select time_series(__time__,'1m','%y/%m/%d %H:%i','0')as dt,"_etl_:connector_meta.task_name"as task, job_name as job_name, avg("_etl_:connector_metrics.lags")as lags FROM log groupby dt, job_name, task orderby dt limit100000)where task isnotnull
- 第3步中的sql可以利用sdk的Getlogs接口进行oss投递延时数据的查询
importtimefromaliyun.logimportLogClient, GetLogsRequestendpoint=''accessKeyId=''accessKeySecret=''project='test-project'logstore='internal-diagnostic_log'logClient=LogClient(endpoint, accessKeyId, accessKeySecret) end=time.time() start=end-60query='''(__topic__: etl_metricsand metric_type: ConnectorMetricsand "_etl_:connector_meta.action": ingest)and (project:test-project and (job_name:job-168****360-*****1 or job_name:job-168****360-*****2 or job_name:job-168****360-*****2))| select *FROM ( select time_series(__time__, '1m', '%y/%m/%d %H:%i', '0') as dt, "_etl_:connector_meta.task_name" as task, job_name as job_name, avg("_etl_:connector_metrics.lags") as lags FROM log group by dt, job_name, task, order by dt limit 100000 )where task is not null '''request=GetLogsRequest(project, logstore, start, end, '', query=query, line=3, offset=0, reverse=False) response=logClient.get_logs(request) logs=response.get_logs() forloginlogs: print(log.log_print())
异常信息说明
oss投递任务任务监控大盘异常
出现查询失败和无数据展示的原因可能是任务运行日志索引被删除或未更新,或者未开通任务运行日志采集。解决方案包括在CloudLens for SLS中重置索引和开通任务运行日志采集。任务运行日志存储在指定Project下的internal-diagnostic_log Logstore中,可通过日志服务控制台查看,如果您在使用的过程中有如下异常,可以参考以下文档解决: