Thursday, December 30, 2021

Library to turn Azure ML Compute into Ray and Dask cluster

Ray and Dask are two among the most popular frameworks to parallelize and scale Python computation. They are very helpful to speed up computing for data processing, hyperparameter tunning, reinforcement learning and model serving and other many other scenarios.

For an Azure ML compute instance, we can easily install Ray and Dask to take advantage of parallel computing for all cores within the node. However, there is yet an easy way in Azure Machine Learning to extend this to a multi-node cluster when the computing and ML problem require the power of more than one node. One would need to setup a separate environment using VMs or K8s outside Azure ML to run multi-node Ray/Dask. This would mean losing all capabilities of Azure ML.

To address this gap, we have developed a library that can easily turn Azure ML compute instance and compute cluster into Ray and Dask cluster. The library does all the complex wirings and setup of a Ray cluster with Dask behind the scene while exposing a simple Ray context object for users perform parallel Python computing tasks. In addition, it is shipped with high performance APIs based on Pyarrow to access Azure storage and simple interface for user to install additional libraries.

The library also comes with support for both Interactive mode and job mode. Data scientist can perform fast interactive work with the cluster during exploratory phase then easily turn the code into the job mode with minimal change.

Checkout library repo at james-tn/ray-on-aml: Turning AML compute into Ray cluster ( for details.

In this post, we'll walk through steps to setup and use the library




Installation of the library

  1. Prepare compute environment 

For Interactive use at your compute instance, create a compute cluster in the same vnet where your compute instance is

Check list

[ ] Azure Machine Learning Workspace

[ ] Virtual network/Subnet

[ ] Create Compute Instance in the Virtual Network

[ ] Create Compute Cluster in the same Virtual Network

Use azureml_py38 conda environment from (Jupyter) Notebook in Azure Machine Learning Studio.


    2.  Install library


pip install --upgrade ray-on-aml


Installing this library will also install ray[default]==1.9.1, pyarrow>= 5.0.0, dask[complete]==2021.12.0, adlfs==2021.10.0 and fsspec==2021.10.1


    3. Use cluster in interactive mode

       Run in interactive mode in compute instance's notebook. Notice the option ci_is_head to enable your current CI as head node.


from ray_on_aml.core import Ray_On_AML
ws = Workspace.from_config()
ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="Name_of_Compute_Cluster", maxnode=5)
ray = ray_on_aml.getRay() # may take around 7 or more mins
# Note that by default, one of the nodes in the remote AML compute cluster is used as head node and the remaining are worker nodes. 
# But if you want to use your current compute instance as head node and all nodes in the remote compute cluster as workers 
#then simply specify ray = ray_on_aml.getRay(ci_is_head=True)


At this point, you have the ray client object where you can use to perform various parallel computing tasks using ray API.

There are two arguments to Ray_On_AML() object initilization with to specify base configuration for the library with following default values


base_conda_dep =['adlfs==2021.10.0','pip'], 
base_pip_dep = ['ray[tune]==1.9.1', 'xgboost_ray==0.1.5', 'dask==2021.12.0','pyarrow >= 5.0.0','fsspec==2021.10.1']


In case you need to add more libraries to the cluster, you can pass them to the two arguments in the list format 


ray_on_aml =Ray_On_AML(ws=ws, compute_cluster ="dask-vnet-ct", additional_pip_packages=['torch', 'torchvision'])


   Although it's possible, you should not change the default values of base_conda_dep  and base_pip_dep as it may break the package.     Only do so when you need to customize the cluster default configuration such as ray version.

    4. Use the cluster in job mode

 For use in an AML job, simply include ray_on_aml as a pip dependency then inside your script, do this to get ray


from ray_on_aml.core import Ray_On_AML
ray_on_aml =Ray_On_AML()
ray = ray_on_aml.getRay()

if ray: #in the headnode
    #logic to use Ray for distributed ML training, tunning or distributed data transformation with Dask

    print("in worker node")


Example scenarios

  1. Perform big data analysis with Dask on Ray


from ray.util.dask import ray_dask_get
import dask
import dask.dataframe as dd
from adlfs import AzureBlobFileSystem

#Set this to tell Dask to use ray as scheduler. This is one time call.

account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
abfs = AzureBlobFileSystem(account_name="azureopendatastorage",  container_name="isdweatherdatacontainer")
data =["az://isdweatherdatacontainer/ISDWeather/year=2012/"], filesystem=abfs)
data_dask = data.to_dask().describe().compute()
print(data_dask )


        2. Distributed hypeparam tunning with ray.tune


 import sklearn.datasets
 import sklearn.metrics
 from sklearn.model_selection import train_test_split
 import xgboost as xgb

 from ray import tune

 def train_breast_cancer(config):
     # Load dataset
     data, labels = sklearn.datasets.load_breast_cancer(return_X_y=True)
     # Split into train and test set
     train_x, test_x, train_y, test_y = train_test_split(
         data, labels, test_size=0.25)
     # Build input matrices for XGBoost
     train_set = xgb.DMatrix(train_x, label=train_y)
     test_set = xgb.DMatrix(test_x, label=test_y)
     # Train the classifier
     results = {}
         evals=[(test_set, "eval")],
     # Return prediction accuracy
     accuracy = 1. - results["eval"]["error"][-1], done=True)

 config = {
     "objective": "binary:logistic",
     "eval_metric": ["logloss", "error"],
     "max_depth": tune.randint(1, 9),
     "min_child_weight": tune.choice([1, 2, 3]),
     "subsample": tune.uniform(0.5, 1.0),
     "eta": tune.loguniform(1e-4, 1e-1)
 analysis =
     resources_per_trial={"cpu": 1},


       3. Distributed XGBoost 


from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

seed = 42

X, y = load_breast_cancer(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=0.25, random_state=42

clf = RayXGBClassifier(
    n_jobs=10,  # In XGBoost-Ray, n_jobs sets the number of actors

# scikit-learn API will automatically conver the data
# to RayDMatrix format as needed.
# You can also pass X as a RayDMatrix, in which case
# y will be ignored., y_train)

pred_ray = clf.predict(X_test)

pred_proba_ray = clf.predict_proba(X_test)

# It is also possible to pass a RayParams object
# to fit/predict/predict_proba methods - will override
# n_jobs set during initialization, y_train, ray_params=RayParams(num_actors=10))

pred_ray = clf.predict(X_test, ray_params=RayParams(num_actors=10))


4. Use with in job mode with AML job


ws = Workspace.from_config()

compute_cluster = 'worker-cpu-v3'
maxnode =5
exp ='ray_on_aml_job'
ws_detail = ws.get_details()
ws_rg = ws_detail['id'].split("/")[4]
    ray_cluster = ComputeTarget(workspace=ws, name=compute_cluster)

    print('Found existing cluster, use it.')
except ComputeTargetException:
    if vnet_rg is None:
        vnet_rg = ws_rg
    compute_config = AmlCompute.provisioning_configuration(vm_size=vm_size,
                                                        min_nodes=0, max_nodes=maxnode,
    ray_cluster = ComputeTarget.create(ws, compute_cluster, compute_config)


rayEnv = Environment.from_conda_specification(name = "rayEnv",
                                             file_path = "../examples/conda_env.yml")

# rayEnv = Environment.get(ws, "rayEnv", version=19)

                    # arguments = ["--master_ip",master_ip]
run = Experiment(ws, exp).submit(src)


This is the code inside with details omitted for brevity 


if __name__ == "__main__":
    run = Run.get_context()
    ws = run.experiment.workspace
    account_key = ws.get_default_keyvault().get_secret("adls7-account-key")
    ray_on_aml =Ray_On_AML()
    ray = ray_on_aml.getRay()

    if ray: #in the headnode
        print("head node detected")

        datasets.MNIST("~/data", train=True, download=True)

        analysis =, config=search_space)
        print("data count result", get_data_count(account_key))

        print("in worker node")


        see more examples at ray-on-aml/quick_use_cases.ipynb at master · james-tn/ray-on-aml (


Posted at