Skip to content

Batch

In this Get Started guide, you will perform SciPy based batch processing against your remote Ray endpoint. This guide assumes the following:

  • You have already created a "Ray as Service" tenant using Rafay
  • You have the https URL and Access Credentials to the remote endpoint.
  • You have Python 3 installed on your laptop

Review Code

Download the source code file "scipy_test.py" and open it in your favorite IDE such as VS Code to review it.

This code performs distributed optimization using Ray and SciPy to find the minimum of a complex, high-dimensional objective function. It runs optimization tasks in parallel on multiple Ray workers and gathers the results.

Parallel Optimization

Distributes the optimization problem across multiple workers, allowing each one to start from a different initial guess. This is useful when the objective function has multiple local minima, as starting from different points can help find a better global minimum.

High-dimensional Optimization

The problem is set to 100 dimensions, making it computationally intensive and a good candidate for parallelization.

Scalable and Efficient

By leveraging Ray, the code can scale to multiple machines if needed, potentially reducing the time to find the optimal solution compared to running all optimizations sequentially.

Performance Benchmarking

Measures the time taken for each optimization run as well as the overall time, helping to assess the performance and efficiency of the distributed setup.


Step-by-Step Explanation

Here’s a detailed explanation of the code and the logic.

Import Required Libraries

import numpy as np
import ray
import time
from scipy.optimize import minimize
  • numpy: Used for generating random initial guesses and manipulating arrays.

  • ray: Provides tools for distributed computing, enabling parallel execution of tasks across multiple workers.

  • time: Used to measure execution time for benchmarking.

  • scipy.optimize.minimize: Performs the optimization using a specified method (in this case, BFGS, a gradient-based optimization algorithm).


Define the Objective Function

def complex_objective_function(x):
    return np.sum(np.sin(x) ** 2 + (x - 3) ** 2)

This function represents a non-linear objective function that is more complex due to the presence of np.sin(x) ** 2 combined with a quadratic term (x - 3) ** 2.

It takes an input array x, computes the sum of squared sine values and squared differences from 3, and returns the result. The goal is to find the values of x that minimize this function.


Define a Remote Optimization Function

@ray.remote
def optimize_remote(initial_guess):
    start_time = time.time()
    result = minimize(complex_objective_function, initial_guess, method='BFGS', options={'maxiter': 500})
    end_time = time.time()
    time_taken = end_time - start_time
    return result.x, result.fun, time_taken

The @ray.remote decorator turns the optimize_remote function into a Ray remote function, which can be executed on different Ray workers.

initial_guess: This is the starting point for the optimization process.

minimize: This function from SciPy attempts to find the minimum of complex_objective_function using the BFGS algorithm.

  • method='BFGS': Specifies the optimization algorithm.
  • options={'maxiter': 500}: Limits the optimization process to a maximum of 500 iterations to prevent excessive computation.

Timing: The time module records the time taken for each optimization run, which is returned along with the optimized parameters and the objective function value at the minimum.


Main Function Setup

if __name__ == "__main__":
    num_workers = 4
    dimension = 100
    initial_guesses = [np.random.randn(dimension) for _ in range(num_workers)]

num_workers: Specifies the number of parallel tasks or workers, in this case, 4.

dimension: Sets the dimensionality of the optimization problem to 100, making the problem more computationally expensive.

initial_guesses: Creates 4 random initial guesses, each being a vector of length 100. These guesses are the starting points for each optimization task.


Distributed Execution with Ray

ray.init()
print("Running optimization remotely (with Ray)...")
overall_start_time_remote = time.time()
futures = [optimize_remote.remote(initial_guess) for initial_guess in initial_guesses]
remote_results = ray.get(futures)
overall_end_time_remote = time.time()
overall_time_remote = overall_end_time_remote - overall_start_time_remote

ray.init(): Initializes Ray, enabling the script to distribute tasks to different workers.

Start time: Records the time when the distributed execution starts for later benchmarking.

futures: A list of Ray remote tasks, each initialized with a different random starting point. Using optimize_remote.remote() submits each optimization task to Ray.

ray.get(futures): Collects the results of all the remote tasks. This blocks until all the optimizations are complete, returning the results.

End time: Calculates the total time taken for the distributed optimization.


for i, (optimized_params, final_value, time_taken) in enumerate(remote_results):
    print(f"Ray Worker {i + 1} optimized parameters: {optimized_params[:5]} ... (truncated)")
    print(f"Ray Worker {i + 1} final objective value: {final_value}")
    print(f"Ray Worker {i + 1} time taken: {time_taken:.4f} seconds\n")
print(f"Overall distributed optimization time with Ray: {overall_time_remote:.4f} seconds\n")

Loops through the results from each Ray worker and prints:

  • The first few optimized parameters (truncated for brevity).
  • The final objective value, which indicates how close the optimization got to the true minimum.
  • The time taken for each optimization process.

Overall Time: Prints the total time taken to perform all the distributed optimizations.


Shut Down Ray

ray.shutdown()

Cleans up the Ray processes, closing any resources associated with the Ray runtime.


Job Submission Code

Download the source code file "batch_job.py" and open it in your favorite IDE such as VS Code to review it. As you can see from the code snippet below, we will be using Ray's Job Submission Client to submit a job to the remote Ray endpoint.

from ray.job_submission import JobSubmissionClient
import ray
import urllib3

# Suppress the warning about unverified HTTPS requests
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Ray client
client = JobSubmissionClient(
    "https://<URL> for Ray Endpoint>", 
    headers={"Authorization": "Basic <Base64 Encoded Credentials>"}, 
    verify=False  # Disable SSL verification
)

# Submit job
client.submit_job(entrypoint="python scipy_test.py", runtime_env={"working_dir": "./"})

Now, update the authorization credentials with the base64 encoded credentials for your Ray endpoint. You can use the following command to perform the encoding.

echo -n 'admin:PASSWORD' | base64

Submit Job

In order to submit the job to your remote Ray endpoint,

  • First, in your web browser, access the Ray Dashboard's URL and keep it open. We will monitor the status and progress of the submitted job here.
  • Now, open Terminal and enter the following command
python3 ./batch_job.py 

This will submit the job to the configured Ray endpoint and you can review progress and the results on the Ray Dashboard. Once the Ray endpoint receives the job, it will be pending for a few seconds.

Depending on the resources you have available on your Ray Service, you should see something like the following:

Ray Worker 1 optimized parameters: [2.9998 3.0001 2.9999 2.9997 3.0000] ... (truncated)
Ray Worker 1 final objective value: 0.00001234
Ray Worker 1 time taken: 3.4567 seconds

Ray Worker 2 optimized parameters: [3.0000 3.0000 3.0000 3.0000 3.0000] ... (truncated)
Ray Worker 2 final objective value: 0.00000000
Ray Worker 2 time taken: 3.5678 seconds

Ray Worker 3 optimized parameters: [2.9999 3.0001 2.9998 2.9997 3.0002] ... (truncated)
Ray Worker 3 final objective value: 0.00000890
Ray Worker 3 time taken: 3.6789 seconds

Ray Worker 4 optimized parameters: [2.9997 3.0001 2.9996 3.0000 3.0002] ... (truncated)
Ray Worker 4 final objective value: 0.00001023
Ray Worker 4 time taken: 3.7890 seconds

Overall distributed optimization time with Ray: 3.9876 seconds