Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!2

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!2

Cassandra

连接数据库

Python可以使用cassandra-driver库连接Cassandra数据库:

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
print("Opened Cassandra database successfully")
cluster.shutdown()

CRUD操作

接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。

创建(Create)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("""
    CREATE TABLE Employees (
        id int PRIMARY KEY,
        name text,
        age int,
        address text,
        salary decimal
    )
""")
print("Table created successfully")
cluster.shutdown()

读取(Retrieve)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
rows = session.execute('SELECT id, name, address, salary FROM Employees')
for row in rows:
    print("ID = ", row.id)
    print("NAME = ", row.name)
    print("ADDRESS = ", row.address)
    print("SALARY = ", row.salary)
cluster.shutdown()

更新(Update)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
print("Row updated successfully")
cluster.shutdown()

删除(Delete)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("DELETE FROM Employees WHERE id = 1")
print("Row deleted successfully")
cluster.shutdown()

Redis

连接数据库

Python可以使用redis-py库连接Redis数据库:

import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print("Opened Redis database successfully")

CRUD操作

接下来,我们将展示在Redis中如何进行基本的CRUD操作。

创建(Create)

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:name', 'John')
r.set('employee:1:age', '30')
r.set('employee:1:address', 'New York')
r.set('employee:1:salary', '1000.00')
print("Keys created successfully")

读取(Retrieve)

r = redis.Redis(host='localhost', port=6379, db=0)
print("NAME = ", r.get('employee:1:name').decode('utf-8'))
print("AGE = ", r.get('employee:1:age').decode('utf-8'))
print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))

更新(Update)

r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:salary', '25000.00')
print("Key updated successfully")

删除(Delete)

r = redis.Redis(host='localhost', port=6379, db=0)
r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
print("Keys deleted successfully")

ElasticSearch

连接数据库

Python可以使用elasticsearch库连接ElasticSearch数据库:

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
print("Opened ElasticSearch database successfully")

CRUD操作

接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。

创建(Create)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
employee = {
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
}
res = es.index(index='employees', doc_type='employee', id=1, body=employee)
print("Document created successfully")

读取(Retrieve)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.get(index='employees', doc_type='employee', id=1)
print("Document details:")
for field, details in res['_source'].items():
    print(f"{field.upper()} = ", details)

更新(Update)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.update(index='employees', doc_type='employee', id=1, body={
    'doc': {
        'salary': 25000.00
    }
})
print("Document updated successfully")

删除(Delete)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.delete(index='employees', doc_type='employee', id=1)
print("Document deleted successfully")

Neo4j

连接数据库

Python可以使用neo4j库连接Neo4j数据库:

from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
print("Opened Neo4j database successfully")
driver.close()

CRUD操作

接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。

创建(Create)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")
print("Node created successfully")
driver.close()

读取(Retrieve)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
    for record in result:
        print("ID = ", record["n"]["id"])
        print("NAME = ", record["n"]["name"])
        print("ADDRESS = ", record["n"]["address"])
        print("SALARY = ", record["n"]["salary"])
driver.close()

更新(Update)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
print("Node updated successfully")
driver.close()

删除(Delete)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
print("Node deleted successfully")
driver.close()

InfluxDB

连接数据库

Python可以使用InfluxDB-Python库连接InfluxDB数据库:

from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086)
print("Opened InfluxDB database successfully")
client.close()

CRUD操作

接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。

创建(Create)

client = InfluxDBClient(host='localhost', port=8086)
json_body = [
    {
        "measurement": "employees",
        "tags": {
            "id": "1"
        },
        "fields": {
            "name": "John",
            "age": 30,
            "address": "New York",
            "salary": 1000.00
        }
    }
]
client.write_points(json_body)
print("Point created successfully")
client.close()

读取(Retrieve)

client = InfluxDBClient(host='localhost', port=8086)
result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')
for point in result.get_points():
    print("ID = ", point['id'])
    print("NAME = ", point['name'])
    print("AGE = ", point['age'])
    print("ADDRESS = ", point['address'])
    print("SALARY = ", point['salary'])
client.close()

更新(Update)

InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。

删除(Delete)

同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。

client = InfluxDBClient(host='localhost', port=8086)
# 删除整个系列
client.query('DROP SERIES FROM "employees"')
# 删除某个时间段的数据
# client.query('DELETE FROM "employees" WHERE time < now() - 1d')
print("Series deleted successfully")
client.close()

Snowflake

连接数据库

Python可以使用snowflake-connector-python库连接Snowflake数据库:

from snowflake.connector import connect
con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
print("Opened Snowflake database successfully")
con.close()

CRUD操作

接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。

创建(Create)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("""
CREATE TABLE EMPLOYEES (
    ID INT,
    NAME STRING,
    AGE INT,
    ADDRESS STRING,
    SALARY FLOAT
)
""")
cur.execute("""
INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
(1, 'John', 30, 'New York', 1000.00)
""")
print("Table created and row inserted successfully")
con.close()

读取(Retrieve)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
rows = cur.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("AGE = ", row[2])
    print("ADDRESS = ", row[3])
    print("SALARY = ", row[4])
con.close()

更新(Update)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
print("Row updated successfully")
con.close()

删除(Delete)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
cur = con.cursor()
cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
print("Row deleted successfully")
con.close()

Amazon DynamoDB

连接数据库

Python可以使用boto3库连接Amazon DynamoDB:

import boto3
dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
                          aws_access_key_id='Your AWS Access Key',
                          aws_secret_access_key='Your AWS Secret Key')
print("Opened DynamoDB successfully")

CRUD操作

接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。

创建(Create)

table = dynamodb.create_table(
    TableName='Employees',
    KeySchema=[
        {
            'AttributeName': 'id',
            'KeyType': 'HASH'
        },
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'id',
            'AttributeType': 'N'
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)
table.put_item(
   Item={
        'id': 1,
        'name': 'John',
        'age': 30,
        'address': 'New York',
        'salary': 1000.00
    }
)
print("Table created and item inserted successfully")

读取(Retrieve)

table = dynamodb.Table('Employees')
response = table.get_item(
   Key={
        'id': 1,
    }
)
item = response['Item']
print(item)

更新(Update)

table = dynamodb.Table('Employees')
table.update_item(
    Key={
        'id': 1,
    },
    UpdateExpression='SET salary = :val1',
    ExpressionAttributeValues={
        ':val1': 25000.00
    }
)
print("Item updated successfully")

删除(Delete)

table = dynamodb.Table('Employees')
table.delete_item(
    Key={
        'id': 1,
    }
)
print("Item deleted successfully")

Microsoft Azure CosMos DB

连接数据库

Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:

from azure.cosmos import CosmosClient, PartitionKey, exceptions
url = 'Cosmos DB Account URL'
key = 'Cosmos DB Account Key'
client = CosmosClient(url, credential=key)
database_name = 'testDB'
database = client.get_database_client(database_name)
container_name = 'Employees'
container = database.get_container_client(container_name)
print("Opened CosMos DB successfully")

CRUD操作

接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。

创建(Create)

database = client.create_database_if_not_exists(id=database_name)
container = database.create_container_if_not_exists(
    id=container_name, 
    partition_key=PartitionKey(path="/id"),
    offer_throughput=400
)
container.upsert_item({
    'id': '1',
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
})
print("Container created and item upserted successfully")

读取(Retrieve)

for item in container.read_all_items():
    print(item)

更新(Update)

for item in container.read_all_items():
    if item['id'] == '1':
        item['salary'] = 25000.00
        container.upsert_item(item)
print("Item updated successfully")

删除(Delete)

for item in container.read_all_items():
    if item['id'] == '1':
        container.delete_item(item, partition_key='1')
print("Item deleted successfully")


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
1月前
|
存储 监控 安全
数据库多实例的部署与配置方法
【10月更文挑战第23天】数据库多实例的部署和配置需要综合考虑多个因素,包括硬件资源、软件设置、性能优化、安全保障等。通过合理的部署和配置,可以充分发挥多实例的优势,提高数据库系统的运行效率和可靠性。在实际操作中,要不断总结经验,根据实际情况进行调整和优化,以适应不断变化的业务需求。
|
2月前
|
存储 索引 Python
Python散列类型(1)
【10月更文挑战第9天】
|
8天前
|
弹性计算 关系型数据库 数据库
自建数据库迁移到云数据库实操
本课程详细介绍了自建数据库迁移到阿里云RDS的实操步骤。主要内容包括:创建实例资源、安全设置、配置自建的MySQL数据库、数据库的迁移、从自建数据库切换到RDS以及清理资源。通过这些步骤,学员可以掌握如何将自建数据库安全、高效地迁移到云端,并确保应用的正常运行。
70 26
|
24天前
|
弹性计算 安全 关系型数据库
活动实践 | 自建数据库迁移到云数据库
通过阿里云RDS,用户可获得稳定、安全的企业级数据库服务,无需担心数据库管理与维护。该方案使用RDS确保数据库的可靠性、可用性和安全性,结合ECS和DTS服务,实现自建数据库平滑迁移到云端,支持WordPress等应用的快速部署与运行。通过一键部署模板,用户能迅速搭建ECS和RDS实例,完成数据迁移及应用上线,显著提升业务灵活性和效率。
|
8天前
|
运维 关系型数据库 MySQL
自建数据库迁移到云数据库RDS
本次课程由阿里云数据库团队的凡珂分享,主题为自建数据库迁移至云数据库RDS MySQL版。课程分为四部分:1) 传统数据库部署方案及痛点;2) 选择云数据库RDS MySQL的原因;3) 数据库迁移方案和产品选型;4) 线上活动与权益。通过对比自建数据库的局限性,介绍了RDS MySQL在可靠性、安全性、性价比等方面的优势,并详细讲解了使用DTS(数据传输服务)进行平滑迁移的步骤。此外,还提供了多种优惠活动信息,帮助用户降低成本并享受云数据库带来的便利。
|
17天前
|
数据可视化 Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
通过这些思维导图和分析说明表,您可以更直观地理解和选择适合的数据可视化图表类型,帮助更有效地展示和分析数据。
59 8
|
1月前
|
Python
在 Python 中实现各种类型的循环判断
在 Python 中实现各种类型的循环判断
34 2
|
2月前
|
存储 数据安全/隐私保护 索引
|
23天前
|
安全 关系型数据库 MySQL
体验自建数据库迁移到云数据库RDS,领取桌面置物架!
「技术解决方案【Cloud Up 挑战赛】」正式开启!本方案旨在帮助用户将自建数据库平滑迁移至阿里云RDS MySQL,享受稳定、高效、安全的数据库服务,助力业务快速发展。完成指定任务即可赢取桌面置物架等奖励,限量供应,先到先得。活动时间:2024年12月3日至12月31日16点。
|
1月前
|
SQL Oracle 关系型数据库
Oracle数据库优化方法
【10月更文挑战第25天】Oracle数据库优化方法
55 7