Jetpack之DataStore解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 前言在正式讲解DataStore之前,我们先回顾下大家熟知的SharedPreferences(以下简称SP),众所周知SP有一些缺点,如调用getXXX()获取数据,可能会阻塞主线程;无法保证类型安全;加载的数据会一直留在内存中,浪费内存;apply方法无法获取到操作成功或失败的结果。SharedPreferences回顾getXXX()可能会阻塞主线程请看以下SP获取value值的使用代码: 

前言

在正式讲解DataStore之前,我们先回顾下大家熟知的SharedPreferences(以下简称SP),众所周知SP有一些缺点,如调用getXXX()获取数据,可能会阻塞主线程;无法保证类型安全;加载的数据会一直留在内存中,浪费内存;apply方法无法获取到操作成功或失败的结果。

SharedPreferences回顾

getXXX()可能会阻塞主线程

请看以下SP获取value值的使用代码:

    val sp = context.getSharedPreferences("sp_file", Context.MODE_PRIVATE) 
    sp.getString("name", "sp")

看下getSharedPreferences的实现:

@Override
    public SharedPreferences getSharedPreferences(File file, int mode) {
        SharedPreferencesImpl sp;
        synchronized (ContextImpl.class) {
            final ArrayMap<File, SharedPreferencesImpl> cache = getSharedPreferencesCacheLocked();
            sp = cache.get(file);
            if (sp == null) {
                checkMode(mode);
                if (getApplicationInfo().targetSdkVersion >= android.os.Build.VERSION_CODES.O) {
                	...
                }
                sp = new SharedPreferencesImpl(file, mode);
                cache.put(file, sp);
                return sp;
            }
        }
        if ((mode & Context.MODE_MULTI_PROCESS) != 0 ||
            getApplicationInfo().targetSdkVersion < android.os.Build.VERSION_CODES.HONEYCOMB) {
            sp.startReloadIfChangedUnexpectedly();
        }
        return sp;
    }

里面调用了getSharedPreferencesCacheLocked方法获取一个以File为key的ArrayMap:

private ArrayMap<File, SharedPreferencesImpl> getSharedPreferencesCacheLocked() {
        if (sSharedPrefsCache == null) {
            sSharedPrefsCache = new ArrayMap<>();
        }

        final String packageName = getPackageName();
        ArrayMap<File, SharedPreferencesImpl> packagePrefs = sSharedPrefsCache.get(packageName);
        if (packagePrefs == null) {
            packagePrefs = new ArrayMap<>();
            sSharedPrefsCache.put(packageName, packagePrefs);
        }

        return packagePrefs;
    }

很简单,就是根据应用包名获取一个ArrayMap,并缓存到一个static的sSharedPrefsCache对象中。接着根据file获取对应的SharedPreferencesImpl,如果为空,则创建一个新的对象并缓存到内存中。

SharedPreferencesImpl构造方法会调用startLoadFromDisk方法,看下改方法实现:

private void startLoadFromDisk() {
        synchronized (mLock) {
            mLoaded = false;
        }
        new Thread("SharedPreferencesImpl-load") {
            public void run() {
                loadFromDisk();
            }
        }.start();
    }

启动一个子线层,调用了loadFromDisk方法,该方法就是读取对应SP文件,把键值对缓存到内存中。

接着看下getString方法的实现:

    public String getString(String key, @Nullable String defValue) {
        synchronized (mLock) {
            awaitLoadedLocked();
            String v = (String)mMap.get(key);
            return v != null ? v : defValue;
        }
    }
    
    private void awaitLoadedLocked() {
        if (!mLoaded) {
            BlockGuard.getThreadPolicy().onReadFromDisk();
        }
        while (!mLoaded) {
            try {
                mLock.wait();
            } catch (InterruptedException unused) {
            }
        }
        if (mThrowable != null) {
            throw new IllegalStateException(mThrowable);
        }
    }

看到会在一个同步块代码中调用awaitLoadedLocked方法,该方法调用了wait方法。如果读取一个较大的文件,这时又调用了getString方法,如果此时文件还未读取完成,则会造成主线程阻塞。

SP不能保证类型安全

请看以下代码:

    val key = "name"
    val sp = context.getSharedPreferences("sp_file", Context.MODE_PRIVATE)
    sp.edit()?.apply {
        putInt(key, 10)
    }?.apply()
    sp.getString(key, "sp")

把一个Int类型的数据存储到SP中,紧接着调用getString获取同一个key的value值,这时变会出现类型转换异常。

apply方法可能会ANR 

为啥apply方法会发生ANR呢,我们看下它的实现:

public void apply() {
    final MemoryCommitResult mcr = commitToMemory();
    final Runnable awaitCommit = new Runnable() {
        @Override
        public void run() {
            try {
                mcr.writtenToDiskLatch.await();
            } catch (InterruptedException ignored) {
            }
        }
    };

    QueuedWork.addFinisher(awaitCommit);

    Runnable postWriteRunnable = new Runnable() {
        @Override
        public void run() {
            awaitCommit.run();
            QueuedWork.removeFinisher(awaitCommit);
        }
    };

    SharedPreferencesImpl.this.enqueueDiskWrite(mcr, postWriteRunnable);
    notifyListeners(mcr);

commitToMemory方法是把当前要提交的键值对缓存到内存,awaitCommit内调用了锁等待。每调用一次apply方法,就会把它添加到一个队列里。postWriteRunnable执行了awaitCommit的代码,接着移除当前队列里的awaitCommit。最后调用了enqueueDiskWrite方法。看下enqueueDiskWrite方法都做了什么:

private void enqueueDiskWrite(final MemoryCommitResult mcr,
final Runnable postWriteRunnable) {
    final boolean isFromSyncCommit = (postWriteRunnable == null);

    final Runnable writeToDiskRunnable = new Runnable() {
        @Override
        public void run() {
            synchronized (mWritingToDiskLock) {
                writeToFile(mcr, isFromSyncCommit);
            }
            synchronized (mLock) {
                mDiskWritesInFlight--;
            }
            if (postWriteRunnable != null) {
                postWriteRunnable.run();
            }
        }
    };
    if (isFromSyncCommit) {
        boolean wasEmpty = false;
        synchronized (mLock) {
            wasEmpty = mDiskWritesInFlight == 1;
        }
        if (wasEmpty) {
            writeToDiskRunnable.run();
            return;
        }
    }

    QueuedWork.queue(writeToDiskRunnable, !isFromSyncCommit);
}

这里又有一个writeToDiskRunnable,里面执行了SP的持久化写入,同时调用传进来的postWriteRunnable。最后调用了QueuedWork.queue方法:

public static void queue(Runnable work, boolean shouldDelay) {
    Handler handler = getHandler();

    synchronized (sLock) {
        sWork.add(work);

        if (shouldDelay && sCanDelay) {
            handler.sendEmptyMessageDelayed(QueuedWorkHandler.MSG_RUN, DELAY);
        } else {
            handler.sendEmptyMessage(QueuedWorkHandler.MSG_RUN);
        }
    }
}

private static class QueuedWorkHandler extends Handler {
    static final int MSG_RUN = 1;

    QueuedWorkHandler(Looper looper) {
        super(looper);
    }

    public void handleMessage(Message msg) {
        if (msg.what == MSG_RUN) {
            processPendingWork();
        }
    }
}

private static void processPendingWork() {
    synchronized (sProcessingWork) {
        LinkedList<Runnable> work;

        synchronized (sLock) {
            work = sWork;
            sWork = new LinkedList<>();

            // Remove all msg-s as all work will be processed now
            getHandler().removeMessages(QueuedWorkHandler.MSG_RUN);
        }

        if (work.size() > 0) {
            for (Runnable w : work) {
                w.run();
            }
        }
    }
}

queue方法把上面的runnable添加到一个队列中,这个队列里的任务会通过内部的HandlerThread执行添加到队列里的任务。看到这里也没发现会产生ANR。发现QueueWork里有一个waitToFinish方法:

/**
 * Trigger queued work to be processed immediately. The queued work is processed on a separate
 * thread asynchronous. While doing that run and process all finishers on this thread. The
 * finishers can be implemented in a way to check weather the queued work is finished.
 *
 * Is called from the Activity base class's onPause(), after BroadcastReceiver's onReceive,
 * after Service command handling, etc. (so async work is never lost)
 */
public static void waitToFinish() {
    long startTime = System.currentTimeMillis();
    boolean hadMessages = false;

    Handler handler = getHandler();

    synchronized (sLock) {
        if (handler.hasMessages(QueuedWorkHandler.MSG_RUN)) {
            // Delayed work will be processed at processPendingWork() below
            handler.removeMessages(QueuedWorkHandler.MSG_RUN);
        }

        // We should not delay any work as this might delay the finishers
        sCanDelay = false;
    }

    StrictMode.ThreadPolicy oldPolicy = StrictMode.allowThreadDiskWrites();
    try {
        processPendingWork();
    } finally {
        StrictMode.setThreadPolicy(oldPolicy);
    }

    try {
        while (true) {
            Runnable finisher;

            synchronized (sLock) {
                finisher = sFinishers.poll();
            }

            if (finisher == null) {
                break;
            }

            finisher.run();
        }
    } finally {
        sCanDelay = true;
    }

    synchronized (sLock) {
        long waitTime = System.currentTimeMillis() - startTime;

        if (waitTime > 0 || hadMessages) {
            mWaitTimes.add(Long.valueOf(waitTime).intValue());
            mNumWaits++;
        }
    }
}

通过该方法的注释可以看出来,它会在Activity的onPause,BroadcastReceive的onReceive,Service的command handing调用。实际上是在handleServiceArgs、handleStopService、handlePauseActivity、handleStopActivity方法调用的,其实是一个意思。该方法有同步语句块,它等待队列里的任务都执行完成,如果执行大量的文件写入操作,就会造成对应生命周期组件发生ANR。

DataStore解析

初始化

使用由preferencesDataStore创建的委托属性来创建Datastore<Preference>实例。

preferencesDataStore委托给PreferenceDataStoreSingletonDelegate来实现:

internal class PreferenceDataStoreSingletonDelegate internal constructor(
    private val name: String,
    private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
    private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
    private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {

    private val lock = Any()

    @GuardedBy("lock")
    @Volatile
    private var INSTANCE: DataStore<Preferences>? = null

    override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
        return INSTANCE ?: synchronized(lock) {
            if (INSTANCE == null) {
                val applicationContext = thisRef.applicationContext

                INSTANCE = PreferenceDataStoreFactory.create(
                    corruptionHandler = corruptionHandler,
                    migrations = produceMigrations(applicationContext),
                    scope = scope
                ) {
                    applicationContext.preferencesDataStoreFile(name)
                }
            }
            INSTANCE!!
        }
    }
}

使用单例创建了一个DataStore<Preferences>实例并返回。name为文件名,produceMigrations用来做sp到datastore的迁移,scope协程作用域,在IO线程下创建。其中preferencesDataStoreFile(name)以name作为文件名创建了一个FIle,后缀名为.preferences_pb。

接着调用PreferenceDataStoreFactory工厂方法:

@JvmOverloads
public fun create(
    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
    migrations: List<DataMigration<Preferences>> = listOf(),
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
    produceFile: () -> File
): DataStore<Preferences> {
    val delegate = DataStoreFactory.create(
        serializer = PreferencesSerializer,
        corruptionHandler = corruptionHandler,
        migrations = migrations,
        scope = scope
    ) {
        val file = produceFile()
        file
    }
    return PreferenceDataStore(delegate)
}

这里根据delegate对象最后创建了DataStore<Preferences>实例。

看下DataStoreFactory工厂方法:

@JvmOverloads // Generate constructors for default params for java users.
public fun <T> create(
    serializer: Serializer<T>,
    corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
    migrations: List<DataMigration<T>> = listOf(),
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
    produceFile: () -> File
): DataStore<T> =
    SingleProcessDataStore(
        produceFile = produceFile,
        serializer = serializer,
        corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
        initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
        scope = scope
    )

没有什么特殊操作,调用了SingleProcessDataStore来创建DataStore。看下PreferenceDataStore(delegate):

internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :
    DataStore<Preferences> by delegate {
    override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
        Preferences {
            return delegate.updateData {
                val transformed = transform(it)
                (transformed as MutablePreferences).freeze()
                transformed
            }
        }
}

将PreferenceDataStore对象代理给了传进来的delegate,并调用了updateData方法。在方法里将transformed转换成了MutablePreferences并调用了freeze方法。

读数据

看下SingleProcessDataStore的代码:

   override val data: Flow<T> = flow {

        val currentDownStreamFlowState = downstreamFlow.value

        if (currentDownStreamFlowState !is Data) {
            // We need to send a read request because we don't have data yet.
            actor.offer(Message.Read(currentDownStreamFlowState))
        }

        emitAll(
            downstreamFlow.dropWhile {
                if (currentDownStreamFlowState is Data<T> ||
                    currentDownStreamFlowState is Final<T>
                ) {
                    // We don't need to drop any Data or Final values.
                    false
                } else {
                    // we need to drop the last seen state since it was either an exception or
                    // wasn't yet initialized. Since we sent a message to actor, we *will* see a
                    // new value.
                    it === currentDownStreamFlowState
                }
            }.map {
                when (it) {
                    is ReadException<T> -> throw it.readException
                    is Final<T> -> throw it.finalException
                    is Data<T> -> it.value
                    is UnInitialized -> error()
                }
            }
        )
    }

实现了DataStore接口,对data进行了赋值。当创建DataStore实例时,currentDownStreamFlowState是一个UnInitialized类型,会调用actor.offer方法发送一个Message.Read用于读取文件里面的数据。看下Message接受消息的地方:

private val actor = SimpleActor<SingleProcessDataStore.Message<T>>(
    scope = scope,
    onComplete = {
        it?.let {
            downstreamFlow.value = Final(it)
        }
        synchronized(SingleProcessDataStore.activeFilesLock) {
            SingleProcessDataStore.activeFiles.remove(file.absolutePath)
        }
    },
    onUndeliveredElement = { msg, ex ->
        if (msg is SingleProcessDataStore.Message.Update) {
            msg.ack.completeExceptionally()
        }
    }
) { msg ->
    when (msg) {
        is SingleProcessDataStore.Message.Read -> {
            handleRead(msg)
        }
        is SingleProcessDataStore.Message.Update -> {
            handleUpdate(msg)
        }
    }
}

这时msg是Message.Read,调用了handleRead(msg)方法:

private suspend fun handleRead(read: SingleProcessDataStore.Message.Read<T>) {
    when (val currentState = downstreamFlow.value) {
        is Data -> {
            // We already have data so just return...
        }
        is ReadException -> {
            if (currentState === read.lastState) {
                readAndInitOrPropagateFailure()
            }
        }
        UnInitialized -> {
            readAndInitOrPropagateFailure()
        }
        is Final -> error("Can't read in final state.") // won't happen
    }
}

downstreamFlow.value为UnInitialized,接着调用readAndInitOrPropagateFailure()方法,该方法里面调用了readAndInit()方法:

private suspend fun readAndInit() {
    val updateLock = Mutex()
    var initData = readDataOrHandleCorruption()
    val api = object : InitializerApi<T> {
        override suspend fun updateData(transform: suspend (t: T) -> T): T {
            return updateLock.withLock() {
                val newData = transform(initData)
                if (newData != initData) {
                    writeData(newData)
                    initData = newData
                }

                initData
            }
        }
    }
		initTasks?.forEach { it(api) }
    initTasks = null // Init tasks have run successfully, we don't need them anymore.
    updateLock.withLock {
        initializationComplete = true
    }

    downstreamFlow.value = Data(initData, initData.hashCode())
}

这里readDataOrHandleCorruption()方法调用了readData()方法,该方法会读取文件输入流,并转换为Preferences:

private suspend fun readData(): T {
    try {
        FileInputStream(file).use { stream ->
            return serializer.readFrom(stream)
        }
    } catch (ex: FileNotFoundException) {
        if (file.exists()) {
            throw ex
        }
        return serializer.defaultValue
    }
}
override suspend fun readFrom(input: InputStream): Preferences {
    val preferencesProto = PreferencesMapCompat.readFrom(input)

    val mutablePreferences = mutablePreferencesOf()

    preferencesProto.preferencesMap.forEach { (name, value) ->
        PreferencesSerializer.addProtoEntryToPreferences(name, value, mutablePreferences)
    }

    return mutablePreferences.toPreferences()
}

最后根据返回的initData数据创建了一个Data并赋值给了downstreamFlow.value,后续便可以在downstreamFlow读取到最新数据。

写数据

通过调用DataStore<Preferences>.edit方法将数据写入到文件里:

public suspend fun DataStore<Preferences>.edit(
    transform: suspend (MutablePreferences) -> Unit
): Preferences {
    return this.updateData {
        // It's safe to return MutablePreferences since we freeze it in
        // PreferencesDataStore.updateData()
        it.toMutablePreferences().apply { transform(this) }
    }
}

调用了delegate.updateData方法:

override suspend fun updateData(transform: suspend (t: T) -> T): T {
    val ack = CompletableDeferred<T>()
    val currentDownStreamFlowState = downstreamFlow.value
    val updateMsg =
        SingleProcessDataStore.Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
    actor.offer(updateMsg)
    return ack.await()
}

通过actor.offer发送了一个Message.Update消息,收到消息后,调用了handleUpdate(msg)方法:

private suspend fun handleUpdate(update: SingleProcessDataStore.Message.Update<T>) {
    update.ack.completeWith(
        runCatching {
            when (val currentState = downstreamFlow.value) {
                is Data -> {
                    // We are already initialized, we just need to perform the update
                    transformAndWrite(update.transform, update.callerContext)
                }
                is ReadException, is UnInitialized -> {
                    if (currentState === update.lastState) {
                        // we need to try to read again
                        readAndInitOrPropagateAndThrowFailure()

                        // We've successfully read, now we need to perform the update
                        transformAndWrite(update.transform, update.callerContext)
                    } else {
                        throw (currentState as ReadException).readException
                    }
                }
                is Final -> throw currentState.finalException // won't happen
            }
        }
    )
}

currentState为Data,调用了transformAndWrite(update.transform, update.callerContext)方法:

private suspend fun transformAndWrite(
    transform: suspend (t: T) -> T,
    callerContext: CoroutineContext
): T {
    val curDataAndHash = downstreamFlow.value as Data<T>
    curDataAndHash.checkHashCode()

    val curData = curDataAndHash.value
    val newData = withContext(callerContext) { transform(curData) }
    curDataAndHash.checkHashCode()
    return if (curData == newData) {
        curData
    } else {
        writeData(newData)
        downstreamFlow.value = Data(newData, newData.hashCode())
        newData
    }
}

如果当前Data和新的Data相等时,则直接返回,不等则调用writeData(newData):

internal suspend fun writeData(newData: T) {
    file.createParentDirectories()
    val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
    try {
        FileOutputStream(scratchFile).use { stream ->
            serializer.writeTo(newData, SingleProcessDataStore.UncloseableOutputStream(stream))
            stream.fd.sync().
        }

        if (!scratchFile.renameTo(file)) {
            throw IOException()
        }
    } catch (ex: IOException) {
        if (scratchFile.exists()) {
            scratchFile.delete() // Swallow failure to delete
        }
        throw ex
    }
}

创建输出流,把新数据写入到文件中。

总结

本文分析了SharedPreferences的缺点,接着从源码角度分析了DataStore,进行了俩者的优缺点对比。这里放一张官方的对比图:

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
安全 API Android开发
Jetpack架构组件库-Jetpack入门介绍
Jetpack架构组件库-Jetpack入门介绍
145 0
|
XML 存储 JSON
Android Jetpack组件 DataStore的使用和简单封装
Android Jetpack组件 DataStore的使用和简单封装
799 0
Android Jetpack组件 DataStore的使用和简单封装
|
存储 XML 安全
Jetpack DataStore 你总要了解一下吧?
一、DataStore 介绍 DataStore 是 Android Jetpack 中的一个组件,它是一个数据存储的解决方案,跟 SharedPreferences 一样,采用key-value形式存储。 DataStore 保证原子性,一致性,隔离性,持久性。尤其是,它解决了 SharedPreferences API 的设计缺陷。 Jetpack DataStore 是经过改进的新版数据存储解决方案,旨在取代 SharedPreferences,让应用能够以异步、事务方式存储数据。
1000 0
Jetpack DataStore 你总要了解一下吧?
|
4月前
|
XML 存储 API
Jetpack初尝试 NavController,LiveData,DataBing,ViewModel,Paging
Jetpack初尝试 NavController,LiveData,DataBing,ViewModel,Paging
|
5月前
深入了解 Jetpack Compose 中的 Modifier
深入了解 Jetpack Compose 中的 Modifier
|
XML JSON Java
Jetpack 系列之Paging3,看这一篇就够了~
Jetpack 系列之Paging3,看这一篇就够了~
3158 4
Jetpack 系列之Paging3,看这一篇就够了~
|
Java API 开发工具
Jetpack 之 LifeCycle 组件使用详解
LifeCycle 是一个可以感知宿主生命周期变化的组件。常见的宿主包括 Activity/Fragment、Service 和 Application。LifeCycle 会持有宿主的生命周期状态的信息,当宿主生命周期发生变化时,会通知监听宿主的观察者。
121 0
Jetpack 之 LifeCycle 组件使用详解
|
Java API Android开发
Jetpack 之 LifeCycle 组件原理解析
1. LifeCycle 是如何监听到 Activity/Fragment 生命周期变化的? 2. LifeCycle 如何将生命周期变化的事件分发给观察者的?
125 0
Jetpack 之 LifeCycle 组件原理解析
|
存储 XML 安全
Android Jetpack系列之DataStore
`Jetpack DataStore` 是一种改进的新数据存储解决方案,允许使用**协议缓冲区**存储键值对或类型化对象。`DataStore` **以异步、一致的事务方式存储数据,克服了 SharedPreferences(以下统称为SP)的一些缺点**。`DataStore`基于`Kotlin`协程和`Flow`实现,并且可以对`SP`数据进行迁移,旨在取代`SP`
549 0
|
SQL 数据库 开发者
Jetpack初识
Google为了开发者更好的设计软件的代码架构以及写出高质量的代码,推出Jetpack组件,将许多好用的代码进行封装,总之使用Jetpack可以使得我们利用更少的时间开发出更高质量的软件
110 0