开发者社区> 问答> 正文

RAY Python框架内存不足

我用ray创建了一个简单的远程功能,该功能占用很少的内存。但是,在短时间运行后,内存稳定增加,并且出现RayOutOfMemoryError异常。

以下代码是此问题的非常简单的示例。“ result_transformed” numpy数组被发送给工作程序,每个工作程序都可以在此工作。我简化的calc_similarity函数什么也不做,但是仍然用完内存。我为该方法添加了更长的睡眠时间,以模拟执行更多的工作,但最终耗尽了内存。

我在具有32GB RAM和Ubuntu 19.10的8核Intel 9900K上运行,Python是:Intel Python Distribution 3.7.4 numpy是1.17.4(使用Intel MKL)

import numpy as np
from time import sleep
import ray
import psutil

@ray.remote
def calc_similarity(sims, offset):
    # Fake some work for 100 ms.
    sleep(0.10)
    return True

if __name__ == "__main__":
    # Initialize RAY to use all of the processors.
    num_cpus = psutil.cpu_count(logical=False)
    ray.init(num_cpus=num_cpus)

    num_docs = 1000000
    num_dimensions = 300
    chunk_size = 128
    sim_pct = 0.82

    # Initialize the array
    index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32)
    index_array = np.arange(num_docs).reshape(1, num_docs)
    index_array_id = ray.put(index_array)

    calc_results = []

    for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)):
        size = min( chunk_size, num_docs - (start_doc_no) + 1 )
        # Get the query vector out of the index.
        query_vector = index[start_doc_no:start_doc_no+size]
        # Calculate the matrix multiplication.
        result_transformed = np.matmul(index, query_vector.T).T
        # Serialize the result matrix out for each client.
        result_id = ray.put(result_transformed)

        # Simulate multi-threading extracting the results of a cosine similarity calculation
        for offset in range(chunk_size):
            calc_results.append(calc_similarity.remote(sims=result_id, offset=offset ))
            # , index_array=index_array_id))
        res = ray.get(calc_results)
        calc_results.clear()

任何帮助/指导将不胜感激。

展开
收起
祖安文状元 2020-02-21 17:36:13 1894 0
1 条回答
写回答
取消 提交回答
  • 目前,Ray支持部分引用计数。(完整的参考计数将很快发布)。简而言之,当传递给远程函数的object_id未序列化时,引用引用的计数方式与引用Python的计数方式相同。这意味着,如果result_transformed是由Python收集的垃圾,result_transformed则应取消固定血浆存储中的,并在将对象清除为LRU时将其清除。(为清楚起见,不会清除具有某些引用计数的固定对象)。

    我还假设存在一些奇怪的引用计数,例如循环引用。result_transformed运行该脚本时,我可以验证它是否被驱逐了。因此,我认为result_transformed本身不是问题。可能存在许多问题。就我而言,我发现当我将ipython用作输入(IN)时,它会创建对python对象的引用。(例如,当您看到某个对象的值时,OUT [number]可以引用您的对象)。

    In [2]: import psutil 
       ...: import gc 
       ...: import ray 
       ...: from time import sleep 
       ...: import numpy as np 
       ...: @ray.remote 
       ...: def calc_similarity(sims, offset): 
       ...:     # Fake some work for 100 ms. 
       ...:     sleep(0.10) 
       ...:     return True 
       ...:  
       ...: if __name__ == "__main__": 
       ...:     # Initialize RAY to use all of the processors. 
       ...:     num_cpus = psutil.cpu_count(logical=False) 
       ...:     ray.init(num_cpus=num_cpus) 
       ...:  
       ...:     num_docs = 1000000 
       ...:     num_dimensions = 300 
       ...:     chunk_size = 128 
       ...:     sim_pct = 0.82 
       ...:  
       ...:     # Initialize the array 
       ...:     index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32) 
       ...:     index_array = np.arange(num_docs).reshape(1, num_docs) 
       ...:     index_array_id = ray.put(index_array) 
       ...:  
       ...:     calc_results = [] 
       ...:     i = 0 
       ...:     for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)): 
       ...:         i += 1 
       ...:         size = min( chunk_size, num_docs - (start_doc_no) + 1 ) 
       ...:         # Get the query vector out of the index. 
       ...:         query_vector = index[start_doc_no:start_doc_no+size] 
       ...:         # Calculate the matrix multiplication. 
       ...:         result_transformed = np.matmul(index, query_vector.T).T 
       ...:         # Serialize the result matrix out for each client. 
       ...:         result_id = ray.put(result_transformed) 
       ...:         if i == 1: 
       ...:             # The first result_id binary number should be stored in result_id_special 
       ...:             # In this way, we can verify if this object id is evicted after filling up our  
       ...:             # plasma store by some random numpy array 
       ...:             # If this object id is not evicted, that means it is pinned, meaning if is  
       ...:             # not properly reference counted. 
       ...:             first_object_id = result_id.binary() 
       ...:         # Simulate multi-threading extracting the results of a cosine similarity calculation 
       ...:         for offset in range(chunk_size): 
       ...:             calc_results.append(calc_similarity.remote(sims=result_id, offset=offset )) 
       ...:             # , index_array=index_array_id)) 
       ...:         res = ray.get(calc_results) 
       ...:         calc_results.clear() 
       ...:         print('ref count to result_id {}'.format(len(gc.get_referrers(result_id)))) 
       ...:         print('Total number of ref counts in a ray cluster. {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts())) 
       ...:         if i == 5: 
       ...:             break 
       ...:     # It should contain the object id of the  
       ...:     print('first object id: {}'.format(first_object_id)) 
       ...:     print('fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.') 
       ...:     print('because if the data_transformed is garbage collected properly, it should be unpinned from plasma store') 
       ...:     print('and when plasma store is filled by numpy array, first_object_id should be evicted.') 
       ...:     for _ in range(40): 
       ...:         import numpy as np 
       ...:         ray.put(np.zeros(500 * 1024 * 1024, dtype=np.uint8)) 
       ...:     print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts())) 
       ...:     # this should fail as first_object_id is already evicted 
       ...:     print(ray.get(ray.ObjectID(first_object_id))) 
    
    [ray] Forcing OMP_NUM_THREADS=1 to avoid performance degradation with many workers (issue #6998). You can override this by explicitly setting OMP_NUM_THREADS.
    2020-02-12 00:10:11,932 INFO resource_spec.py:212 -- Starting Ray with 4.35 GiB memory available for workers and up to 2.19 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
    2020-02-12 00:10:12,273 INFO services.py:1080 -- View the Ray dashboard at localhost:8265
    2020-02-12 00:10:18,522 WARNING worker.py:289 -- OMP_NUM_THREADS=1 is set, this may slow down ray.put() for large objects (issue #6998).
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008002000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008003000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008004000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008005000000): {'local': 1, 'submitted': 0}}
    ref count to result_id 1
    Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    first object id: b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x80\x02\x00\x00\x00'
    fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.
    because if the data_transformed is garbage collected properly, it should be unpinned from plasma store
    and when plasma store is filled by numpy array, first_object_id should be evicted.
    total ref count from a ray cluster after eviction: {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
    2020-02-12 00:10:57,108 WARNING worker.py:1515 -- Local object store memory usage:
    num clients with quota: 0
    quota map size: 0
    pinned quota map size: 0
    allocated bytes: 2092865189
    allocation limit: 2347285708
    pinned bytes: 520000477
    (global lru) capacity: 2347285708
    (global lru) used: 67.0078%
    (global lru) num objects: 4
    (global lru) num evictions: 41
    (global lru) bytes evicted: 21446665725
    
    2020-02-12 00:10:57,112 WARNING worker.py:1072 -- The task with ID ffffffffffffffffffffffff0100 is a driver task and so the object created by ray.put could not be reconstructed.
    ---------------------------------------------------------------------------
    UnreconstructableError                    Traceback (most recent call last)
    <ipython-input-1-184e5836123c> in <module>
         63     print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts()))
         64     # this should fail as first_object_id is already evicted
    ---> 65     print(ray.get(ray.ObjectID(first_object_id)))
         66 
    
    ~/work/ray/python/ray/worker.py in get(object_ids, timeout)
       1517                     raise value.as_instanceof_cause()
       1518                 else:
    -> 1519                     raise value
       1520 
       1521         # Run post processors.
    
    UnreconstructableError: Object ffffffffffffffffffffffff0100008002000000 is lost (either LRU evicted or deleted by user) and cannot be reconstructed. Try increasing the object store memory available with ray.init(object_store_memory=<bytes>) or setting object store limits with ray.remote(object_store_memory=<bytes>). See also: https://ray.readthedocs.io/en/latest/memory-management.html
    
    2020-02-22 10:27:48
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
From Python Scikit-Learn to Sc 立即下载
Data Pre-Processing in Python: 立即下载
双剑合璧-Python和大数据计算平台的结合 立即下载