Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!2

简介: Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!2

Cassandra

连接数据库

Python可以使用cassandra-driver库连接Cassandra数据库:

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
print("Opened Cassandra database successfully")
cluster.shutdown()

CRUD操作

接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。

创建(Create)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("""
    CREATE TABLE Employees (
        id int PRIMARY KEY,
        name text,
        age int,
        address text,
        salary decimal
    )
""")
print("Table created successfully")
cluster.shutdown()

读取(Retrieve)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
rows = session.execute('SELECT id, name, address, salary FROM Employees')
for row in rows:
    print("ID = ", row.id)
    print("NAME = ", row.name)
    print("ADDRESS = ", row.address)
    print("SALARY = ", row.salary)
cluster.shutdown()

更新(Update)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
print("Row updated successfully")
cluster.shutdown()

删除(Delete)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("DELETE FROM Employees WHERE id = 1")
print("Row deleted successfully")
cluster.shutdown()

Redis

连接数据库

Python可以使用redis-py库连接Redis数据库:

import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print("Opened Redis database successfully")

CRUD操作

接下来,我们将展示在Redis中如何进行基本的CRUD操作。

创建(Create)

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:name', 'John')
r.set('employee:1:age', '30')
r.set('employee:1:address', 'New York')
r.set('employee:1:salary', '1000.00')
print("Keys created successfully")

读取(Retrieve)

r = redis.Redis(host='localhost', port=6379, db=0)
print("NAME = ", r.get('employee:1:name').decode('utf-8'))
print("AGE = ", r.get('employee:1:age').decode('utf-8'))
print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))

更新(Update)

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:salary', '25000.00')
print("Key updated successfully")

删除(Delete)

r = redis.Redis(host='localhost', port=6379, db=0)
r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
print("Keys deleted successfully")

ElasticSearch

连接数据库

Python可以使用elasticsearch库连接ElasticSearch数据库:

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
print("Opened ElasticSearch database successfully")

CRUD操作

接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。

创建(Create)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
employee = {
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
}
res = es.index(index='employees', doc_type='employee', id=1, body=employee)
print("Document created successfully")

读取(Retrieve)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.get(index='employees', doc_type='employee', id=1)
print("Document details:")
for field, details in res['_source'].items():
    print(f"{field.upper()} = ", details)

更新(Update)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.update(index='employees', doc_type='employee', id=1, body={
    'doc': {
        'salary': 25000.00
    }
})
print("Document updated successfully")

删除(Delete)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.delete(index='employees', doc_type='employee', id=1)
print("Document deleted successfully")

Neo4j

连接数据库

Python可以使用neo4j库连接Neo4j数据库:

from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
print("Opened Neo4j database successfully")
driver.close()

CRUD操作

接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。

创建(Create)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")
print("Node created successfully")
driver.close()

读取(Retrieve)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
    for record in result:
        print("ID = ", record["n"]["id"])
        print("NAME = ", record["n"]["name"])
        print("ADDRESS = ", record["n"]["address"])
        print("SALARY = ", record["n"]["salary"])
driver.close()

更新(Update)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
print("Node updated successfully")
driver.close()

删除(Delete)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
print("Node deleted successfully")
driver.close()

InfluxDB

连接数据库

Python可以使用InfluxDB-Python库连接InfluxDB数据库:

from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086)
print("Opened InfluxDB database successfully")
client.close()

CRUD操作

接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。

创建(Create)

client = InfluxDBClient(host='localhost', port=8086)
json_body = [
    {
        "measurement": "employees",
        "tags": {
            "id": "1"
        },
        "fields": {
            "name": "John",
            "age": 30,
            "address": "New York",
            "salary": 1000.00
        }
    }
]
client.write_points(json_body)
print("Point created successfully")
client.close()

读取(Retrieve)

client = InfluxDBClient(host='localhost', port=8086)
result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')
for point in result.get_points():
    print("ID = ", point['id'])
    print("NAME = ", point['name'])
    print("AGE = ", point['age'])
    print("ADDRESS = ", point['address'])
    print("SALARY = ", point['salary'])
client.close()

更新(Update)

InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。

删除(Delete)

同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。

client = InfluxDBClient(host='localhost', port=8086)
# 删除整个系列
client.query('DROP SERIES FROM "employees"')
# 删除某个时间段的数据
# client.query('DELETE FROM "employees" WHERE time < now() - 1d')
print("Series deleted successfully")
client.close()

Snowflake

连接数据库

Python可以使用snowflake-connector-python库连接Snowflake数据库:

from snowflake.connector import connect
con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
print("Opened Snowflake database successfully")
con.close()

CRUD操作

接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。

创建(Create)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("""
CREATE TABLE EMPLOYEES (
    ID INT,
    NAME STRING,
    AGE INT,
    ADDRESS STRING,
    SALARY FLOAT
)
""")
cur.execute("""
INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
(1, 'John', 30, 'New York', 1000.00)
""")
print("Table created and row inserted successfully")
con.close()

读取(Retrieve)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
rows = cur.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("AGE = ", row[2])
    print("ADDRESS = ", row[3])
    print("SALARY = ", row[4])
con.close()

更新(Update)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
print("Row updated successfully")
con.close()

删除(Delete)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
print("Row deleted successfully")
con.close()

Amazon DynamoDB

连接数据库

Python可以使用boto3库连接Amazon DynamoDB:

import boto3
dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
                          aws_access_key_id='Your AWS Access Key',
                          aws_secret_access_key='Your AWS Secret Key')
print("Opened DynamoDB successfully")

CRUD操作

接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。

创建(Create)

table = dynamodb.create_table(
    TableName='Employees',
    KeySchema=[
        {
            'AttributeName': 'id',
            'KeyType': 'HASH'
        },
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'id',
            'AttributeType': 'N'
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)
table.put_item(
   Item={
        'id': 1,
        'name': 'John',
        'age': 30,
        'address': 'New York',
        'salary': 1000.00
    }
)
print("Table created and item inserted successfully")

读取(Retrieve)

table = dynamodb.Table('Employees')
response = table.get_item(
   Key={
        'id': 1,
    }
)
item = response['Item']
print(item)

更新(Update)

table = dynamodb.Table('Employees')
table.update_item(
    Key={
        'id': 1,
    },
    UpdateExpression='SET salary = :val1',
    ExpressionAttributeValues={
        ':val1': 25000.00
    }
)
print("Item updated successfully")

删除(Delete)

table = dynamodb.Table('Employees')
table.delete_item(
    Key={
        'id': 1,
    }
)
print("Item deleted successfully")

Microsoft Azure CosMos DB

连接数据库

Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:

from azure.cosmos import CosmosClient, PartitionKey, exceptions
url = 'Cosmos DB Account URL'
key = 'Cosmos DB Account Key'
client = CosmosClient(url, credential=key)
database_name = 'testDB'
database = client.get_database_client(database_name)
container_name = 'Employees'
container = database.get_container_client(container_name)
print("Opened CosMos DB successfully")

CRUD操作

接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。

创建(Create)

database = client.create_database_if_not_exists(id=database_name)
container = database.create_container_if_not_exists(
    id=container_name, 
    partition_key=PartitionKey(path="/id"),
    offer_throughput=400
)
container.upsert_item({
    'id': '1',
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
})
print("Container created and item upserted successfully")

读取(Retrieve)

for item in container.read_all_items():
    print(item)

更新(Update)

for item in container.read_all_items():
    if item['id'] == '1':
        item['salary'] = 25000.00
        container.upsert_item(item)
print("Item updated successfully")

删除(Delete)

for item in container.read_all_items():
    if item['id'] == '1':
        container.delete_item(item, partition_key='1')
print("Item deleted successfully")


目录
相关文章
|
3月前
|
SQL 关系型数据库 数据库
Python SQLAlchemy模块:从入门到实战的数据库操作指南
免费提供Python+PyCharm编程环境,结合SQLAlchemy ORM框架详解数据库开发。涵盖连接配置、模型定义、CRUD操作、事务控制及Alembic迁移工具,以电商订单系统为例,深入讲解高并发场景下的性能优化与最佳实践,助你高效构建数据驱动应用。
450 7
|
4月前
|
测试技术 开发者 Python
Python单元测试入门:3个核心断言方法,帮你快速定位代码bug
本文介绍Python单元测试基础,详解`unittest`框架中的三大核心断言方法:`assertEqual`验证值相等,`assertTrue`和`assertFalse`判断条件真假。通过实例演示其用法,帮助开发者自动化检测代码逻辑,提升测试效率与可靠性。
406 1
|
5月前
|
机器学习/深度学习 数据采集 数据挖掘
基于 GARCH -LSTM 模型的混合方法进行时间序列预测研究(Python代码实现)
基于 GARCH -LSTM 模型的混合方法进行时间序列预测研究(Python代码实现)
190 2
|
5月前
|
调度 Python
微电网两阶段鲁棒优化经济调度方法(Python代码实现)
微电网两阶段鲁棒优化经济调度方法(Python代码实现)
161 0
|
5月前
|
传感器 大数据 API
Python数字限制在指定范围内:方法与实践
在Python编程中,限制数字范围是常见需求,如游戏属性控制、金融计算和数据过滤等场景。本文介绍了五种主流方法:基础条件判断、数学运算、装饰器模式、类封装及NumPy数组处理,分别适用于不同复杂度和性能要求的场景。每种方法均有示例代码和适用情况说明,帮助开发者根据实际需求选择最优方案。
250 0
|
5月前
|
Python
Python字符串center()方法详解 - 实现字符串居中对齐的完整指南
Python的`center()`方法用于将字符串居中,并通过指定宽度和填充字符美化输出格式,常用于文本对齐、标题及表格设计。
|
4月前
|
人工智能 数据安全/隐私保护 异构计算
桌面版exe安装和Python命令行安装2种方法详细讲解图片去水印AI源码私有化部署Lama-Cleaner安装使用方法-优雅草卓伊凡
桌面版exe安装和Python命令行安装2种方法详细讲解图片去水印AI源码私有化部署Lama-Cleaner安装使用方法-优雅草卓伊凡
515 8
桌面版exe安装和Python命令行安装2种方法详细讲解图片去水印AI源码私有化部署Lama-Cleaner安装使用方法-优雅草卓伊凡
|
4月前
|
数据采集 关系型数据库 MySQL
python爬取数据存入数据库
Python爬虫结合Scrapy与SQLAlchemy,实现高效数据采集并存入MySQL/PostgreSQL/SQLite。通过ORM映射、连接池优化与批量提交,支持百万级数据高速写入,具备良好的可扩展性与稳定性。
|
5月前
|
存储 关系型数据库 MySQL
MySQL数据库中进行日期比较的多种方法介绍。
以上方法提供了灵活多样地处理和对比MySQL数据库中存储地不同格式地日子信息方式。根据实际需求选择适当方式能够有效执行所需操作并保证性能优化。
554 10
|
4月前
|
存储 数据库 开发者
Python SQLite模块:轻量级数据库的实战指南
本文深入讲解Python内置sqlite3模块的实战应用,涵盖数据库连接、CRUD操作、事务管理、性能优化及高级特性,结合完整案例,助你快速掌握SQLite在小型项目中的高效使用,是Python开发者必备的轻量级数据库指南。
385 0

推荐镜像

更多