MyEMS开源能源管理系统核心代码解读003

简介: MyEMS核心功能包括:1. **主循环**:每5分钟执行一次能耗数据汇总。2. **多进程处理**:为每个空间创建独立进程,提高处理效率。3. **数据汇总**:按能源类别和小时统计能耗数据,确定公共时间范围后进行汇总。4. **错误处理与日志**:全面的异常捕获和日志记录,保障系统稳定性。此脚本展示了数据库操作、多进程编程及数据处理的高级应用,助力企业实现低碳发展。

MyEMS开源能源管理系统适用于建筑、工厂、商场、医院、园区的电、水、气等能源数据采集、分析、报表,还有光伏、储能、充电桩、微电网、设备控制、故障诊断、工单管理、人工智能优化等可选功能。资深专业团队开发维护,保障长期支持。用开源助力企业集团、产业园区、能源运营商低碳发展!
官网:https://myems.io
Myems开源社区版下载:https://gitee.com/myems/myems
本期解读:
空间能耗分类数据汇总算法:myems/myems-aggregation/space_energy_input_category.py
代码见底部
这段代码是一个用于能源数据处理的Python脚本,主要功能是计算和汇总一个建筑空间内不同能源消耗分类的小时能耗数据。它通过连接到MySQL数据库来获取空间、表、虚拟表、离线表、组合设备、设备、车间、门店、租户和子空间的相关信息,并计算这些实体在特定时间段内的能耗。以下是对代码的详细解析:

总体流程

  1. 主循环: 脚本持续运行,定期(每5分钟)执行能耗数据的汇总计算。
    2.获取空间列表: 从系统数据库中获取所有空间的列表。
    3.多进程处理: 为每个空间创建一个进程,并行处理能耗数据的汇总。
    4.数据汇总: 对于每个空间,汇总其下所有相关实体的能耗数据。

    详细步骤

    1.连接数据库: 连接到系统数据库和能源数据库。
    2.获取空间列表: 查询系统数据库,获取所有空间的ID和名称。
    3.随机化空间列表: 为了避免每次处理相同顺序的空间,将空间列表随机化。
    4.多进程汇总: 使用Python的multiprocessing库,为每个空间创建一个进程进行数据汇总。
    5.数据汇总流程 在每个空间上执行):
  • 获取与空间关联的所有表、虚拟表、离线表、组合设备、设备、商铺、车间、租户和子空间。
  • 确定汇总的起始和结束时间。
  • 从能源数据库中获取每个实体在指定时间段内的能耗数据。
  • 确定所有实体能耗数据的公共时间范围。
  • 在公共时间范围内,按能源类别和小时汇总能耗数据。
  • 将汇总后的数据保存回能源数据库。

    关键点

  • 错误处理: 在每个关键步骤中,都有异常处理来确保数据库连接的稳定性和错误的记录。
  • 日志记录:使用logger对象记录错误和重要的信息。
  • 配置管理:通过config模块管理数据库连接和其他配置信息。
  • 性能优化:通过多进程处理和数据汇总优化性能。

    结论

    这个脚本是一个复杂的数据处理工具,用于管理和汇总建筑空间内的能源消耗数据。它展示了数据库操作、多进程编程和数据处理的高级应用。
    ```javascript
    import randomimport timefrom datetime import datetime, timedeltafrom decimal import Decimalfrom multiprocessing import Pool
    import mysql.connector
    import config
################################################################################################################### PROCEDURES# Step 1: get all spaces# Step 2: Create multiprocessing pool to call worker in parallel

def main(logger):
while True: # the outermost while loop ################################################################################################################ # Step 1: get all spaces ################################################################################################################ cnx_system_db = None cursor_system_db = None try: cnx_system_db = mysql.connector.connect(**config.myems_system_db) cursor_system_db = cnx_system_db.cursor() except Exception as e: logger.error("Error in step 1.1 of space_energy_input_category.main " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # sleep and continue the outer loop to reconnect the database time.sleep(60) continue print("Connected to MyEMS System Database")
space_list = list() try: cursor_system_db.execute(" SELECT id, name " " FROM tbl_spaces " " ORDER BY id ") rows_spaces = cursor_system_db.fetchall()
if rows_spaces is None or len(rows_spaces) == 0: print("There isn't any spaces ") # sleep and continue the outer loop to reconnect the database time.sleep(60) continue
for row in rows_spaces: space_list.append({"id": row[0], "name": row[1]})
except Exception as e: logger.error("Error in step 1.2 of space_energy_input_category.main " + str(e)) # sleep and continue the outer loop to reconnect the database time.sleep(60) continue finally: if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close()
print("Got all spaces in MyEMS System Database")

    # shuffle the space list for randomly calculating the meter hourly value        random.shuffle(space_list)
    ################################################################################################################        # Step 2: Create multiprocessing pool to call worker in parallel        ################################################################################################################        p = Pool(processes=config.pool_size)        error_list = p.map(worker, space_list)        p.close()        p.join()
    for error in error_list:            if error is not None and len(error) > 0:                logger.error(error)
    print("go to sleep 300 seconds...")        time.sleep(300)        print("wake from sleep, and continue to work...")    # end of outer while
################################################################################################################### PROCEDURES:# Step 1: get all input meters associated with the space# Step 2: get all input virtual meters associated with the space# Step 3: get all input offline meters associated with the space# Step 4: get all combined equipments associated with the space# Step 5: get all equipments associated with the space# Step 6: get all shopfloors associated with the space# Step 7: get all stores associated with the space# Step 8: get all tenants associated with the space# Step 9: get all child spaces associated with the space# Step 10: determine start datetime and end datetime to aggregate# Step 11: for each meter in list, get energy input data from energy database# Step 12: for each virtual meter in list, get energy input data from energy database# Step 13: for each offline meter in list, get energy input data from energy database# Step 14: for each combined equipment in list, get energy input data from energy database# Step 15: for each equipment in list, get energy input data from energy database# Step 16: for each shopfloor in list, get energy input data from energy database# Step 17: for each store in list, get energy input data from energy database# Step 18: for each tenant in list, get energy input data from energy database# Step 19: for each child space in list, get energy input data from energy database# Step 20: determine common time slot to aggregate# Step 21: aggregate energy data in the common time slot by energy categories and hourly# Step 22: save energy data to energy database## NOTE: returns None or the error string because that the logger object cannot be passed in as parameter

def worker(space): #################################################################################################################### # Step 1: get all input meters associated with the space #################################################################################################################### print("Step 1: get all input meters associated with the space " + str(space['name']))
cnx_system_db = None cursor_system_db = None try: cnx_system_db = mysql.connector.connect(**config.myems_system_db) cursor_system_db = cnx_system_db.cursor() except Exception as e: error_string = "Error in step 1.1 of space_energy_input_category.worker " + str(e) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() print(error_string) return error_string
meter_list = list() try: cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id " " FROM tbl_meters m, tbl_spaces_meters sm " " WHERE m.id = sm.meter_id " " AND m.is_counted = 1 " " AND sm.space_id = %s ", (space['id'],)) rows_meters = cursor_system_db.fetchall()
if rows_meters is not None and len(rows_meters) > 0: for row in rows_meters: meter_list.append({"id": row[0], "name": row[1], "energy_category_id": row[2]})
except Exception as e: error_string = "Error in step 1.2 of space_energy_input_category.worker " + str(e) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() print(error_string) return error_string

####################################################################################################################    # Step 2: get all input virtual meters associated with the space    ####################################################################################################################    print("Step 2: get all input virtual meters associated with the space")    virtual_meter_list = list()
try:        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "                                 " FROM tbl_virtual_meters m, tbl_spaces_virtual_meters sm "                                 " WHERE m.id = sm.virtual_meter_id "                                 "       AND m.is_counted = 1 "                                 "       AND sm.space_id = %s ",                                 (space['id'],))        rows_virtual_meters = cursor_system_db.fetchall()
    if rows_virtual_meters is not None and len(rows_virtual_meters) > 0:            for row in rows_virtual_meters:                virtual_meter_list.append({"id": row[0],                                           "name": row[1],                                           "energy_category_id": row[2]})
except Exception as e:        error_string = "Error in step 2 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 3: get all input offline meters associated with the space    ####################################################################################################################    print("Step 3: get all input offline meters associated with the space")
offline_meter_list = list()
try:        cursor_system_db.execute(" SELECT m.id, m.name, m.energy_category_id "                                 " FROM tbl_offline_meters m, tbl_spaces_offline_meters sm "                                 " WHERE m.id = sm.offline_meter_id "                                 "       AND m.is_counted = 1 "                                 "       AND sm.space_id = %s ",                                 (space['id'],))        rows_offline_meters = cursor_system_db.fetchall()
    if rows_offline_meters is not None and len(rows_offline_meters) > 0:            for row in rows_offline_meters:                offline_meter_list.append({"id": row[0],                                           "name": row[1],                                           "energy_category_id": row[2]})
except Exception as e:        error_string = "Error in step 3 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 4: get all combined equipments associated with the space    ####################################################################################################################    print("Step 4: get all combined equipments associated with the space")
combined_equipment_list = list()
try:        cursor_system_db.execute(" SELECT e.id, e.name "                                 " FROM tbl_combined_equipments e, tbl_spaces_combined_equipments se "                                 " WHERE e.id = se.combined_equipment_id "                                 "       AND e.is_input_counted = 1 "                                 "       AND se.space_id = %s ",                                 (space['id'],))        rows_combined_equipments = cursor_system_db.fetchall()
    if rows_combined_equipments is not None and len(rows_combined_equipments) > 0:            for row in rows_combined_equipments:                combined_equipment_list.append({"id": row[0],                                                "name": row[1]})
except Exception as e:        error_string = "Error in step 4 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 5: get all equipments associated with the space    ####################################################################################################################    print("Step 5: get all equipments associated with the space")
equipment_list = list()
try:        cursor_system_db.execute(" SELECT e.id, e.name "                                 " FROM tbl_equipments e, tbl_spaces_equipments se "                                 " WHERE e.id = se.equipment_id "                                 "       AND e.is_input_counted = 1 "                                 "       AND se.space_id = %s ",                                 (space['id'],))        rows_equipments = cursor_system_db.fetchall()
    if rows_equipments is not None and len(rows_equipments) > 0:            for row in rows_equipments:                equipment_list.append({"id": row[0],                                       "name": row[1]})
except Exception as e:        error_string = "Error in step 5 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 6: get all shopfloors associated with the space    ####################################################################################################################    print("Step 6: get all shopfloors associated with the space")
shopfloor_list = list()
try:        cursor_system_db.execute(" SELECT s.id, s.name "                                 " FROM tbl_shopfloors s, tbl_spaces_shopfloors ss "                                 " WHERE s.id = ss.shopfloor_id "                                 "       AND s.is_input_counted = 1 "                                 "       AND ss.space_id = %s ",                                 (space['id'],))        rows_shopfloors = cursor_system_db.fetchall()
    if rows_shopfloors is not None and len(rows_shopfloors) > 0:            for row in rows_shopfloors:                shopfloor_list.append({"id": row[0],                                       "name": row[1]})
except Exception as e:        error_string = "Error in step 6 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 7: get all stores associated with the space    ####################################################################################################################    print("Step 7: get all stores associated with the space")
store_list = list()
try:        cursor_system_db.execute(" SELECT s.id, s.name "                                 " FROM tbl_stores s, tbl_spaces_stores ss "                                 " WHERE s.id = ss.store_id "                                 "       AND s.is_input_counted = 1 "                                 "       AND ss.space_id = %s ",                                 (space['id'],))        rows_stores = cursor_system_db.fetchall()
    if rows_stores is not None and len(rows_stores) > 0:            for row in rows_stores:                store_list.append({"id": row[0],                                   "name": row[1]})
except Exception as e:        error_string = "Error in step 7 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 8: get all tenants associated with the space    ####################################################################################################################    print("Step 8: get all tenants associated with the space")
tenant_list = list()
try:        cursor_system_db.execute(" SELECT t.id, t.name "                                 " FROM tbl_tenants t, tbl_spaces_tenants st "                                 " WHERE t.id = st.tenant_id "                                 "       AND t.is_input_counted = 1 "                                 "       AND st.space_id = %s ",                                 (space['id'],))        rows_tenants = cursor_system_db.fetchall()
    if rows_tenants is not None and len(rows_tenants) > 0:            for row in rows_tenants:                tenant_list.append({"id": row[0],                                    "name": row[1]})
except Exception as e:        error_string = "Error in step 8 of space_energy_input_category.worker " + str(e)        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 9: get all child spaces associated with the space    ####################################################################################################################    print("Step 9: get all child spaces associated with the space")
child_space_list = list()
try:        cursor_system_db.execute(" SELECT id, name "                                 " FROM tbl_spaces "                                 " WHERE is_input_counted = 1 "                                 "       AND parent_space_id = %s ",                                 (space['id'],))        rows_child_spaces = cursor_system_db.fetchall()
    if rows_child_spaces is not None and len(rows_child_spaces) > 0:            for row in rows_child_spaces:                child_space_list.append({"id": row[0],                                         "name": row[1]})
except Exception as e:        error_string = "Error in step 9 of space_energy_input_category.worker " + str(e)        print(error_string)        return error_string    finally:        if cursor_system_db:            cursor_system_db.close()        if cnx_system_db:            cnx_system_db.close()
if (meter_list is None or len(meter_list) == 0) and \            (virtual_meter_list is None or len(virtual_meter_list) == 0) and \            (offline_meter_list is None or len(offline_meter_list) == 0) and \            (combined_equipment_list is None or len(combined_equipment_list) == 0) and \            (equipment_list is None or len(equipment_list) == 0) and \            (shopfloor_list is None or len(shopfloor_list) == 0) and \            (store_list is None or len(store_list) == 0) and \            (tenant_list is None or len(tenant_list) == 0) and \            (child_space_list is None or len(child_space_list) == 0):        print("This is an empty space ")        return None
####################################################################################################################    # Step 10: determine start datetime and end datetime to aggregate    ####################################################################################################################    print("Step 10: determine start datetime and end datetime to aggregate")    cnx_energy_db = None    cursor_energy_db = None    try:        cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)        cursor_energy_db = cnx_energy_db.cursor()    except Exception as e:        error_string = "Error in step 10.1 of space_energy_input_category.worker " + str(e)        if cursor_energy_db:            cursor_energy_db.close()        if cnx_energy_db:            cnx_energy_db.close()        print(error_string)        return error_string
try:        query = (" SELECT MAX(start_datetime_utc) "                 " FROM tbl_space_input_category_hourly "                 " WHERE space_id = %s ")        cursor_energy_db.execute(query, (space['id'],))        row_datetime = cursor_energy_db.fetchone()        start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')        start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
    if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):            # replace second and microsecond with 0            # note: do not replace minute in case of calculating in half hourly            start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)            # start from the next time slot            start_datetime_utc += timedelta(minutes=config.minutes_to_count)
    end_datetime_utc = datetime.utcnow().replace(second=0, microsecond=0, tzinfo=None)
    print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]              + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
except Exception as e:        error_string = "Error in step 10.2 of space_energy_input_category.worker " + str(e)        if cursor_energy_db:            cursor_energy_db.close()        if cnx_energy_db:            cnx_energy_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 11: for each meter in list, get energy input data from energy database    ####################################################################################################################    energy_meter_hourly = dict()    try:        if meter_list is not None and len(meter_list) > 0:            for meter in meter_list:                meter_id = str(meter['id'])
            query = (" SELECT start_datetime_utc, actual_value "                         " FROM tbl_meter_hourly "                         " WHERE meter_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_meter_hourly[meter_id] = None                else:                    energy_meter_hourly[meter_id] = dict()                    for row_energy_value in rows_energy_values:                        energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]    except Exception as e:        error_string = "Error in step 11 of space_energy_input_category.worker " + str(e)        if cursor_energy_db:            cursor_energy_db.close()        if cnx_energy_db:            cnx_energy_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 12: for each virtual meter in list, get energy input data from energy database    ####################################################################################################################    energy_virtual_meter_hourly = dict()    if virtual_meter_list is not None and len(virtual_meter_list) > 0:        try:            for virtual_meter in virtual_meter_list:                virtual_meter_id = str(virtual_meter['id'])
            query = (" SELECT start_datetime_utc, actual_value "                         " FROM tbl_virtual_meter_hourly "                         " WHERE virtual_meter_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_virtual_meter_hourly[virtual_meter_id] = None                else:                    energy_virtual_meter_hourly[virtual_meter_id] = dict()                    for row_energy_value in rows_energy_values:                        energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]        except Exception as e:            error_string = "Error in step 12 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 13: for each offline meter in list, get energy input data from energy database    ####################################################################################################################    energy_offline_meter_hourly = dict()    if offline_meter_list is not None and len(offline_meter_list) > 0:        try:            for offline_meter in offline_meter_list:                offline_meter_id = str(offline_meter['id'])
            query = (" SELECT start_datetime_utc, actual_value "                         " FROM tbl_offline_meter_hourly "                         " WHERE offline_meter_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_offline_meter_hourly[offline_meter_id] = None                else:                    energy_offline_meter_hourly[offline_meter_id] = dict()                    for row_energy_value in rows_energy_values:                        energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
    except Exception as e:            error_string = "Error in step 13 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 14: for each combined equipment in list, get energy input data from energy database    ####################################################################################################################    energy_combined_equipment_hourly = dict()    if combined_equipment_list is not None and len(combined_equipment_list) > 0:        try:            for combined_equipment in combined_equipment_list:                combined_equipment_id = str(combined_equipment['id'])                query = (" SELECT start_datetime_utc, energy_category_id, actual_value "                         " FROM tbl_combined_equipment_input_category_hourly "                         " WHERE combined_equipment_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (combined_equipment_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_combined_equipment_hourly[combined_equipment_id] = None                else:                    energy_combined_equipment_hourly[combined_equipment_id] = dict()                    for row_value in rows_energy_values:                        current_datetime_utc = row_value[0]                        if current_datetime_utc not in energy_combined_equipment_hourly[combined_equipment_id]:                            energy_combined_equipment_hourly[combined_equipment_id][current_datetime_utc] = dict()                        energy_category_id = row_value[1]                        actual_value = row_value[2]                        energy_combined_equipment_hourly[combined_equipment_id][current_datetime_utc][                            energy_category_id] = actual_value        except Exception as e:            error_string = "Error in step 14 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 15: for each equipment in list, get energy input data from energy database    ####################################################################################################################    energy_equipment_hourly = dict()    if equipment_list is not None and len(equipment_list) > 0:        try:            for equipment in equipment_list:                equipment_id = str(equipment['id'])                query = (" SELECT start_datetime_utc, energy_category_id, actual_value "                         " FROM tbl_equipment_input_category_hourly "                         " WHERE equipment_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (equipment_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_equipment_hourly[equipment_id] = None                else:                    energy_equipment_hourly[equipment_id] = dict()                    for row_value in rows_energy_values:                        current_datetime_utc = row_value[0]                        if current_datetime_utc not in energy_equipment_hourly[equipment_id]:                            energy_equipment_hourly[equipment_id][current_datetime_utc] = dict()                        energy_category_id = row_value[1]                        actual_value = row_value[2]                        energy_equipment_hourly[equipment_id][current_datetime_utc][energy_category_id] = \                            actual_value        except Exception as e:            error_string = "Error in step 15 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 16: for each shopfloor in list, get energy input data from energy database    ####################################################################################################################    energy_shopfloor_hourly = dict()    if shopfloor_list is not None and len(shopfloor_list) > 0:        try:            for shopfloor in shopfloor_list:                shopfloor_id = str(shopfloor['id'])
            query = (" SELECT start_datetime_utc, energy_category_id, actual_value "                         " FROM tbl_shopfloor_input_category_hourly "                         " WHERE shopfloor_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (shopfloor_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_shopfloor_hourly[shopfloor_id] = None                else:                    energy_shopfloor_hourly[shopfloor_id] = dict()                    for row_energy_value in rows_energy_values:                        current_datetime_utc = row_energy_value[0]                        if current_datetime_utc not in energy_shopfloor_hourly[shopfloor_id]:                            energy_shopfloor_hourly[shopfloor_id][current_datetime_utc] = dict()                        energy_category_id = row_energy_value[1]                        actual_value = row_energy_value[2]                        energy_shopfloor_hourly[shopfloor_id][current_datetime_utc][energy_category_id] = actual_value        except Exception as e:            error_string = "Error in step 16 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 17: for each store in list, get energy input data from energy database    ####################################################################################################################    energy_store_hourly = dict()    if store_list is not None and len(store_list) > 0:        try:            for store in store_list:                store_id = str(store['id'])
            query = (" SELECT start_datetime_utc, energy_category_id, actual_value "                         " FROM tbl_store_input_category_hourly "                         " WHERE store_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (store_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_store_hourly[store_id] = None                else:                    energy_store_hourly[store_id] = dict()                    for row_energy_value in rows_energy_values:                        current_datetime_utc = row_energy_value[0]                        if current_datetime_utc not in energy_store_hourly[store_id]:                            energy_store_hourly[store_id][current_datetime_utc] = dict()                        energy_category_id = row_energy_value[1]                        actual_value = row_energy_value[2]                        energy_store_hourly[store_id][current_datetime_utc][energy_category_id] = actual_value        except Exception as e:            error_string = "Error in step 17 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 18: for each tenant in list, get energy input data from energy database    ####################################################################################################################    energy_tenant_hourly = dict()    if tenant_list is not None and len(tenant_list) > 0:        try:            for tenant in tenant_list:                tenant_id = str(tenant['id'])
            query = (" SELECT start_datetime_utc, energy_category_id, actual_value "                         " FROM tbl_tenant_input_category_hourly "                         " WHERE tenant_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (tenant_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_tenant_hourly[tenant_id] = None                else:                    energy_tenant_hourly[tenant_id] = dict()                    for row_energy_value in rows_energy_values:                        current_datetime_utc = row_energy_value[0]                        if current_datetime_utc not in energy_tenant_hourly[tenant_id]:                            energy_tenant_hourly[tenant_id][current_datetime_utc] = dict()                        energy_category_id = row_energy_value[1]                        actual_value = row_energy_value[2]                        energy_tenant_hourly[tenant_id][current_datetime_utc][energy_category_id] = actual_value        except Exception as e:            error_string = "Error in step 18 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 19: for each child space in list, get energy input data from energy database    ####################################################################################################################    energy_child_space_hourly = dict()    if child_space_list is not None and len(child_space_list) > 0:        try:            for child_space in child_space_list:                child_space_id = str(child_space['id'])
            query = (" SELECT start_datetime_utc, energy_category_id, actual_value "                         " FROM tbl_space_input_category_hourly "                         " WHERE space_id = %s "                         "       AND start_datetime_utc >= %s "                         "       AND start_datetime_utc < %s "                         " ORDER BY start_datetime_utc ")                cursor_energy_db.execute(query, (child_space_id, start_datetime_utc, end_datetime_utc,))                rows_energy_values = cursor_energy_db.fetchall()                if rows_energy_values is None or len(rows_energy_values) == 0:                    energy_child_space_hourly[child_space_id] = None                else:                    energy_child_space_hourly[child_space_id] = dict()                    for row_energy_value in rows_energy_values:                        current_datetime_utc = row_energy_value[0]                        if current_datetime_utc not in energy_child_space_hourly[child_space_id]:                            energy_child_space_hourly[child_space_id][current_datetime_utc] = dict()                        energy_category_id = row_energy_value[1]                        actual_value = row_energy_value[2]                        energy_child_space_hourly[child_space_id][current_datetime_utc][energy_category_id] \                            = actual_value        except Exception as e:            error_string = "Error in step 19 of space_energy_input_category.worker " + str(e)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            print(error_string)            return error_string
####################################################################################################################    # Step 20: determine common time slot to aggregate    ####################################################################################################################
common_start_datetime_utc = start_datetime_utc    common_end_datetime_utc = end_datetime_utc
print("Getting common time slot of energy values for all meters")    if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:        for meter_id, energy_hourly in energy_meter_hourly.items():            if energy_hourly is None or len(energy_hourly) == 0:                common_start_datetime_utc = None                common_end_datetime_utc = None                break            else:                if common_start_datetime_utc < min(energy_hourly.keys()):                    common_start_datetime_utc = min(energy_hourly.keys())                if common_end_datetime_utc > max(energy_hourly.keys()):                    common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all virtual meters")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:            for meter_id, energy_hourly in energy_virtual_meter_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all offline meters")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:            for meter_id, energy_hourly in energy_offline_meter_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all combined equipments")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_combined_equipment_hourly is not None and len(energy_combined_equipment_hourly) > 0:            for combined_equipment_id, energy_hourly in energy_combined_equipment_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all equipments")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_equipment_hourly is not None and len(energy_equipment_hourly) > 0:            for equipment_id, energy_hourly in energy_equipment_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all shopfloors")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_shopfloor_hourly is not None and len(energy_shopfloor_hourly) > 0:            for shopfloor_id, energy_hourly in energy_shopfloor_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all stores")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_store_hourly is not None and len(energy_store_hourly) > 0:            for store_id, energy_hourly in energy_store_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all tenants")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_tenant_hourly is not None and len(energy_tenant_hourly) > 0:            for tenant_id, energy_hourly in energy_tenant_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
print("Getting common time slot of energy values for all child spaces")    if common_start_datetime_utc is not None and common_end_datetime_utc is not None:        if energy_child_space_hourly is not None and len(energy_child_space_hourly) > 0:            for child_space_id, energy_hourly in energy_child_space_hourly.items():                if energy_hourly is None or len(energy_hourly) == 0:                    common_start_datetime_utc = None                    common_end_datetime_utc = None                    break                else:                    if common_start_datetime_utc < min(energy_hourly.keys()):                        common_start_datetime_utc = min(energy_hourly.keys())                    if common_end_datetime_utc > max(energy_hourly.keys()):                        common_end_datetime_utc = max(energy_hourly.keys())
if (energy_meter_hourly is None or len(energy_meter_hourly) == 0) and \            (energy_virtual_meter_hourly is None or len(energy_virtual_meter_hourly) == 0) and \            (energy_offline_meter_hourly is None or len(energy_offline_meter_hourly) == 0) and \            (energy_combined_equipment_hourly is None or len(energy_combined_equipment_hourly) == 0) and \            (energy_equipment_hourly is None or len(energy_equipment_hourly) == 0) and \            (energy_shopfloor_hourly is None or len(energy_shopfloor_hourly) == 0) and \            (energy_store_hourly is None or len(energy_store_hourly) == 0) and \            (energy_tenant_hourly is None or len(energy_tenant_hourly) == 0) and \            (energy_child_space_hourly is None or len(energy_child_space_hourly) == 0):        # There isn't any energy data        print("There isn't any energy data")        # continue the for space loop to the next space        print("continue the for space loop to the next space")        if cursor_energy_db:            cursor_energy_db.close()        if cnx_energy_db:            cnx_energy_db.close()        return None
print("common_start_datetime_utc: " + str(common_start_datetime_utc))    print("common_end_datetime_utc: " + str(common_end_datetime_utc))
####################################################################################################################    # Step 21: aggregate energy data in the common time slot by energy categories and hourly    ####################################################################################################################
print("Step 21: aggregate energy data in the common time slot by energy categories and hourly")    aggregated_values = list()    try:        current_datetime_utc = common_start_datetime_utc        while common_start_datetime_utc is not None \                and common_end_datetime_utc is not None \                and current_datetime_utc <= common_end_datetime_utc:            aggregated_value = dict()            aggregated_value['start_datetime_utc'] = current_datetime_utc            aggregated_value['meta_data'] = dict()
        if meter_list is not None and len(meter_list) > 0:                for meter in meter_list:                    meter_id = str(meter['id'])                    energy_category_id = meter['energy_category_id']                    actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, Decimal(0.0))                    aggregated_value['meta_data'][energy_category_id] = \                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if virtual_meter_list is not None and len(virtual_meter_list) > 0:                for virtual_meter in virtual_meter_list:                    virtual_meter_id = str(virtual_meter['id'])                    energy_category_id = virtual_meter['energy_category_id']                    actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, Decimal(0.0))                    aggregated_value['meta_data'][energy_category_id] = \                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if offline_meter_list is not None and len(offline_meter_list) > 0:                for offline_meter in offline_meter_list:                    offline_meter_id = str(offline_meter['id'])                    energy_category_id = offline_meter['energy_category_id']                    actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, Decimal(0.0))                    aggregated_value['meta_data'][energy_category_id] = \                        aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if combined_equipment_list is not None and len(combined_equipment_list) > 0:                for combined_equipment in combined_equipment_list:                    combined_equipment_id = str(combined_equipment['id'])                    meta_data_dict = \                        energy_combined_equipment_hourly[combined_equipment_id].get(current_datetime_utc, None)                    if meta_data_dict is not None and len(meta_data_dict) > 0:                        for energy_category_id, actual_value in meta_data_dict.items():                            aggregated_value['meta_data'][energy_category_id] = \                                aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if equipment_list is not None and len(equipment_list) > 0:                for equipment in equipment_list:                    equipment_id = str(equipment['id'])                    meta_data_dict = energy_equipment_hourly[equipment_id].get(current_datetime_utc, None)                    if meta_data_dict is not None and len(meta_data_dict) > 0:                        for energy_category_id, actual_value in meta_data_dict.items():                            aggregated_value['meta_data'][energy_category_id] = \                                aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if shopfloor_list is not None and len(shopfloor_list) > 0:                for shopfloor in shopfloor_list:                    shopfloor_id = str(shopfloor['id'])                    meta_data_dict = energy_shopfloor_hourly[shopfloor_id].get(current_datetime_utc, None)                    if meta_data_dict is not None and len(meta_data_dict) > 0:                        for energy_category_id, actual_value in meta_data_dict.items():                            aggregated_value['meta_data'][energy_category_id] = \                                aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if store_list is not None and len(store_list) > 0:                for store in store_list:                    store_id = str(store['id'])                    meta_data_dict = energy_store_hourly[store_id].get(current_datetime_utc, None)                    if meta_data_dict is not None and len(meta_data_dict) > 0:                        for energy_category_id, actual_value in meta_data_dict.items():                            aggregated_value['meta_data'][energy_category_id] = \                                aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if tenant_list is not None and len(tenant_list) > 0:                for tenant in tenant_list:                    tenant_id = str(tenant['id'])                    meta_data_dict = energy_tenant_hourly[tenant_id].get(current_datetime_utc, None)                    if meta_data_dict is not None and len(meta_data_dict) > 0:                        for energy_category_id, actual_value in meta_data_dict.items():                            aggregated_value['meta_data'][energy_category_id] = \                                aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        if child_space_list is not None and len(child_space_list) > 0:                for child_space in child_space_list:                    child_space_id = str(child_space['id'])                    meta_data_dict = energy_child_space_hourly[child_space_id].get(current_datetime_utc, None)                    if meta_data_dict is not None and len(meta_data_dict) > 0:                        for energy_category_id, actual_value in meta_data_dict.items():                            aggregated_value['meta_data'][energy_category_id] = \                                aggregated_value['meta_data'].get(energy_category_id, Decimal(0.0)) + actual_value
        aggregated_values.append(aggregated_value)
        current_datetime_utc += timedelta(minutes=config.minutes_to_count)
except Exception as e:        error_string = "Error in step 21 of space_energy_input_category.worker " + str(e)        if cursor_energy_db:            cursor_energy_db.close()        if cnx_energy_db:            cnx_energy_db.close()        print(error_string)        return error_string
####################################################################################################################    # Step 22: save energy data to energy database    ####################################################################################################################    print("Step 22: save energy data to energy database")
while len(aggregated_values) > 0:        insert_100 = aggregated_values[:100]        aggregated_values = aggregated_values[100:]        try:            add_values = (" INSERT INTO tbl_space_input_category_hourly "                          "             (space_id, "                          "              energy_category_id, "                          "              start_datetime_utc, "                          "              actual_value) "                          " VALUES  ")
        for aggregated_value in insert_100:                for energy_category_id, actual_value in aggregated_value['meta_data'].items():                    add_values += " (" + str(space['id']) + ","                    add_values += " " + str(energy_category_id) + ","                    add_values += "'" + aggregated_value['start_datetime_utc'].isoformat()[0:19] + "',"                    add_values += str(actual_value) + "), "            print("add_values:" + add_values)            # trim ", " at the end of string and then execute            cursor_energy_db.execute(add_values[:-2])            cnx_energy_db.commit()
    except Exception as e:            error_string = "Error in step 22 of space_energy_input_category.worker " + str(e)            print(error_string)            if cursor_energy_db:                cursor_energy_db.close()            if cnx_energy_db:                cnx_energy_db.close()            return error_string
if cursor_energy_db:        cursor_energy_db.close()    if cnx_energy_db:        cnx_energy_db.close()    return None

```

相关文章
|
数据采集 监控 数据管理
《阿里云数据治理方案及案例分享》|学习笔记
快速学习《阿里云数据治理方案及案例分享》
2416 0
|
机器学习/深度学习 人工智能 数据挖掘
【人工智能】Transformers之Pipeline(一):音频分类(audio-classification)
【人工智能】Transformers之Pipeline(一):音频分类(audio-classification)
511 0
|
机器学习/深度学习 自然语言处理 数据可视化
揭秘深度学习模型中的“黑箱”:理解与优化网络决策过程
【5月更文挑战第28天】 在深度学习领域,神经网络因其卓越的性能被广泛应用于图像识别、自然语言处理等任务。然而,这些复杂的模型往往被视作“黑箱”,其内部决策过程难以解释。本文将深入探讨深度学习模型的可解释性问题,并提出几种方法来揭示和优化网络的决策机制。我们将从模型可视化、敏感性分析到高级解释框架,一步步剖析模型行为,旨在为研究者提供更透明、可靠的深度学习解决方案。
|
域名解析 C# 数据安全/隐私保护
阿里云域名新注、续费、转入收费政策及价格表(2023最新版价格)
阿里云的域名注册业务由万网提供接口,因此,也可以说目前阿里云是目前国内最大的域名注册商,阿里云域名价格表包括域名注册、域名续费及域名转入价格,不同时期的收费价格是不一样的,例如2022年在阿里云注册.com域名的新注价格是63元,续费是75元,到了2023年,由于各大注册商纷纷都涨价了,阿里云也涨到了69元,续费价格也上涨到了79元,下面是小编整理的2023年最新版的阿里云域名新注、续费、转入收费价格表。
11098 19
阿里云域名新注、续费、转入收费政策及价格表(2023最新版价格)
|
前端开发
前端学习笔记202304学习笔记第十一天-vue3.0-基于eventbus实现兄弟传值原理
前端学习笔记202304学习笔记第十一天-vue3.0-基于eventbus实现兄弟传值原理
179 0
|
虚拟化 数据安全/隐私保护 Linux
《游戏大师Chris Crawford谈互动叙事》一导读
互动叙事(Interactive Storytelling)1是一个新兴的领域,于20世纪80年代末提出,于90年代得到了试验性的实践,在21世纪之前并未获得过高度关注。时至2010年,互动叙事已然成为热门话题。该领域的不成熟体现在诸多千差万别的发展思路和各执己见的激烈讨论当中。
1731 0
|
3天前
|
人工智能 运维 安全
|
1天前
|
人工智能 异构计算
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
敬请锁定《C位面对面》,洞察通用计算如何在AI时代持续赋能企业创新,助力业务发展!
|
5天前
|
SpringCloudAlibaba 负载均衡 Dubbo
微服务架构下Feign和Dubbo的性能大比拼,到底鹿死谁手?
本文对比分析了SpringCloudAlibaba框架下Feign与Dubbo的服务调用性能及差异。Feign基于HTTP协议,使用简单,适合轻量级微服务架构;Dubbo采用RPC通信,性能更优,支持丰富的服务治理功能。通过实际测试,Dubbo在调用性能、负载均衡和服务发现方面表现更出色。两者各有适用场景,可根据项目需求灵活选择。
396 124
微服务架构下Feign和Dubbo的性能大比拼,到底鹿死谁手?