Ray and Dask are two among the most popular frameworks to parallelize and scale Python computation. They are very helpful to speed up computing for data processing, hyperparameter tunning, reinforcement learning and model serving and other many other scenarios.
For an Azure ML compute instance, we can easily install Ray and Dask to take advantage of parallel computing for all cores within the node. However, there is yet an easy way in Azure Machine Learning to extend this to a multi-node cluster when the computing and ML problem require the power of more than one node. One would need to setup a separate environment using VMs or K8s outside Azure ML to run multi-node Ray/Dask. This would mean losing all capabilities of Azure ML.
To address this gap, we have developed a library that can easily turn Azure ML compute instance and compute cluster into Ray and Dask cluster. The library does all the complex wirings and setup of a Ray cluster with Dask behind the scene while exposing a simple Ray context object for users perform parallel Python computing tasks. In addition, it is shipped with high performance APIs based on Pyarrow to access Azure storage and simple interface for user to install additional libraries.
The library also comes with support for both Interactive mode and job mode. Data scientist can perform fast interactive work with the cluster during exploratory phase then easily turn the code into the job mode with minimal change.
Checkout library repo at james-tn/ray-on-aml: Turning AML compute into Ray cluster (github.com) for details.
In this post, we'll walk through steps to setup and use the library
Installation of the library
- Prepare compute environment
For Interactive use at your compute instance, create a compute cluster in the same vnet where your compute instance is
Check list
[ ] Azure Machine Learning Workspace
[ ] Virtual network/Subnet
[ ] Create Compute Instance in the Virtual Network
[ ] Create Compute Cluster in the same Virtual Network
Use azureml_py38
conda environment from (Jupyter) Notebook
in Azure Machine Learning Studio.
2. Install library
pip install --upgrade ray-on-aml
Installing this library will also install ray[default]==1.9.1, pyarrow>= 5.0.0, dask[complete]==2021.12.0, adlfs==2021.10.0 and fsspec==2021.10.1
3. Use cluster in interactive mode
Run in interactive mode in compute instance's notebook. Notice the option ci_is_head to enable your current CI as head node.
from ray_on_aml.core import Ray_On_AML
ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="Name_of_Compute_Cluster", maxnode=5)
ray = ray_on_aml.getRay() # may take around 7 or more mins
# Note that by default, one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes.
# But if you want to use your current compute instance as head node and all nodes in the remote compute cluster as workers
#then simply specify ray = ray_on_aml.getRay(ci_is_head=True)
At this point, you have the ray client object where you can use to perform various parallel computing tasks using ray API.
There are two arguments to Ray_On_AML() object initilization with to specify base configuration for the library with following default values
base_conda_dep =['adlfs==2021.10.0','pip'],
base_pip_dep = ['ray[tune]==1.9.1', 'xgboost_ray==0.1.5', 'dask==2021.12.0','pyarrow >= 5.0.0','fsspec==2021.10.1']
In case you need to add more libraries to the cluster, you can pass them to the two arguments in the list format
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="dask-vnet-ct", additional_pip_packages=['torch', 'torchvision'])
Although it's possible, you should not change the default values of base_conda_dep and base_pip_dep as it may break the package. Only do so when you need to customize the cluster default configuration such as ray version.
4. Use the cluster in job mode
For use in an AML job, simply include ray_on_aml as a pip dependency then inside your script, do this to get ray
from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()
if ray: #in the headnode
#logic to use Ray for distributed ML training, tunning or distributed data transformation with Dask
else:
print("in worker node")
Example scenarios
- Perform big data analysis with Dask on Ray
from ray.util.dask import ray_dask_get
import dask
import dask.dataframe as dd
from adlfs import AzureBlobFileSystem
#Set this to tell Dask to use ray as scheduler. This is one time call.
dask.config.set(scheduler=ray_dask_get)
account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
account_name="adlsgen7"
abfs = AzureBlobFileSystem(account_name="azureopendatastorage", container_name="isdweatherdatacontainer")
data = ray.data.read_parquet(["az://isdweatherdatacontainer/ISDWeather/year=2012/"], filesystem=abfs)
data_dask = data.to_dask().describe().compute()
print(data_dask )
2. Distributed hypeparam tunning with ray.tune
import sklearn.datasets
import sklearn.metrics
from sklearn.model_selection import train_test_split
import xgboost as xgb
from ray import tune
def train_breast_cancer(config):
# Load dataset
data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
# Split into train and test set
train_x, test_x, train_y, test_y = train_test_split(
data, labels, test_size=0.25)
# Build input matrices for XGBoost
train_set = xgb.DMatrix(train_x, label=train_y)
test_set = xgb.DMatrix(test_x, label=test_y)
# Train the classifier
results = {}
xgb.train(
config,
train_set,
evals=[(test_set, "eval")],
evals_result=results,
verbose_eval=False)
# Return prediction accuracy
accuracy = 1. - results["eval"]["error"][-1]
tune.report(mean_accuracy=accuracy, done=True)
config = {
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
"max_depth": tune.randint(1, 9),
"min_child_weight": tune.choice([1, 2, 3]),
"subsample": tune.uniform(0.5, 1.0),
"eta": tune.loguniform(1e-4, 1e-1)
}
analysis = tune.run(
train_breast_cancer,
resources_per_trial={"cpu": 1},
config=config,
num_samples=10)
3. Distributed XGBoost
from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
seed = 42
X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
X, y, train_size=0.25, random_state=42
)
clf = RayXGBClassifier(
n_jobs=10, # In XGBoost-Ray, n_jobs sets the number of actors
random_state=seed
)
# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored.
clf.fit(X_train, y_train)
pred_ray = clf.predict(X_test)
print(pred_ray.shape)
pred_proba_ray = clf.predict_proba(X_test)
print(pred_proba_ray.shape)
# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization
clf.fit(X_train, y_train, ray_params=RayParams(num_actors=10))
pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=10))
print(pred_ray.shape)
4. Use with in job mode with AML job
ws = Workspace.from_config()
compute_cluster = 'worker-cpu-v3'
maxnode =5
vm_size='STANDARD_DS3_V2'
vnet='rayvnet'
subnet='default'
exp ='ray_on_aml_job'
ws_detail = ws.get_details()
ws_rg = ws_detail['id'].split("/")[4]
vnet_rg=None
try:
ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)
print('Found existing cluster, use it.')
except ComputeTargetException:
if vnet_rg is None:
vnet_rg = ws_rg
compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
min_nodes=0, max_nodes=maxnode,
vnet_resourcegroup_name=vnet_rg,
vnet_name=vnet,
subnet_name=subnet)
ray_cluster = ComputeTarget.create(ws, compute_cluster, compute_config)
ray_cluster.wait_for_completion(show_output=True)
rayEnv = Environment.from_conda_specification(name = "rayEnv",
file_path = "../examples/conda_env.yml")
# rayEnv = Environment.get(ws, "rayEnv", version=19)
src=ScriptRunConfig(source_directory='../examples/job',
script='aml_job.py',
environment=rayEnv,
compute_target=ray_cluster,
distributed_job_config=PyTorchConfiguration(node_count=maxnode),
# arguments = ["--master_ip",master_ip]
)
run = Experiment(ws, exp).submit(src)
This is the code inside aml_job.py with details omitted for brevity
if __name__ == "__main__":
run = Run.get_context()
ws = run.experiment.workspace
account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()
if ray: #in the headnode
print("head node detected")
datasets.MNIST("~/data", train=True, download=True)
analysis = tune.run(train_mnist, config=search_space)
print(ray.cluster_resources())
print("data count result", get_data_count(account_key))
else:
print("in worker node")
see more examples at ray-on-aml/quick_use_cases.ipynb at master · james-tn/ray-on-aml (github.com)
Posted at https://sl.advdat.com/3JxuvIL