开发者社区> 问答> 正文

从csv文件读取输入时,如何在Cassandra中插入数据时达到50k / s的吞吐量?

我的目标是增加Cassandra中版本控制数据的吞吐量。我使用了并发读取和写入,并且还增加了我的代码从文件中读取的块大小。我的机器是16GB,有8个核心,是的,我已经改变了Cassandra的yaml文件,进行了10k并发读写,当计时时,我发现10000次写入/读取只需不到一秒钟。我最小的,可行的代码是:

import json
import logging
import os
import sys
from datetime import datetime
from hashlib import sha256

import pandas as pd
from cassandra import ConsistencyLevel, WriteTimeout
from cassandra.cluster import (EXEC_PROFILE_DEFAULT, BatchStatement, Cluster,

                           ExecutionProfile)

from cassandra.concurrent import (execute_concurrent,

                              execute_concurrent_with_args)

from cassandra.query import SimpleStatement, dict_factory

class PythonCassandraExample:

def __init__(self, file_to_be_versioned, working_dir=os.getcwd(), mode='append'):
    self.cluster = None
    self.session = None
    self.keyspace = None
    self.log = None
    self.mode = mode
    self.file_to_be_versioned = file_to_be_versioned
    self.insert_patch = []
    self.delete_patch = []
    self.update_patch = []
    self.working_dir = working_dir

def __del__(self):
    self.cluster.shutdown()

def createsession(self):
    profile = ExecutionProfile(
        row_factory=dict_factory,
        request_timeout=6000
    )
    self.cluster = Cluster(
        ['localhost'],
        connect_timeout=50,
        execution_profiles={
            EXEC_PROFILE_DEFAULT: profile
        }
    )
    self.session = self.cluster.connect(self.keyspace)

def getsession(self):
    return self.session

# How about Adding some log info to see what went wrong
def setlogger(self):
    log = logging.getLogger()
    log.setLevel('INFO')
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter(
        "%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
    log.addHandler(handler)
    self.log = log
# Create Keyspace based on Given Name

def handle_error(self, exception):
    self.log.error("Failed to fetch user info: %s", exception)

def createkeyspace(self, keyspace):
    """
    :param keyspace:  The Name of Keyspace to be created
    :return:
    """
    # Before we create new lets check if exiting keyspace; we will drop that and create new
    self.log.info("creating keyspace...")
    self.session.execute("""
            CREATE KEYSPACE IF NOT EXISTS %s
            WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }
            """ % keyspace)
    self.log.info("setting keyspace...")
    self.keyspace = keyspace
    self.session.set_keyspace(self.keyspace)

def create_table_and_set_version(self, table_name):
    self.table_name = table_name.lower()
    table_select_query = "SELECT table_name FROM system_schema.tables WHERE keyspace_name='{}' AND table_name='{}'".format(
        self.keyspace, self.table_name)
    print(table_select_query)
    table_exists = self.session.execute(table_select_query).one()
    self.log.info("Table exists: {}".format(table_exists))
    if table_exists:
        self.log.info(
            "Datapackage already exists! Checking the last version")
        self.version = self.session.execute(
            "SELECT version FROM {} LIMIT 1".format(self.table_name)).one()
        self.log.info(
            "The version fetched is: {} of type: {}".format(
                self.version, type(self.version)
            )
        )
        if not self.version:
            self.version = 0
        else:
            self.version = self.version['version']
    else:
        self.log.info("Table didn't exist!")
        self.version = 0
    self.target_version = int(str(self.version)) + 1
    self.log.info(
        "Current and candidate versions are: {}, {}".format(
            self.version,
            self.target_version
        )
    )
    # c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version int, row varchar, row_hash varchar, PRIMARY KEY(id, version)) with clustering order by (version desc)".format(
    #     self.table_name)
    c_sql = "CREATE TABLE IF NOT EXISTS {} (id varchar, version int, row_hash varchar, PRIMARY KEY((version), id))".format(
        self.table_name
    )
    self.session.execute(c_sql)
    self.log.info("DP Table Created !!!")
    self.log.info("Current and candidate versions are: {}, {}".format(
        self.version, self.target_version))

def push_to_update_patch(self, update_patch_file, last_patch=False):
    if len(self.update_patch) >= 10000:
        with open(update_patch_file, mode='a') as json_file:
            json_file.writelines(
                self.update_patch
            )
        del self.update_patch[:]
    if last_patch is True and len(self.update_patch) > 0:
        with open(update_patch_file, mode='a') as json_file:
            json_file.writelines(
                self.update_patch
            )
        del self.update_patch[:]

def push_to_insert_patch(self, insert_patch_file, last_patch=False):
    if len(self.insert_patch) >= 10000:
        with open(insert_patch_file, mode='a') as json_file:
            json_file.writelines(
                self.insert_patch
            )
        del self.insert_patch[:]
    if last_patch is True and len(self.update_patch) > 0:
        with open(insert_patch_file, mode='a') as json_file:
            json_file.writelines(
                self.insert_patch
            )
        del self.insert_patch[:]

def push_to_delete_patch(self, delete_patch_file, last_patch=False):
    if len(self.delete_patch) >= 10000:
        with open(delete_patch_file, mode='a') as json_file:
            json_file.writelines(
                self.delete_patch
            )
        del self.delete_patch[:]
    if last_patch is True and len(self.delete_patch) > 0:
        with open(delete_patch_file, mode='a') as json_file:
            json_file.writelines(
                self.delete_patch
            )
        del self.delete_patch[:]

def push_to_patch(self, key, value, mode='update'):
    if key is None or value is None:
        raise ValueError(
            "Key or value or both not specified for making a patch. Exiting now."
        )
    data = {}
    data["id"] = str(key)
    data["data"] = json.dumps(value, default=str)
    # convert dict to json str so that the patch is a list of line jsons.
    data = json.dumps(data, default=str)
    json_patch_file = os.path.join(
        self.working_dir,
        "version_{}_{}.json".format(
            self.target_version, mode
        )
    )
    if mode == 'update':
        self.update_patch.append(
            data + "\n"
        )
        self.push_to_update_patch(
            json_patch_file
        )
    if mode == 'insert':
        self.insert_patch.append(
            data + "\n"
        )
        self.push_to_insert_patch(
            json_patch_file
        )
    if mode == 'delete':
        self.delete_patch.append(
            data + "\n"
        )
        self.push_to_delete_patch(
            json_patch_file
        )

def clone_version(self):
    if self.mode == 'replace':
        return
    self.log.info("Cloning version")
    start_time = datetime.utcnow()
    if self.version == 0:
        return
    insert_sql = self.session.prepare(
        (
            "INSERT INTO  {} ({}, {}, {}) VALUES (?,?,?)"
        ).format(
            self.table_name, "id", "version", "row_hash"
        )
    )
    futures = []
    current_version_query = "SELECT id, row_hash FROM {} WHERE version={}".format(
        self.table_name, self.version
    )
    current_version_rows = self.session.execute(
        current_version_query
    )
    for current_version_row in current_version_rows:
        params = (
            current_version_row['id'],
            self.target_version,
            current_version_row['row_hash']
        )
        futures.append(
            (
                insert_sql,
                params
            )
        )
    self.log.info(
        "Time taken to clone the version is: {}".format(
            datetime.utcnow() - start_time
        )
    )

def insert_data(self, generate_diff=False):
    self.generate_diff = generate_diff
    destination = self.file_to_be_versioned
    chunksize = 10000
    chunks = pd.read_csv(destination, chunksize=chunksize)
    chunk_counter = 0
    insert_sql = self.session.prepare(
        (
            "INSERT INTO  {} ({}, {}, {}) VALUES (?,?,?)"
        ).format(
            self.table_name, "id", "version", "row_hash"
        )
    )
    select_sql = self.session.prepare(
        (
            "SELECT id, version, row_hash FROM {} WHERE  version=? AND id=?"
        ).format(
            self.table_name
        )
    )
    futures = []
    check_for_patch = [] #this list comprises rows with ids and values for checking whether its an update/insert
    rows_for_checking_patch = []
    for df in chunks:
        start_time = datetime.utcnow()
        df = df.to_dict(orient='records')
        chunk_counter += 1
        if chunk_counter%100 == 0:
            self.log.info(chunk_counter)
        for row in df:
            key = sha256(str(row["column_test_3"]).encode("utf-8")).hexdigest()
            row_json = json.dumps(row, default=str)
            row_hash = sha256(row_json.encode("utf-8")).hexdigest()
            rows_for_checking_patch.append(
                {
                    "row": row,
                    "row_hash": row_hash,
                    "id": key
                }
            )
            row_check = None
            params = (
                key,
                self.target_version, 
                row_hash
            )
            check_for_patch_params = (
                self.version,
                key
            )
            check_for_patch.append(
                (
                    select_sql,
                    check_for_patch_params
                )
            )
            futures.append(
                (
                    insert_sql,
                    params
                )
            )    
        if len(check_for_patch) >= 1000:
            results = execute_concurrent(
                self.session, check_for_patch, concurrency=10000, raise_on_first_error=False
            )
            row_counter_for_patch = 0
            for (success, result) in results:
                if not result:
                    self.push_to_patch(
                        rows_for_checking_patch[row_counter_for_patch]["id"],
                        rows_for_checking_patch[row_counter_for_patch]["row"],
                        mode='insert'
                    )
                    row_counter_for_patch += 1
                    continue
                if not success:
                    # result will be an Exception
                    self.log.error("Error has occurred in insert cql")
                    self.handle_error(result)
                id_to_be_compared = result[0]["id"]
                row_hash_to_be_compared = result[0]["row_hash"]
                if (row_hash_to_be_compared != rows_for_checking_patch[row_counter_for_patch]["row_hash"]):
                    self.push_to_patch(
                        id_to_be_compared,
                        rows_for_checking_patch[row_counter_for_patch]["row"],
                        mode='update'
                    )
                row_counter_for_patch += 1
            del check_for_patch[:]
            del rows_for_checking_patch[:]
            row_counter_for_patch = 0

        if (len(futures) >= 1000):
            results = execute_concurrent(
                self.session, futures, concurrency=10000, raise_on_first_error=False
            )
            for (success, result) in results:
                if not success:
                    # result will be an Exception
                    self.log.error("Error has occurred in insert cql")
                    self.handle_error(result)
            del futures[:]
        self.log.info("This chunk got over in {}".format(datetime.utcnow() - start_time))

    if len(check_for_patch) > 0:
        results = execute_concurrent(
            self.session, check_for_patch, concurrency=1000, raise_on_first_error=False
        )
        row_counter_for_patch = 0
        for (success, result) in results:
            if not result:
                self.push_to_patch(
                    rows_for_checking_patch[row_counter_for_patch]["id"],
                    rows_for_checking_patch[row_counter_for_patch]["row"],
                    mode='insert'
                )
                row_counter_for_patch += 1
                continue
            if not success:
                # result will be an Exception
                self.log.error("Error has occurred in insert cql")
                self.handle_error(result)
            id_to_be_compared = result[0]["id"]
            row_hash_to_be_compared = result[0]["row_hash"]
            if (row_hash_to_be_compared != rows_for_checking_patch[row_counter_for_patch]["row_hash"]):
                self.push_to_patch(
                    id_to_be_compared,
                    rows_for_checking_patch[row_counter_for_patch]["row"],
                    mode='update'
                )
                row_counter_for_patch += 1
        del check_for_patch[:]
        del rows_for_checking_patch[:]

    if len(futures) > 0:   # in case the last dataframe has #rows < 10k.
        results = execute_concurrent(
            self.session, futures, concurrency=1000, raise_on_first_error=False
        )
        for (success, result) in results:
            if not success:
                self.handle_error(result)
        del futures[:]
        self.log.info(chunk_counter)

    # Check the delete patch
    if self.generate_diff is True and self.mode is 'replace' and self.version is not 0:
        self.log.info("We got to find the delete patch!")
        start_time = datetime.utcnow()
        current_version_query = "SELECT id, row, row_hash FROM {} WHERE version={}".format(
            self.table_name, self.version
        )
        current_version_rows = self.session.execute(
            current_version_query
        )
        for current_version_row in current_version_rows:
            row_check_query = "SELECT {} FROM {} WHERE {}={} AND {}='{}' ".format(
                "id", self.table_name, "version", self.target_version, "id", current_version_row.id
            )
            row_check = self.session.execute(row_check_query).one()
            if row_check is not None:  # row exists in both version.
                continue
            self.push_to_patch(
                current_version_row.id,
                current_version_row.id,
                mode="delete"
            )
    print("Complete insert's duration is: {}".format(
        datetime.utcnow() - start_time)
    )
    # Calling last_patch for all remaining diffs
    modes = [
        'update',
        'insert',
        'delete'
    ]
    for mode in modes:
        json_patch_file = os.path.join(
            self.working_dir,
            "version_{}_{}.json".format(
                self.target_version, mode
            )
        )
        if mode == 'update':
            self.push_to_update_patch(
                json_patch_file,
                last_patch=True
            )
        if mode == 'insert':
            self.push_to_insert_patch(
                json_patch_file,
                last_patch=True
            )
        if mode == 'delete':
            self.push_to_delete_patch(
                json_patch_file,
                last_patch=True
            )

if name == '__main__':

example1 = PythonCassandraExample(
    file_to_be_versioned="hundred_million_eleven_columns.csv"
)
example1.createsession()
example1.setlogger()
example1.createkeyspace('sat_athena_one')
example1.create_table_and_set_version('five_hundred_rows')
example1.clone_version()
example1.insert_data(generate_diff=True)

我有一个100M行和11列的csv文件。用于生成此类文件的脚本是:

import csv
import sys
import os
import pandas as pd

file_name = "hundred_million_eleven_columns.csv"
rows_list = []
chunk_counter = 1
headers = [

"column_test_1",
"column_test_2",
"column_test_3",
"column_test_4",
"column_test_5",
"column_test_6",
"column_test_7",
"column_test_8",
"column_test_9",
"column_test_10",
"column_test_11",

]

file_exists = os.path.isfile(file_name)
with open(file_name, 'a') as csvfile:

writer = csv.DictWriter(csvfile, delimiter=',',
                        lineterminator='\n', fieldnames=headers)
if not file_exists:
    writer.writeheader()  # file doesn't exist yet, write a header

for i in range(100000000):

dict1 = [
    i, i+1, i+2, i+3, i+4, i+5, i+6, i+7, i+8, i+9, i+10
]
# get input row in dictionary format
# key = col_name
rows_list.append(dict1)
if len(rows_list) == 100000:
    df = pd.DataFrame(rows_list)

展开
收起
一码平川MACHEL 2019-01-23 16:55:23 2615 0
1 条回答
写回答
取消 提交回答
  • 由于写入几乎不是IO绑定的,因此理想的“concurrent_writes”数取决于系统中的核心数; (8 * number_of_cores)是一个很好的经验法则。

    64在8core机器中是合适的。

    concurrent_reads:64
    concurrent_writes:64
    concurrent_counter_writes:64
    可能会建议使用此限制,因为除了普通IO之外还有许多其他io操作。ex)写提交日志,缓存,压缩,复制,查看(如果存在)

    一些经验法则

    disk_optimization_strategy:ssd //如果您的磁盘是硬盘,则值为旋转
    使用专用的提交日志磁盘。ssd推荐。
    更多磁盘=更好的性能

    2019-07-17 23:26:45
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
大批量处理excel文件到ODPS中方案 立即下载
HBase2.0重新定义小对象实时存取 立即下载
低代码开发师(初级)实战教程 立即下载