我用ray创建了一个简单的远程功能,该功能占用很少的内存。但是,在短时间运行后,内存稳定增加,并且出现RayOutOfMemoryError异常。
以下代码是此问题的非常简单的示例。“ result_transformed” numpy数组被发送给工作程序,每个工作程序都可以在此工作。我简化的calc_similarity函数什么也不做,但是仍然用完内存。我为该方法添加了更长的睡眠时间,以模拟执行更多的工作,但最终耗尽了内存。
我在具有32GB RAM和Ubuntu 19.10的8核Intel 9900K上运行,Python是:Intel Python Distribution 3.7.4 numpy是1.17.4(使用Intel MKL)
import numpy as np
from time import sleep
import ray
import psutil
@ray.remote
def calc_similarity(sims, offset):
# Fake some work for 100 ms.
sleep(0.10)
return True
if __name__ == "__main__":
# Initialize RAY to use all of the processors.
num_cpus = psutil.cpu_count(logical=False)
ray.init(num_cpus=num_cpus)
num_docs = 1000000
num_dimensions = 300
chunk_size = 128
sim_pct = 0.82
# Initialize the array
index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32)
index_array = np.arange(num_docs).reshape(1, num_docs)
index_array_id = ray.put(index_array)
calc_results = []
for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)):
size = min( chunk_size, num_docs - (start_doc_no) + 1 )
# Get the query vector out of the index.
query_vector = index[start_doc_no:start_doc_no+size]
# Calculate the matrix multiplication.
result_transformed = np.matmul(index, query_vector.T).T
# Serialize the result matrix out for each client.
result_id = ray.put(result_transformed)
# Simulate multi-threading extracting the results of a cosine similarity calculation
for offset in range(chunk_size):
calc_results.append(calc_similarity.remote(sims=result_id, offset=offset ))
# , index_array=index_array_id))
res = ray.get(calc_results)
calc_results.clear()
任何帮助/指导将不胜感激。
目前,Ray支持部分引用计数。(完整的参考计数将很快发布)。简而言之,当传递给远程函数的object_id未序列化时,引用引用的计数方式与引用Python的计数方式相同。这意味着,如果result_transformed是由Python收集的垃圾,result_transformed则应取消固定血浆存储中的,并在将对象清除为LRU时将其清除。(为清楚起见,不会清除具有某些引用计数的固定对象)。
我还假设存在一些奇怪的引用计数,例如循环引用。result_transformed运行该脚本时,我可以验证它是否被驱逐了。因此,我认为result_transformed本身不是问题。可能存在许多问题。就我而言,我发现当我将ipython用作输入(IN)时,它会创建对python对象的引用。(例如,当您看到某个对象的值时,OUT [number]可以引用您的对象)。
In [2]: import psutil
...: import gc
...: import ray
...: from time import sleep
...: import numpy as np
...: @ray.remote
...: def calc_similarity(sims, offset):
...: # Fake some work for 100 ms.
...: sleep(0.10)
...: return True
...:
...: if __name__ == "__main__":
...: # Initialize RAY to use all of the processors.
...: num_cpus = psutil.cpu_count(logical=False)
...: ray.init(num_cpus=num_cpus)
...:
...: num_docs = 1000000
...: num_dimensions = 300
...: chunk_size = 128
...: sim_pct = 0.82
...:
...: # Initialize the array
...: index = np.random.random((num_docs, num_dimensions)).astype(dtype=np.float32)
...: index_array = np.arange(num_docs).reshape(1, num_docs)
...: index_array_id = ray.put(index_array)
...:
...: calc_results = []
...: i = 0
...: for count, start_doc_no in enumerate(range(0, num_docs, chunk_size)):
...: i += 1
...: size = min( chunk_size, num_docs - (start_doc_no) + 1 )
...: # Get the query vector out of the index.
...: query_vector = index[start_doc_no:start_doc_no+size]
...: # Calculate the matrix multiplication.
...: result_transformed = np.matmul(index, query_vector.T).T
...: # Serialize the result matrix out for each client.
...: result_id = ray.put(result_transformed)
...: if i == 1:
...: # The first result_id binary number should be stored in result_id_special
...: # In this way, we can verify if this object id is evicted after filling up our
...: # plasma store by some random numpy array
...: # If this object id is not evicted, that means it is pinned, meaning if is
...: # not properly reference counted.
...: first_object_id = result_id.binary()
...: # Simulate multi-threading extracting the results of a cosine similarity calculation
...: for offset in range(chunk_size):
...: calc_results.append(calc_similarity.remote(sims=result_id, offset=offset ))
...: # , index_array=index_array_id))
...: res = ray.get(calc_results)
...: calc_results.clear()
...: print('ref count to result_id {}'.format(len(gc.get_referrers(result_id))))
...: print('Total number of ref counts in a ray cluster. {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts()))
...: if i == 5:
...: break
...: # It should contain the object id of the
...: print('first object id: {}'.format(first_object_id))
...: print('fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.')
...: print('because if the data_transformed is garbage collected properly, it should be unpinned from plasma store')
...: print('and when plasma store is filled by numpy array, first_object_id should be evicted.')
...: for _ in range(40):
...: import numpy as np
...: ray.put(np.zeros(500 * 1024 * 1024, dtype=np.uint8))
...: print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts()))
...: # this should fail as first_object_id is already evicted
...: print(ray.get(ray.ObjectID(first_object_id)))
[ray] Forcing OMP_NUM_THREADS=1 to avoid performance degradation with many workers (issue #6998). You can override this by explicitly setting OMP_NUM_THREADS.
2020-02-12 00:10:11,932 INFO resource_spec.py:212 -- Starting Ray with 4.35 GiB memory available for workers and up to 2.19 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-02-12 00:10:12,273 INFO services.py:1080 -- View the Ray dashboard at localhost:8265
2020-02-12 00:10:18,522 WARNING worker.py:289 -- OMP_NUM_THREADS=1 is set, this may slow down ray.put() for large objects (issue #6998).
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008002000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008003000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008004000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008005000000): {'local': 1, 'submitted': 0}}
ref count to result_id 1
Total number of ref counts in a ray cluster. {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
first object id: b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x80\x02\x00\x00\x00'
fill up plasma store by big numpy arrays. This should evict the first_object_id from the plasma store.
because if the data_transformed is garbage collected properly, it should be unpinned from plasma store
and when plasma store is filled by numpy array, first_object_id should be evicted.
total ref count from a ray cluster after eviction: {ObjectID(ffffffffffffffffffffffff0100008006000000): {'local': 1, 'submitted': 0}, ObjectID(ffffffffffffffffffffffff0100008001000000): {'local': 1, 'submitted': 0}}
2020-02-12 00:10:57,108 WARNING worker.py:1515 -- Local object store memory usage:
num clients with quota: 0
quota map size: 0
pinned quota map size: 0
allocated bytes: 2092865189
allocation limit: 2347285708
pinned bytes: 520000477
(global lru) capacity: 2347285708
(global lru) used: 67.0078%
(global lru) num objects: 4
(global lru) num evictions: 41
(global lru) bytes evicted: 21446665725
2020-02-12 00:10:57,112 WARNING worker.py:1072 -- The task with ID ffffffffffffffffffffffff0100 is a driver task and so the object created by ray.put could not be reconstructed.
---------------------------------------------------------------------------
UnreconstructableError Traceback (most recent call last)
<ipython-input-1-184e5836123c> in <module>
63 print('total ref count from a ray cluster after eviction: {}'.format(ray.worker.global_worker.core_worker.get_all_reference_counts()))
64 # this should fail as first_object_id is already evicted
---> 65 print(ray.get(ray.ObjectID(first_object_id)))
66
~/work/ray/python/ray/worker.py in get(object_ids, timeout)
1517 raise value.as_instanceof_cause()
1518 else:
-> 1519 raise value
1520
1521 # Run post processors.
UnreconstructableError: Object ffffffffffffffffffffffff0100008002000000 is lost (either LRU evicted or deleted by user) and cannot be reconstructed. Try increasing the object store memory available with ray.init(object_store_memory=<bytes>) or setting object store limits with ray.remote(object_store_memory=<bytes>). See also: https://ray.readthedocs.io/en/latest/memory-management.html
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。