C++ 实现线程安全的map(OpenHarmony源码实现版)

简介: C++ 实现线程安全的map(OpenHarmony源码实现版)

概述


STL容器不是线程安全的。比如对于vector,即使写方(生产者)是单线程写入,但是并发读的时候,由于潜在的内存重新申请和对象复制问题,会导致读方(消费者)的迭代器失效。实际表现也就是招致了core dump。另外一种情况,如果是多个写方,并发的push_back(),也会导致core dump。但可以通过固定vector的大小(调用resize())避免动态扩容(无push_back)来做到lock-free。


c++的map的并发操作也是不安全的,c++里边有红黑树实现的std::map和hash表 unordered_map。在《C++并发编程实战》一书中的162页提供了一个细粒度锁的MAP数据结构,使用了 boost的shared_mutex (C++14已经支持,C++11没有),那上面的实现代码挺长的,这里给出个OpenHarmony源码实现的safe_map,代码精简,值得学习。


接上篇欣赏了OpenHarmony源码中实现的ThreadPool的实现,链接在这里:


c++的ThreadPool,OpenHarmony源码实现版赏析和使用


源码实现


源码位置:code-v3.0-LTS\OpenHarmony\utils\native\base\include\safe_map.h


/*
 * Copyright (c) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef UTILS_BASE_SAFE_MAP_H
#define UTILS_BASE_SAFE_MAP_H
#include <map>
#include <mutex>
namespace OHOS {
template <typename K, typename V>
class SafeMap {
public:
    SafeMap() {}
    ~SafeMap() {}
    SafeMap(const SafeMap& rhs)
    {
        map_ = rhs.map_;
    }
    SafeMap& operator=(const SafeMap& rhs)
    {
        if (&rhs != this) {
            map_ = rhs.map_;
        }
        return *this;
    }
    V& operator[](const K& key)
    {
        return map_[key];
    }
    // when multithread calling size() return a tmp status, some threads may insert just after size() call
    int Size()
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return map_.size();
    }
    // when multithread calling Empty() return a tmp status, some threads may insert just after Empty() call
    bool IsEmpty()
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return map_.empty();
    }
    bool Insert(const K& key, const V& value)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        auto ret = map_.insert(std::pair<K, V>(key, value));
        return ret.second;
    }
    void EnsureInsert(const K& key, const V& value)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        auto ret = map_.insert(std::pair<K, V>(key, value));
        // find key and cannot insert
        if (!ret.second) {
            map_.erase(ret.first);
            map_.insert(std::pair<K, V>(key, value));
            return;
        }
        return;
    }
    bool Find(const K& key, V& value)
    {
        bool ret = false;
        std::lock_guard<std::mutex> lock(mutex_);
        auto iter = map_.find(key);
        if (iter != map_.end()) {
            value = iter->second;
            ret = true;
        }
        return ret;
    }
    bool FindOldAndSetNew(const K& key, V& oldValue, const V& newValue)
    {
        bool ret = false;
        std::lock_guard<std::mutex> lock(mutex_);
        if (map_.size() > 0) {
            auto iter = map_.find(key);
            if (iter != map_.end()) {
                oldValue = iter->second;
                map_.erase(iter);
                map_.insert(std::pair<K, V>(key, newValue));
                ret = true;
            }
        }
        return ret;
    }
    void Erase(const K& key)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        map_.erase(key);
    }
    void Clear()
    {
        std::lock_guard<std::mutex> lock(mutex_);
        map_.clear();
        return;
    }
private:
    std::mutex mutex_;
    std::map<K, V> map_;
};
} // namespace OHOS
#endif


源码欣赏


使用模板语法template <typename K, typename V>让这个map的实现更通用。这是c++模板泛型的强大之处,不用针对每个类型都实现一遍,复用性更强。且模板是在编译期检查的,也降低的出错的可能性。内部实现上,倒是没啥特别的,就是对相应的操作加了锁。锁使用的RAII模型的std::lock_guard写法,这种很常见也很常用。


自定义实现了几个常用的操作方法如Find,Erase和Clear,每个里面的操作都相应的加了锁。操作符重载实现了[]和赋值=操作。注意这两处的地方没有用锁,你知道为什么吗?如果多个线程只访问容器但不更改其结构,则不需要对容器进行同步。另外一个原因是,对于map,如果只是通过[]的方式修改而不是新插入,则多线程下也不会core dump。


单元测试


源码中同样有safe_map的单元测试,单元测试框架使用的是google的gtest。看来gtest还是很强大的,华为也选择使用了它。以下给出源码,可以熟悉下gtest单元测试的用法。


/*
 * Copyright (c) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include "safe_map.h"
#include <array>
#include <future>
#include <gtest/gtest.h>
#include <iostream>
#include <thread>
#include <iostream>
#include <chrono> // std::chrono::seconds
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::sleep_for
using namespace testing::ext;
using namespace OHOS;
using namespace std;
class UtilsSafeMap : public testing::Test {
};
/*
 * @tc.name: testUtilsCopyAndAssign001
 * @tc.desc: single thread test the normal feature insert and erase and EnsureInsert
 */
HWTEST_F(UtilsSafeMap, testUtilsCopyAndAssign001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    // insert new
    demoData.Insert("A", 1);
    ASSERT_FALSE(demoData.IsEmpty());
    ASSERT_EQ(demoData.Size(), 1);
    SafeMap<string, int> newdemo = demoData;
    int tar = -1;
    ASSERT_TRUE(newdemo.Find("A", tar));
    ASSERT_EQ(1, tar);
    tar = -1;
    SafeMap<string, int> newdemo2;
    newdemo2 = demoData;
    ASSERT_TRUE(newdemo2.Find("A", tar));
    ASSERT_EQ(1, tar);
}
/*
 * @tc.name: testUtilsoperator001
 * @tc.desc: SafeMap
 */
HWTEST_F(UtilsSafeMap, testUtilsoperator001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    // insert new
    demoData.Insert("A", 1);
    ASSERT_FALSE(demoData.IsEmpty());
    ASSERT_EQ(demoData.Size(), 1);
    ASSERT_EQ(demoData["A"], 1);
    SafeMap<string, int> newdemo = demoData;
    ASSERT_EQ(newdemo["A"], 1);
    int tar = -1;
    newdemo["B"] = 6;
    ASSERT_TRUE(newdemo.Find("B", tar));
    ASSERT_EQ(6, tar);
    SafeMap<string, int> newdemo2;
    newdemo2 = newdemo;
    ASSERT_EQ(newdemo2["A"], 1);
}
/*
 * @tc.name: testUtilsNormalFeatureInsert001
 * @tc.desc: SafeMap
 */
HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureInsert001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    ASSERT_TRUE(demoData.IsEmpty());
    // insert new
    demoData.Insert("A", 1);
    ASSERT_FALSE(demoData.IsEmpty());
    ASSERT_EQ(demoData.Size(), 1);
    // insert copy one should fail
    ASSERT_FALSE(demoData.Insert("A", 2));
    ASSERT_EQ(demoData.Size(), 1);
}
/*
 * @tc.name: testUtilsNormalFeatureEnsureInsert001
 * @tc.desc: SafeMap
 */
HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureEnsureInsert001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    ASSERT_TRUE(demoData.IsEmpty());
    demoData.Insert("A", 1);
    demoData.EnsureInsert("B", 2);
    ASSERT_FALSE(demoData.IsEmpty());
    ASSERT_EQ(demoData.Size(), 2);
    // insert copy one and new one
    demoData.EnsureInsert("B", 5);
    demoData.EnsureInsert("C", 6);
    ASSERT_EQ(demoData.Size(), 3);
}
/*
 * @tc.name: testUtilsNormalFeatureFind001
 * @tc.desc: SafeMap
 */
HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureFind001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    ASSERT_TRUE(demoData.IsEmpty());
    demoData.Insert("A", 1);
    demoData.Insert("B", 10000);
    demoData.EnsureInsert("B", 2);
    demoData.EnsureInsert("C", 6);
    ASSERT_FALSE(demoData.IsEmpty());
    ASSERT_EQ(demoData.Size(), 3);
    int i = 0;
    ASSERT_TRUE(demoData.Find("A", i));
    ASSERT_EQ(i, 1);
    ASSERT_TRUE(demoData.Find("B", i));
    ASSERT_EQ(i, 2);
    ASSERT_TRUE(demoData.Find("C", i));
    ASSERT_EQ(i, 6);
}
/*
 * @tc.name: testUtilsNormalFeatureFindAndSet001
 * @tc.desc: SafeMap
 */
HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureFindAndSet001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    ASSERT_TRUE(demoData.IsEmpty());
    demoData.Insert("A", 1);
    demoData.EnsureInsert("B", 2);
    int oldvalue = 0;
    int newvalue = 3;
    ASSERT_TRUE(demoData.FindOldAndSetNew("A", oldvalue, newvalue));
    // old value
    ASSERT_EQ(oldvalue, 1);
    newvalue = 4;
    ASSERT_TRUE(demoData.FindOldAndSetNew("B", oldvalue, newvalue));
    // old value
    ASSERT_EQ(oldvalue, 2);
    int i = -1;
    ASSERT_TRUE(demoData.Find("A", i));
    // new value
    ASSERT_EQ(i, 3);
    ASSERT_TRUE(demoData.Find("B", i));
    // new value
    ASSERT_EQ(i, 4);
}
/*
 * @tc.name: testUtilsNormalFeatureEraseAndClear001
 * @tc.desc: SafeMap
 */
HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureEraseAndClear001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    ASSERT_TRUE(demoData.IsEmpty());
    demoData.Insert("A", 1);
    demoData.EnsureInsert("B", 2);
    ASSERT_EQ(demoData.Size(), 2);
    demoData.Erase("A");
    ASSERT_EQ(demoData.Size(), 1);
    demoData.Clear();
    ASSERT_EQ(demoData.Size(), 0);
}
/*
 * @tc.name: testUtilsConcurrentWriteAndRead001
 * @tc.desc: 100 threads test in writein to the same key of the map, while read at same time  and no throw
 */
const int THREAD_NUM = 100;
HWTEST_F(UtilsSafeMap, testUtilsConcurrentWriteAndRead001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    std::thread threads[THREAD_NUM];
    std::thread checkThread[THREAD_NUM];
    ASSERT_NO_THROW({
        auto lamfuncInsert = [](SafeMap<string, int>& data, const string& key,
            const int& value, const std::chrono::system_clock::time_point& absTime) {
            std::this_thread::sleep_until(absTime);
            data.EnsureInsert(key, value);
        };
        auto lamfuncCheck = [](SafeMap<string, int>& data, const string& key,
            std::chrono::system_clock::time_point absTime) {
            std::this_thread::sleep_until(absTime);
            thread_local int i = -1;
            data.Find(key, i);
        };
        using std::chrono::system_clock;
        std::time_t timeT = system_clock::to_time_t(system_clock::now());
        timeT += 2;
        string key("A");
        for (int i = 0; i < THREAD_NUM; ++i) {
            threads[i] = std::thread(lamfuncInsert, std::ref(demoData), key, i, system_clock::from_time_t(timeT));
            checkThread[i] = std::thread(lamfuncCheck, std::ref(demoData), key, system_clock::from_time_t(timeT));
        }
        std::this_thread::sleep_for(std::chrono::seconds(3));
        for (auto& t : threads) {
            t.join();
        }
        for (auto& t : checkThread) {
            t.join();
        }
    });
}
/*
 * @tc.name: testUtilsConcurrentWriteAndFind001
 * @tc.desc: 100 threads test in writein to the corresponding key of the map,
 * while read at same time  and check the results
 */
HWTEST_F(UtilsSafeMap, testUtilsConcurrentWriteAndFind001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    std::thread threads[THREAD_NUM];
    std::vector<std::future<int>> vcfi;
    ASSERT_NO_THROW({
        auto lamfuncInsert = [](SafeMap<string, int>& data, const string& key,
            const int& value, const std::chrono::system_clock::time_point& absTime) {
            std::this_thread::sleep_until(absTime);
            data.EnsureInsert(key, value);
        };
        auto lamfuncCheckLoop = [](SafeMap<string, int>& data, const string& key,
            std::chrono::system_clock::time_point absTime) {
            std::this_thread::sleep_until(absTime);
            thread_local int i = -1;
            while (!data.Find(key, i)) {
                std::this_thread::sleep_for(std::chrono::microseconds(10));
            }
            return i;
        };
        using std::chrono::system_clock;
        std::time_t timeT = system_clock::to_time_t(system_clock::now());
        timeT += 2;
        string key("A");
        for (int i = 0; i < THREAD_NUM; ++i) {
            threads[i] = std::thread(lamfuncInsert, std::ref(demoData),
                key + std::to_string(i), i, system_clock::from_time_t(timeT));
            vcfi.push_back(std::async(std::launch::async, lamfuncCheckLoop,
                std::ref(demoData), key + std::to_string(i), system_clock::from_time_t(timeT)));
        }
        std::this_thread::sleep_for(std::chrono::seconds(4));
        for (auto& t : threads) {
            t.join();
        }
        vector<int> result;
        for (auto& t : vcfi) {
            result.push_back(t.get());
        }
        std::sort(result.begin(), result.end());
        for (int i = 0; i < THREAD_NUM; ++i) {
            ASSERT_EQ(i, result[i]);
        }
    });
}
/*
 * @tc.name: testUtilsConcurrentWriteAndFindAndSet001
 * @tc.desc: 100 threads test in writein to the corresponding key of the map,
 * while findandfix at same time  and check the results
 */
HWTEST_F(UtilsSafeMap, testUtilsConcurrentWriteAndFindAndSet001, TestSize.Level1)
{
    SafeMap<string, int> demoData;
    std::thread threads[THREAD_NUM];
    std::vector<std::future<int>> vcfi;
    ASSERT_NO_THROW({
        auto lamfuncInsert = [](SafeMap<string, int>& data, const string& key,
            const int& value, const std::chrono::system_clock::time_point& absTime) {
            std::this_thread::sleep_until(absTime);
            data.EnsureInsert(key, value);
        };
        auto lamfuncCheckLoop = [](SafeMap<string, int>& data, const string& key,
            const int& newvalue, std::chrono::system_clock::time_point absTime) {
            std::this_thread::sleep_until(absTime);
            thread_local int i = -1;
            while (!data.FindOldAndSetNew(key, i, newvalue)) {
                std::this_thread::sleep_for(std::chrono::microseconds(10));
            }
            return i;
        };
        using std::chrono::system_clock;
        std::time_t timeT = system_clock::to_time_t(system_clock::now());
        timeT += 2;
        string key("A");
        for (int i = 0; i < THREAD_NUM; ++i) {
            threads[i] = std::thread(lamfuncInsert, std::ref(demoData),
                key + std::to_string(i), i, system_clock::from_time_t(timeT));
            vcfi.push_back(std::async(std::launch::async, lamfuncCheckLoop,
                std::ref(demoData), key + std::to_string(i), i + 1, system_clock::from_time_t(timeT)));
        }
        std::this_thread::sleep_for(std::chrono::seconds(4));
        for (auto& t : threads) {
            t.join();
        }
        vector<int> result;
        for (auto& t : vcfi) {
            result.push_back(t.get());
        }
        std::sort(result.begin(), result.end());
        for (int i = 0; i < THREAD_NUM; ++i) {
            ASSERT_EQ(i, result[i]);
        }
        int t = 0;
        result.clear();
        for (int i = 0; i < THREAD_NUM; ++i) {
            t = -1;
            ASSERT_TRUE(demoData.Find("A" + std::to_string(i), t));
            result.push_back(t);
        }
        std::sort(result.begin(), result.end());
        for (int i = 0; i < THREAD_NUM; ++i) {
            ASSERT_EQ(i + 1, result[i]);
        }
    });
}


引用


C++线程安全map (低效率) | 码农家园


C++11:基于std::queue和std::mutex构建一个线程安全的队列_10km的博客-CSDN博客_std::queue 线程安全


C++11:基于std::unordered_map和共享锁构建线程安全的map_10km的博客-CSDN博客_c++ map 无锁


如何设计并实现一个线程安全的 Map ?(上篇) - JavaShuo


c++线程安全的map_clh01s的博客-CSDN博客_c++ 线程安全的map


C++ 20 线程安全的Map_学习好烦啊的博客-CSDN博客_c++ map 线程安全


c++ map 多线程_线程安全原理简析及HashMap多线程并发5种场景异常分析_sumlao的博客-CSDN博客


C++ STL容器如何解决线程安全的问题? - 腾讯云开发者社区-腾讯云


RISC-V MCU中文社区_致力于RISC-V技术的推广,提供一个交流学习的开放平台

相关文章
|
2月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
76 0
|
2月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
60 0
|
25天前
|
存储 C++ 容器
【C++】map、set基本用法
本文介绍了C++ STL中的`map`和`set`两种关联容器。`map`用于存储键值对,每个键唯一;而`set`存储唯一元素,不包含值。两者均基于红黑树实现,支持高效的查找、插入和删除操作。文中详细列举了它们的构造方法、迭代器、容量检查、元素修改等常用接口,并简要对比了`map`与`set`的主要差异。此外,还介绍了允许重复元素的`multiset`和`multimap`。
29 3
【C++】map、set基本用法
|
19天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
50 12
|
23天前
|
C语言 C++ 容器
【c++丨STL】string模拟实现(附源码)
本文详细介绍了如何模拟实现C++ STL中的`string`类,包括其构造函数、拷贝构造、赋值重载、析构函数等基本功能,以及字符串的插入、删除、查找、比较等操作。文章还展示了如何实现输入输出流操作符,使自定义的`string`类能够方便地与`cin`和`cout`配合使用。通过这些实现,读者不仅能加深对`string`类的理解,还能提升对C++编程技巧的掌握。
47 5
|
25天前
|
存储 算法 C++
【C++】unordered_map(set)
C++中的`unordered`容器(如`std::unordered_set`、`std::unordered_map`)基于哈希表实现,提供高效的查找、插入和删除操作。哈希表通过哈希函数将元素映射到特定的“桶”中,每个桶可存储一个或多个元素,以处理哈希冲突。主要组成部分包括哈希表、哈希函数、冲突处理机制、负载因子和再散列,以及迭代器。哈希函数用于计算元素的哈希值,冲突通过开链法解决,负载因子控制哈希表的扩展。迭代器支持遍历容器中的元素。`unordered_map`和`unordered_set`的插入、查找和删除操作在理想情况下时间复杂度为O(1),但在冲突较多时可能退化为O(n)。
21 5
|
25天前
|
存储 C++ 容器
【C++】map的模拟实现
C++中的`map`是STL中的一种关联容器,存储键值对且键唯一。`map`基于红黑树实现,自动按键排序,支持动态调整、复杂数据类型、丰富的成员函数及双向迭代器。插入、查找等操作保证了对数时间复杂度,适用于需要快速查找和有序存储的场景。
22 3
|
1月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
47 7
|
1月前
|
消息中间件 存储 安全
|
2月前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。