-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat: parallelize LLM-as-judge evaluation using asyncio.gather() #3960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
be36faa
935f85d
c655474
fcf51de
59be1d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| """ | ||
| Performance demonstration for parallel LLM-as-judge evaluation. | ||
| This script demonstrates the performance improvement from parallelizing | ||
| LLM evaluation calls using asyncio.gather(). | ||
| """ | ||
|
|
||
| import asyncio | ||
| import time | ||
| from typing import Optional | ||
|
|
||
| from google.genai import types as genai_types | ||
|
|
||
|
|
||
| # Simulated LLM call with artificial delay | ||
| async def mock_llm_call(delay: float = 0.5): | ||
| """Simulates an LLM API call with specified delay.""" | ||
| await asyncio.sleep(delay) | ||
| return genai_types.Content( | ||
| parts=[genai_types.Part(text="Mock LLM response")], | ||
| role="model", | ||
| ) | ||
|
|
||
|
|
||
| async def serial_evaluation(num_invocations: int, num_samples: int, delay: float): | ||
| """Simulates the OLD serial evaluation approach.""" | ||
| results = [] | ||
| for i in range(num_invocations): | ||
| invocation_samples = [] | ||
| for j in range(num_samples): | ||
| response = await mock_llm_call(delay) | ||
| invocation_samples.append(response) | ||
| results.append(invocation_samples) | ||
| return results | ||
|
|
||
|
|
||
| async def parallel_evaluation(num_invocations: int, num_samples: int, delay: float): | ||
| """Simulates the NEW parallel evaluation approach.""" | ||
| tasks = [] | ||
| invocation_indices = [] | ||
|
|
||
| # Create all N×M tasks | ||
| for i in range(num_invocations): | ||
| for j in range(num_samples): | ||
| tasks.append(mock_llm_call(delay)) | ||
| invocation_indices.append(i) | ||
|
|
||
| # Execute in parallel | ||
| all_results = await asyncio.gather(*tasks) | ||
|
|
||
| # Group by invocation | ||
| results_by_invocation = {} | ||
| for idx, result in zip(invocation_indices, all_results): | ||
| if idx not in results_by_invocation: | ||
| results_by_invocation[idx] = [] | ||
| results_by_invocation[idx].append(result) | ||
|
Comment on lines
+52
to
+56
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for grouping results by invocation index can be made more concise by using results_by_invocation = {}
for idx, result in zip(invocation_indices, all_results):
results_by_invocation.setdefault(idx, []).append(result) |
||
|
|
||
| return [results_by_invocation[i] for i in sorted(results_by_invocation.keys())] | ||
|
|
||
|
|
||
| async def main(): | ||
| """Run performance comparison.""" | ||
| num_invocations = 5 | ||
| num_samples = 2 | ||
| delay = 0.5 # 500ms per call | ||
|
|
||
| print("=" * 60) | ||
| print("LLM-as-Judge Parallel Evaluation Performance Test") | ||
| print("=" * 60) | ||
| print(f"Configuration:") | ||
| print(f" - Invocations: {num_invocations}") | ||
| print(f" - Samples per invocation: {num_samples}") | ||
| print(f" - Total LLM calls: {num_invocations * num_samples}") | ||
| print(f" - Simulated delay per call: {delay}s") | ||
| print() | ||
|
|
||
| # Test serial approach | ||
| print("Testing SERIAL approach (old)...") | ||
| start_time = time.time() | ||
| serial_results = await serial_evaluation(num_invocations, num_samples, delay) | ||
| serial_time = time.time() - start_time | ||
| print(f"✓ Completed in {serial_time:.2f}s") | ||
| print() | ||
|
|
||
| # Test parallel approach | ||
| print("Testing PARALLEL approach (new)...") | ||
| start_time = time.time() | ||
| parallel_results = await parallel_evaluation(num_invocations, num_samples, delay) | ||
| parallel_time = time.time() - start_time | ||
| print(f"✓ Completed in {parallel_time:.2f}s") | ||
| print() | ||
|
|
||
| # Calculate speedup | ||
| speedup = serial_time / parallel_time | ||
| time_saved = serial_time - parallel_time | ||
|
|
||
| print("=" * 60) | ||
| print("RESULTS") | ||
| print("=" * 60) | ||
| print(f"Serial time: {serial_time:.2f}s") | ||
| print(f"Parallel time: {parallel_time:.2f}s") | ||
| print(f"Speedup: {speedup:.2f}x faster") | ||
| print(f"Time saved: {time_saved:.2f}s ({time_saved/serial_time*100:.1f}%)") | ||
| print("=" * 60) | ||
|
|
||
| # Verify results are the same | ||
| assert len(serial_results) == len(parallel_results) | ||
| for i in range(len(serial_results)): | ||
| assert len(serial_results[i]) == len(parallel_results[i]) | ||
| print("✓ Results verified: both approaches produce same output structure") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,8 +13,8 @@ | |
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from abc import abstractmethod | ||
| import asyncio | ||
| from typing import Optional | ||
|
|
||
| from google.genai import types as genai_types | ||
|
|
@@ -33,7 +33,7 @@ | |
| from .eval_case import Invocation | ||
| from .eval_metrics import BaseCriterion | ||
| from .eval_metrics import EvalMetric | ||
| from .eval_metrics import RubricScore | ||
| from .eval_rubrics import RubricScore | ||
| from .evaluator import EvaluationResult | ||
| from .evaluator import Evaluator | ||
| from .evaluator import PerInvocationResult | ||
|
|
@@ -114,6 +114,46 @@ def aggregate_invocation_results( | |
| ) -> EvaluationResult: | ||
| """Aggregates the per invocation results to get the overall score.""" | ||
|
|
||
| async def _evaluate_single_sample( | ||
| self, | ||
| llm_request: LlmRequest, | ||
| actual: Invocation, | ||
| expected: Optional[Invocation], | ||
| ) -> PerInvocationResult: | ||
| """Evaluates a single sample for an invocation. | ||
|
|
||
| Args: | ||
| llm_request: The LLM request to execute. | ||
| actual: The actual invocation to evaluate. | ||
| expected: The expected invocation (optional). | ||
|
|
||
| Returns: | ||
| A PerInvocationResult containing the evaluation score and status. | ||
| """ | ||
| async with Aclosing( | ||
| self._judge_model.generate_content_async(llm_request) | ||
| ) as agen: | ||
| async for llm_response in agen: | ||
| # Non-streaming call, so there is only one response content. | ||
| auto_rater_score = self.convert_auto_rater_response_to_score( | ||
| llm_response | ||
| ) | ||
| return PerInvocationResult( | ||
| actual_invocation=actual, | ||
| expected_invocation=expected, | ||
| score=auto_rater_score.score, | ||
| eval_status=get_eval_status( | ||
| auto_rater_score.score, self._eval_metric.threshold | ||
| ), | ||
| rubric_scores=auto_rater_score.rubric_scores, | ||
| ) | ||
| # This should not be reached for non-streaming calls, but added for safety | ||
| return PerInvocationResult( | ||
| actual_invocation=actual, | ||
| expected_invocation=expected, | ||
| eval_status=get_eval_status(None, self._eval_metric.threshold), | ||
| ) | ||
|
Comment on lines
+150
to
+155
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While it's good to have a safety net for cases where the LLM judge doesn't return a response, this is an unexpected event for non-streaming calls. It would be beneficial to log a warning when this code path is executed. This will help in debugging potential issues with the LLM or the request setup, which might otherwise go unnoticed. Note: You will need to add # This should not be reached for non-streaming calls, but added for safety
logging.warning(
"LLM judge did not yield a response for a sample evaluation."
)
return PerInvocationResult(
actual_invocation=actual,
expected_invocation=expected,
eval_status=get_eval_status(None, self._eval_metric.threshold),
) |
||
|
|
||
| @override | ||
| async def evaluate_invocations( | ||
| self, | ||
|
|
@@ -132,8 +172,13 @@ async def evaluate_invocations( | |
| else expected_invocations | ||
| ) | ||
|
|
||
| per_invocation_results = [] | ||
| for actual, expected in zip(actual_invocations, expected_invocations): | ||
| # Build all LLM evaluation tasks for parallel execution | ||
| tasks = [] | ||
| invocation_indices = [] # Track which invocation each task belongs to | ||
|
|
||
| for invocation_idx, (actual, expected) in enumerate( | ||
| zip(actual_invocations, expected_invocations) | ||
| ): | ||
| auto_rater_prompt = self.format_auto_rater_prompt(actual, expected) | ||
| llm_request = LlmRequest( | ||
| model=self._judge_model_options.judge_model, | ||
|
|
@@ -148,32 +193,32 @@ async def evaluate_invocations( | |
| ) | ||
| add_default_retry_options_if_not_present(llm_request) | ||
| num_samples = self._judge_model_options.num_samples | ||
| invocation_result_samples = [] | ||
|
|
||
| # Create tasks for all samples of this invocation | ||
| for _ in range(num_samples): | ||
| async with Aclosing( | ||
| self._judge_model.generate_content_async(llm_request) | ||
| ) as agen: | ||
| async for llm_response in agen: | ||
| # Non-streaming call, so there is only one response content. | ||
| auto_rater_score = self.convert_auto_rater_response_to_score( | ||
| llm_response | ||
| ) | ||
| invocation_result_samples.append( | ||
| PerInvocationResult( | ||
| actual_invocation=actual, | ||
| expected_invocation=expected, | ||
| score=auto_rater_score.score, | ||
| eval_status=get_eval_status( | ||
| auto_rater_score.score, self._eval_metric.threshold | ||
| ), | ||
| rubric_scores=auto_rater_score.rubric_scores, | ||
| ) | ||
| ) | ||
| if not invocation_result_samples: | ||
| continue | ||
| per_invocation_results.append( | ||
| self.aggregate_per_invocation_samples(invocation_result_samples) | ||
| ) | ||
| tasks.append( | ||
| self._evaluate_single_sample(llm_request, actual, expected) | ||
| ) | ||
| invocation_indices.append(invocation_idx) | ||
|
|
||
| # Execute all tasks in parallel | ||
| all_results = await asyncio.gather(*tasks) | ||
|
|
||
| # Group results by invocation | ||
| results_by_invocation = {} | ||
| for invocation_idx, result in zip(invocation_indices, all_results): | ||
| if invocation_idx not in results_by_invocation: | ||
| results_by_invocation[invocation_idx] = [] | ||
| results_by_invocation[invocation_idx].append(result) | ||
|
Comment on lines
+208
to
+212
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic for grouping results by invocation can be simplified using results_by_invocation = {}
for invocation_idx, result in zip(invocation_indices, all_results):
results_by_invocation.setdefault(invocation_idx, []).append(result) |
||
|
|
||
| # Aggregate samples for each invocation | ||
| per_invocation_results = [] | ||
| for invocation_idx in sorted(results_by_invocation.keys()): | ||
| invocation_result_samples = results_by_invocation[invocation_idx] | ||
| if invocation_result_samples: | ||
| per_invocation_results.append( | ||
| self.aggregate_per_invocation_samples(invocation_result_samples) | ||
| ) | ||
|
|
||
| if per_invocation_results: | ||
| return self.aggregate_invocation_results(per_invocation_results) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Optionaltype is imported fromtypingbut is not used within this file. It's a good practice to remove unused imports to maintain code cleanliness.