方法
我们使用最大均值差异(MMD)和 Kolmogorov-Smirnov (K-S) 检测器检测文本数据的漂移。
在这个示例中,我们将专注于检测协变量漂移Δp(x)\Delta p(x)Δp(x), 因为检测预测的标签分布漂移与其他方式没有区别(在 CIFAR-10 上检查 K-S 和 MMD 漂移)。
然而,当我们想要获取输入数据漂移Δp(x)\Delta p(x)Δp(x)时,它变得更加复杂。
当我们处理表格或图像数据时,我们可以直接在输入上应用两个样本假设检验,或者在预处理步骤后进行测试。
例如:使用Failing Loudly: An Empirical Study of Methods for Detecting Dataset Shift(他们称之为Untrained AutoEncoder 或UAE)中建议的随机初始化编码器。
但是在处理文本时,无论是字符串还是tokenized格式,都不是那么简单,因为它们不直接表示输入的语义。
因此,我们提取文本的embeddings并检测它们的漂移。此过程对我们检测到的漂移类型有重大影响。严格来说,我们不再检测Δp(x)\Delta p(x)Δp(x),因为(预)训练embeddings的整个训练过程(目标函数、训练数据等)对我们提取的embeddings有影响。
该库包含利用 HuggingFace transformer 包中预训练embeddings的功能,但也允许您轻松使用自己选择的embeddings。本文中的示例说明了这两个选项。
注意:
正如本文中所做的那样,建议将文本数据作为字符串列表 (
List[str]
) 传递给检测器。 这允许与 HuggingFace 的 transformers 库无缝集成。上述情况的一个例外是使用自定义 embeddings 时。 在这里,确保数据以兼容的格式传递给自定义 embeddings 模型非常重要。 在最后一个示例中,定义了
preprocess_batch_fn
以将list
转换为自定义TensorFlow embedding
所期望的np.ndarray
。
后端
该方法适用于 PyTorch 和 TensorFlow 框架,用于统计测试和预处理步骤。 但是 Alibi Detect 不会为您安装 PyTorch。 如何执行此操作请查看 PyTorch 文档。
数据集
我们使用包含 25000 个用于训练的和 25000 个用于测试的电影评论情感分类数据集(二分类)。 安装 nlp 库以获取数据集:
!pip install nlp 复制代码
import nlp import numpy as np import os import tensorflow as tf from transformers import AutoTokenizer from alibi_detect.cd import KSDrift, MMDDrift from alibi_detect.saving import save_detector, load_detector 复制代码
加载 Tokenizer
model_name = 'bert-base-cased' tokenizer = AutoTokenizer.from_pretrained(model_name) 复制代码
加载数据
def load_dataset(dataset: str, split: str = 'test'): # 包含训练、测试、无监督数据集 data = nlp.load_dataset(dataset) X, y = [], [] for x in data[split]: X.append(x['text']) y.append(x['label']) X = np.array(X) y = np.array(y) return X, y 复制代码
# 训练集 X, y = load_dataset('imdb', split='train') print(X.shape, y.shape) 复制代码
运行结果:
(25000,) (25000,) 复制代码
让我们分别看一下负面和正面的评论:
# 0 表示负面 , 1表示正面 labels = ['Negative', 'Positive'] print(labels[y[-1]]) print(X[-1]) 复制代码
运行结果:
Negative This is one of the dumbest films, I've ever seen. It rips off nearly ever type of thriller and manages to make a mess of them all.<br /><br />There's not a single good line or character in the whole mess. If there was a plot, it was an afterthought and as far as acting goes, there's nothing good to say so Ill say nothing. I honestly cant understand how this type of nonsense gets produced and actually released, does somebody somewhere not at some stage think, 'Oh my god this really is a load of shite' and call it a day. Its crap like this that has people downloading illegally, the trailer looks like a completely different film, at least if you have download it, you haven't wasted your time or money Don't waste your time, this is painful. 复制代码
print(labels[y[2]]) print(X[2]) 复制代码
运行结果:
Positive Brilliant over-acting by Lesley Ann Warren. Best dramatic hobo lady I have ever seen, and love scenes in clothes warehouse are second to none. The corn on face is a classic, as good as anything in Blazing Saddles. The take on lawyers is also superb. After being accused of being a turncoat, selling out his boss, and being dishonest the lawyer of Pepto Bolt shrugs indifferently "I'm a lawyer" he says. Three funny words. Jeffrey Tambor, a favorite from the later Larry Sanders show, is fantastic here too as a mad millionaire who wants to crush the ghetto. His character is more malevolent than usual. The hospital scene, and the scene where the homeless invade a demolition site, are all-time classics. Look for the legs scene and the two big diggers fighting (one bleeds). This movie gets better each time I see it (which is quite often). 复制代码
我们将原始测试集拆分为一个参考数据集和一个在统计测试的 H0 下不应拒绝的数据集。 我们还创建了不平衡的数据集,并在参考集中注入了选定的单词。
def random_sample(X: np.ndarray, y: np.ndarray, proba_zero: float, n: int): if len(y.shape) == 1: # 获取下标 idx_0 = np.where(y == 0)[0] idx_1 = np.where(y == 1)[0] else: idx_0 = np.where(y[:, 0] == 1)[0] idx_1 = np.where(y[:, 1] == 1)[0] # 计算分别从标签为 0 或 1 的数据中取出多少数据 n_0, n_1 = int(n * proba_zero), int(n * (1 - proba_zero)) # 随机选择N个数据 idx_0_out = np.random.choice(idx_0, n_0, replace=False) idx_1_out = np.random.choice(idx_1, n_1, replace=False) # 拼接筛选出的标签为0或标签为1的数据 X_out = np.concatenate([X[idx_0_out], X[idx_1_out]]) y_out = np.concatenate([y[idx_0_out], y[idx_1_out]]) return X_out.tolist(), y_out.tolist() def padding_last(x: np.ndarray, seq_len: int) -> np.ndarray: try: # try not to replace padding token last_token = np.where(x == 0)[0][0] except: # no padding last_token = seq_len - 1 return 1, last_token def padding_first(x: np.ndarray, seq_len: int) -> np.ndarray: try: # try not to replace padding token first_token = np.where(x == 0)[0][-1] + 2 except: # no padding first_token = 0 return first_token, seq_len - 1 def inject_word(token: int, X: np.ndarray, perc_chg: float, padding: str = 'last'): seq_len = X.shape[1] n_chg = int(perc_chg * .01 * seq_len) X_cp = X.copy() for _ in range(X.shape[0]): if padding == 'last': first_token, last_token = padding_last(X_cp[_, :], seq_len) else: first_token, last_token = padding_first(X_cp[_, :], seq_len) if last_token <= n_chg: choice_len = seq_len else: choice_len = last_token idx = np.random.choice(np.arange(first_token, choice_len), n_chg, replace=False) X_cp[_, idx] = token return X_cp.tolist() 复制代码
参考、H0 和不平衡数据集:
# proba_zero = fraction with label 0 (=negative sentiment) n_sample = 1000 # 参考数据集 X_ref = random_sample(X, y, proba_zero=.5, n=n_sample)[0] # H0数据集 X_h0 = random_sample(X, y, proba_zero=.5, n=n_sample)[0] # 不平衡数据集 n_imb = [.1, .9] X_imb = {_: random_sample(X, y, proba_zero=_, n=n_sample)[0] for _ in n_imb} 复制代码
在参考数据集中注入单词:
# 好极了、好的、不好的,极差的 words = ['fantastic', 'good', 'bad', 'horrible'] # 受干扰的百分比,1% 或 5% perc_chg = [1., 5.] # % of tokens to change in an instance # input_ids: 你的 tokens 的数字表示 words_tf = tokenizer(words)['input_ids'] words_tf = [token[1:-1][0] for token in words_tf] max_len = 100 tokens = tokenizer(X_ref, pad_to_max_length=True, max_length=max_len, return_tensors='tf') X_word = {} for i, w in enumerate(words_tf): X_word[words[i]] = {} for p in perc_chg: x = inject_word(w, tokens['input_ids'].numpy(), p) dec = tokenizer.batch_decode(x, **dict(skip_special_tokens=True)) X_word[words[i]][p] = dec 复制代码
tokens['input_ids'] 复制代码
<tf.Tensor: shape=(1000, 100), dtype=int32, numpy= array([[ 101, 1188, 1794, ..., 0, 0, 0], [ 101, 1556, 5122, ..., 1307, 1800, 102], [ 101, 3406, 4720, ..., 5674, 2723, 102], ..., [ 101, 2082, 1122, ..., 1641, 107, 102], [ 101, 1124, 118, ..., 1155, 1104, 102], [ 101, 1249, 24017, ..., 0, 0, 0]], dtype=int32)> 复制代码
预处理
首先,我们需要指定要从 BERT 模型中提取的embedding类型。我们可以从…中提取embedding
- pooler_output:序列的第一个标记(分类标记;CLS)的最后一层隐藏状态,由线性层和 Tanh 激活函数进一步处理。线性层权重在预训练期间从下一个句子预测(分类)目标进行训练。注意:这个输出通常不能很好地总结输入的语义内容,你通常最好对整个输入序列的隐藏状态序列进行平均或池化。
- last_hidden_state:模型最后一层输出的隐藏状态序列,对tokens进行平均。
- hidden_state:模型在每层输出处的隐藏状态,对tokens进行平均。
- hidden_state_cls:查看 hidden_state 但使用 CLS tokens 输出。
如果 hidden_state 或 hidden_state_cls 用作 embedding 类型,您还需要传递用于从中提取 embedding 的层号。作为一个例子,我们从最后 8 个隐藏状态中提取 embeddings。
from alibi_detect.models.tensorflow import TransformerEmbedding emb_type = 'hidden_state' n_layers = 8 layers = [-_ for _ in range(1, n_layers + 1)] embedding = TransformerEmbedding(model_name, emb_type, layers) 复制代码
让我们检查一下 embedding 的样子:
tokens = tokenizer(list(X[:5]), pad_to_max_length=True, max_length=max_len, return_tensors='tf') # embedding模型 x_emb = embedding(tokens) print(x_emb.shape) 复制代码
运行结果:
(5, 768) 复制代码
因此,漂移检测器使用的 BERT 模型的 embedding 空间由每个实例的 768 维向量组成。 因此,在进行统计假设检验之前,我们将首先使用未经训练的自动编码器 (UAE) 应用降维步骤。 我们使用embedding 模型作为 UAE 的输入,然后将 embedding 投影到低维空间。
tf.random.set_seed(0) 复制代码
from alibi_detect.cd.tensorflow import UAE # 低维 enc_dim = 32 shape = (x_emb.shape[1],) uae = UAE(input_layer=embedding, shape=shape, enc_dim=enc_dim) 复制代码
让我们再次测试一下:
emb_uae = uae(tokens) print(emb_uae.shape) 复制代码
运行结果:
(5, 32) 复制代码