diff --git a/contributing/samples/gepa/experiment.py b/contributing/samples/gepa/experiment.py index 2f5d03a772..f68b349d9c 100644 --- a/contributing/samples/gepa/experiment.py +++ b/contributing/samples/gepa/experiment.py @@ -43,7 +43,6 @@ from tau_bench.types import EnvRunResult from tau_bench.types import RunConfig import tau_bench_agent as tau_bench_agent_lib - import utils diff --git a/contributing/samples/gepa/run_experiment.py b/contributing/samples/gepa/run_experiment.py index cfd850b3a3..1bc4ee58c8 100644 --- a/contributing/samples/gepa/run_experiment.py +++ b/contributing/samples/gepa/run_experiment.py @@ -25,7 +25,6 @@ from absl import flags import experiment from google.genai import types - import utils _OUTPUT_DIR = flags.DEFINE_string( diff --git a/demo_parallel_performance.py b/demo_parallel_performance.py new file mode 100644 index 0000000000..7700a53386 --- /dev/null +++ b/demo_parallel_performance.py @@ -0,0 +1,114 @@ +""" +Performance demonstration for parallel LLM-as-judge evaluation. + +This script demonstrates the performance improvement from parallelizing +LLM evaluation calls using asyncio.gather(). +""" + +import asyncio +import time +from typing import Optional + +from google.genai import types as genai_types + + +# Simulated LLM call with artificial delay +async def mock_llm_call(delay: float = 0.5): + """Simulates an LLM API call with specified delay.""" + await asyncio.sleep(delay) + return genai_types.Content( + parts=[genai_types.Part(text="Mock LLM response")], + role="model", + ) + + +async def serial_evaluation(num_invocations: int, num_samples: int, delay: float): + """Simulates the OLD serial evaluation approach.""" + results = [] + for i in range(num_invocations): + invocation_samples = [] + for j in range(num_samples): + response = await mock_llm_call(delay) + invocation_samples.append(response) + results.append(invocation_samples) + return results + + +async def parallel_evaluation(num_invocations: int, num_samples: int, delay: float): + """Simulates the NEW parallel evaluation approach.""" + tasks = [] + invocation_indices = [] + + # Create all N×M tasks + for i in range(num_invocations): + for j in range(num_samples): + tasks.append(mock_llm_call(delay)) + invocation_indices.append(i) + + # Execute in parallel + all_results = await asyncio.gather(*tasks) + + # Group by invocation + results_by_invocation = {} + for idx, result in zip(invocation_indices, all_results): + if idx not in results_by_invocation: + results_by_invocation[idx] = [] + results_by_invocation[idx].append(result) + + return [results_by_invocation[i] for i in sorted(results_by_invocation.keys())] + + +async def main(): + """Run performance comparison.""" + num_invocations = 5 + num_samples = 2 + delay = 0.5 # 500ms per call + + print("=" * 60) + print("LLM-as-Judge Parallel Evaluation Performance Test") + print("=" * 60) + print(f"Configuration:") + print(f" - Invocations: {num_invocations}") + print(f" - Samples per invocation: {num_samples}") + print(f" - Total LLM calls: {num_invocations * num_samples}") + print(f" - Simulated delay per call: {delay}s") + print() + + # Test serial approach + print("Testing SERIAL approach (old)...") + start_time = time.time() + serial_results = await serial_evaluation(num_invocations, num_samples, delay) + serial_time = time.time() - start_time + print(f"✓ Completed in {serial_time:.2f}s") + print() + + # Test parallel approach + print("Testing PARALLEL approach (new)...") + start_time = time.time() + parallel_results = await parallel_evaluation(num_invocations, num_samples, delay) + parallel_time = time.time() - start_time + print(f"✓ Completed in {parallel_time:.2f}s") + print() + + # Calculate speedup + speedup = serial_time / parallel_time + time_saved = serial_time - parallel_time + + print("=" * 60) + print("RESULTS") + print("=" * 60) + print(f"Serial time: {serial_time:.2f}s") + print(f"Parallel time: {parallel_time:.2f}s") + print(f"Speedup: {speedup:.2f}x faster") + print(f"Time saved: {time_saved:.2f}s ({time_saved/serial_time*100:.1f}%)") + print("=" * 60) + + # Verify results are the same + assert len(serial_results) == len(parallel_results) + for i in range(len(serial_results)): + assert len(serial_results[i]) == len(parallel_results[i]) + print("✓ Results verified: both approaches produce same output structure") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/google/adk/evaluation/llm_as_judge.py b/src/google/adk/evaluation/llm_as_judge.py index 0f2d890139..d9005cca3d 100644 --- a/src/google/adk/evaluation/llm_as_judge.py +++ b/src/google/adk/evaluation/llm_as_judge.py @@ -13,8 +13,8 @@ # limitations under the License. from __future__ import annotations - from abc import abstractmethod +import asyncio from typing import Optional from google.genai import types as genai_types @@ -33,7 +33,7 @@ from .eval_case import Invocation from .eval_metrics import BaseCriterion from .eval_metrics import EvalMetric -from .eval_metrics import RubricScore +from .eval_rubrics import RubricScore from .evaluator import EvaluationResult from .evaluator import Evaluator from .evaluator import PerInvocationResult @@ -114,6 +114,46 @@ def aggregate_invocation_results( ) -> EvaluationResult: """Aggregates the per invocation results to get the overall score.""" + async def _evaluate_single_sample( + self, + llm_request: LlmRequest, + actual: Invocation, + expected: Optional[Invocation], + ) -> PerInvocationResult: + """Evaluates a single sample for an invocation. + + Args: + llm_request: The LLM request to execute. + actual: The actual invocation to evaluate. + expected: The expected invocation (optional). + + Returns: + A PerInvocationResult containing the evaluation score and status. + """ + async with Aclosing( + self._judge_model.generate_content_async(llm_request) + ) as agen: + async for llm_response in agen: + # Non-streaming call, so there is only one response content. + auto_rater_score = self.convert_auto_rater_response_to_score( + llm_response + ) + return PerInvocationResult( + actual_invocation=actual, + expected_invocation=expected, + score=auto_rater_score.score, + eval_status=get_eval_status( + auto_rater_score.score, self._eval_metric.threshold + ), + rubric_scores=auto_rater_score.rubric_scores, + ) + # This should not be reached for non-streaming calls, but added for safety + return PerInvocationResult( + actual_invocation=actual, + expected_invocation=expected, + eval_status=get_eval_status(None, self._eval_metric.threshold), + ) + @override async def evaluate_invocations( self, @@ -132,8 +172,13 @@ async def evaluate_invocations( else expected_invocations ) - per_invocation_results = [] - for actual, expected in zip(actual_invocations, expected_invocations): + # Build all LLM evaluation tasks for parallel execution + tasks = [] + invocation_indices = [] # Track which invocation each task belongs to + + for invocation_idx, (actual, expected) in enumerate( + zip(actual_invocations, expected_invocations) + ): auto_rater_prompt = self.format_auto_rater_prompt(actual, expected) llm_request = LlmRequest( model=self._judge_model_options.judge_model, @@ -148,32 +193,32 @@ async def evaluate_invocations( ) add_default_retry_options_if_not_present(llm_request) num_samples = self._judge_model_options.num_samples - invocation_result_samples = [] + + # Create tasks for all samples of this invocation for _ in range(num_samples): - async with Aclosing( - self._judge_model.generate_content_async(llm_request) - ) as agen: - async for llm_response in agen: - # Non-streaming call, so there is only one response content. - auto_rater_score = self.convert_auto_rater_response_to_score( - llm_response - ) - invocation_result_samples.append( - PerInvocationResult( - actual_invocation=actual, - expected_invocation=expected, - score=auto_rater_score.score, - eval_status=get_eval_status( - auto_rater_score.score, self._eval_metric.threshold - ), - rubric_scores=auto_rater_score.rubric_scores, - ) - ) - if not invocation_result_samples: - continue - per_invocation_results.append( - self.aggregate_per_invocation_samples(invocation_result_samples) - ) + tasks.append( + self._evaluate_single_sample(llm_request, actual, expected) + ) + invocation_indices.append(invocation_idx) + + # Execute all tasks in parallel + all_results = await asyncio.gather(*tasks) + + # Group results by invocation + results_by_invocation = {} + for invocation_idx, result in zip(invocation_indices, all_results): + if invocation_idx not in results_by_invocation: + results_by_invocation[invocation_idx] = [] + results_by_invocation[invocation_idx].append(result) + + # Aggregate samples for each invocation + per_invocation_results = [] + for invocation_idx in sorted(results_by_invocation.keys()): + invocation_result_samples = results_by_invocation[invocation_idx] + if invocation_result_samples: + per_invocation_results.append( + self.aggregate_per_invocation_samples(invocation_result_samples) + ) if per_invocation_results: return self.aggregate_invocation_results(per_invocation_results)