前言
在正式讲解DataStore之前,我们先回顾下大家熟知的SharedPreferences(以下简称SP),众所周知SP有一些缺点,如调用getXXX()获取数据,可能会阻塞主线程;无法保证类型安全;加载的数据会一直留在内存中,浪费内存;apply方法无法获取到操作成功或失败的结果。
SharedPreferences回顾
getXXX()可能会阻塞主线程
请看以下SP获取value值的使用代码:
val sp = context.getSharedPreferences("sp_file", Context.MODE_PRIVATE)
sp.getString("name", "sp")
看下getSharedPreferences的实现:
@Override
public SharedPreferences getSharedPreferences(File file, int mode) {
SharedPreferencesImpl sp;
synchronized (ContextImpl.class) {
final ArrayMap<File, SharedPreferencesImpl> cache = getSharedPreferencesCacheLocked();
sp = cache.get(file);
if (sp == null) {
checkMode(mode);
if (getApplicationInfo().targetSdkVersion >= android.os.Build.VERSION_CODES.O) {
...
}
sp = new SharedPreferencesImpl(file, mode);
cache.put(file, sp);
return sp;
}
}
if ((mode & Context.MODE_MULTI_PROCESS) != 0 ||
getApplicationInfo().targetSdkVersion < android.os.Build.VERSION_CODES.HONEYCOMB) {
sp.startReloadIfChangedUnexpectedly();
}
return sp;
}
里面调用了getSharedPreferencesCacheLocked方法获取一个以File为key的ArrayMap:
private ArrayMap<File, SharedPreferencesImpl> getSharedPreferencesCacheLocked() {
if (sSharedPrefsCache == null) {
sSharedPrefsCache = new ArrayMap<>();
}
final String packageName = getPackageName();
ArrayMap<File, SharedPreferencesImpl> packagePrefs = sSharedPrefsCache.get(packageName);
if (packagePrefs == null) {
packagePrefs = new ArrayMap<>();
sSharedPrefsCache.put(packageName, packagePrefs);
}
return packagePrefs;
}
很简单,就是根据应用包名获取一个ArrayMap,并缓存到一个static的sSharedPrefsCache对象中。接着根据file获取对应的SharedPreferencesImpl,如果为空,则创建一个新的对象并缓存到内存中。
SharedPreferencesImpl构造方法会调用startLoadFromDisk方法,看下改方法实现:
private void startLoadFromDisk() {
synchronized (mLock) {
mLoaded = false;
}
new Thread("SharedPreferencesImpl-load") {
public void run() {
loadFromDisk();
}
}.start();
}
启动一个子线层,调用了loadFromDisk方法,该方法就是读取对应SP文件,把键值对缓存到内存中。
接着看下getString方法的实现:
public String getString(String key, @Nullable String defValue) {
synchronized (mLock) {
awaitLoadedLocked();
String v = (String)mMap.get(key);
return v != null ? v : defValue;
}
}
private void awaitLoadedLocked() {
if (!mLoaded) {
BlockGuard.getThreadPolicy().onReadFromDisk();
}
while (!mLoaded) {
try {
mLock.wait();
} catch (InterruptedException unused) {
}
}
if (mThrowable != null) {
throw new IllegalStateException(mThrowable);
}
}
看到会在一个同步块代码中调用awaitLoadedLocked方法,该方法调用了wait方法。如果读取一个较大的文件,这时又调用了getString方法,如果此时文件还未读取完成,则会造成主线程阻塞。
SP不能保证类型安全
请看以下代码:
val key = "name"
val sp = context.getSharedPreferences("sp_file", Context.MODE_PRIVATE)
sp.edit()?.apply {
putInt(key, 10)
}?.apply()
sp.getString(key, "sp")
把一个Int类型的数据存储到SP中,紧接着调用getString获取同一个key的value值,这时变会出现类型转换异常。
apply方法可能会ANR
为啥apply方法会发生ANR呢,我们看下它的实现:
public void apply() {
final MemoryCommitResult mcr = commitToMemory();
final Runnable awaitCommit = new Runnable() {
@Override
public void run() {
try {
mcr.writtenToDiskLatch.await();
} catch (InterruptedException ignored) {
}
}
};
QueuedWork.addFinisher(awaitCommit);
Runnable postWriteRunnable = new Runnable() {
@Override
public void run() {
awaitCommit.run();
QueuedWork.removeFinisher(awaitCommit);
}
};
SharedPreferencesImpl.this.enqueueDiskWrite(mcr, postWriteRunnable);
notifyListeners(mcr);
commitToMemory方法是把当前要提交的键值对缓存到内存,awaitCommit内调用了锁等待。每调用一次apply方法,就会把它添加到一个队列里。postWriteRunnable执行了awaitCommit的代码,接着移除当前队列里的awaitCommit。最后调用了enqueueDiskWrite方法。看下enqueueDiskWrite方法都做了什么:
private void enqueueDiskWrite(final MemoryCommitResult mcr,
final Runnable postWriteRunnable) {
final boolean isFromSyncCommit = (postWriteRunnable == null);
final Runnable writeToDiskRunnable = new Runnable() {
@Override
public void run() {
synchronized (mWritingToDiskLock) {
writeToFile(mcr, isFromSyncCommit);
}
synchronized (mLock) {
mDiskWritesInFlight--;
}
if (postWriteRunnable != null) {
postWriteRunnable.run();
}
}
};
if (isFromSyncCommit) {
boolean wasEmpty = false;
synchronized (mLock) {
wasEmpty = mDiskWritesInFlight == 1;
}
if (wasEmpty) {
writeToDiskRunnable.run();
return;
}
}
QueuedWork.queue(writeToDiskRunnable, !isFromSyncCommit);
}
这里又有一个writeToDiskRunnable,里面执行了SP的持久化写入,同时调用传进来的postWriteRunnable。最后调用了QueuedWork.queue方法:
public static void queue(Runnable work, boolean shouldDelay) {
Handler handler = getHandler();
synchronized (sLock) {
sWork.add(work);
if (shouldDelay && sCanDelay) {
handler.sendEmptyMessageDelayed(QueuedWorkHandler.MSG_RUN, DELAY);
} else {
handler.sendEmptyMessage(QueuedWorkHandler.MSG_RUN);
}
}
}
private static class QueuedWorkHandler extends Handler {
static final int MSG_RUN = 1;
QueuedWorkHandler(Looper looper) {
super(looper);
}
public void handleMessage(Message msg) {
if (msg.what == MSG_RUN) {
processPendingWork();
}
}
}
private static void processPendingWork() {
synchronized (sProcessingWork) {
LinkedList<Runnable> work;
synchronized (sLock) {
work = sWork;
sWork = new LinkedList<>();
// Remove all msg-s as all work will be processed now
getHandler().removeMessages(QueuedWorkHandler.MSG_RUN);
}
if (work.size() > 0) {
for (Runnable w : work) {
w.run();
}
}
}
}
queue方法把上面的runnable添加到一个队列中,这个队列里的任务会通过内部的HandlerThread执行添加到队列里的任务。看到这里也没发现会产生ANR。发现QueueWork里有一个waitToFinish方法:
/**
* Trigger queued work to be processed immediately. The queued work is processed on a separate
* thread asynchronous. While doing that run and process all finishers on this thread. The
* finishers can be implemented in a way to check weather the queued work is finished.
*
* Is called from the Activity base class's onPause(), after BroadcastReceiver's onReceive,
* after Service command handling, etc. (so async work is never lost)
*/
public static void waitToFinish() {
long startTime = System.currentTimeMillis();
boolean hadMessages = false;
Handler handler = getHandler();
synchronized (sLock) {
if (handler.hasMessages(QueuedWorkHandler.MSG_RUN)) {
// Delayed work will be processed at processPendingWork() below
handler.removeMessages(QueuedWorkHandler.MSG_RUN);
}
// We should not delay any work as this might delay the finishers
sCanDelay = false;
}
StrictMode.ThreadPolicy oldPolicy = StrictMode.allowThreadDiskWrites();
try {
processPendingWork();
} finally {
StrictMode.setThreadPolicy(oldPolicy);
}
try {
while (true) {
Runnable finisher;
synchronized (sLock) {
finisher = sFinishers.poll();
}
if (finisher == null) {
break;
}
finisher.run();
}
} finally {
sCanDelay = true;
}
synchronized (sLock) {
long waitTime = System.currentTimeMillis() - startTime;
if (waitTime > 0 || hadMessages) {
mWaitTimes.add(Long.valueOf(waitTime).intValue());
mNumWaits++;
}
}
}
通过该方法的注释可以看出来,它会在Activity的onPause,BroadcastReceive的onReceive,Service的command handing调用。实际上是在handleServiceArgs、handleStopService、handlePauseActivity、handleStopActivity方法调用的,其实是一个意思。该方法有同步语句块,它等待队列里的任务都执行完成,如果执行大量的文件写入操作,就会造成对应生命周期组件发生ANR。
DataStore解析
初始化
使用由preferencesDataStore创建的委托属性来创建Datastore<Preference>实例。
preferencesDataStore委托给PreferenceDataStoreSingletonDelegate来实现:
internal class PreferenceDataStoreSingletonDelegate internal constructor(
private val name: String,
private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {
private val lock = Any()
@GuardedBy("lock")
@Volatile
private var INSTANCE: DataStore<Preferences>? = null
override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
return INSTANCE ?: synchronized(lock) {
if (INSTANCE == null) {
val applicationContext = thisRef.applicationContext
INSTANCE = PreferenceDataStoreFactory.create(
corruptionHandler = corruptionHandler,
migrations = produceMigrations(applicationContext),
scope = scope
) {
applicationContext.preferencesDataStoreFile(name)
}
}
INSTANCE!!
}
}
}
使用单例创建了一个DataStore<Preferences>实例并返回。name为文件名,produceMigrations用来做sp到datastore的迁移,scope协程作用域,在IO线程下创建。其中preferencesDataStoreFile(name)以name作为文件名创建了一个FIle,后缀名为.preferences_pb。
接着调用PreferenceDataStoreFactory工厂方法:
@JvmOverloads
public fun create(
corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
migrations: List<DataMigration<Preferences>> = listOf(),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
produceFile: () -> File
): DataStore<Preferences> {
val delegate = DataStoreFactory.create(
serializer = PreferencesSerializer,
corruptionHandler = corruptionHandler,
migrations = migrations,
scope = scope
) {
val file = produceFile()
file
}
return PreferenceDataStore(delegate)
}
这里根据delegate对象最后创建了DataStore<Preferences>实例。
看下DataStoreFactory工厂方法:
@JvmOverloads // Generate constructors for default params for java users.
public fun <T> create(
serializer: Serializer<T>,
corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
migrations: List<DataMigration<T>> = listOf(),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
produceFile: () -> File
): DataStore<T> =
SingleProcessDataStore(
produceFile = produceFile,
serializer = serializer,
corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
scope = scope
)
没有什么特殊操作,调用了SingleProcessDataStore来创建DataStore。看下PreferenceDataStore(delegate):
internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :
DataStore<Preferences> by delegate {
override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
Preferences {
return delegate.updateData {
val transformed = transform(it)
(transformed as MutablePreferences).freeze()
transformed
}
}
}
将PreferenceDataStore对象代理给了传进来的delegate,并调用了updateData方法。在方法里将transformed转换成了MutablePreferences并调用了freeze方法。
读数据
看下SingleProcessDataStore的代码:
override val data: Flow<T> = flow {
val currentDownStreamFlowState = downstreamFlow.value
if (currentDownStreamFlowState !is Data) {
// We need to send a read request because we don't have data yet.
actor.offer(Message.Read(currentDownStreamFlowState))
}
emitAll(
downstreamFlow.dropWhile {
if (currentDownStreamFlowState is Data<T> ||
currentDownStreamFlowState is Final<T>
) {
// We don't need to drop any Data or Final values.
false
} else {
// we need to drop the last seen state since it was either an exception or
// wasn't yet initialized. Since we sent a message to actor, we *will* see a
// new value.
it === currentDownStreamFlowState
}
}.map {
when (it) {
is ReadException<T> -> throw it.readException
is Final<T> -> throw it.finalException
is Data<T> -> it.value
is UnInitialized -> error()
}
}
)
}
实现了DataStore接口,对data进行了赋值。当创建DataStore实例时,currentDownStreamFlowState是一个UnInitialized类型,会调用actor.offer方法发送一个Message.Read用于读取文件里面的数据。看下Message接受消息的地方:
private val actor = SimpleActor<SingleProcessDataStore.Message<T>>(
scope = scope,
onComplete = {
it?.let {
downstreamFlow.value = Final(it)
}
synchronized(SingleProcessDataStore.activeFilesLock) {
SingleProcessDataStore.activeFiles.remove(file.absolutePath)
}
},
onUndeliveredElement = { msg, ex ->
if (msg is SingleProcessDataStore.Message.Update) {
msg.ack.completeExceptionally()
}
}
) { msg ->
when (msg) {
is SingleProcessDataStore.Message.Read -> {
handleRead(msg)
}
is SingleProcessDataStore.Message.Update -> {
handleUpdate(msg)
}
}
}
这时msg是Message.Read,调用了handleRead(msg)方法:
private suspend fun handleRead(read: SingleProcessDataStore.Message.Read<T>) {
when (val currentState = downstreamFlow.value) {
is Data -> {
// We already have data so just return...
}
is ReadException -> {
if (currentState === read.lastState) {
readAndInitOrPropagateFailure()
}
}
UnInitialized -> {
readAndInitOrPropagateFailure()
}
is Final -> error("Can't read in final state.") // won't happen
}
}
downstreamFlow.value为UnInitialized,接着调用readAndInitOrPropagateFailure()方法,该方法里面调用了readAndInit()方法:
private suspend fun readAndInit() {
val updateLock = Mutex()
var initData = readDataOrHandleCorruption()
val api = object : InitializerApi<T> {
override suspend fun updateData(transform: suspend (t: T) -> T): T {
return updateLock.withLock() {
val newData = transform(initData)
if (newData != initData) {
writeData(newData)
initData = newData
}
initData
}
}
}
initTasks?.forEach { it(api) }
initTasks = null // Init tasks have run successfully, we don't need them anymore.
updateLock.withLock {
initializationComplete = true
}
downstreamFlow.value = Data(initData, initData.hashCode())
}
这里readDataOrHandleCorruption()方法调用了readData()方法,该方法会读取文件输入流,并转换为Preferences:
private suspend fun readData(): T {
try {
FileInputStream(file).use { stream ->
return serializer.readFrom(stream)
}
} catch (ex: FileNotFoundException) {
if (file.exists()) {
throw ex
}
return serializer.defaultValue
}
}
override suspend fun readFrom(input: InputStream): Preferences {
val preferencesProto = PreferencesMapCompat.readFrom(input)
val mutablePreferences = mutablePreferencesOf()
preferencesProto.preferencesMap.forEach { (name, value) ->
PreferencesSerializer.addProtoEntryToPreferences(name, value, mutablePreferences)
}
return mutablePreferences.toPreferences()
}
最后根据返回的initData数据创建了一个Data并赋值给了downstreamFlow.value,后续便可以在downstreamFlow读取到最新数据。
写数据
通过调用DataStore<Preferences>.edit方法将数据写入到文件里:
public suspend fun DataStore<Preferences>.edit(
transform: suspend (MutablePreferences) -> Unit
): Preferences {
return this.updateData {
// It's safe to return MutablePreferences since we freeze it in
// PreferencesDataStore.updateData()
it.toMutablePreferences().apply { transform(this) }
}
}
调用了delegate.updateData方法:
override suspend fun updateData(transform: suspend (t: T) -> T): T {
val ack = CompletableDeferred<T>()
val currentDownStreamFlowState = downstreamFlow.value
val updateMsg =
SingleProcessDataStore.Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
actor.offer(updateMsg)
return ack.await()
}
通过actor.offer发送了一个Message.Update消息,收到消息后,调用了handleUpdate(msg)方法:
private suspend fun handleUpdate(update: SingleProcessDataStore.Message.Update<T>) {
update.ack.completeWith(
runCatching {
when (val currentState = downstreamFlow.value) {
is Data -> {
// We are already initialized, we just need to perform the update
transformAndWrite(update.transform, update.callerContext)
}
is ReadException, is UnInitialized -> {
if (currentState === update.lastState) {
// we need to try to read again
readAndInitOrPropagateAndThrowFailure()
// We've successfully read, now we need to perform the update
transformAndWrite(update.transform, update.callerContext)
} else {
throw (currentState as ReadException).readException
}
}
is Final -> throw currentState.finalException // won't happen
}
}
)
}
currentState为Data,调用了transformAndWrite(update.transform, update.callerContext)方法:
private suspend fun transformAndWrite(
transform: suspend (t: T) -> T,
callerContext: CoroutineContext
): T {
val curDataAndHash = downstreamFlow.value as Data<T>
curDataAndHash.checkHashCode()
val curData = curDataAndHash.value
val newData = withContext(callerContext) { transform(curData) }
curDataAndHash.checkHashCode()
return if (curData == newData) {
curData
} else {
writeData(newData)
downstreamFlow.value = Data(newData, newData.hashCode())
newData
}
}
如果当前Data和新的Data相等时,则直接返回,不等则调用writeData(newData):
internal suspend fun writeData(newData: T) {
file.createParentDirectories()
val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
try {
FileOutputStream(scratchFile).use { stream ->
serializer.writeTo(newData, SingleProcessDataStore.UncloseableOutputStream(stream))
stream.fd.sync().
}
if (!scratchFile.renameTo(file)) {
throw IOException()
}
} catch (ex: IOException) {
if (scratchFile.exists()) {
scratchFile.delete() // Swallow failure to delete
}
throw ex
}
}
创建输出流,把新数据写入到文件中。
总结
本文分析了SharedPreferences的缺点,接着从源码角度分析了DataStore,进行了俩者的优缺点对比。这里放一张官方的对比图: