Ray分布式计算框架实践
一、简介
- 背景:随着数据规模的增长,传统的单机处理能力已经难以满足需求。分布式计算成为了处理大规模数据集的关键技术。
- Ray是什么:Ray是由UC Berkeley RISELab开发的一个用于构建分布式应用的开源框架,旨在简化并行和分布式系统的开发。
- 主要特性:易于使用的API、支持Python/C++/Java等多语言、动态任务调度、低延迟、支持GPU等异构资源。
二、Ray原理和能力
2.1 Task - 无状态计算单元
在Ray中,Task是一种最基本的并行执行单位,它代表了可以被远程执行的一段代码。每个Task都是无状态的,这意味着它们不依赖于任何外部状态,并且每次执行的结果只取决于输入参数。这样的设计使得Tasks非常适合用于数据处理、数值计算等场景,在这些场景下任务之间没有共享状态的需求。
ray.init() # 使用ray.remote装饰器定义一个Task
@ray.remote
def add(a, b):
return a + b
# 创建并运行Task
result = add.remote(1, 2)
print(ray.get(result))
# 输出: 3
2.2 Actor - 有状态计算单元
Actor是一种有状态的计算单元。与无状态的Task不同,Actors可以维护自己的状态,并且可以在多个方法调用之间保持这些状态信息。这使得Actors非常适合需要长期运行的服务、共享资源或者需要保持某些状态的应用场景,如在线学习、模拟器等。
import ray
# 初始化Ray
ray.init()
# 使用ray.remote装饰器定义一个Actor
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
# 创建一个Counter类型的Actor实例
counter_actor = Counter.remote()
# 调用Actor的方法
result = ray.get(counter_actor.increment.remote())
print(result) # 输出: 1
# 再次调用increment方法
result = ray.get(counter_actor.increment.remote())
print(result) # 输出: 2
2.3 Data,Train, Tune, Serve
Data
Ray Datasets是一个用于处理大规模数据集的库,它提供了类似Pandas DataFrame的API,并且可以轻松地在分布式环境中进行数据转换和计算。
示例代码:
from typing import Dict
import numpy as np
import ray
# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
length = batch["petal length (cm)"]
width = batch["petal width (cm)"]
batch["petal area (cm^2)"] = length * width
return batch
transformed_ds = ds.map_batches(compute_area)
# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):
print(batch)
# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")
Train
Ray Train 是 Ray 的一部分,旨在简化分布式训练过程。它可以与多种流行的深度学习框架(如TensorFlow, PyTorch等)集成。
示例代码:
详见Ray官方文档。
Tune
Ray Tune 提供了一套强大的超参数优化工具,支持随机搜索、网格搜索、贝叶斯优化等多种策略。
示例代码:
from ray import train, tune
def objective(config): # ①
score = config["a"] ** 2 + config["b"]
return {"score": score}
search_space = { # ②
"a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
"b": tune.choice([1, 2, 3]),
}
tuner = tune.Tuner(objective, param_space=search_space) # ③
results = tuner.fit()
print(results.get_best_result(metric="score", mode="min").config)
Serve
Ray Serve 可以快速地将机器学习模型部署为微服务,并提供高可用性和自动伸缩的能力。
示例代码:
import requests
from starlette.requests import Request
from typing import Dict
from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier
from ray import serve
# Train model.
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])
@serve.deployment
class BoostingModel:
def __init__(self, model):
self.model = model
self.label_list = iris_dataset["target_names"].tolist()
async def __call__(self, request: Request) -> Dict:
payload = (await request.json())["vector"]
print(f"Received http request with data {payload}")
prediction = self.model.predict([payload])[0]
human_name = self.label_list[prediction]
return {"result": human_name}
# Deploy model.
serve.run(BoostingModel.bind(model), route_prefix="/iris")
# Query it!
sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
response = requests.get(
"localhost:8000/iris", json=sample_request_input)
print(response.text)
2.4 调度与资源管理
Ray 提供了强大的资源管理功能,允许用户在创建任务或Actor时指定所需的硬件资源(如CPU、GPU等)。这使得Ray能够更有效地调度任务,并确保每个任务都有足够的资源来运行。下面是如何在Ray中指定硬件资源的示例。
指定CPU和GPU资源
可以在定义Actor类时,通过@ray.remote
装饰器中的num_cpus
和num_gpus
参数来指定Actor实例所需的资源。
示例代码:
import ray
# 初始化Ray
ray.init(num_cpus=4, num_gpus=2)
# 定义一个需要1个CPU和1个GPU的Actor
@ray.remote(num_cpus=1, num_gpus=1)
class GpuWorker:
def __init__(self):
import tensorflow as tf
self.gpus = tf.config.list_physical_devices('GPU')
def run(self):
if self.gpus:
print(f"Using GPU: {self.gpus[0]}")
else:
print("No GPU available")
return "GpuWorker task completed"
# 创建Actor实例
gpu_worker = GpuWorker.remote()
# 调用Actor的方法
result = ray.get(gpu_worker.run.remote())
print(result) # 输出: Using GPU: <some_gpu>, GpuWorker task completed
三、工程实践KubeRay
- 概述:KubeRay是在Kubernetes上运行Ray集群的一种方式,提供了更灵活的资源管理和更高的可扩展性。
- RayCluster:定义了如何在Kubernetes中启动一个Ray集群。
- RayJob:提供了一种便捷的方式来提交Ray作业到Kubernetes环境。
- RayService:允许用户通过HTTP接口访问服务化的Ray应用。
3.1 RayService使用
RayService 管理以下组件:
- RayCluster:在 Kubernetes 集群中管理资源。
- Ray Serve 应用程序:管理用户的应用程序。
RayService 提供的功能包括:
- Kubernetes 原生支持 Ray 集群和 Ray Serve 应用程序:通过使用 Kubernetes 配置定义 Ray 集群及其 Ray Serve 应用程序后,可以使用
kubectl
来创建集群及其应用程序。 - Ray Serve 应用程序的就地更新:有关详细信息,请参阅 RayService文档。
- Ray 集群的零停机升级:有关详细信息,请参阅 RayService文档。
- 高可用性服务:有关详细信息,请参阅 RayService 文档的高可用性部分。