什么是 InfluxDB?
InfluxDB是一个高性能的时序数据库(Time-Series Database, TSDB),用于存储和分析时间序列数据的开源数据库,它非常适合于处理大量的时间戳数据,如金融市场数据、IoT 设备数据、监控数据等,尤其适合处理大量的时序数据和高频数据。
主要特性有:
- 内置HTTP接口,使用方便
- 数据可以打标记,查询可以很灵活
- 类SQL的查询语句
- 安装管理很简单,并且读写数据很高效
- 能够实时查询,数据在写入时被索引后就能够被立即查出
安装InfluxDB服务端(Windows下)
在InfluxDB官网文档中选择适用于 Windows 的 .zip 安装包
下载解压,找到 influxd.exe 文件,2.7这个版本没办法双击exe程序启动,只能用cmd或者power 启动InfluxDB 服务。
influx2.7 服务端和客户端已经分离,需要客户端的要另外下载,influxd.exe是服务端,influx.exe是客户端。
这样就启动成功了,默认情况下,InfluxDB 将在 localhost:8086 上运行。浏览器输入可以访问InfluxDB的web端管理界面,在 Web 界面中,初次进入需要注册账户,设置组织(organization)用于管理数据和权限、桶(bucket)是数据的存储单元 以及创建 API 密钥(token)。
如果不想用web界面 操作创建用户密码和数据库的话,也可以通过InfluxDB客户端程序,通过命令创建。
安装InfluxDB客户端
在官方文档中下载InfluxDB CLI下载解压后,按照Web界面的提示,初始化客户端基本配置,然后可以通过指令对数据库进行操作,本文不做介绍,我们着重使用Python来进行数据库的各种读写操作。
InfluxDB 的数据模型与结构
InfluxDB 的数据模型与传统的关系型数据库不同。它主要包括以下几个概念:
- Point(数据点): 数据点是 InfluxDB 中的数据单元,包括时间戳、测量、标签和字段。
- Bucket(桶):是数据的逻辑分区,用于存储不同的时间序列数据。桶类似于关系型数据库中的数据库或表空间。
- Organization(组织): 组织是用户、桶和权限的管理单位。一个组织可以包含多个用户和桶。
- Measurement(测量):类似于关系型数据库中的表(table),表示一类数据的集合。
- Tag(标签):用于索引数据,便于查询。类似于数据库中的索引字段,标签是键值对(key-value pair),值是字符串类型。
- Field(字段):存储实际的数据值,也是键值对。字段值可以是各种数据类型(整数、浮点数、字符串、布尔值)。字段不被索引。
- Timestamp(时间戳):每条记录的时间戳,用于标识数据的时间点。时间戳是数据的索引,所有的查询都是基于时间戳进行的。
下面代码中的金融类期权数据的示例数据模型,包含测量、标签和字段:
- Measurement(测量):options
- Tags(标签):contract, exchange, type
- contract:期权合约代码,如 AAPL210416C00125000
- exchange:交易所,如 NASDAQ
- type:期权类型,如 CALL 或 PUT
- Fields(字段):strike_price, market_price
- strike_price:执行价,如 125.00
- market_price:市场价格,如 5.50
- Timestamp(时间戳):数据点的时间
Python基本操作
使用 pip 安装 InfluxDB 的 Python 客户端库:
pip install influxdb-client
以下是一个简单的 Python 示例代码,用于将数据写入 InfluxDB:
from influxdb_client import InfluxDBClient, Point, WritePrecision from datetime import datetime from influxdb_client.client.write_api import SYNCHRONOUS # 配置 InfluxDB 客户端 url = "http://localhost:8086" token = "your_token" # 替换为你的 API Token org = "your_org" # 替换为你的组织名 bucket = "your_bucket" # 替换为你的桶名 # 初始化 InfluxDB 客户端 client = InfluxDBClient(url=url, token=token, org=org) write_api = client.write_api(write_options=SYNCHRONOUS) query_api = client.query_api() # 写入期权数据示例 def write_option_data(): option_contract = "AAPL210416C00125000" # 期权合约代码 exchange = "NASDAQ" # 交易所 option_type = "CALL" # 期权类型 strike_price = 125.00 # 执行价 market_price = 5.50 # 市场价格 point = Point("options") \ # 创建一个名为 options 的测量(measurement) .tag("contract", option_contract) \ # 添加期权合约代码作为标签(tag) .tag("exchange", exchange) \ # 添加交易所作为标签(tag)。 .tag("type", option_type) \ # 添加期权类型(CALL/PUT)作为标签(tag) .field("strike_price", strike_price) \ # 添加执行价作为字段(field) .field("market_price", market_price) \ # 添加市场价格作为字段(field) .time(datetime.utcnow(), WritePrecision.NS) # 设置数据点的时间戳为当前时间,并指定时间精度为纳秒(NS) write_api.write(bucket=bucket, org=org, record=point) # 写入数据 # 查询期权数据示例 def query_option_data(): # from(bucket:"{bucket}"): 从指定的 bucket 中查询数据。 # |> range(start: -1h): 查询最近一小时内的数据。 # |> filter(fn: (r) => r._measurement == "options"): 过滤数据,只保留测量名称为 options 的数据。 query = f'from(bucket:"{bucket}") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "options")' tables = query_api.query(query=query, org=org) # 使用字典保存字段值 data = {} print(tables) for table in tables: for record in table.records: print(record) field_name = record["_field"] field_value = record["_value"] data[field_name] = field_value if "contract" not in data: data["contract"] = record["contract"] if "type" not in data: data["type"] = record["type"] if "exchange" not in data: data["exchange"] = record["exchange"] # 打印结果 print( f"Contract: {data.get('contract')}, Type: {data.get('type')}, Exchange: {data.get('exchange')}, Strike Price: {data.get('strike_price')}, Market Price: {data.get('market_price')}" ) # 主函数 if __name__ == "__main__": # 写入示例数据 write_option_data() # 查询示例数据 query_option_data()
作者:花花木较瘦
链接:https://juejin.cn/post/7399453056972587046