【HuggingFace】基于检索策略的隐私政策标注应用
数据源:https://github.com/EnlightenedAI/CAPP-130
注意在数据源的链接中原作者自己也训练了一个用以标注隐私政策信息的模型,有兴趣可以去看一下。
hugging-face上的Space:https://huggingface.co/spaces/ThornRugal/ChinesePrivacyPolicyMark
注意由于我没有花钱租GPU,hugging-face上的这个应用使用的CPU版本的,跑起来很慢,尽量不要输入整个隐私政策文档。有条件的可以直接clone到本地使用GPU跑。
现在注册网站都需要同意隐私政策才可以注册,然而隐私政策条文过于复杂冗余让人难以阅读而且用户急于进入应用,因此有很多人都是直接勾选“已经阅读并同意xx政策”。然而这一之前也有迪士尼在合同中暗藏“不允许以任何理由在法律上控告迪士尼公司”的“霸王条款”。虽然不是隐私政策条款,但是也侧面证明了,在注册账号前详细查看各个条款是有必要的。在这一数据集中,包含130个来自流行应用的中文隐私政策,这些政策由法律专家进行标注,甚至还有许多的重写句子。该数据集旨在帮助用户理解和总结隐私政策,以保护个人隐私信息。本次的任务,就是利用数据集做一个自动标注隐私数据的应用。
问题分析
在数据集中,标注者将以下多个标签打入隐私政策的句子中:
'使用','停止运营','共享、委托、转让、公开(披露)','存储方式','安全措施','摘要','数据、权限管理','数据收集','权限获取','潜在风险','特殊人群','联系方式','隐私政策的授权、修改与变更'
每句句子可能不包含任何标签,也可能包含多个标签。诚然,我们可以将它作为一个分类问题,一共13个子问题(是否有“使用”标签,是否有“停止运营”标签……),如果将他们使用一个单独的分类模型需要做13个输出,且会面临模型过于复杂、数据失衡、决策边界模糊等问题;而如果对13个子问题分别建模,则又会大大增加模型的维护成本。(数据源的提供者训练了一个可以用以检测“潜在风险”的二分类模型。)因此,笔者使用了一个更为简易的策略:直接将所有标注好的数据序列化后保存,有新数据时将其与保存的数据进行对比,将与它最接近的标签拿出作为预测结果。这样的策略尽管可能准确率上比作为分类任务时来的要差,但是胜在简单快捷,连fine-tune都不需要,速度与能耗上更为优秀。
标注策略
零、模型流程
我们集合所有标签非空的句子,将它们作为字典存储,对于每个键(句子),使用特征筛选模型将其序列化,变为多个向量。当有新的句子进来后,使用特征筛选模型序列化句子得到一个新的向量,筛选出离新最近的那n个向量获得n条备选句子。之后将新句子与n条备选句子输入句子相似度模型进行对比,再精筛出m条句子,将它们对应的标签融合作为输入语句的标签。
一、数据准备
首先我们将jsonl格式的语句转化为pandas.DataFrame格式
import pandas as pd
import osdata = pd.DataFrame(columns=["task","id","text","label","rewrite"])
file_dir = "./CAPP_130_Corpus/Annotation/Annotation/"
for file in os.listdir(file_dir):pth = file_dir+filetmp = pd.read_json(pth,lines=True)data = pd.concat([data,tmp],axis=0).reset_index(drop=True)data.to_excel("data_Excel_format.xlsx",index=None)
我们读取保存好的Excel文件,并筛选其中的一部分作为验证集。注意保存为Excel文件后list对象会被当做字符串,需要手动将其转化回来。将训练集以及总体数据的分别筛选标签非空的部分保存备用。
import warnings
warnings.filterwarnings("ignore")import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from itertools import islice
import faiss
import os
import ast
import jsondata_all = pd.read_excel("data_Excel_format.xlsx")
def get_not_empty_data(df,x_column="text",y_column="label"):df = df[df[y_column] != "[]"].reset_index(drop=True)res_dict = {}for idx in df.index:if df.loc[idx,x_column] not in res_dict:res_dict[df.loc[idx,x_column]] = ast.literal_eval(df.loc[idx,y_column])else:res_dict[df.loc[idx,x_column]] += ast.literal_eval(df.loc[idx,y_column])res_dict = {k:list(set(v)) for k,v in res_dict.items()}df_dict = pd.DataFrame({"x":res_dict.keys(),"y":res_dict.values()})return df_dictdf_dict_all = get_not_empty_data(data_all)
df_dict_all.to_excel("data_not_empty.xlsx",index=None)
train_tasks = {f"rw_{i+1}.jsonl" for i in range(int(data_all["task"].nunique()*0.8))}
x_train = data_all[data_all["task"].isin(train_tasks)]["text"].values
y_train = data_all[data_all["task"].isin(train_tasks)]["label"].valuesx_valid = data_all[~data_all["task"].isin(train_tasks)]["text"].values
y_valid = data_all[~data_all["task"].isin(train_tasks)]["label"].valuesdf_train = pd.DataFrame({"x":x_train,"y":y_train})
df_valid = pd.DataFrame({"x":x_valid,"y":y_valid})
df_dict_train = get_not_empty_data(df_train,x_column="x",y_column="y")
二、特征比对(粗筛)
使用特征提取模型将各个句子进行向量化并保存,通过向量对比的方式粗筛出前n个相似的句子。
import torch.nn.functional as F
import torch
from transformers import AutoModel, AutoTokenizerEncoding_model = 'jinaai/jina-embeddings-v2-base-zh'
model = AutoModel.from_pretrained(Encoding_model, trust_remote_code=True, torch_dtype=torch.bfloat16)
model.to("cuda")
x_train = df_dict_train["x"].values
y_train = df_dict_train["y"].values
vec = np.empty(shape=[0,768],dtype="float32")
bsize = 256
with torch.no_grad():for i in range(0,len(x_train),bsize):tmp = model.encode(x_train[i:i+bsize])vec = np.concatenate([vec,tmp])index = faiss.IndexFlatIP(768)
faiss.normalize_L2(vec)
index.add(vec)
faiss.write_index(index,"train_index.faiss")
index = faiss.read_index("train_index.faiss")# 最后一位代表无法匹配
x_train = np.array(x_train.tolist()+["None"])
y_train = np.array([str(i) for i in y_train]+["['None']"])def string_to_list(s):s = ast.literal_eval("["+s+"]")res = []for i in s:if i != ["None"]:res += ireturn set(res)bsize=128
unassigned = len(y_train)-1recall_rate = []
misjudgement_rate = []
params = []
with torch.no_grad():for threshold in [0.5,0.55,0.6,0.65,0.7,0.75,0.8,0.85]:for n_nearest in [5,10,20]:res = pd.DataFrame(columns=["key","key_pred","y","y_pred"])for i in range(0,len(x_valid),bsize):vec = model.encode(x_valid[i:i+bsize])faiss.normalize_L2(vec)scores, indexes = index.search(vec,n_nearest)mask = scores<thresholdscores[mask] = unassignedindexes[mask] = unassignedtmp = pd.DataFrame({"key":x_valid[i:i+bsize],"key_pred":["\n".join(i) for i in x_train[indexes]],"y":y_valid[i:i+bsize],"y_pred":[",".join(y_train[indexes[i]]) for i in range(indexes.shape[0])]})res = pd.concat([res,tmp]).reset_index(drop=True)res.to_excel(f"for_validation_{n_nearest}_{int(threshold*100)}.xlsx",index=None)y_true = res["y"].map(ast.literal_eval)y_pred = res["y_pred"].map(string_to_list)cnt_all = 0cnt_recall = 0cnt_misjudge = 0cnt_pred = 0for i,x in enumerate(y_true):cnt_pred+=len(y_pred[i])if x:cnt_all += len(x)for c in y_pred[i]:if c not in x:cnt_misjudge+=1else:cnt_recall += 1else:cnt_misjudge += len(y_pred[i])print(f"threshold:{threshold} {n_nearest} nearest recall_rate:{cnt_recall/cnt_all} misjudgement_rate:{cnt_misjudge/cnt_pred}")recall_rate.append(cnt_recall/cnt_all)misjudgement_rate.append(cnt_misjudge/cnt_pred)params.append((threshold,n_nearest))
对于粗筛,我们有以下2个参数:1、threshold:阈值。当某个句子低于阈值时,哪怕离新句子是最为接近的,也会被筛去。2、n_nearest:需要筛选的最接近的n条句子。(最多筛选n条,比threshold小的句子会被筛去,一个也不剩就是没有标签的句子)
使用训练集中的句子作为筛选池,在验证集上进行测试得到如下结果:
import matplotlib as mpl
from matplotlib import pyplot as plt
plt.plot(recall_rate,label="recall rate")
plt.plot(misjudgement_rate,label="misjudgementrate")
plt.legend()
plt.show()
效果看上去不怎么好:尽管在threshold值较小时,各个标签的召回率很高,但是误判率却较高。提升阈值能某种程度上缓解误判率的问题,但是却会使得召回率下降。
然而在深入查看数据后,我发现此处的误判率会有“虚高”的问题:
上面的2张图分别是起点读书与全民K歌的隐私政策,相同的语句(“请注意,主动更开的个人信息可能包含您的个人身份信息、个人财产信息等敏感信息。”)却获得了不同的标签。在起点读书中被认为是潜在风险的语句到了全民K歌中却没有获得任何标签。因此,很有可能由于标注者的不同,每个句子的判定会有的差别。如果用户仅查看一个隐私政策的文本,实际上“误标”的语句代价相对较小,因此应当更为注重模型的召回率。
三、句子相似度计算(精筛)
在粗筛出n条相似句子之后,使用句子相似度模型对输入的句子和n条备选语句两两比对,找出最何时的那条语句。
similarity_model = 'Alibaba-NLP/gte-multilingual-base'
similarity_tokenizer = AutoTokenizer.from_pretrained(similarity_model)
similarity_model = AutoModel.from_pretrained(similarity_model, trust_remote_code=True).to("cuda")def calc_scores(x):return (x[:1] @ x[1:].T)def get_idxs(threshold,max_len,arr):res = np.where(arr >= threshold)[0]if len(res)<max_len:return resres = res[np.argsort(-arr[res])][:3]return resdef merge_set_to_list(set_list):res = set()for i in set_list:res = res | ireturn resdef get_predict_result(index,score,threshold,max_len):score = score.flatten()index = index.flatten()index_of_index = np.where(score >= threshold)[0]if len(index_of_index)>=max_len:index_of_index = index_of_index[np.argsort(-index[index_of_index])][:3]if len(index_of_index)==0:return {},[]res_index = index[index_of_index]res = merge_set_to_list([string_to_list(i) for i in y_train[res_index]])return res,x_train[res_index]bsize=8
index = faiss.read_index("train_index.faiss")
n_nearest = 5
recall_rate = []
misjudgement_rate = []
params = []
with torch.no_grad():for threshold in [0.7,0.75,0.8,0.85]:for max_result_len in [1,3]:res = pd.DataFrame(columns=["key","key_pred","y","y_pred"])for i in range(0,len(x_valid),bsize):sentences = x_valid[i:i+bsize]y_true = y_valid[i:i+bsize]vec = model.encode(sentences)faiss.normalize_L2(vec)scores, indexes = index.search(vec,n_nearest)x_pred = np.array([[sentences[j]]+s.tolist() for j,s in enumerate(x_train[indexes])])# for j in x_pred:# batch_dict = similarity_tokenizer(j.tolist(), max_length=768, padding=True, truncation=True, return_tensors='pt').to("cuda")batch_dict = similarity_tokenizer(x_pred.flatten().tolist(), max_length=768, padding=True, truncation=True, return_tensors='pt').to("cuda")outputs = similarity_model(**batch_dict)dimension=384embeddings = outputs.last_hidden_state[:, 0][:dimension]embeddings = F.normalize(embeddings, p=2, dim=1)embeddings = embeddings.view(len(x_pred),n_nearest+1,dimension).detach().cpu().numpy()scores = [calc_scores(embeddings[b]) for b in range(embeddings.shape[0])]pred = [get_predict_result(indexes[k],scores[k],threshold=threshold,max_len=max_result_len) for k in range(len(scores))]y_pred = [i[0] for i in pred]x_pred = [i[1] for i in pred]tmp = pd.DataFrame({"key":x_valid[i:i+bsize],"key_pred":x_pred,"y":y_valid[i:i+bsize],"y_pred":y_pred})res = pd.concat([res,tmp]).reset_index(drop=True)res.to_excel(f"for_validation_similar_{max_result_len}_{threshold*100}.xlsx",index=None)y_pred = res["y_pred"]y_true = res["y"].map(ast.literal_eval)cnt_all = 0cnt_recall = 0cnt_misjudge = 0cnt_pred = 0for i,x in enumerate(y_true):cnt_pred+=len(y_pred[i])if x:cnt_all += len(x)for c in y_pred[i]:if c not in x:cnt_misjudge+=1else:cnt_recall += 1else:cnt_misjudge += len(y_pred[i])print(f"threshold:{threshold} {max_result_len} top three recall_rate:{cnt_recall/cnt_all} misjudgement_rate:{cnt_misjudge/cnt_pred}")recall_rate.append(cnt_recall/cnt_all)misjudgement_rate.append(cnt_misjudge/cnt_pred)params.append((threshold,max_result_len))
精筛也有以下2个参数:1、threshold:阈值。当2个句子的相似度低于阈值时,备选项会被筛去。2、max_result_len:选取相似度前m条句子。(最多筛选max_result_len条,比threshold小的句子会被筛去,一个也不剩就是没有标签的句子)
网页应用
最终,使用gradio制成网页端:
import gradio as gr
import warnings
warnings.filterwarnings("ignore")import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
from itertools import islice
import faiss
import os
import ast
import jsonimport torch.nn.functional as F
import torch
from transformers import AutoModel, AutoTokenizerEncoding_model = 'jinaai/jina-embeddings-v2-base-zh'
model = AutoModel.from_pretrained(Encoding_model, trust_remote_code=True, torch_dtype=torch.bfloat16)
model.to("cuda")similarity_model = 'Alibaba-NLP/gte-multilingual-base'
similarity_tokenizer = AutoTokenizer.from_pretrained(similarity_model)
similarity_model = AutoModel.from_pretrained(similarity_model, trust_remote_code=True).to("cuda")def get_not_empty_data(df,x_column="text",y_column="label"):df = df[df[y_column] != "[]"].reset_index(drop=True)res_dict = {}for idx in df.index:if df.loc[idx,x_column] not in res_dict:res_dict[df.loc[idx,x_column]] = ast.literal_eval(df.loc[idx,y_column])else:res_dict[df.loc[idx,x_column]] += ast.literal_eval(df.loc[idx,y_column])res_dict = {k:list(set(v)) for k,v in res_dict.items()}df_dict = pd.DataFrame({"x":res_dict.keys(),"y":res_dict.values()})return df_dictdata_all = pd.read_excel("data_Excel_format.xlsx")
df_dict_all = get_not_empty_data(data_all)
x_dict = df_dict_all["x"].values
y_dict = df_dict_all["y"].valuesdef calc_scores(x):return (x[:1] @ x[1:].T)def get_idxs(threshold,max_len,arr):res = np.where(arr >= threshold)[0]if len(res)<max_len:return resres = res[np.argsort(-arr[res])][:3]return resdef merge_set_to_list(set_list):res = set()for i in set_list:res = res | ireturn resdef get_predict_result(index,score,threshold,max_len):score = score.flatten()index = index.flatten()index_of_index = np.where(score >= threshold)[0]if len(index_of_index)>=max_len:index_of_index = index_of_index[np.argsort(-index[index_of_index])][:3]if len(index_of_index)==0:return {},[]res_index = index[index_of_index]res = merge_set_to_list([set(i) for i in y_dict[res_index]])return res,x_dict[res_index]# vec = np.empty(shape=[0,768],dtype="float32")
# bsize = 256
# with torch.no_grad():
# for i in range(0,len(x),bsize):
# tmp = model.encode(x[i:i+bsize])
# vec = np.concatenate([vec,tmp])# index = faiss.IndexFlatIP(768)
# faiss.normalize_L2(vec)
# index.add(vec)
# faiss.write_index(index,"all_index.faiss")
index = faiss.read_index("all_index.faiss")def predict_label(x,threshold=0.85,n_nearest=10,max_result_len=3):bsize=1y_pred = []with torch.no_grad():for i in range(0,len(x),bsize):sentences = x[i:i+bsize]vec = model.encode(sentences)faiss.normalize_L2(vec)scores, indexes = index.search(vec,n_nearest)x_pred = np.array([[sentences[j]]+s.tolist() for j,s in enumerate(x_dict[indexes])])batch_dict = similarity_tokenizer(x_pred.flatten().tolist(), max_length=768, padding=True, truncation=True, return_tensors='pt').to("cuda")outputs = similarity_model(**batch_dict)dimension=768embeddings = outputs.last_hidden_state[:, 0][:dimension]embeddings = F.normalize(embeddings, p=2, dim=1)embeddings = embeddings.view(len(x_pred),n_nearest+1,dimension).detach().cpu().numpy()scores = [calc_scores(embeddings[b]) for b in range(embeddings.shape[0])]pred = [get_predict_result(indexes[k],scores[k],threshold=threshold,max_len=max_result_len) for k in range(len(scores))]y_pred.append([i[0] for i in pred])return y_predCSS_Content = """
<!DOCTYPE html>
<html lang="en">
<head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style> #custom_id { border: 2px solid red; padding: 10px; background-color: lightgray;}</style>
</head>
</html>
<span style="color: red;line-height:1;">红色字体:潜在风险</span><br>
<span style="color: blue;line-height:1;">蓝色字体:权限获取</span><br>
<span style="color: purple;line-height:1;">紫色字体:数据收集</span><br>
<span style="color: green;line-height:1;">绿色字体:数据、权限管理</span><br>
<span style="color: brown;line-height:1;">棕色字体:共享、委托、转让、公开(披露)</span><br>
""" color_dict = {"潜在风险":"red","权限获取":"blue","数据收集":"purple","数据、权限管理":"green","共享、委托、转让、公开(披露)":"brown"}def generate_HTML(text,threshold=0.85,n_nearest=10,max_result_len=3):sentences = text.split("\n")sentences = [i for i in map(lambda x:x.split("。"),sentences)]res = CSS_Contentfor paragraph in sentences:tmp_res = []pred_label = predict_label(paragraph,threshold,n_nearest,max_result_len)for i,x in enumerate(pred_label):pre = "<span"if len(x[0])>0:for j in color_dict.keys(): #color dict重要性递减,所以只取第一个标签的颜色if j in x[0]:pre += f' style="color: {color_dict[j]};line-height:1;"'breaktmp_res.append(pre+">"+paragraph[i]+"</span>")res += "。".join(tmp_res)res += "<br>"return reswith gr.Blocks() as demo: with gr.Row(): input_text = gr.Textbox(lines=25,label="输入")with gr.Row():threshold = gr.Slider(minimum=0.5,maximum=0.85,value=0.75,step=0.05,interactive=True,label="相似度阈值")n_nearest = gr.Slider(minimum=3,maximum=10,value=10,step=1,interactive=True,label="粗筛语句数量")max_result_len = gr.Slider(minimum=1,maximum=5,value=3,step=1,interactive=True,label="精筛语句数量")with gr.Row():submit_button = gr.Button("检测") with gr.Row():output_text = gr.HTML(CSS_Content)output_text.elem_id="custom_id"submit_button.click(fn=generate_HTML, inputs=[input_text,threshold,n_nearest,max_result_len], outputs=output_text)demo.launch()
在处理输入的文本时,我们将文本首先按照换行符切分,之后再按照句号切分,以此保证标注后的文本能够恢复成原本的样式。我们将句子输入模型中,得到每个句子的标签,之后根据预先设定好的标签对应颜色,生成带有颜色样式的HTML代码。而在处理输入时,我们还将3个参数作为用户输入的一部分:
1、threshold:精筛时的阈值,低于阈值的备选句子被舍去
2、粗筛语句数量:粗筛时是啊选出输入句子最近的备选句子数量
3、精筛语句数量:精筛时选取最多多少句子的标签
最终,能够输出带有颜色标签的隐私政策。