使用python3抓取pinpoint应用信息入库

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 使用python3通过pinpoint api来获取pinpoint中应用基础信息、上下游链路,并入库

使用python3抓取pinpoint应用信息入库

Pinpoint是用Java编写的大型分布式系统的APM(应用程序性能管理)工具。 受Dapper的启发,Pinpoint提供了一种解决方案,通过在分布式应用程序中跟踪事务来帮助分析系统的整体结构以及它们中的组件之间的相互关系.

pinpoint api:

  • /applications.pinpoint 获取applications基本信息
  • /getAgentList.pinpoint 获取对应application agent信息
  • /getServerMapData.pinpoint 获取对应app 基本数据流信息

db.py

import mysql.connector
class MyDB(object):
    """docstring for MyDB"""
    def __init__(self, host, user, passwd , db):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.db = db

        self.connect = None
        self.cursor = None
    def db_connect(self):
        """数据库连接
        """
        self.connect = mysql.connector.connect(host=self.host, user=self.user, passwd=self.passwd, database=self.db)
        return self
    def db_cursor(self):
        if self.connect is None:
            self.connect = self.db_connect()

        if not self.connect.is_connected():
            self.connect = self.db_connect()
        self.cursor = self.connect.cursor()
        return self
    def get_rows(self , sql):
        """ 查询数据库结果
        :param sql: SQL语句
        :param cursor: 数据库游标
        """

        self.cursor.execute(sql)
        return self.cursor.fetchall()
    def db_execute(self, sql):
        self.cursor.execute(sql)
        self.connect.commit()
    def db_close(self):
        """关闭数据库连接和游标
        :param connect: 数据库连接实例
        :param cursor: 数据库游标
        """
        if self.connect:
            self.connect.close()
        if self.cursor:
            self.cursor.close()

pinpoint.py:

 
# -*- coding: utf-8 -*-

'''
Copyright (c) 2018, mersap
All rights reserved.

摘    要: pinpoint.py
创 建 者: mersap
创建日期: 2019-01-17
'''

import sys
import requests
import time
import datetime
import json

sys.path.append('../Golf')
import db #db.py

PPURL = "https://pinpoint.*******.com"


From_Time = datetime.datetime.now() + datetime.timedelta(seconds=-60)
To_Time = datetime.datetime.now()
From_TimeStamp = int(time.mktime(From_Time.timetuple()))*1000
To_TimeStamp = int(time.mktime(datetime.datetime.now().timetuple()))*1000


class PinPoint(object):
    """docstring for PinPoint"""
    def __init__(self, db):
        self.db = db
        super(PinPoint, self).__init__()

    """获取pinpoint中应用"""
    def get_applications(self):
        '''return application dict
        '''
        applicationListUrl = PPURL + "/applications.pinpoint"
        res = requests.get(applicationListUrl)
        if res.status_code != 200:
            print("请求异常,请检查")
            return
        applicationLists = []
        for app in res.json():
            applicationLists.append(app)
        applicationListDict={}
        applicationListDict["applicationList"] = applicationLists
        return applicationListDict
    def getAgentList(self, appname):
        AgentListUrl = PPURL + "/getAgentList.pinpoint"
        param = {
            'application':appname
        }
        res = requests.get(AgentListUrl, params=param)
        if res.status_code != 200:
            print("请求异常,请检查")
            return
        return len(res.json().keys()),json.dumps(list(res.json().keys()))
        
    def update_servermap(self, appname , from_time=From_TimeStamp,
                         to_time=To_TimeStamp, serviceType='SPRING_BOOT'):
        '''更新app上下游关系
        :param appname: 应用名称
        :param serviceType: 应用类型
        :param from_time: 起始时间
        :param to_time: 终止时间
        :
        '''
        #https://pinpoint.*****.com/getServerMapData.pinpoint?applicationName=test-app&from=1547721493000&to=1547721553000&callerRange=1&calleeRange=1&serviceTypeName=TOMCAT&_=1547720614229
        param = {
            'applicationName':appname,
            'from':from_time,
            'to':to_time,
            'callerRange':1,
            'calleeRange':1,
            'serviceTypeName':serviceType
        }

        # serverMapUrl = PPURL + "/getServerMapData.pinpoint"
        serverMapUrl = "{}{}".format(PPURL, "/getServerMapData.pinpoint")
        res = requests.get(serverMapUrl, params=param)
        if res.status_code != 200:
            print("请求异常,请检查")
            return
        update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
        links = res.json()["applicationMapData"]["linkDataArray"]
        for link in links :
            ###排除test的应用
            if link['sourceInfo']['applicationName'].startswith('test'):
                continue
            #应用名称、应用类型、下游应用名称、下游应用类型、应用节点数、下游应用节点数、总请求数、 错误请求数、慢请求数(本应用到下一个应用的数量)
            application = link['sourceInfo']['applicationName']
            serviceType = link['sourceInfo']['serviceType']
            to_application = link['targetInfo']['applicationName']
            to_serviceType = link['targetInfo']['serviceType']
            agents = len(link.get('fromAgent',' '))
            to_agents =  len(link.get('toAgent',' '))
            totalCount = link['totalCount']
            errorCount = link['errorCount']
            slowCount  = link['slowCount']

            sql = """
                REPLACE into application_server_map (application, serviceType, 
                agents, to_application, to_serviceType, to_agents, totalCount, 
                errorCount,slowCount, update_time, from_time, to_time) 
                VALUES ("{}", "{}", {}, "{}", "{}", {}, {}, {}, {},"{}","{}",
                "{}")""".format(
                    application, serviceType, agents, to_application, 
                    to_serviceType, to_agents, totalCount, errorCount,
                     slowCount, update_time, From_Time, To_Time)
            self.db.db_execute(sql)

    def update_app(self):
        """更新application
        """
        appdict = self.get_applications()
        apps = appdict.get("applicationList")
        update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
        for app in apps:
            if app['applicationName'].startswith('test'):
                continue
            agents, agentlists = self.getAgentList(app['applicationName'])
            sql = """
                REPLACE  into application_list( application_name, 
                service_type, code, agents, agentlists, update_time) 
                VALUES ("{}", "{}", {}, {}, '{}', "{}");""".format(
                    app['applicationName'], app['serviceType'], 
                    app['code'], agents, agentlists, update_time)
            self.db.db_execute(sql)
        return True

    def update_all_servermaps(self):
        """更新所有应用数
        """
        appdict = self.get_applications()
        apps = appdict.get("applicationList")
        for app in apps:
            self.update_servermap(app['applicationName'], serviceType=app['serviceType'])
        ###删除7天前数据
        Del_Time = datetime.datetime.now() + datetime.timedelta(days=-7)

        sql = """delete from application_server_map where update_time <= "{}"
        """.format(Del_Time)
        self.db.db_execute(sql)
        return True


def connect_db():
    """ 建立SQL连接
    """
    mydb = db.MyDB(
            host="rm-*****.mysql.rds.aliyuncs.com",
            user="user",
            passwd="passwd",
            db="pinpoint"
            )
    mydb.db_connect()
    mydb.db_cursor()
    return mydb

def main():
    db = connect_db()
    pp = PinPoint(db)
    pp.update_app()
    pp.update_all_servermaps()
    db.db_close()


if __name__ == '__main__':
    main()
  • 附sql语句

CREATE TABLE `application_list` (
  `application_name` varchar(32) NOT NULL,
  `service_type` varchar(32) DEFAULT NULL COMMENT '服务类型',
  `code` int(11) DEFAULT NULL COMMENT '服务类型代码',
  `agents` int(11) DEFAULT NULL COMMENT 'agent个数',
  `agentlists` varchar(256) DEFAULT NULL COMMENT 'agent list',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`application_name`),
  UNIQUE KEY `Unique_App` (`application_name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='pinpoint app list'

CREATE TABLE `application_server_map` (
  `application` varchar(32) NOT NULL COMMENT '应用名称',
  `serviceType` varchar(8) NOT NULL,
  `agents` int(2) NOT NULL COMMENT 'agent个数',
  `to_application` varchar(32) NOT NULL COMMENT '下游服务名称',
  `to_serviceType` varchar(32) DEFAULT NULL COMMENT '下游服务类型',
  `to_agents` int(2) DEFAULT NULL COMMENT '下游服务agent数量',
  `totalCount` int(8) DEFAULT NULL COMMENT '总请求数',
  `errorCount` int(8) DEFAULT NULL,
  `slowCount` int(8) DEFAULT NULL,
  `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  `from_time` datetime DEFAULT NULL,
  `to_time` datetime DEFAULT NULL,
  PRIMARY KEY (`application`,`to_application`),
  UNIQUE KEY `Unique_AppMap` (`application`,`to_application`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='应用链路数据'
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
机器学习/深度学习 存储 数据挖掘
Python图像处理实用指南:PIL库的多样化应用
本文介绍Python中PIL库在图像处理中的多样化应用,涵盖裁剪、调整大小、旋转、模糊、锐化、亮度和对比度调整、翻转、压缩及添加滤镜等操作。通过具体代码示例,展示如何轻松实现这些功能,帮助读者掌握高效图像处理技术,适用于图片美化、数据分析及机器学习等领域。
91 20
|
23天前
|
存储 算法 API
【01】整体试验思路,如何在有UID的情况下获得用户手机号信息,python开发之理论研究试验,如何通过抖音视频下方的用户的UID获得抖音用户的手机号-本系列文章仅供学习研究-禁止用于任何商业用途-仅供学习交流-优雅草卓伊凡
【01】整体试验思路,如何在有UID的情况下获得用户手机号信息,python开发之理论研究试验,如何通过抖音视频下方的用户的UID获得抖音用户的手机号-本系列文章仅供学习研究-禁止用于任何商业用途-仅供学习交流-优雅草卓伊凡
145 82
|
3月前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
24天前
|
算法 Serverless 数据处理
从集思录可转债数据探秘:Python与C++实现的移动平均算法应用
本文探讨了如何利用移动平均算法分析集思录提供的可转债数据,帮助投资者把握价格趋势。通过Python和C++两种编程语言实现简单移动平均(SMA),展示了数据处理的具体方法。Python代码借助`pandas`库轻松计算5日SMA,而C++代码则通过高效的数据处理展示了SMA的计算过程。集思录平台提供了详尽且及时的可转债数据,助力投资者结合算法与社区讨论,做出更明智的投资决策。掌握这些工具和技术,有助于在复杂多变的金融市场中挖掘更多价值。
47 12
|
23天前
|
存储 人工智能 程序员
通义灵码AI程序员实战:从零构建Python记账本应用的开发全解析
本文通过开发Python记账本应用的真实案例,展示通义灵码AI程序员2.0的代码生成能力。从需求分析到功能实现、界面升级及测试覆盖,AI程序员展现了需求转化、技术选型、测试驱动和代码可维护性等核心价值。文中详细解析了如何使用Python标准库和tkinter库实现命令行及图形化界面,并生成单元测试用例,确保应用的稳定性和可维护性。尽管AI工具显著提升开发效率,但用户仍需具备编程基础以进行调试和优化。
218 9
|
23天前
|
算法 安全 网络安全
基于 Python 的布隆过滤器算法在内网行为管理中的应用探究
在复杂多变的网络环境中,内网行为管理至关重要。本文介绍布隆过滤器(Bloom Filter),一种高效的空间节省型概率数据结构,用于判断元素是否存在于集合中。通过多个哈希函数映射到位数组,实现快速访问控制。Python代码示例展示了如何构建和使用布隆过滤器,有效提升企业内网安全性和资源管理效率。
50 9
|
23天前
|
数据采集 存储 前端开发
用Python抓取亚马逊动态加载数据,一文读懂
用Python抓取亚马逊动态加载数据,一文读懂
|
2月前
|
人工智能 开发者 Python
Chainlit:一个开源的异步Python框架,快速构建生产级对话式 AI 应用
Chainlit 是一个开源的异步 Python 框架,帮助开发者在几分钟内构建可扩展的对话式 AI 或代理应用,支持多种工具和服务集成。
259 9
|
3月前
|
算法 数据处理 Python
高精度保形滤波器Savitzky-Golay的数学原理、Python实现与工程应用
Savitzky-Golay滤波器是一种基于局部多项式回归的数字滤波器,广泛应用于信号处理领域。它通过线性最小二乘法拟合低阶多项式到滑动窗口中的数据点,在降噪的同时保持信号的关键特征,如峰值和谷值。本文介绍了该滤波器的原理、实现及应用,展示了其在Python中的具体实现,并分析了不同参数对滤波效果的影响。适合需要保持信号特征的应用场景。
224 11
高精度保形滤波器Savitzky-Golay的数学原理、Python实现与工程应用
|
2月前
|
存储 SQL 大数据
Python 在企业级应用中的两大硬伤
关系数据库和SQL在企业级应用中面临诸多挑战,如复杂SQL难以移植、数据库负担重、应用间强耦合等。Python虽是替代选择,但在大数据运算和版本管理方面存在不足。SPL(esProc Structured Programming Language)作为开源语言,专门针对结构化数据计算,解决了Python的这些硬伤。它提供高效的大数据运算能力、并行处理、高性能文件存储格式(如btx、ctx),以及一致的版本管理,确保企业级应用的稳定性和高性能。此外,SPL与Java无缝集成,适合现代J2EE体系应用,简化开发并提升性能。

热门文章

最新文章