当前位置: 首页 > news >正文

大模型系列11-ray

大模型系列11-ray

  • Plasma
    • PlasmaStore
      • 启动监听
      • 处理请求 ProcessMessage
        • PlasmaCreateRequest请求
        • PlasmaCreateRetryRequest请求
        • PlasmaGetRequest请求
        • PlasmaReleaseRequest
        • PlasmaDeleteRequest
        • PlasmaSealRequest
      • ObjectLifecycleManager
        • GetObject
        • SealObject
      • ObjectStoreRunner
      • PlasmaStoreRunner
      • ObjectStatsCollector
        • OnObjectRefIncreased
      • LocalObject
      • ObjectManager
    • PlasmaClient
    • Plasma编译依赖项
      • plasma_store_server_lib
        • 代码文件
        • 依赖库
      • PlasmaClient
        • 代码文件
        • 依赖库
      • ray_common
        • 依赖库
      • plasma_fbs [done]
        • 代码文件
        • 无依赖库
      • 其它
  • Ray编译运行demo
    • 编译
    • 安装
    • 如何查看当前目录的ray包
      • bazel build基础教程
    • 生成wheel并测试
      • 编译问题
        • 安装 bazel
        • 安装bazel 6.5.0
        • 报错 pack不是class template
        • 报错:error: unused variable 'tag_key' [-Werror=unused-variable]
        • 报错:main/test-filesystem.cc:2:10: fatal error: filesystem: No such file or directory #include <filesystem>
        • 报错:sed: cannot rename python/ray/serve/generated/sedEFwvtV: Permission denied
  • Ray
    • 设计哲学

Plasma

PlasmaStore

启动监听

acceptor启动异步接收新的连接请求,

void PlasmaStore::DoAccept() {acceptor_.async_accept(socket_,boost::bind(&PlasmaStore::ConnectClient, this, boost::asio::placeholders::error));
}

在 Boost.Asio 中,异步操作(如 async_accept)不会自动重新启动,这是由其设计理念决定的。Boost.Asio 的异步操作通常是一次性的,即使一个操作完成了,如果你希望继续处理更多类似的操作,你必须显式地再次调用相应的函数。因此,需要在ConnectClient中再次 DoAccept。

void PlasmaStore::ConnectClient(const boost::system::error_code &error) {if (!error) {// Accept a new local client and dispatch it to the node manager.auto new_connection = Client::Create(// NOLINTNEXTLINE : handler must be of boost::AcceptHandler type.boost::bind(&PlasmaStore::ProcessMessage, this, ph::_1, ph::_2, ph::_3),std::move(socket_));}if (error != boost::asio::error::operation_aborted) {// We're ready to accept another client.DoAccept();}
}

下面是一个使用boost::asio构建tcp异步服务器监听的例子

#include <boost/asio.hpp>
#include <iostream>using boost::asio::ip::tcp;void handle_accept(tcp::socket socket, const boost::system::error_code& error) {if (!error) {std::string message = "Hello from server!\n";boost::asio::write(socket, boost::asio::buffer(message));std::cout << "Sent message to client." << std::endl;} else {std::cerr << "Error: " << error.message() << std::endl;}
}int main() {try {boost::asio::io_context io_context;tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), 12345));std::cout << "Server is listening on port 12345..." << std::endl;while (true) {tcp::socket socket(io_context);acceptor.async_accept(socket, [&socket](const boost::system::error_code& error) {handle_accept(std::move(socket), error);});io_context.run();}} catch (std::exception& e) {std::cerr << "Error: " << e.what() << std::endl;}return 0;
}

处理请求 ProcessMessage

Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,fb::MessageType type,const std::vector<uint8_t> &message)
PlasmaCreateRequest请求

从请求的 message 字符串中解析成 Request类型,获取相应的请求的属性,如object_id,object_size等

    const auto &object_id = GetCreateRequestObjectId(message);const auto &request = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(input);const size_t object_size = request->data_size() + request->metadata_size();

如果是 try_immediately == true,则原地执行 TryRequestImmediately,并将结果通过 client->SendFd 返回;否则,将其加入到 create_request_queue_ 队列中,并立刻尝试原地处理,如果本次请求处理有错误,则会触发retry with timeout,该重试请求之后的请求会因为该请求在等待retry中而被自动排队到后面(某个请求被retry,则create_timer_为true),对于排队的请求,会向客户端发送SendUnfinishedCreateReply的回复。

PlasmaCreateRetryRequest请求

客户端收到Unfinished的回复后,可以继续重试,当服务器端收到重试请求后,直接从当前的回复中查找该object_id是否已经完成,如果完成则回复object的location信息,如果未完成,继续回复 SendUnfinishedCreateReply。

PlasmaGetRequest请求

ReadGetRequest 目标是将收到的input字符串解析为PlasmaGetRequest,会拿到其属性,并将请求入队到 get_request_queue_

PlasmaReleaseRequest

调用 ReleaseObject 将 object_id 从client中移除。它会检查 object_lifecycle_mgr_ 中是否有该object_id,如果没有,则直接返回false;如果有,则调用 RemoveFromClientObjectIds 将该object_id移除,这会通过 client->MarkObjectAsUnused(object_id)将object从client的object_ids中移除,同时会将 object_lifecycle_mgr_ 中的object_id的引用计数减1。

PlasmaDeleteRequest

逐个遍历收到的object_id,调用 object_lifecycle_mgr_->DeleteObject将它们移除。 object_lifecycle_mgr_ 内部管理所有的 object_id以及object entry对象。

PlasmaSealRequest

逐个遍历收到的object,调用 object_lifecycle_mgr_.SealObject(object_ids[i]) 将它们seal掉,同时通过 add_object_callback_ 来告知。另外,对pending的get请求标识sealed:get_request_queue_.MarkObjectSealed

MarkObjectSealed

  • 从 object_get_requests_ 获取指定object_id 对应的 get_requests
  • 遍历get_requests,对每个request,将其object_id对应的plasma_object填充(通过object_lifecycle_mgr_.GetObject拿到的entry来填充),调用 object_satisfied_callback_ 来通知外部。如果对于这个get_request,它的所有的object_ids都满足之后,则通过 all_objects_satisfied_callback_ 来通知该get_request。

ObjectLifecycleManager

GetObject

从 object_store_ 获取对象 LocalObject

SealObject

向 object_store_->SealObject(object_id) 发起Seal对象,如果成功,则返回对应的entry。有个状态收集器来通过 stats_collector_->OnObjectSealed(*entry) 来感知各种对象的变化

ObjectStoreRunner

隶属于object_manager.h文件,它创建一个 store_thread_ 线程,启动PlasmaStoreRunner,注意完全单线程逻辑

PlasmaStoreRunner

检查 系统内存, plasma_directory, fallback_directory等信息,然后启动PlasmaStore的run loop,等待外部客户端连接。

构造函数和参数

class PlasmaStoreRunner {public:PlasmaStoreRunner(std::string socket_name,int64_t system_memory,bool hugepages_enabled,std::string plasma_directory,std::string fallback_directory);void Start(ray::SpillObjectsCallback spill_objects_callback,std::function<void()> object_store_full_callback,ray::AddObjectCallback add_object_callback,ray::DeleteObjectCallback delete_object_callback);

主要启动调用

store_.reset(new PlasmaStore(main_service_,*allocator_,*fs_monitor_,socket_name_,RayConfig::instance().object_store_full_delay_ms(),spill_objects_callback,object_store_full_callback,add_object_callback,delete_object_callback));
store_->Start();                             
main_service_.run();                             

ObjectStatsCollector

ObjectStatsCollector 是分布式系统中一个 轻量级但非常重要的组件,专注于对象生命周期的跟踪,空间的使用情况等。

OnObjectRefIncreased

Ray 的设计在引用计数变化的关键节点(0→1 和 1→2)进行了状态更新,主要是为了在性能与资源使用之间取得平衡:

  • 引用计数从 0 到 1:对象引用计数为 0 时,通常意味着它是空闲的,可能被系统标记为可逐出(evictable)或不活跃。当引用计数变为 1 时,说明它开始被使用。标志对象从空闲变为活跃。
    增加正在使用的统计,减少可逐出的统计。
    如果对象封存且由 Worker 创建,标记为可溢出对象,以支持后续的溢出策略(例如,将这些对象移到磁盘以节省内存)
    这是对象从“潜在可清理状态”变为“不可清理状态”的分界点,系统需要在此刻更新相关状态。

  • 引用计数从 1 到 2
    标志对象的重要性提升,进入多任务共享状态。
    溢出操作会涉及磁盘 I/O,代价较高。引用计数增加意味着对象可能正在高频使用,因此通过及时更新“可溢出对象”统计,系统可以避免在不适当的时机溢出对象。

  if (obj.GetRefCount() == 1) {num_objects_in_use_++;num_bytes_in_use_ += kObjectSize;if (kSource == plasma::flatbuf::ObjectSource::CreatedByWorker && kSealed) {num_objects_spillable_++;num_bytes_spillable_ += kObjectSize;}if (kSealed) {num_objects_evictable_--;num_bytes_evictable_ -= kObjectSize;}}// object ref count bump from 1 to 2if (obj.GetRefCount() == 2 &&kSource == plasma::flatbuf::ObjectSource::CreatedByWorker && kSealed) {num_objects_spillable_--;num_bytes_spillable_ -= kObjectSize;}

LocalObject

唯一标识一个对象,可定位该对象,以及

  • object_info: 对象的信息如数据长度,owner地址信息
  • allocation: 对象所隶属的mmaped chunk的起始位置,包括fd,map_size, offset等信息
  • ref_count: 这个对象的引用计数
  • state: 对象状态,open or sealed
  • source: object来源,用于debug
  • create_time & construct_duration: 创建时间戳以及耗时

ObjectManager

ObjectManager 是 Ray 分布式系统中用于管理 对象的存储、传输和生命周期 的核心组件。它在节点之间协调对象的流动,并与对象存储(如 Plasma Store)交互,确保对象的创建、删除、拉取和推送操作得以高效进行。

  • 支持将对象从一个节点推送(push)到另一个节点(Push 和 PushLocalObject 方法)。
    当节点请求对象时,从其他节点或磁盘拉取(pull)所需的对象(Pull 和 SendPullRequest 方法)。
  • 支持溢出和逐出策略,通过溢出和逐出策略管理内存使用,确保内存资源不会耗尽。通过与 Plasma Store 的交互实现对象溢出(spillable)到磁盘,或者逐出(evictable)以释放内存资源。提供 IsPlasmaObjectSpillable 方法检查对象是否可以溢出。
  • 使用异步 I/O 和多线程(如 rpc_threads_ 和 buffer_pool_),保证高吞吐量和低延迟。通过 buffer_pool_ 管理对象的内存分配和释放,动态调整拉取和推送操作的频率以适应节点的内存状况(UpdatePullsBasedOnAvailableMemory 方法)。对大对象进行分片,支持多线程传输以提升效率(PushObjectInternal 和 SendObjectChunk 方法)。在接收端,处理分片的写入和拼接(ReceiveObjectChunk 方法)。
  • 当对象被创建时,调用 HandleObjectAdded,将对象信息注册到本地记录(如 local_objects_),并通知其他模块。

对象的推送(Push)
如果节点上的某个对象需要传输到其他节点,调用 Push 方法:检查对象是本地的(PushLocalObject)还是在磁盘上(PushFromFilesystem)。对对象进行分片,并通过 RPC 将分片逐一发送(PushObjectInternal 和 SendObjectChunk)。
对象的拉取(Pull)
如果本节点需要一个远程对象,调用 Pull 方法:向目标节点发送拉取请求(SendPullRequest)。
监听对象的位置信息更新,并根据最新信息决定拉取策略。

ObjectManager 和 PlasmaStore的核心区别
ObjectManager 是全局管理者,负责在分布式系统中调度和协调对象的使用。PlasmaStore 是本地存储引擎,专注于高效地管理单节点的共享内存对象。任务请求一个对象,ObjectManager 检查对象是否在本地 PlasmaStore 中。如果存在,直接通过共享内存访问;如果么有,则ObjectManager 向其他节点发送拉取请求,远程节点通过 PlasmaStore 提供数据。
在这里插入图片描述

PlasmaClient

Plasma编译依赖项

plasma_store_server_lib

代码文件
    srcs = ["src/ray/object_manager/plasma/create_request_queue.cc","src/ray/object_manager/plasma/dlmalloc.cc","src/ray/object_manager/plasma/eviction_policy.cc","src/ray/object_manager/plasma/get_request_queue.cc","src/ray/object_manager/plasma/object_lifecycle_manager.cc","src/ray/object_manager/plasma/object_store.cc","src/ray/object_manager/plasma/plasma_allocator.cc","src/ray/object_manager/plasma/stats_collector.cc","src/ray/object_manager/plasma/store.cc","src/ray/object_manager/plasma/store_runner.cc",],hdrs = ["src/ray/object_manager/common.h","src/ray/object_manager/plasma/allocator.h","src/ray/object_manager/plasma/create_request_queue.h","src/ray/object_manager/plasma/eviction_policy.h","src/ray/object_manager/plasma/get_request_queue.h","src/ray/object_manager/plasma/object_lifecycle_manager.h","src/ray/object_manager/plasma/object_store.h","src/ray/object_manager/plasma/plasma_allocator.h","src/ray/object_manager/plasma/stats_collector.h","src/ray/object_manager/plasma/store.h","src/ray/object_manager/plasma/store_runner.h","src/ray/thirdparty/dlmalloc.c",],
依赖库
        ":plasma_client",":stats_lib","//src/ray/common:network",

PlasmaClient

代码文件
srcs = ["src/ray/object_manager/common.cc","src/ray/object_manager/plasma/client.cc","src/ray/object_manager/plasma/connection.cc","src/ray/object_manager/plasma/malloc.cc","src/ray/object_manager/plasma/plasma.cc","src/ray/object_manager/plasma/protocol.cc","src/ray/object_manager/plasma/shared_memory.cc",] + select({"@platforms//os:windows": [],"//conditions:default": ["src/ray/object_manager/plasma/fling.cc",],}),hdrs = ["src/ray/object_manager/common.h","src/ray/object_manager/plasma/client.h","src/ray/object_manager/plasma/common.h","src/ray/object_manager/plasma/compat.h","src/ray/object_manager/plasma/connection.h","src/ray/object_manager/plasma/malloc.h","src/ray/object_manager/plasma/plasma.h","src/ray/object_manager/plasma/plasma_generated.h","src/ray/object_manager/plasma/protocol.h","src/ray/object_manager/plasma/shared_memory.h",] + select({"@platforms//os:windows": [],"//conditions:default": ["src/ray/object_manager/plasma/fling.h",],}),
依赖库
        ":plasma_fbs",":ray_common","//src/ray/protobuf:common_cc_proto","//src/ray/util","@msgpack",

ray_common

依赖库
ray_cc_library(name = "ray_common",deps = [":stats_metric","//src/ray/common:asio","//src/ray/common:constants","//src/ray/common:event_stats","//src/ray/common:file_system_monitor","//src/ray/common:grpc_util","//src/ray/common:id","//src/ray/common:memory_monitor","//src/ray/common:network","//src/ray/common:ray_config","//src/ray/common:ray_syncer","//src/ray/common:status","//src/ray/common:task_common","//src/ray/common:test_util","//src/ray/protobuf:gcs_cc_proto","@com_google_googletest//:gtest",],
)

plasma_fbs [done]

代码文件
src/ray/object_manager/plasma/plasma.fbs
无依赖库

其它

        "//src/ray/protobuf:common_cc_proto","//src/ray/util",

Ray编译运行demo

编译

经过6个多小时的奋战,./build.sh终于编译成功,其实回过头来只有两件事情:

  • 安装bazel6.5
  • 安装gcc-9

官网参考:https://docs.ray.io/en/latest/ray-contribute/development.html

下载源代码,然后在根目录下执行 ./build.sh

(pytorch_gpu) ➜  /mnt/c/workspace/llm/ray ./build.sh
  • 报错说,找不到’/root/bin/bazel’,于是参照下面的编译问题来安装 bazel,要求安装bazel6.5版本。
  • 进一步执行./build.sh报错说 pack 不是class template,需要升级到c++17,升级g++到9
  • 切换思路,决策使用小模块编译方案,只编译特定模块,找到根目录的BUILD.bazel文件/mnt/c/workspace/llm/ray/cpp/BUILD.bazel,执行编译grpc lib
    • 编译grpc:在ray源代码的根目录下执行,bazel build //:grpc_common_lib
      在这里插入图片描述
      报错:error: unused variable ‘tag_key’ [-Werror=unused-variable]
  • 重新编译,启用copt参数: bazel build //:grpc_common_lib --copt=-Wno-error=unused-variable。报错:main/test-filesystem.cc:2:10: fatal error: filesystem: No such file or directory #include <filesystem>。 同样需要安装gcc-9
  • 安装g+±9
sudo add-apt-repository ppa:ubuntu-toolchain-r/test
apt install gcc-9 ## 自动安装gcc-9和g++-9
  • 替换默认的g++ 7.5.0
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 50
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-9 50然后输入g++ --version 可以看到变成9.4.0版本其它命令
update-alternatives --query g++
update-alternatives --list gcc
sudo update-alternatives --config gccbazel info cxx # 查看当前bazel是否使用的g++-9

运行单个目录,更容易缩小问题范围
bazel build //cpp:ray_cpp_pkg --verbose_failures

./build.sh还失败,程序被莫名杀死,dmesg查看内存不足导致,调整机器提供给WSL的内存,直接搜索 wsl 即可进行设置内存,内存设置完毕最终编译成功
在这里插入图片描述

安装

安装nvm

curl https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash  
nvm install 14
nvm use 14

编译dashboard

cd python/ray/dashboard/client
npm ci
npm run build
cd ../..

安装ray

# Install Ray.
cd python/
# Install required dependencies.
pip install -r requirements.txt
# You may need to set the following two env vars if you have a macOS ARM64(M1) platform.
# See https://github.com/grpc/grpc/issues/25082 for more details.
# export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1
# export GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1
pip install -e . --verbose  # Add --user if you see a permission denied error.

命令 pip install -e . --verbose 是一个用于安装当前目录下 Python 包的命令

  • -e (可编辑模式):以“可编辑模式”安装当前目录下的 Python 包。这意味着,包的源代码不会被复制到 Python 的 site-packages 目录,而是创建一个指向当前目录的符号链接。
    适用于开发环境,当你对代码进行修改时,这些修改会立即反映在安装的包中。
  • . : 表示安装当前目录下的包,当前目录应该包含 setup.py 文件或 pyproject.toml 文件,以指定包的构建和安装配置

在这里插入图片描述
验证ray
python3 -c "import ray"

如何查看当前目录的ray包

如果你已经以开发模式安装了当前目录的包,可以用 pip list 查看安装的包:
pip list | grep ray,还有一个是 pip show ray
在这里插入图片描述

如果当前目录包含 setup.py,可以执行以下命令来查看定义的包名称:
python setup.py --name

site-packages中有指向ray包的软链接
cat /root/anaconda3/envs/pytorch_gpu/lib/python3.11/site-packages/ray.egg-link
在这里插入图片描述

bazel build基础教程

bazel build //main:hello-world

  • 在当前目录下从项目根目录的 main 子目录开始查找。:hello-world表示 main 目录下的一个构建目标,在main目录下有对应的 BUILD 文件定义。

  • bazel build:
    命令告诉 Bazel 构建指定目标。
    构建结果通常存放在 bazel-out/ 目录下。

教程链接:https://bazel.build/reference/be/c-cpp?hl=zh-cn

生成wheel并测试

组装轮子

cd ~/ray/python
python3 setup.py bdist_wheel

那么最终的轮子就在~/ray/python/dist里了

安装轮子:

cd dist
pip3 install *.whl

测试模块是否工作正常:

cd ~/ray
python3 -m pytest -v python/ray/tests/test_mini.py

编译问题

执行编译期间遇到了很多问题,一一解决

安装 bazel

问题1: FileNotFoundError: [Errno 2] No such file or directory: ‘/root/bin/bazel’
查看bazel的安装文档: https://bazel.build/install/ubuntu?hl=zh-cn
安装bazel

sudo apt install apt-transport-https curl gnupg -y
curl -fsSL https://bazel.build/bazel-release.pub.gpg | gpg --dearmor >bazel-archive-keyring.gpg本句的替换: wget https://bazel.build/bazel-release.pub.gpg -O bazel-release.pub.gpg  gpg --dearmor -o bazel-archive-keyring.gpg bazel-release.pub.gpg
sudo mv bazel-archive-keyring.gpg /usr/share/keyrings
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/bazel-archive-keyring.gpg] https://storage.googleapis.com/bazel-apt stable jdk1.8" | sudo tee /etc/apt/sources.list.d/bazel.listsudo apt update && sudo apt install bazel -y
安装bazel 6.5.0

ERROR: The project you’re trying to build requires Bazel 6.5.0 (specified in /mnt/c/workspace/llm/ray/.bazelversion), but it wasn’t found in /usr/bin.

You can install the required Bazel version via apt:

  sudo apt update && sudo apt install bazel-6.5.0
报错 pack不是class template

需要使用c++17编译

ray_cpp_lib/ray/api/msgpack_adaptor.h:27:8: error: 'pack' is not a class templatestruct pack<std::any> {
报错:error: unused variable ‘tag_key’ [-Werror=unused-variable]

重新编译,启用copt参数: bazel build //:grpc_common_lib --copt=-Wno-error=unused-variable

报错:main/test-filesystem.cc:2:10: fatal error: filesystem: No such file or directory #include

安装gcc 9

报错:sed: cannot rename python/ray/serve/generated/sedEFwvtV: Permission denied

sudo chmod -R u+w python/ray/serve/generated/

Ray

设计哲学

不需要将o1以及o2的数据返回给Driver层,只提前设计好框架,数据在workers之间自动按需流转
在这里插入图片描述


http://www.mrgr.cn/news/77956.html

相关文章:

  • Mysql--架构篇--体系结构(连接层,SQL层,存储引擎层,文件存储层)
  • 操作系统之文件的逻辑结构
  • Web无障碍
  • 有关Redis的相关概述
  • Ubuntu 下载安装 elasticsearch7.17.9
  • 第四、五章补充:线代本质合集(B站:小崔说数)
  • 疑难Tips:NextCloud域名访问登录时卡住,显示违反内容安全策略
  • k8s网络服务
  • C#设计模式——抽象工厂模式(重点)
  • Vue3响应式原理
  • Springboot项目搭建-Maven打包编译
  • 演示如何使用 `nn.CrossEntropyLoss` 来计算交叉熵损失,计算损失值的演示代码,和讲解 ,CrossEntropyLoss 损失数值等于零的原因
  • hugo文章支持数学公式
  • oracle 12c查看执行过的sql及当前正在执行的sql
  • 【计算机网络】多路转接之select
  • 新华三嵌入式面试题及参考答案
  • 海信Java后端开发面试题及参考答案
  • 第三十九篇 ShuffleNet V1、V2模型解析
  • Optional类
  • Leetcode 51 N Queens
  • 高频面试题(含笔试高频算法整理)基本总结回顾16
  • pinia的使用
  • 【c++篇】掌握动态内存的奥妙
  • Modern Effective C++ item 15:尽可能的使用constexpr
  • 活着就好20241125
  • 禁用达梦DEM的agent