当前位置: 首页 > news >正文

【从零开始】5. 向量数据库选型与搭建

书接上回…既然 Python 环境搭好了那么马上就可以开展工作了,首先需要回顾一下我们的最终实现效果是什么。按照预定计划是要做一个基于微信小程序的生成式人工智能机器人。

好,以此为目标先看看手头上有什么资源。

呃…目前手头上只有一张 1060 显卡,显卡虽然有 6G 显存但是在大模型面前还是不太够看。除了显卡外就是 16G 内存和一块 i5 的 CPU。相比于这些,磁盘空间就比较富裕一共有 4 T(256G SSD + 4T HDD)。基于这种配置下想通过大数据做模型微调是不太实际的了(主要是算力不够)。要达到较为理想的效果,剩下的就

只能“手搓”一个 RAG(检索增强生成)应用这个方法了(之所以“手搓”也是为了减少不必要的功能,减少资源消耗)。

既然考虑做 RAG 应用,那么向量数据库就是必不可少的了。现在可用资源吃紧向量数据库的选型也要考虑多方因素。业界推荐的 Milvus 只能存储向量数据而不能存储原文,因此使用 Milvus 时还需搭配其他存储库进行映射,这就不在考虑范围内了。当然了,某些特殊的向量机是可以将词向量或句子向量重新转换成文本的,但是一般不建议这样做。这里面涉及到低维度与高维度的转换开销问题,还不如直接通过字段关联映射来得快速直接。

那有没有一步到位的解决方案呢?

答案是肯定的,就目前所知支持向量的存储库就有 Elasticsearch、Lucene、Cassandra 、MongoDB、Clickhouse、PostgreSQL等。由于本次 RAG 数据主要来自文本,因此最终选择 Elasticsearch(以下简称“es”) 作为向量存储库。(关于 es 的安装可以参考另一片文章《【Docker】Elasticsearch 8.12 安装与搭建》这里就不过多叙述了)。

接下来将 es 的操作封装成 Python 工具类中(顺便将这个 RAG 项目也建起来吧)。项目结构如下图:

brain-mix
|-- resources
|   `-- config
|       `-- elastic_cnf.yml
|-- test
|   `-- unit-test
|       `-- elastic_util
|           |-- batch_insert_test.py
|           |-- delete_by_body_test.py
|           |-- delete_by_id_test.py
|           |-- find_and_create_index_test.py
|           |-- find_by_body_nopaging_test.py
|           |-- find_by_body_test.py
|           |-- find_by_id_test.py
|           |-- find_by_sql_test.py
|           |-- insert_test.py
|           `-- refresh_index_test.py
`-- utils|-- elastic_util.py`-- yaml_util.py

其中 resources 文件夹中将存放项目的配置文件,数据源文件(jsonl、csv 等)。test 文件夹将存放各种测试文件,譬如压力测试、单元测试等。而 utils 文件夹中将存放工具类,其中 elastic_util.py 就是本次实现的 es 工具类。为了方便配置信息的读取也封装了一个 yaml 文件读取工具类,下面就先看看 yaml_util.py 的内容:

import yamlclass YamlConfig:_instance = Nonedef __init__(self, path):"""初始化 YamlConfig 实例.此构造函数设置YAML文件的路径并加载其内容在`config`属性中。参数:path (str): YAML配置文档路径"""self.path = pathself.config = self.load_config()def __new__(cls, path):"""一个静态方法,用于实例化 YamlConfig 的单例对象.由于 YamlConfig 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建.参数:path (str): YAML配置文档路径.返回:YamlConfig: YamlConfig 的单例对象."""if cls._instance is None:cls._instance = super().__new__(cls)cls._instance.path = pathcls._instance.config = cls._instance.load_config()return cls._instancedef load_config(self):"""读取YAML配置文档的内容。读取并解析YAML配置文档,返回解析后的内容。返回:dict: 解析后的YAML配置文档内容。"""with open(self.path, 'r', encoding='utf-8') as file:return yaml.safe_load(file)def get_value(self, key):"""通过key获取YAML配置文档中的值。参数:key (str): 键名,可能包含多个部分,例如a.b.c。返回:object: 通过key获取的值,可能是None。"""key_parts = key.split('.')value = self.configfor part in key_parts:value = value.get(part)if value is None:breakreturn value

这里的设计思路是按需加载,加载的时候会将配置文件内容加载到内存中,然后需要时直接从内存中获取,这样就不用每次都读取 yml 文件了。有了这个 yaml 文件读取工具后,在 elastic_util.py 中读取配置就变得简单起来了,如下图:

from yaml_util import YamlConfig 
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulkimport os
project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))class ElasticUtil:instance = Noneinit_flag = False# 读取 elasticsearch 配置elastic_config = YamlConfig(os.path.join(project_dir, 'resources', 'config', 'elastic_cnf.yml'))def __init__(self):"""初始化 ElasticUtil 实例。此构造函数检查类的初始化标志。如果尚未初始化,则调用私有方法`__elastic_init_model` 来初始化 Elasticsearch 客户端,并将初始化标志设置为 True。"""if not ElasticUtil.init_flag:self.es = Noneself.__elastic_init_model()ElasticUtil.init_flag = Truedef __new__(cls, *args, **kwargs):"""一个静态方法,用于实例化 elastic_util 的单例对象.由于 elastic_util 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建."""if cls.instance is None:cls.instance = super().__new__(cls)return cls.instancedef __elastic_init_model(self) -> None:"""初始化Elasticsearch的client对象.该函数读取YAML配置文件,获取Elasticsearch的host、username、password、max_retries、max_size、timeout等配置项。然后使用这些配置项实例化Elasticsearch的client对象,并将其赋值给全局变量`es`。"""host = ElasticUtil.elastic_config.get_value('es.host')username = ElasticUtil.elastic_config.get_value('es.username')password = ElasticUtil.elastic_config.get_value('es.password')max_retries = ElasticUtil.elastic_config.get_value('es.max-retries')max_size = ElasticUtil.elastic_config.get_value('es.max-size')timeout = ElasticUtil.elastic_config.get_value('es.timeout')self.es = Elasticsearch(host,basic_auth=(username, password),max_retries=max_retries,connections_per_node=max_size,request_timeout=timeout)def insert(self, name, data) -> dict:"""插入单个文档到Elasticsearch索引中。参数:name (str): Elasticsearch索引的名称。data (dict): 要插入的文档数据。返回:dict: 插入操作的结果。该函数在指定的Elasticsearch索引中插入一个文档。如果索引不存在,则抛出异常。"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")response = self.es.index(index=name, body=data)return response["_shards"]["successful"],response['_id']def batch_insert(self, name, datas) -> int:"""批量插入文档到Elasticsearch索引中。该函数将多个文档插入到Elasticsearch索引中。参数:name (str): Elasticsearch索引的名称。datas (list): 要插入的文档列表,列表中的每个元素必须是字典类型。返回:None"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")if not all(isinstance(doc, dict) for doc in datas):raise TypeError("datas 中的所有元素必须是字典类型")actions = [{"_index": name,"_source": doc}for doc in datas]response = bulk(self.es, actions)return response[0]def refresh_index(self, name) -> None:"""重新刷新Elasticsearch索引,以便于最近插入的文档能够被搜索到。参数:name (str): Elasticsearch索引的名称。"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")self.es.indices.refresh(index=name)def delete_by_body(self, name, body) -> None:"""根据给定的搜索体从Elasticsearch索引中删除文档。参数:name (str): Elasticsearch索引的名称。body (dict): 用于查找要删除的文档的搜索体。返回:None"""if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")self.es.delete_by_query(index=name, query=body,refresh=True)def delete_by_id(self, name, id) -> dict:"""通过ID在Elasticsearch中删除文档。参数:name (str): Elasticsearch索引的名称。id (str): 要删除的文档的ID。返回:dict: 删除操作的结果。"""if id == '' or name == '':raise TypeError("params cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")return self.es.delete(index=name, id=id,refresh=True)def find_by_id(self, name, id) -> dict:"""通过ID在Elasticsearch中查找文档。参数:name (str): Elasticsearch索引的名称。id (str): 文档的ID。返回:dict: 文档的详细信息。"""if id == '' or name == '':raise TypeError("params cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")return self.es.get(index=name, id=id)def find_by_body(self, name, body) -> list:"""通过给定的body在Elasticsearch中搜索并返回结果。参数:name (str): Elasticsearch索引的名称。body (dict): 搜索的body。返回:list: 搜索响应的结果列表。该函数使用Elasticsearch的search API执行搜索操作,并将所有的结果都return回去。"""if name == '':raise TypeError("index cannot be empty")if body == {}:raise KeyError("body cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")response = self.es.search(index=name, body=body)return response['hits']['hits']def find_by_body_nopaging(self, name, body) -> list:"""通过给定的body在Elasticsearch中搜索并返回结果,且不分页。参数:name (str): Elasticsearch索引的名称。body (dict): 搜索的body。返回:list: 搜索响应的结果列表。该函数使用Elasticsearch的search API执行搜索操作,并使用scroll API来获取所有的结果。"""if name == '':raise TypeError("index cannot be empty")if body == {}:raise KeyError("body cannot be empty")if not self.es.indices.exists(index=name):raise Exception(f"Index {name} does not exist")response = self.es.search(index=name, scroll='1m', body=body)# 获取 scroll_id 和初始结果scroll_id = response['_scroll_id']hits = response['hits']['hits']# 处理初始结果all_hits = hits# 循环获取剩余结果while len(hits) > 0:response = self.es.scroll(scroll_id=scroll_id, scroll='1m')hits = response['hits']['hits']all_hits.extend(hits)# 清除 scrollself.es.clear_scroll(scroll_id=scroll_id)return all_hitsdef find_and_create_index(self, yaml_key, mapping) -> str:"""通过name从配置文件中获取对应的index_name,然后判断index是否存在,不存在则创建,最后返回index_name。参数:name (str): 在配置文件中配置的index name。mapping (dict): index的mapping。返回:str: 创建的index_name。"""if yaml_key == '':raise TypeError("yaml_key cannot be empty")index_name = ElasticUtil.elastic_config.get_value(yaml_key)if not self.es.indices.exists(index=index_name) and mapping is not None:self.es.indices.create(index=index_name, body=mapping)return index_namedef find_by_sql(self, sql, fetch_size=100) -> list:"""执行Elasticsearch的SQL查询。参数:sql (str): Elasticsearch的SQL语句。fetch_size (int): 一次从Elasticsearch获取的文档数量。返回:list: JSON字符串列表,每个字符串表示一个文档。该函数执行Elasticsearch的SQL查询,并将结果以JSON字符串列表的形式返回。"""return self.es.sql.query(format="json", query=sql, fetch_size=fetch_size)def update(self, name, data, id) -> dict:"""更新Elasticsearch中的文档。参数:name (str): Elasticsearch索引的名称。data (dict): 包含更新字段及其新值的数据字典。id (str): 要更新的文档的ID。返回:dict: 更新操作的结果。该函数在指定的Elasticsearch索引中通过文档ID更新文档。返回更新操作的结果。"""return self.es.update(index=name, id=id, body=data)

elastic_util 做成单例,并在 elastic_util 中提供了多种操作模式:

  • 插入类:insert、batch_insert
  • 删除类:delete_by_body、delete_by_id
  • 查询类:find_by_id、find_by_body、find_by_body_nopaging、find_by_sql
  • 更新类:update
  • 辅助类:refresh_index、find_and_create_index

这些函数都有对应的单元测试用例, 在路径 ${project_path}/test/unit-test/elastic_util 下。以 insert 函数的单元测试用例为例:

import unittestimport os
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
import sys
sys.path.append(os.path.join(project_dir, 'utils'))from elastic_util import ElasticUtil # type: ignore class TestElasticUtilInsert(unittest.TestCase):def setUp(self):"""初始化ElasticUtil实例。此函数在每次测试之前运行,用于初始化测试中使用的ElasticUtil实例。"""self.elastic = ElasticUtil()self.index_name = 'test_index'self.data = {'key': 'value'}self.elastic.es.indices.create(index=self.index_name)def tearDown(self):self.elastic.es.options(ignore_status=404).indices.delete(index=self.index_name)def test_insert_success(self):"""测试向Elasticsearch索引中插入文档是否成功。该测试函数向Elasticsearch索引中插入一个文档,然后使用get API来检查文档是否插入成功。"""_,insert_id = self.elastic.insert(self.index_name, self.data)# 检查数据是否插入成功result = self.elastic.es.get(index=self.index_name, id=insert_id)self.assertEqual(result['_source'], self.data)def test_insert_failure_index_not_exists(self):"""测试向不存在的Elasticsearch索引中插入文档是否失败。该测试函数尝试向不存在的Elasticsearch索引中插入一个文档,并检查是否抛出异常。"""name = 'non_existent_index'self.elastic.es.options(ignore_status=404).indices.delete(index=name)  # 删除索引with self.assertRaises(Exception):self.elastic.insert(name, self.data)def test_insert_failure_elasticsearch_connection_error(self):"""测试Elasticsearch连接出错时插入文档是否失败。该测试函数模拟Elasticsearch连接错误,然后尝试向Elasticsearch索引中插入一个文档,并检查是否抛出异常。"""original_es = self.elastic.esself.elastic.es = Nonewith self.assertRaises(Exception):self.elastic.insert(self.index_name, self.data)self.elastic.es = original_esdef test_insert_failure_data_format_error(self):"""测试插入格式错误的数据时是否抛出异常。该测试函数尝试插入一个无效格式的数据到Elasticsearch索引中,并检查是否抛出异常。"""data = 'invalid data'with self.assertRaises(Exception):self.elastic.insert(self.index_name, data)if __name__ == '__main__':unittest.main()

为了保证单元测试的质量,每个单元测试中都应包含 setUp 和 tearDown 两个函数(部分代码中因为需要预设数据因此没有 tearDown 函数),对测试数据进行销毁。

注意:这里没有使用 Mock,主要是因为这次测试的是工具类,需要将真实数据插入 es 库看到效果。Mock 只会对处理逻辑进行模拟并没有真正的将数据插入到 es 中,因此没有使用 Mock 来测试。

至此,向量数据库(Elasticsearch)搭建完成。

项目地址:https://github.com/yzh0623/brain-mix

(未完待续…)


http://www.mrgr.cn/news/68382.html

相关文章:

  • 代码随想录 | Day38 | 动态规划 :01背包应用 目标和一和零
  • SpringBoot框架在资产管理中的应用
  • 【数据集】【YOLO】【目标检测】道路结冰数据集 1527 张,YOLO目标检测实战训练教程!
  • 物联网核心安全系列——物联网安全需求
  • sql server 文件和文件组介绍
  • 高校实验室安全巡检系统设计与实现(源码+定制+开发)高校实验室巡检系统、实验室安全管理平台、实验室安全监控系统、智能实验室巡查系统、高校实验室风险管理
  • 在python中,什么是库?
  • 微服务架构面试内容整理-Archaius
  • 丹摩征文活动|FLUX.1图像生成模型:AI工程师的创新实践
  • 157页全面介绍票据业务
  • yocto下编译perf失败的解决方法
  • wireshark工具使用
  • AIDOVECL数据集:包含超过15000张AI生成的车辆图像数据集,目的解决旨在解决眼水平分类和定位问题。
  • VSCode 多工程联合调试
  • Java 8 Optional 详解
  • 简化招标流程:电子招标软件的关键作用
  • vue搭建项目之后的步骤操作
  • 科研——统计 Markdown 字符数量的插件
  • 贝塞尔曲线的超集即对应的数学模型
  • API返回值:代码界的“快递包裹”说明
  • 旅游社交小程序ssm+论文源码调试讲解
  • 深入理解封装与接口:Java程序设计的核心思想与最佳实践
  • C#中日期和时间的处理
  • java开发程序员职业发展的一些思考
  • 【华为机试题】 [Python] 光伏场地建设规划
  • 【算法设计与分析】期末复习