TechSetupGuides
Advancedraydistributed-computingmachine-learningpythonml-operationsparallel-computingkubernetesaws

Ray - Distributed Computing Framework

Ray is an AI compute engine for scaling AI and Python applications. It consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.

  1. Step 1

    Overview

    Ray is an open source distributed computing framework designed for scaling AI and Python applications. Created by the RLAB research group at UC Berkeley, Ray provides a unified API for distributed task scheduling, stateful computation, and data parallelism.

    Ray is widely used for:

    • Machine learning model training at scale
    • Hyperparameter tuning
    • Reinforcement learning
    • Batch inference
    • Serving ML models
    • Distributed data processing

    Key components of the Ray ecosystem:

    • Ray Core: Fundamental primitives (tasks, actors, objects) for distributed applications
    • Ray Data: Distributed data processing for ETL and preprocessing
    • Ray Train: Distributed training for machine learning models
    • Ray Tune: Hyperparameter tuning and experiment management
    • Ray Serve: Scalable model serving with autoscaling
    • Ray RLlib: Production-ready reinforcement learning library
  2. Step 2

    System Requirements

    Ray officially supports:

    • Platforms: Linux (x86_64, aarch64/ARM), macOS (Apple Silicon M1+), Windows (beta)
    • Python: 3.8, 3.9, 3.10, 3.11, 3.12
    • Hardware: Ray runs on single machines and can scale to thousands of nodes across clouds

    For ML workloads:

    • GPU support via NVIDIA CUDA (recommended for deep learning)
    • Optional: TPUs, AMD ROCm
    • Sufficient RAM for model and data storage
    • High-speed network for cluster deployments (10 GigE recommended)
  3. Step 3

    Installation (ML Applications)

    Install Ray from PyPI. For machine learning applications, this installs Ray with Data, Train, Tune, and Serve libraries.

    # Full ML stack with dashboard and cluster launcher
    pip install -U "ray[data,train,tune,serve]"
    
    # For reinforcement learning specifically (includes RLlib)
    pip install -U "ray[rllib]"
    
    # Verify installation
    ray --version
  4. Step 4

    Installation (Python Applications)

    For general distributed Python applications without ML dependencies.

    # Ray with dashboard and cluster launcher
    pip install -U "ray[default]"
    
    # Minimal Ray (no dashboard, no cluster launcher)
    pip install -U "ray"
    
    # Verify installation
    ray --version
  5. Step 5

    Quick Start

    Get started with Ray in a few steps. Ray will automatically initialize when you first use a Ray API.

    import ray
    
    # Initialize Ray (optional - happens automatically on first use)
    ray.init()
    
    # Simple distributed task
    @ray.remote
    def add(x, y):
        return x + y
    
    # Submit tasks - returns ObjectRef futures
    futures = [add.remote(i, i + 1) for i in range(10)]
    
    # Get results back
    results = ray.get(futures)
    print(results)
    # Output: [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
    
    # Shutdown Ray
    ray.shutdown()
  6. Step 6

    Ray Actors (Stateful Workers)

    Actors are stateful workers that maintain internal state between method calls.

    import ray
    
    ray.init()
    
    # Define an actor class
    @ray.remote
    class Counter:
        def __init__(self):
            self.value = 0
        
        def increment(self):
            self.value += 1
            return self.value
        
        def get_value(self):
            return self.value
    
    # Create actor instance
    counter = Counter.remote()
    
    # Call actor methods
    result = counter.get_value.remote()
    print(ray.get(result))  # Output: 0
    
    result = counter.increment.remote()
    print(ray.get(result))  # Output: 1
    
    result = counter.increment.remote()
    print(ray.get(result))  # Output: 2
    
    ray.shutdown()
  7. Step 7

    Ray Data (Distributed Dataset Processing)

    Ray Data provides a scalable Dataset API for distributed data processing. It's ideal for ETL, data preprocessing, and large-scale transformations.

    import ray
    from ray import data
    
    ray.init()
    
    # Create a dataset from various sources
    # From a list
    ds = data.from_items([{"text": "hello"}, {"text": "world"}])
    
    # From JSON files
    ds = data.read_json("/path/to/json/*.json")
    
    # From CSV files
    ds = data.read_csv("/path/to/csv/*.csv")
    
    # Parquet files
    ds = data.read_parquet("/path/to/parquet/")
    
    # Transformations
    ds = ds.map(lambda x: {"text_upper": x["text"].upper()})
    
    # Batch transformations (more efficient)
    def add_prefix(batch):
        batch["prefixed"] = ["item_" + t for t in batch["text"]]
        return batch
    
    ds = ds.map_batches(add_prefix)
    
    # Filter
    ds = ds.filter(lambda x: len(x["text"]) > 3)
    
    # Sort
    ds = ds.sort("text")
    
    # Aggregate summary statistics
    ds.summary()
    
    # Show sample
    ds.show(5)
    
    # Materialize as list
    results = ds.to_pandas()
    
    ray.shutdown()
  8. Step 8

    Ray Train (Distributed Model Training)

    Ray Train enables distributed training for deep learning models. It integrates with PyTorch, TensorFlow, and other frameworks.

    import ray
    from ray import train
    from ray.train import ScalingConfig
    import torch
    from torchvision.models import resnet18
    
    ray.init()
    
    # Define training function
    def train_func(config):
        model = resnet18(num_classes=10)
        
        # Get training configuration from Ray context
        ctx = train.get_context()
        distributed_config = ctx.distributed_config
        
        # Training loop
        for epoch in range(10):
            for batch in dataloader:
                # Training logic here
                pass
        
        # Save checkpoint and report metrics
        train.report(
            {"loss": 0.5, "accuracy": 0.9},
            checkpoint=Checkpoint.from_dict({"model": model.state_dict()})
        )
    
    # Run distributed training
    results = train.fit(
        train_loop_per_worker=train_func,
        scaling_config=ScalingConfig(
            num_workers=4,      # Number of training workers
            use_gpu=True        # Use GPU if available
        )
    )
    
    print(f"Best checkpoint: {results.checkpoints}")
    
    ray.shutdown()
  9. Step 9

    Ray Tune (Hyperparameter Tuning)

    Ray Tune provides scalable hyperparameter optimization with support for multiple optimizers and early stopping.

    import ray
    from ray import tune
    from ray.tune.schedulers import ASHAScheduler
    
    ray.init()
    
    # Define training function with config
    def train_model(config):
        for i in range(10):
            # Simulated training
            current_metric = config["lr"] * (i + 1)
            tune.report(mean_loss=current_metric)
    
    # Define search space
    search_space = {
        "lr": tune.uniform(0.001, 0.01),
        "batch_size": tune.choice([32, 64, 128]),
        "num_layers": tune.randint(2, 10)
    }
    
    # Run tuning experiment with new API
    tuner = tune.Tuner(
        train_model,
        param_space=search_space,
        tune_config=tune.TuneConfig(
            num_samples=5,
            metric="mean_loss",
            mode="min",
            scheduler=ASHAScheduler(
                metric="mean_loss",
                mode="min",
                max_t=10,
                grace_period=2,
                reduction_factor=3
            )
        ),
        run_config=ray.train.RunConfig(
            name="hyperparameter_tuning",
            storage_path="/tmp/ray_results"
        )
    )
    
    results = tuner.fit()
    
    # Get best result
    best_result = results.get_best_result(
        metric="mean_loss",
        mode="min"
    )
    
    print(f"Best config: {best_result.config}")
    print(f"Best loss: {best_result.metrics['mean_loss']}")
    
    ray.shutdown()
  10. Step 10

    Ray Serve (Model Serving)

    Ray Serve provides scalable, low-latency model serving with autoscaling, batching, and model versioning.

    from ray import serve
    from transformers import pipeline
    
    # Define the service
    @serve.deployment
    class TextGenerator:
        def __init__(self):
            self.pipeline = pipeline("text-generation", model="gpt2")
        
        async def __call__(self, text):
            result = self.pipeline(text, max_length=50)
            return {"generated": result[0]["generated_text"]}
    
    # Deploy with configuration
    text_gen = TextGenerator.options(
        num_replicas=2,        # Number of replicas
        max_concurrent_queries=100
    )
    
    # Deploy the service
    depl = text_gen.deploy()
    
    # Run inference
    response = depl.remote("The weather today")
    print(ray.get(response))
    
    # Alternative: serve.run() for simple deployment
    import ray
    ray.init()
    serve.run(
        text_gen,
        host="0.0.0.0",
        port=8000
    )
  11. Step 11

    Ray RLlib (Reinforcement Learning)

    Ray RLlib is a scalable reinforcement learning library supporting RL algorithms like PPO, DQN, A3C, and more.

    from ray import tune
    from ray.rllib.algorithms import ppo
    
    ray.init()
    
    # Build algorithm config
    alg_config = ppo.PpoConfig()
    alg_config = alg_config.environment(
        env_id="CartPole-v1"  # Gym environment ID
    )
    alg_config = alg_config.algorithms(
        gamma=0.99,
        lr=0.001,
        vf_loss_coeff=1,
        entropy_coeff=0.01
    )
    alg_config = alg_config.resources(
        num_workers=4,
        num_envs_per_worker=4,
        log_level="WARNING"
    )
    
    # Build algorithm with config
    algo = alg_config.build()
    
    # Train for 10 iterations
    for _ in range(10):
        result = algo.train()
        print(f"Reward: {result['episode_reward_mean']}")
    
    # Save the model
    checkpoint = algo.save("/tmp/my-checkpoint")
    print(f"Saved checkpoint at: {checkpoint}")
    
    ray.shutdown()
  12. Step 12

    Local Cluster Setup

    Start a Ray cluster on a single machine for development:

    # Start Ray with dashboard
    ray start --head --port=6379 --dashboard-host=0.0.0.0
    
    # Start worker node
    ray start --address=<master-node-address>:6379
    
    # Check cluster status
    ray status
    
    # Stop the cluster
    ray stop
    
    # View logs
    ray logs
  13. Step 13

    Cluster Configuration Files

    For production deployments, use a YAML configuration file to define your Ray cluster:

    # cluster.yaml
    cluster_name: my-ray-cluster
    
    # The minimum and max number of workers to scale between.
    min_workers: 2
    max_workers: 10
    
    # Ray head node settings
    head_node:
        MachineType: g4dn.xlarge
        ImageId: ami-0abcdef123456
        DiskSize: 100
    
    # Ray worker node settings (worker types)
    worker_node_types:
      - MachineType: g4dn.xlarge
        ImageId: ami-0abcdef123456
        DiskSize: 100
        Resources: {}
        Max_workers: 8
    
    # Setup commands for all nodes
    setup_commands:
      - pip install ray[tune,serve,data] --upgrade
    
    # Resources
    available_resources:
      CPU: 4
      GPU: 1
    
    # Head node setup commands
    head_setup_commands:
      - pip install torch torchvision
    
    # Worker setup commands
    worker_setup_commands:
      - pip install torch torchvision
      - echo "Worker node setup complete"
  14. Step 14

    Cloud Deployment

    Deploy Ray clusters on AWS, GCP, or Azure using Ray Cluster Launcher:

    # Submit cluster on AWS
    ray up cluster.yaml --no-reuse
    
    # Submit on Google Cloud
    ray up cluster.yaml --provider gcp --no-reuse
    
    # Submit on Azure
    ray up cluster.yaml --provider azure --no-reuse
    
    # Tear cluster down
    ray down cluster.yaml --yes
    
    # SSH into the cluster
    ray attach cluster.yaml
    
    # Check cluster status
    ray status
    
    # Sync files to cluster
    ray sync cluster.yaml /local/path /remote/path
  15. Step 15

    Monitoring and Debugging

    Ray provides a Web Dashboard for monitoring cluster health. The dashboard is available at http://localhost:8265 by default after running ray start --head.

    Access the dashboard:

    • Nodes overview
    • Job details
    • Task execution
    • GPU utilization
    • Ray Data pipelines
    • Serve deployments
    # Start Ray with dashboard
    ray start --head --dashboard-host=0.0.0.0 --dashboard-port=8265
    
    # Open browser to http://localhost:8265
    
    # Check cluster info
    ray status
    
    # Attach to cluster for debugging
    ray attach cluster.yaml
    
    # Sync files and run commands
    ray sync cluster.yaml /local/path
    ray exec cluster.yaml -- "pip install my-package"
  16. Step 16

    Advanced Configuration

    Fine-tune Ray behavior with init options:

    import ray
    
    # Initialize with custom options
    ray.init(
        # Cluster settings
        address="auto",  # or "localhost:6379"
        
        # Resource limits
        num_cpus=4,
        num_gpus=2,
        
        # Memory
        memory=8e9,  # 8GB
        
        # Object store
        object_store_memory=2e9,  # 2GB
        
        # Dashboard
        include_dashboard=True,
        dashboard_host="0.0.0.0",
        dashboard_port=8265,
        
        # Logging
        log_to_driver=True,
        ignore_reinit_error=True,
        
        # Namespace
        namespace="default"
    )
    
    # Access cluster resources
    resources = ray.available_resources()
    print(resources)  # {'CPU': 4, 'GPU': 2, 'memory': 8000000000}
  17. Step 17

    Best Practices and Pitfalls

    Best Practices:

    1. Use batch processing: map_batches() is more efficient than map() for large datasets
    2. Leverage actors for state: Reuse actor instances rather than recreating them
    3. Configure resources properly: Don't overallocate GPU memory
    4. Use checkpointing: Save checkpoints during long training runs
    5. Monitor your cluster: Use the Ray Dashboard to identify bottlenecks
    6. Batch requests: For inference, batch similar requests to improve throughput

    Common Pitfalls:

    1. ObjectRef confusion: ObjectRefs are not the actual data - use ray.get() to retrieve
    2. Memory pressure: Large datasets can cause OOM - process in batches
    3. Actor deadlocks: Avoid holding locks across async calls
    4. GIL limitations: Python functions still hit the GIL - use native extensions for compute-intensive tasks
    5. Serialization overhead: Large objects serialization can be slow - use numpy arrays when possible
    ⚠ Heads up: Always test your distributed code locally first before scaling to a cluster. Use `ray.init()` without arguments for local development.

Feature requests

Sign in to suggest features or vote on existing ones.

No feature requests yet.

Discussion

0 people marked this as worked·Sign in to mark your own.

Sign in to join the discussion.

No comments yet.