【从零开始】5. 向量数据库选型与搭建
书接上回…既然 Python 环境搭好了那么马上就可以开展工作了,首先需要回顾一下我们的最终实现效果是什么。按照预定计划是要做一个基于微信小程序的生成式人工智能机器人。
好,以此为目标先看看手头上有什么资源。
呃…目前手头上只有一张 1060 显卡,显卡虽然有 6G 显存但是在大模型面前还是不太够看。除了显卡外就是 16G 内存和一块 i5 的 CPU。相比于这些,磁盘空间就比较富裕一共有 4 T(256G SSD + 4T HDD)。基于这种配置下想通过大数据做模型微调是不太实际的了(主要是算力不够)。要达到较为理想的效果,剩下的就
只能“手搓”一个 RAG(检索增强生成)应用这个方法了(之所以“手搓”也是为了减少不必要的功能,减少资源消耗)。
既然考虑做 RAG 应用,那么向量数据库就是必不可少的了。现在可用资源吃紧向量数据库的选型也要考虑多方因素。业界推荐的 Milvus 只能存储向量数据而不能存储原文,因此使用 Milvus 时还需搭配其他存储库进行映射,这就不在考虑范围内了。当然了,某些特殊的向量机是可以将词向量或句子向量重新转换成文本的,但是一般不建议这样做。这里面涉及到低维度与高维度的转换开销问题,还不如直接通过字段关联映射来得快速直接。
那有没有一步到位的解决方案呢?
答案是肯定的,就目前所知支持向量的存储库就有 Elasticsearch、Lucene、Cassandra 、MongoDB、Clickhouse、PostgreSQL等。由于本次 RAG 数据主要来自文本,因此最终选择 Elasticsearch(以下简称“es”) 作为向量存储库。(关于 es 的安装可以参考另一片文章《【Docker】Elasticsearch 8.12 安装与搭建》这里就不过多叙述了)。
接下来将 es 的操作封装成 Python 工具类中(顺便将这个 RAG 项目也建起来吧)。项目结构如下图:
brain-mix
|-- resources
| `-- config
| `-- elastic_cnf.yml
|-- test
| `-- unit-test
| `-- elastic_util
| |-- batch_insert_test.py
| |-- delete_by_body_test.py
| |-- delete_by_id_test.py
| |-- find_and_create_index_test.py
| |-- find_by_body_nopaging_test.py
| |-- find_by_body_test.py
| |-- find_by_id_test.py
| |-- find_by_sql_test.py
| |-- insert_test.py
| `-- refresh_index_test.py
`-- utils|-- elastic_util.py`-- yaml_util.py
其中 resources 文件夹中将存放项目的配置文件,数据源文件(jsonl、csv 等)。test 文件夹将存放各种测试文件,譬如压力测试、单元测试等。而 utils 文件夹中将存放工具类,其中 elastic_util.py 就是本次实现的 es 工具类。为了方便配置信息的读取也封装了一个 yaml 文件读取工具类,下面就先看看 yaml_util.py 的内容:
import yamlclass YamlConfig:_instance = Nonedef __init__(self, path):"""初始化 YamlConfig 实例.此构造函数设置YAML文件的路径并加载其内容在`config`属性中。参数:path (str): YAML配置文档路径"""self.path = pathself.config = self.load_config()def __new__(cls, path):"""一个静态方法,用于实例化 YamlConfig 的单例对象.由于 YamlConfig 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建.参数:path (str): YAML配置文档路径.返回:YamlConfig: YamlConfig 的单例对象."""if cls._instance is None:cls._instance = super().__new__(cls)cls._instance.path = pathcls._instance.config = cls._instance.load_config()return cls._instancedef load_config(self):"""读取YAML配置文档的内容。读取并解析YAML配置文档,返回解析后的内容。返回:dict: 解析后的YAML配置文档内容。"""with open(self.path, 'r', encoding='utf-8') as file:return yaml.safe_load(file)def get_value(self, key):"""通过key获取YAML配置文档中的值。参数:key (str): 键名,可能包含多个部分,例如a.b.c。返回:object: 通过key获取的值,可能是None。"""key_parts = key.split('.')value = self.configfor part in key_parts:value = value.get(part)if value is None:breakreturn value
这里的设计思路是按需加载,加载的时候会将配置文件内容加载到内存中,然后需要时直接从内存中获取,这样就不用每次都读取 yml 文件了。有了这个 yaml 文件读取工具后,在 elastic_util.py 中读取配置就变得简单起来了,如下图:
from yaml_util import YamlConfig
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulkimport os
project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))class ElasticUtil:instance = Noneinit_flag = False# 读取 elasticsearch 配置elastic_config = YamlConfig(os.path.join(project_dir, 'resources', 'config', 'elastic_cnf.yml'))def __init__(self):"""初始化 ElasticUtil 实例。此构造函数检查类的初始化标志。如果尚未初始化,则调用私有方法`__elastic_init_model` 来初始化 Elasticsearch 客户端,并将初始化标志设置为 True。"""if not ElasticUtil.init_flag:self.es = Noneself.__elastic_init_model()ElasticUtil.init_flag = Truedef __new__(cls, *args, **kwargs):"""一个静态方法,用于实例化 elastic_util 的单例对象.由于 elastic_util 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建."""if cls.instance is None:cls.instance = super().__new__(cls)return cls.instancedef __elastic_init_model(self) -> None:"""初始化Elasticsearch的client对象.该函数读取YAML配置文件,获取Elasticsearch的host、username、password、max_retries、max_size、timeout等配置项。然后使用这些配置项实例化Elasticsearch的client对象,并将其赋值给全局变量`es`。"""host = ElasticUtil.elastic_config.get_value('es.host')username = ElasticUtil.elastic_config.get_value('es.username')password = ElasticUtil.elastic_config.get_value('es.password')max_retries = ElasticUtil.elastic_config.get_value('es.max-retries')max_size = ElasticUtil.elastic_config.get_value('es.max-size')timeout = ElasticUtil.elastic_config.get_value('es.timeout')self.es = Elasticsearch(host,basic_auth=(username, password),max_retries=max_retries,connections_per_node=max_size,request_timeout=timeout)def insert(self, name, data) -> dict:"""插入单个文档到Elasticsearch索引中。参数:name (str): Elasticsearch索引的名称。data (dict): 要插入的文档数据。返回:dict: 插入操作的结果。该函数在指定的Elasticsearch索引中插入一个文档。如果索引不存在,则抛出异常。"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")response = self.es.index(index=name, body=data)return response["_shards"]["successful"],response['_id']def batch_insert(self, name, datas) -> int:"""批量插入文档到Elasticsearch索引中。该函数将多个文档插入到Elasticsearch索引中。参数:name (str): Elasticsearch索引的名称。datas (list): 要插入的文档列表,列表中的每个元素必须是字典类型。返回:None"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")if not all(isinstance(doc, dict) for doc in datas):raise TypeError("datas 中的所有元素必须是字典类型")actions = [{"_index": name,"_source": doc}for doc in datas]response = bulk(self.es, actions)return response[0]def refresh_index(self, name) -> None:"""重新刷新Elasticsearch索引,以便于最近插入的文档能够被搜索到。参数:name (str): Elasticsearch索引的名称。"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")self.es.indices.refresh(index=name)def delete_by_body(self, name, body) -> None:"""根据给定的搜索体从Elasticsearch索引中删除文档。参数:name (str): Elasticsearch索引的名称。body (dict): 用于查找要删除的文档的搜索体。返回:None"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")self.es.delete_by_query(index=name, query=body,refresh=True)def delete_by_id(self, name, id) -> dict:"""通过ID在Elasticsearch中删除文档。参数:name (str): Elasticsearch索引的名称。id (str): 要删除的文档的ID。返回:dict: 删除操作的结果。"""if id == '' or name == '':raise TypeError("params cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")return self.es.delete(index=name, id=id,refresh=True)def find_by_id(self, name, id) -> dict:"""通过ID在Elasticsearch中查找文档。参数:name (str): Elasticsearch索引的名称。id (str): 文档的ID。返回:dict: 文档的详细信息。"""if id == '' or name == '':raise TypeError("params cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")return self.es.get(index=name, id=id)def find_by_body(self, name, body) -> list:"""通过给定的body在Elasticsearch中搜索并返回结果。参数:name (str): Elasticsearch索引的名称。body (dict): 搜索的body。返回:list: 搜索响应的结果列表。该函数使用Elasticsearch的search API执行搜索操作,并将所有的结果都return回去。"""if name == '':raise TypeError("index cannot be empty")if body == {}:raise KeyError("body cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")response = self.es.search(index=name, body=body)return response['hits']['hits']def find_by_body_nopaging(self, name, body) -> list:"""通过给定的body在Elasticsearch中搜索并返回结果,且不分页。参数:name (str): Elasticsearch索引的名称。body (dict): 搜索的body。返回:list: 搜索响应的结果列表。该函数使用Elasticsearch的search API执行搜索操作,并使用scroll API来获取所有的结果。"""if name == '':raise TypeError("index cannot be empty")if body == {}:raise KeyError("body cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")response = self.es.search(index=name, scroll='1m', body=body)# 获取 scroll_id 和初始结果scroll_id = response['_scroll_id']hits = response['hits']['hits']# 处理初始结果all_hits = hits# 循环获取剩余结果while len(hits) > 0:response = self.es.scroll(scroll_id=scroll_id, scroll='1m')hits = response['hits']['hits']all_hits.extend(hits)# 清除 scrollself.es.clear_scroll(scroll_id=scroll_id)return all_hitsdef find_and_create_index(self, yaml_key, mapping) -> str:"""通过name从配置文件中获取对应的index_name,然后判断index是否存在,不存在则创建,最后返回index_name。参数:name (str): 在配置文件中配置的index name。mapping (dict): index的mapping。返回:str: 创建的index_name。"""if yaml_key == '':raise TypeError("yaml_key cannot be empty")index_name = ElasticUtil.elastic_config.get_value(yaml_key)if not self.es.indices.exists(index=index_name) and mapping is not None:self.es.indices.create(index=index_name, body=mapping)return index_namedef find_by_sql(self, sql, fetch_size=100) -> list:"""执行Elasticsearch的SQL查询。参数:sql (str): Elasticsearch的SQL语句。fetch_size (int): 一次从Elasticsearch获取的文档数量。返回:list: JSON字符串列表,每个字符串表示一个文档。该函数执行Elasticsearch的SQL查询,并将结果以JSON字符串列表的形式返回。"""return self.es.sql.query(format="json", query=sql, fetch_size=fetch_size)def update(self, name, data, id) -> dict:"""更新Elasticsearch中的文档。参数:name (str): Elasticsearch索引的名称。data (dict): 包含更新字段及其新值的数据字典。id (str): 要更新的文档的ID。返回:dict: 更新操作的结果。该函数在指定的Elasticsearch索引中通过文档ID更新文档。返回更新操作的结果。"""return self.es.update(index=name, id=id, body=data)
elastic_util 做成单例,并在 elastic_util 中提供了多种操作模式:
- 插入类:insert、batch_insert
- 删除类:delete_by_body、delete_by_id
- 查询类:find_by_id、find_by_body、find_by_body_nopaging、find_by_sql
- 更新类:update
- 辅助类:refresh_index、find_and_create_index
这些函数都有对应的单元测试用例, 在路径 ${project_path}/test/unit-test/elastic_util 下。以 insert 函数的单元测试用例为例:
import unittestimport os
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
import sys
sys.path.append(os.path.join(project_dir, 'utils'))from elastic_util import ElasticUtil # type: ignore class TestElasticUtilInsert(unittest.TestCase):def setUp(self):"""初始化ElasticUtil实例。此函数在每次测试之前运行,用于初始化测试中使用的ElasticUtil实例。"""self.elastic = ElasticUtil()self.index_name = 'test_index'self.data = {'key': 'value'}self.elastic.es.indices.create(index=self.index_name)def tearDown(self):self.elastic.es.options(ignore_status=404).indices.delete(index=self.index_name)def test_insert_success(self):"""测试向Elasticsearch索引中插入文档是否成功。该测试函数向Elasticsearch索引中插入一个文档,然后使用get API来检查文档是否插入成功。"""_,insert_id = self.elastic.insert(self.index_name, self.data)# 检查数据是否插入成功result = self.elastic.es.get(index=self.index_name, id=insert_id)self.assertEqual(result['_source'], self.data)def test_insert_failure_index_not_exists(self):"""测试向不存在的Elasticsearch索引中插入文档是否失败。该测试函数尝试向不存在的Elasticsearch索引中插入一个文档,并检查是否抛出异常。"""name = 'non_existent_index'self.elastic.es.options(ignore_status=404).indices.delete(index=name) # 删除索引with self.assertRaises(Exception):self.elastic.insert(name, self.data)def test_insert_failure_elasticsearch_connection_error(self):"""测试Elasticsearch连接出错时插入文档是否失败。该测试函数模拟Elasticsearch连接错误,然后尝试向Elasticsearch索引中插入一个文档,并检查是否抛出异常。"""original_es = self.elastic.esself.elastic.es = Nonewith self.assertRaises(Exception):self.elastic.insert(self.index_name, self.data)self.elastic.es = original_esdef test_insert_failure_data_format_error(self):"""测试插入格式错误的数据时是否抛出异常。该测试函数尝试插入一个无效格式的数据到Elasticsearch索引中,并检查是否抛出异常。"""data = 'invalid data'with self.assertRaises(Exception):self.elastic.insert(self.index_name, data)if __name__ == '__main__':unittest.main()
为了保证单元测试的质量,每个单元测试中都应包含 setUp 和 tearDown 两个函数(部分代码中因为需要预设数据因此没有 tearDown 函数),对测试数据进行销毁。
注意:这里没有使用 Mock,主要是因为这次测试的是工具类,需要将真实数据插入 es 库看到效果。Mock 只会对处理逻辑进行模拟并没有真正的将数据插入到 es 中,因此没有使用 Mock 来测试。
至此,向量数据库(Elasticsearch)搭建完成。
项目地址:https://github.com/yzh0623/brain-mix
(未完待续…)