Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!2

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!2

Cassandra

连接数据库

Python可以使用cassandra-driver库连接Cassandra数据库:

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
print("Opened Cassandra database successfully")
cluster.shutdown()

CRUD操作

接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。

创建(Create)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("""
    CREATE TABLE Employees (
        id int PRIMARY KEY,
        name text,
        age int,
        address text,
        salary decimal
    )
""")
print("Table created successfully")
cluster.shutdown()

读取(Retrieve)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
rows = session.execute('SELECT id, name, address, salary FROM Employees')
for row in rows:
    print("ID = ", row.id)
    print("NAME = ", row.name)
    print("ADDRESS = ", row.address)
    print("SALARY = ", row.salary)
cluster.shutdown()

更新(Update)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
print("Row updated successfully")
cluster.shutdown()

删除(Delete)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("DELETE FROM Employees WHERE id = 1")
print("Row deleted successfully")
cluster.shutdown()

Redis

连接数据库

Python可以使用redis-py库连接Redis数据库:

import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print("Opened Redis database successfully")

CRUD操作

接下来,我们将展示在Redis中如何进行基本的CRUD操作。

创建(Create)

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:name', 'John')
r.set('employee:1:age', '30')
r.set('employee:1:address', 'New York')
r.set('employee:1:salary', '1000.00')
print("Keys created successfully")

读取(Retrieve)

r = redis.Redis(host='localhost', port=6379, db=0)
print("NAME = ", r.get('employee:1:name').decode('utf-8'))
print("AGE = ", r.get('employee:1:age').decode('utf-8'))
print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))

更新(Update)

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:salary', '25000.00')
print("Key updated successfully")

删除(Delete)

r = redis.Redis(host='localhost', port=6379, db=0)
r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
print("Keys deleted successfully")

ElasticSearch

连接数据库

Python可以使用elasticsearch库连接ElasticSearch数据库:

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
print("Opened ElasticSearch database successfully")

CRUD操作

接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。

创建(Create)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
employee = {
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
}
res = es.index(index='employees', doc_type='employee', id=1, body=employee)
print("Document created successfully")

读取(Retrieve)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.get(index='employees', doc_type='employee', id=1)
print("Document details:")
for field, details in res['_source'].items():
    print(f"{field.upper()} = ", details)

更新(Update)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.update(index='employees', doc_type='employee', id=1, body={
    'doc': {
        'salary': 25000.00
    }
})
print("Document updated successfully")

删除(Delete)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.delete(index='employees', doc_type='employee', id=1)
print("Document deleted successfully")

Neo4j

连接数据库

Python可以使用neo4j库连接Neo4j数据库:

from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
print("Opened Neo4j database successfully")
driver.close()

CRUD操作

接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。

创建(Create)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")
print("Node created successfully")
driver.close()

读取(Retrieve)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
    for record in result:
        print("ID = ", record["n"]["id"])
        print("NAME = ", record["n"]["name"])
        print("ADDRESS = ", record["n"]["address"])
        print("SALARY = ", record["n"]["salary"])
driver.close()

更新(Update)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
print("Node updated successfully")
driver.close()

删除(Delete)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
print("Node deleted successfully")
driver.close()

InfluxDB

连接数据库

Python可以使用InfluxDB-Python库连接InfluxDB数据库:

from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086)
print("Opened InfluxDB database successfully")
client.close()

CRUD操作

接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。

创建(Create)

client = InfluxDBClient(host='localhost', port=8086)
json_body = [
    {
        "measurement": "employees",
        "tags": {
            "id": "1"
        },
        "fields": {
            "name": "John",
            "age": 30,
            "address": "New York",
            "salary": 1000.00
        }
    }
]
client.write_points(json_body)
print("Point created successfully")
client.close()

读取(Retrieve)

client = InfluxDBClient(host='localhost', port=8086)
result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')
for point in result.get_points():
    print("ID = ", point['id'])
    print("NAME = ", point['name'])
    print("AGE = ", point['age'])
    print("ADDRESS = ", point['address'])
    print("SALARY = ", point['salary'])
client.close()

更新(Update)

InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。

删除(Delete)

同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。

client = InfluxDBClient(host='localhost', port=8086)
# 删除整个系列
client.query('DROP SERIES FROM "employees"')
# 删除某个时间段的数据
# client.query('DELETE FROM "employees" WHERE time < now() - 1d')
print("Series deleted successfully")
client.close()

Snowflake

连接数据库

Python可以使用snowflake-connector-python库连接Snowflake数据库:

from snowflake.connector import connect
con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
print("Opened Snowflake database successfully")
con.close()

CRUD操作

接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。

创建(Create)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("""
CREATE TABLE EMPLOYEES (
    ID INT,
    NAME STRING,
    AGE INT,
    ADDRESS STRING,
    SALARY FLOAT
)
""")
cur.execute("""
INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
(1, 'John', 30, 'New York', 1000.00)
""")
print("Table created and row inserted successfully")
con.close()

读取(Retrieve)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
rows = cur.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("AGE = ", row[2])
    print("ADDRESS = ", row[3])
    print("SALARY = ", row[4])
con.close()

更新(Update)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
print("Row updated successfully")
con.close()

删除(Delete)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
print("Row deleted successfully")
con.close()

Amazon DynamoDB

连接数据库

Python可以使用boto3库连接Amazon DynamoDB:

import boto3
dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
                          aws_access_key_id='Your AWS Access Key',
                          aws_secret_access_key='Your AWS Secret Key')
print("Opened DynamoDB successfully")

CRUD操作

接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。

创建(Create)

table = dynamodb.create_table(
    TableName='Employees',
    KeySchema=[
        {
            'AttributeName': 'id',
            'KeyType': 'HASH'
        },
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'id',
            'AttributeType': 'N'
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)
table.put_item(
   Item={
        'id': 1,
        'name': 'John',
        'age': 30,
        'address': 'New York',
        'salary': 1000.00
    }
)
print("Table created and item inserted successfully")

读取(Retrieve)

table = dynamodb.Table('Employees')
response = table.get_item(
   Key={
        'id': 1,
    }
)
item = response['Item']
print(item)

更新(Update)

table = dynamodb.Table('Employees')
table.update_item(
    Key={
        'id': 1,
    },
    UpdateExpression='SET salary = :val1',
    ExpressionAttributeValues={
        ':val1': 25000.00
    }
)
print("Item updated successfully")

删除(Delete)

table = dynamodb.Table('Employees')
table.delete_item(
    Key={
        'id': 1,
    }
)
print("Item deleted successfully")

Microsoft Azure CosMos DB

连接数据库

Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:

from azure.cosmos import CosmosClient, PartitionKey, exceptions
url = 'Cosmos DB Account URL'
key = 'Cosmos DB Account Key'
client = CosmosClient(url, credential=key)
database_name = 'testDB'
database = client.get_database_client(database_name)
container_name = 'Employees'
container = database.get_container_client(container_name)
print("Opened CosMos DB successfully")

CRUD操作

接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。

创建(Create)

database = client.create_database_if_not_exists(id=database_name)
container = database.create_container_if_not_exists(
    id=container_name, 
    partition_key=PartitionKey(path="/id"),
    offer_throughput=400
)
container.upsert_item({
    'id': '1',
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
})
print("Container created and item upserted successfully")

读取(Retrieve)

for item in container.read_all_items():
    print(item)

更新(Update)

for item in container.read_all_items():
    if item['id'] == '1':
        item['salary'] = 25000.00
        container.upsert_item(item)
print("Item updated successfully")

删除(Delete)

for item in container.read_all_items():
    if item['id'] == '1':
        container.delete_item(item, partition_key='1')
print("Item deleted successfully")


相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。 &nbsp;
目录
相关文章
|
12天前
|
IDE 开发工具 开发者
Python类型注解:提升代码可读性与健壮性
Python类型注解:提升代码可读性与健壮性
186 102
|
2月前
|
存储 运维 关系型数据库
从MySQL到云数据库,数据库迁移真的有必要吗?
本文探讨了企业在业务增长背景下,是否应从 MySQL 迁移至云数据库的决策问题。分析了 MySQL 的优势与瓶颈,对比了云数据库在存储计算分离、自动化运维、多负载支持等方面的优势,并提出判断迁移必要性的五个关键问题及实施路径,帮助企业理性决策并落地迁移方案。
|
4月前
|
人工智能 运维 关系型数据库
数据库运维:mysql 数据库迁移方法-mysqldump
本文介绍了MySQL数据库迁移的方法与技巧,重点探讨了数据量大小对迁移方式的影响。对于10GB以下的小型数据库,推荐使用mysqldump进行逻辑导出和source导入;10GB以上可考虑mydumper与myloader工具;100GB以上则建议物理迁移。文中还提供了统计数据库及表空间大小的SQL语句,并讲解了如何使用mysqldump导出存储过程、函数和数据结构。通过结合实际应用场景选择合适的工具与方法,可实现高效的数据迁移。
709 1
|
6月前
|
索引 Python
Python的变量和简单类型
本文介绍了Python中变量命名规则、常用变量类型及字符串操作。变量命名需遵循字母、数字和下划线组合,不能以数字开头且不可与关键字冲突。字符串支持单引号、双引号或三引号定义,涵盖基本输出、转义字符、索引、拼接等操作。此外,还详细解析了字符串方法如`islower()`、`upper()`、`count()`等,帮助理解字符串处理技巧。
134 15
|
25天前
|
弹性计算 关系型数据库 数据库
云数据库RDS数据库迁移上云
阿里云RDS是一种安全稳定、高性价比的在线数据库服务,支持弹性伸缩,帮助用户轻松部署与扩展数据库。提供实例创建、白名单设置、数据库与账号管理、便捷连接等功能,简化运维操作,保障数据安全。
|
8天前
|
存储 数据库 开发者
Python SQLite模块:轻量级数据库的实战指南
本文深入讲解Python内置sqlite3模块的实战应用,涵盖数据库连接、CRUD操作、事务管理、性能优化及高级特性,结合完整案例,助你快速掌握SQLite在小型项目中的高效使用,是Python开发者必备的轻量级数据库指南。
89 0
|
2月前
|
存储 关系型数据库 MySQL
MySQL数据库中进行日期比较的多种方法介绍。
以上方法提供了灵活多样地处理和对比MySQL数据库中存储地不同格式地日子信息方式。根据实际需求选择适当方式能够有效执行所需操作并保证性能优化。
235 10
|
2月前
|
安全 JavaScript Java
Python中None与NoneType的真相:从单例对象到类型系统的深度解析
本文通过10个真实场景,深入解析Python中表示“空值”的None与NoneType。从单例模式、函数返回值,到类型注解、性能优化,全面揭示None在语言设计与实际编程中的核心作用,帮助开发者正确高效地处理“无值”状态,写出更健壮、清晰的Python代码。
169 3
|
3月前
|
SQL Oracle 关系型数据库
比较MySQL和Oracle数据库系统,特别是在进行分页查询的方法上的不同
两者的性能差异将取决于数据量大小、索引优化、查询设计以及具体版本的数据库服务器。考虑硬件资源、数据库设计和具体需求对于实现优化的分页查询至关重要。开发者和数据库管理员需要根据自身使用的具体数据库系统版本和环境,选择最合适的分页机制,并进行必要的性能调优来满足应用需求。
125 11
|
2月前
|
缓存 数据可视化 Linux
Python文件/目录比较实战:排除特定类型的实用技巧
本文通过四个实战案例,详解如何使用Python比较目录差异并灵活排除特定文件,涵盖基础比较、大文件处理、跨平台适配与可视化报告生成,助力开发者高效完成目录同步与数据校验任务。
98 0

推荐镜像

更多