Batch
In this Get Started guide, you will perform SciPy based batch processing against your remote Ray endpoint. This guide assumes the following:
- You have already created a "Ray as Service" tenant using Rafay
- You have the https URL and Access Credentials to the remote endpoint.
- You have Python 3 installed on your laptop
Review Code¶
Download the source code file "scipy_test.py" and open it in your favorite IDE such as VS Code to review it.
This code performs distributed optimization using Ray and SciPy to find the minimum of a complex, high-dimensional objective function. It runs optimization tasks in parallel on multiple Ray workers and gathers the results.
Parallel Optimization
Distributes the optimization problem across multiple workers, allowing each one to start from a different initial guess. This is useful when the objective function has multiple local minima, as starting from different points can help find a better global minimum.
High-dimensional Optimization
The problem is set to 100 dimensions, making it computationally intensive and a good candidate for parallelization.
Scalable and Efficient
By leveraging Ray, the code can scale to multiple machines if needed, potentially reducing the time to find the optimal solution compared to running all optimizations sequentially.
Performance Benchmarking
Measures the time taken for each optimization run as well as the overall time, helping to assess the performance and efficiency of the distributed setup.
Step-by-Step Explanation¶
Here’s a detailed explanation of the code and the logic.
Import Required Libraries¶
import numpy as np
import ray
import time
from scipy.optimize import minimize
-
numpy
: Used for generating random initial guesses and manipulating arrays. -
ray
: Provides tools for distributed computing, enabling parallel execution of tasks across multiple workers. -
time
: Used to measure execution time for benchmarking. -
scipy.optimize.minimize
: Performs the optimization using a specified method (in this case, BFGS, a gradient-based optimization algorithm).
Define the Objective Function¶
def complex_objective_function(x):
return np.sum(np.sin(x) ** 2 + (x - 3) ** 2)
This function represents a non-linear objective function that is more complex due to the presence of np.sin(x) ** 2
combined with a quadratic term (x - 3) ** 2
.
It takes an input array x
, computes the sum of squared sine values and squared differences from 3, and returns the result. The goal is to find the values of x
that minimize this function.
Define a Remote Optimization Function¶
@ray.remote
def optimize_remote(initial_guess):
start_time = time.time()
result = minimize(complex_objective_function, initial_guess, method='BFGS', options={'maxiter': 500})
end_time = time.time()
time_taken = end_time - start_time
return result.x, result.fun, time_taken
The @ray.remote
decorator turns the optimize_remote
function into a Ray remote function, which can be executed on different Ray workers.
initial_guess
: This is the starting point for the optimization process.
minimize
: This function from SciPy attempts to find the minimum of complex_objective_function
using the BFGS algorithm.
method='BFGS'
: Specifies the optimization algorithm.options={'maxiter': 500}
: Limits the optimization process to a maximum of 500 iterations to prevent excessive computation.
Timing: The time
module records the time taken for each optimization run, which is returned along with the optimized parameters and the objective function value at the minimum.
Main Function Setup¶
if __name__ == "__main__":
num_workers = 4
dimension = 100
initial_guesses = [np.random.randn(dimension) for _ in range(num_workers)]
num_workers
: Specifies the number of parallel tasks or workers, in this case, 4.
dimension
: Sets the dimensionality of the optimization problem to 100, making the problem more computationally expensive.
initial_guesses
: Creates 4 random initial guesses, each being a vector of length 100. These guesses are the starting points for each optimization task.
Distributed Execution with Ray¶
ray.init()
print("Running optimization remotely (with Ray)...")
overall_start_time_remote = time.time()
futures = [optimize_remote.remote(initial_guess) for initial_guess in initial_guesses]
remote_results = ray.get(futures)
overall_end_time_remote = time.time()
overall_time_remote = overall_end_time_remote - overall_start_time_remote
ray.init()
: Initializes Ray, enabling the script to distribute tasks to different workers.
Start time: Records the time when the distributed execution starts for later benchmarking.
futures
: A list of Ray remote tasks, each initialized with a different random starting point. Using optimize_remote.remote()
submits each optimization task to Ray.
ray.get(futures)
: Collects the results of all the remote tasks. This blocks until all the optimizations are complete, returning the results.
End time: Calculates the total time taken for the distributed optimization.
Print the Results¶
for i, (optimized_params, final_value, time_taken) in enumerate(remote_results):
print(f"Ray Worker {i + 1} optimized parameters: {optimized_params[:5]} ... (truncated)")
print(f"Ray Worker {i + 1} final objective value: {final_value}")
print(f"Ray Worker {i + 1} time taken: {time_taken:.4f} seconds\n")
print(f"Overall distributed optimization time with Ray: {overall_time_remote:.4f} seconds\n")
Loops through the results from each Ray worker and prints:
- The first few optimized parameters (truncated for brevity).
- The final objective value, which indicates how close the optimization got to the true minimum.
- The time taken for each optimization process.
Overall Time: Prints the total time taken to perform all the distributed optimizations.
Shut Down Ray¶
ray.shutdown()
Cleans up the Ray processes, closing any resources associated with the Ray runtime.
Job Submission Code¶
Download the source code file "batch_job.py" and open it in your favorite IDE such as VS Code to review it. As you can see from the code snippet below, we will be using Ray's Job Submission Client to submit a job to the remote Ray endpoint.
from ray.job_submission import JobSubmissionClient
import ray
import urllib3
# Suppress the warning about unverified HTTPS requests
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Ray client
client = JobSubmissionClient(
"https://<URL> for Ray Endpoint>",
headers={"Authorization": "Basic <Base64 Encoded Credentials>"},
verify=False # Disable SSL verification
)
# Submit job
client.submit_job(entrypoint="python scipy_test.py", runtime_env={"working_dir": "./"})
Now, update the authorization credentials with the base64 encoded credentials for your Ray endpoint. You can use the following command to perform the encoding.
echo -n 'admin:PASSWORD' | base64
Submit Job¶
In order to submit the job to your remote Ray endpoint,
- First, in your web browser, access the Ray Dashboard's URL and keep it open. We will monitor the status and progress of the submitted job here.
- Now, open Terminal and enter the following command
python3 ./batch_job.py
This will submit the job to the configured Ray endpoint and you can review progress and the results on the Ray Dashboard. Once the Ray endpoint receives the job, it will be pending for a few seconds.
Depending on the resources you have available on your Ray Service, you should see something like the following:
Ray Worker 1 optimized parameters: [2.9998 3.0001 2.9999 2.9997 3.0000] ... (truncated)
Ray Worker 1 final objective value: 0.00001234
Ray Worker 1 time taken: 3.4567 seconds
Ray Worker 2 optimized parameters: [3.0000 3.0000 3.0000 3.0000 3.0000] ... (truncated)
Ray Worker 2 final objective value: 0.00000000
Ray Worker 2 time taken: 3.5678 seconds
Ray Worker 3 optimized parameters: [2.9999 3.0001 2.9998 2.9997 3.0002] ... (truncated)
Ray Worker 3 final objective value: 0.00000890
Ray Worker 3 time taken: 3.6789 seconds
Ray Worker 4 optimized parameters: [2.9997 3.0001 2.9996 3.0000 3.0002] ... (truncated)
Ray Worker 4 final objective value: 0.00001023
Ray Worker 4 time taken: 3.7890 seconds
Overall distributed optimization time with Ray: 3.9876 seconds