searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Ray分布式计算框架实践

2024-09-24 10:07:38
151
0

Ray分布式计算框架实践

一、简介

  • 背景​:随着数据规模的增长,传统的单机处理能力已经难以满足需求。分布式计算成为了处理大规模数据集的关键技术。
  • Ray是什么​:Ray是由UC Berkeley RISELab开发的一个用于构建分布式应用的开源框架,旨在简化并行和分布式系统的开发。
  • 主要特性​:易于使用的API、支持Python/C++/Java等多语言、动态任务调度、低延迟、支持GPU等异构资源。

二、Ray原理和能力

2.1 Task - 无状态计算单元

在Ray中,Task是一种最基本的并行执行单位,它代表了可以被远程执行的一段代码。每个Task都是无状态的,这意味着它们不依赖于任何外部状态,并且每次执行的结果只取决于输入参数。这样的设计使得Tasks非常适合用于数据处理、数值计算等场景,在这些场景下任务之间没有共享状态的需求。

ray.init()  # 使用ray.remote装饰器定义一个Task 

@ray.remote 
def add(a, b): 
    return a + b  
# 创建并运行Task
result = add.remote(1, 2)
print(ray.get(result)) 
# 输出: 3

2.2 Actor - 有状态计算单元

Actor是一种有状态的计算单元。与无状态的Task不同,Actors可以维护自己的状态,并且可以在多个方法调用之间保持这些状态信息。这使得Actors非常适合需要长期运行的服务、共享资源或者需要保持某些状态的应用场景,如在线学习、模拟器等。

import ray

# 初始化Ray
ray.init()

# 使用ray.remote装饰器定义一个Actor
@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        self.count += 1
        return self.count

# 创建一个Counter类型的Actor实例
counter_actor = Counter.remote()

# 调用Actor的方法
result = ray.get(counter_actor.increment.remote())
print(result)  # 输出: 1

# 再次调用increment方法
result = ray.get(counter_actor.increment.remote())
print(result)  # 输出: 2

2.3 Data,Train, Tune, Serve

Data

Ray Datasets是一个用于处理大规模数据集的库,它提供了类似Pandas DataFrame的API,并且可以轻松地在分布式环境中进行数据转换和计算。

示例代码:

from typing import Dict
import numpy as np
import ray

# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal length (cm)"]
    width = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = length * width
    return batch

transformed_ds = ds.map_batches(compute_area)

# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):
    print(batch)

# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

Train

Ray Train 是 Ray 的一部分,旨在简化分布式训练过程。它可以与多种流行的深度学习框架(如TensorFlow, PyTorch等)集成。

示例代码:

详见Ray官方文档。

Tune

Ray Tune 提供了一套强大的超参数优化工具,支持随机搜索、网格搜索、贝叶斯优化等多种策略。

示例代码:

from ray import train, tune


def objective(config):  # ①
    score = config["a"] ** 2 + config["b"]
    return {"score": score}


search_space = {  # ②
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)  # ③

results = tuner.fit()
print(results.get_best_result(metric="score", mode="min").config)

Serve

Ray Serve 可以快速地将机器学习模型部署为微服务,并提供高可用性和自动伸缩的能力。

示例代码:

import requests
from starlette.requests import Request
from typing import Dict

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

from ray import serve


# Train model.
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])


@serve.deployment
class BoostingModel:
    def __init__(self, model):
        self.model = model
        self.label_list = iris_dataset["target_names"].tolist()

    async def __call__(self, request: Request) -> Dict:
        payload = (await request.json())["vector"]
        print(f"Received http request with data {payload}")

        prediction = self.model.predict([payload])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# Deploy model.
serve.run(BoostingModel.bind(model), route_prefix="/iris")

# Query it!
sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
response = requests.get(
    "localhost:8000/iris", json=sample_request_input)
print(response.text)

2.4 调度与资源管理

Ray 提供了强大的资源管理功能,允许用户在创建任务或Actor时指定所需的硬件资源(如CPU、GPU等)。这使得Ray能够更有效地调度任务,并确保每个任务都有足够的资源来运行。下面是如何在Ray中指定硬件资源的示例。

指定CPU和GPU资源

可以在定义Actor类时,通过@ray.remote装饰器中的num_cpusnum_gpus参数来指定Actor实例所需的资源。

示例代码:

import ray

# 初始化Ray
ray.init(num_cpus=4, num_gpus=2)

# 定义一个需要1个CPU和1个GPU的Actor
@ray.remote(num_cpus=1, num_gpus=1)
class GpuWorker:
    def __init__(self):
        import tensorflow as tf
        self.gpus = tf.config.list_physical_devices('GPU')
    
    def run(self):
        if self.gpus:
            print(f"Using GPU: {self.gpus[0]}")
        else:
            print("No GPU available")
        return "GpuWorker task completed"

# 创建Actor实例
gpu_worker = GpuWorker.remote()

# 调用Actor的方法
result = ray.get(gpu_worker.run.remote())
print(result)  # 输出: Using GPU: <some_gpu>, GpuWorker task completed

三、工程实践KubeRay

  • 概述​:KubeRay是在Kubernetes上运行Ray集群的一种方式,提供了更灵活的资源管理和更高的可扩展性。
  • RayCluster​:定义了如何在Kubernetes中启动一个Ray集群。
  • RayJob​:提供了一种便捷的方式来提交Ray作业到Kubernetes环境。
  • RayService​:允许用户通过HTTP接口访问服务化的Ray应用。

3.1 RayService使用

RayService 管理以下组件:

  • RayCluster​:在 Kubernetes 集群中管理资源。
  • Ray Serve 应用程序​:管理用户的应用程序。

RayService 提供的功能包括:

  • Kubernetes 原生支持 Ray 集群和 Ray Serve 应用程序​:通过使用 Kubernetes 配置定义 Ray 集群及其 Ray Serve 应用程序后,可以使用 kubectl 来创建集群及其应用程序。
  • Ray Serve 应用程序的就地更新​:有关详细信息,请参阅 RayService文档。
  • Ray 集群的零停机升级​:有关详细信息,请参阅 RayService文档。
  • 高可用性服务​:有关详细信息,请参阅 RayService 文档的高可用性部分。
0条评论
0 / 1000
张****鹏
1文章数
0粉丝数
张****鹏
1 文章 | 0 粉丝
张****鹏
1文章数
0粉丝数
张****鹏
1 文章 | 0 粉丝
原创

Ray分布式计算框架实践

2024-09-24 10:07:38
151
0

Ray分布式计算框架实践

一、简介

  • 背景​:随着数据规模的增长,传统的单机处理能力已经难以满足需求。分布式计算成为了处理大规模数据集的关键技术。
  • Ray是什么​:Ray是由UC Berkeley RISELab开发的一个用于构建分布式应用的开源框架,旨在简化并行和分布式系统的开发。
  • 主要特性​:易于使用的API、支持Python/C++/Java等多语言、动态任务调度、低延迟、支持GPU等异构资源。

二、Ray原理和能力

2.1 Task - 无状态计算单元

在Ray中,Task是一种最基本的并行执行单位,它代表了可以被远程执行的一段代码。每个Task都是无状态的,这意味着它们不依赖于任何外部状态,并且每次执行的结果只取决于输入参数。这样的设计使得Tasks非常适合用于数据处理、数值计算等场景,在这些场景下任务之间没有共享状态的需求。

ray.init()  # 使用ray.remote装饰器定义一个Task 

@ray.remote 
def add(a, b): 
    return a + b  
# 创建并运行Task
result = add.remote(1, 2)
print(ray.get(result)) 
# 输出: 3

2.2 Actor - 有状态计算单元

Actor是一种有状态的计算单元。与无状态的Task不同,Actors可以维护自己的状态,并且可以在多个方法调用之间保持这些状态信息。这使得Actors非常适合需要长期运行的服务、共享资源或者需要保持某些状态的应用场景,如在线学习、模拟器等。

import ray

# 初始化Ray
ray.init()

# 使用ray.remote装饰器定义一个Actor
@ray.remote
class Counter:
    def __init__(self):
        self.count = 0
    
    def increment(self):
        self.count += 1
        return self.count

# 创建一个Counter类型的Actor实例
counter_actor = Counter.remote()

# 调用Actor的方法
result = ray.get(counter_actor.increment.remote())
print(result)  # 输出: 1

# 再次调用increment方法
result = ray.get(counter_actor.increment.remote())
print(result)  # 输出: 2

2.3 Data,Train, Tune, Serve

Data

Ray Datasets是一个用于处理大规模数据集的库,它提供了类似Pandas DataFrame的API,并且可以轻松地在分布式环境中进行数据转换和计算。

示例代码:

from typing import Dict
import numpy as np
import ray

# Create datasets from on-disk files, Python objects, and cloud storage like S3.
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal length (cm)"]
    width = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = length * width
    return batch

transformed_ds = ds.map_batches(compute_area)

# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):
    print(batch)

# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

Train

Ray Train 是 Ray 的一部分,旨在简化分布式训练过程。它可以与多种流行的深度学习框架(如TensorFlow, PyTorch等)集成。

示例代码:

详见Ray官方文档。

Tune

Ray Tune 提供了一套强大的超参数优化工具,支持随机搜索、网格搜索、贝叶斯优化等多种策略。

示例代码:

from ray import train, tune


def objective(config):  # ①
    score = config["a"] ** 2 + config["b"]
    return {"score": score}


search_space = {  # ②
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)  # ③

results = tuner.fit()
print(results.get_best_result(metric="score", mode="min").config)

Serve

Ray Serve 可以快速地将机器学习模型部署为微服务,并提供高可用性和自动伸缩的能力。

示例代码:

import requests
from starlette.requests import Request
from typing import Dict

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

from ray import serve


# Train model.
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])


@serve.deployment
class BoostingModel:
    def __init__(self, model):
        self.model = model
        self.label_list = iris_dataset["target_names"].tolist()

    async def __call__(self, request: Request) -> Dict:
        payload = (await request.json())["vector"]
        print(f"Received http request with data {payload}")

        prediction = self.model.predict([payload])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# Deploy model.
serve.run(BoostingModel.bind(model), route_prefix="/iris")

# Query it!
sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
response = requests.get(
    "localhost:8000/iris", json=sample_request_input)
print(response.text)

2.4 调度与资源管理

Ray 提供了强大的资源管理功能,允许用户在创建任务或Actor时指定所需的硬件资源(如CPU、GPU等)。这使得Ray能够更有效地调度任务,并确保每个任务都有足够的资源来运行。下面是如何在Ray中指定硬件资源的示例。

指定CPU和GPU资源

可以在定义Actor类时,通过@ray.remote装饰器中的num_cpusnum_gpus参数来指定Actor实例所需的资源。

示例代码:

import ray

# 初始化Ray
ray.init(num_cpus=4, num_gpus=2)

# 定义一个需要1个CPU和1个GPU的Actor
@ray.remote(num_cpus=1, num_gpus=1)
class GpuWorker:
    def __init__(self):
        import tensorflow as tf
        self.gpus = tf.config.list_physical_devices('GPU')
    
    def run(self):
        if self.gpus:
            print(f"Using GPU: {self.gpus[0]}")
        else:
            print("No GPU available")
        return "GpuWorker task completed"

# 创建Actor实例
gpu_worker = GpuWorker.remote()

# 调用Actor的方法
result = ray.get(gpu_worker.run.remote())
print(result)  # 输出: Using GPU: <some_gpu>, GpuWorker task completed

三、工程实践KubeRay

  • 概述​:KubeRay是在Kubernetes上运行Ray集群的一种方式,提供了更灵活的资源管理和更高的可扩展性。
  • RayCluster​:定义了如何在Kubernetes中启动一个Ray集群。
  • RayJob​:提供了一种便捷的方式来提交Ray作业到Kubernetes环境。
  • RayService​:允许用户通过HTTP接口访问服务化的Ray应用。

3.1 RayService使用

RayService 管理以下组件:

  • RayCluster​:在 Kubernetes 集群中管理资源。
  • Ray Serve 应用程序​:管理用户的应用程序。

RayService 提供的功能包括:

  • Kubernetes 原生支持 Ray 集群和 Ray Serve 应用程序​:通过使用 Kubernetes 配置定义 Ray 集群及其 Ray Serve 应用程序后,可以使用 kubectl 来创建集群及其应用程序。
  • Ray Serve 应用程序的就地更新​:有关详细信息,请参阅 RayService文档。
  • Ray 集群的零停机升级​:有关详细信息,请参阅 RayService文档。
  • 高可用性服务​:有关详细信息,请参阅 RayService 文档的高可用性部分。
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0