Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion contributing/samples/gepa/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from tau_bench.types import EnvRunResult
from tau_bench.types import RunConfig
import tau_bench_agent as tau_bench_agent_lib

import utils


Expand Down
1 change: 0 additions & 1 deletion contributing/samples/gepa/run_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from absl import flags
import experiment
from google.genai import types

import utils

_OUTPUT_DIR = flags.DEFINE_string(
Expand Down
114 changes: 114 additions & 0 deletions demo_parallel_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
Performance demonstration for parallel LLM-as-judge evaluation.
This script demonstrates the performance improvement from parallelizing
LLM evaluation calls using asyncio.gather().
"""

import asyncio
import time
from typing import Optional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Optional type is imported from typing but is not used within this file. It's a good practice to remove unused imports to maintain code cleanliness.


from google.genai import types as genai_types


# Simulated LLM call with artificial delay
async def mock_llm_call(delay: float = 0.5):
"""Simulates an LLM API call with specified delay."""
await asyncio.sleep(delay)
return genai_types.Content(
parts=[genai_types.Part(text="Mock LLM response")],
role="model",
)


async def serial_evaluation(num_invocations: int, num_samples: int, delay: float):
"""Simulates the OLD serial evaluation approach."""
results = []
for i in range(num_invocations):
invocation_samples = []
for j in range(num_samples):
response = await mock_llm_call(delay)
invocation_samples.append(response)
results.append(invocation_samples)
return results


async def parallel_evaluation(num_invocations: int, num_samples: int, delay: float):
"""Simulates the NEW parallel evaluation approach."""
tasks = []
invocation_indices = []

# Create all N×M tasks
for i in range(num_invocations):
for j in range(num_samples):
tasks.append(mock_llm_call(delay))
invocation_indices.append(i)

# Execute in parallel
all_results = await asyncio.gather(*tasks)

# Group by invocation
results_by_invocation = {}
for idx, result in zip(invocation_indices, all_results):
if idx not in results_by_invocation:
results_by_invocation[idx] = []
results_by_invocation[idx].append(result)
Comment on lines +52 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for grouping results by invocation index can be made more concise by using dict.setdefault(). This avoids the explicit check for the key's existence and is a common Python idiom for this pattern, improving readability.

    results_by_invocation = {}
    for idx, result in zip(invocation_indices, all_results):
        results_by_invocation.setdefault(idx, []).append(result)


return [results_by_invocation[i] for i in sorted(results_by_invocation.keys())]


async def main():
"""Run performance comparison."""
num_invocations = 5
num_samples = 2
delay = 0.5 # 500ms per call

print("=" * 60)
print("LLM-as-Judge Parallel Evaluation Performance Test")
print("=" * 60)
print(f"Configuration:")
print(f" - Invocations: {num_invocations}")
print(f" - Samples per invocation: {num_samples}")
print(f" - Total LLM calls: {num_invocations * num_samples}")
print(f" - Simulated delay per call: {delay}s")
print()

# Test serial approach
print("Testing SERIAL approach (old)...")
start_time = time.time()
serial_results = await serial_evaluation(num_invocations, num_samples, delay)
serial_time = time.time() - start_time
print(f"✓ Completed in {serial_time:.2f}s")
print()

# Test parallel approach
print("Testing PARALLEL approach (new)...")
start_time = time.time()
parallel_results = await parallel_evaluation(num_invocations, num_samples, delay)
parallel_time = time.time() - start_time
print(f"✓ Completed in {parallel_time:.2f}s")
print()

# Calculate speedup
speedup = serial_time / parallel_time
time_saved = serial_time - parallel_time

print("=" * 60)
print("RESULTS")
print("=" * 60)
print(f"Serial time: {serial_time:.2f}s")
print(f"Parallel time: {parallel_time:.2f}s")
print(f"Speedup: {speedup:.2f}x faster")
print(f"Time saved: {time_saved:.2f}s ({time_saved/serial_time*100:.1f}%)")
print("=" * 60)

# Verify results are the same
assert len(serial_results) == len(parallel_results)
for i in range(len(serial_results)):
assert len(serial_results[i]) == len(parallel_results[i])
print("✓ Results verified: both approaches produce same output structure")


if __name__ == "__main__":
asyncio.run(main())
103 changes: 74 additions & 29 deletions src/google/adk/evaluation/llm_as_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# limitations under the License.

from __future__ import annotations

from abc import abstractmethod
import asyncio
from typing import Optional

from google.genai import types as genai_types
Expand All @@ -33,7 +33,7 @@
from .eval_case import Invocation
from .eval_metrics import BaseCriterion
from .eval_metrics import EvalMetric
from .eval_metrics import RubricScore
from .eval_rubrics import RubricScore
from .evaluator import EvaluationResult
from .evaluator import Evaluator
from .evaluator import PerInvocationResult
Expand Down Expand Up @@ -114,6 +114,46 @@ def aggregate_invocation_results(
) -> EvaluationResult:
"""Aggregates the per invocation results to get the overall score."""

async def _evaluate_single_sample(
self,
llm_request: LlmRequest,
actual: Invocation,
expected: Optional[Invocation],
) -> PerInvocationResult:
"""Evaluates a single sample for an invocation.

Args:
llm_request: The LLM request to execute.
actual: The actual invocation to evaluate.
expected: The expected invocation (optional).

Returns:
A PerInvocationResult containing the evaluation score and status.
"""
async with Aclosing(
self._judge_model.generate_content_async(llm_request)
) as agen:
async for llm_response in agen:
# Non-streaming call, so there is only one response content.
auto_rater_score = self.convert_auto_rater_response_to_score(
llm_response
)
return PerInvocationResult(
actual_invocation=actual,
expected_invocation=expected,
score=auto_rater_score.score,
eval_status=get_eval_status(
auto_rater_score.score, self._eval_metric.threshold
),
rubric_scores=auto_rater_score.rubric_scores,
)
# This should not be reached for non-streaming calls, but added for safety
return PerInvocationResult(
actual_invocation=actual,
expected_invocation=expected,
eval_status=get_eval_status(None, self._eval_metric.threshold),
)
Comment on lines +150 to +155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While it's good to have a safety net for cases where the LLM judge doesn't return a response, this is an unexpected event for non-streaming calls. It would be beneficial to log a warning when this code path is executed. This will help in debugging potential issues with the LLM or the request setup, which might otherwise go unnoticed.

Note: You will need to add import logging at the top of the file.

    # This should not be reached for non-streaming calls, but added for safety
    logging.warning(
        "LLM judge did not yield a response for a sample evaluation."
    )
    return PerInvocationResult(
        actual_invocation=actual,
        expected_invocation=expected,
        eval_status=get_eval_status(None, self._eval_metric.threshold),
    )


@override
async def evaluate_invocations(
self,
Expand All @@ -132,8 +172,13 @@ async def evaluate_invocations(
else expected_invocations
)

per_invocation_results = []
for actual, expected in zip(actual_invocations, expected_invocations):
# Build all LLM evaluation tasks for parallel execution
tasks = []
invocation_indices = [] # Track which invocation each task belongs to

for invocation_idx, (actual, expected) in enumerate(
zip(actual_invocations, expected_invocations)
):
auto_rater_prompt = self.format_auto_rater_prompt(actual, expected)
llm_request = LlmRequest(
model=self._judge_model_options.judge_model,
Expand All @@ -148,32 +193,32 @@ async def evaluate_invocations(
)
add_default_retry_options_if_not_present(llm_request)
num_samples = self._judge_model_options.num_samples
invocation_result_samples = []

# Create tasks for all samples of this invocation
for _ in range(num_samples):
async with Aclosing(
self._judge_model.generate_content_async(llm_request)
) as agen:
async for llm_response in agen:
# Non-streaming call, so there is only one response content.
auto_rater_score = self.convert_auto_rater_response_to_score(
llm_response
)
invocation_result_samples.append(
PerInvocationResult(
actual_invocation=actual,
expected_invocation=expected,
score=auto_rater_score.score,
eval_status=get_eval_status(
auto_rater_score.score, self._eval_metric.threshold
),
rubric_scores=auto_rater_score.rubric_scores,
)
)
if not invocation_result_samples:
continue
per_invocation_results.append(
self.aggregate_per_invocation_samples(invocation_result_samples)
)
tasks.append(
self._evaluate_single_sample(llm_request, actual, expected)
)
invocation_indices.append(invocation_idx)

# Execute all tasks in parallel
all_results = await asyncio.gather(*tasks)

# Group results by invocation
results_by_invocation = {}
for invocation_idx, result in zip(invocation_indices, all_results):
if invocation_idx not in results_by_invocation:
results_by_invocation[invocation_idx] = []
results_by_invocation[invocation_idx].append(result)
Comment on lines +208 to +212
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for grouping results by invocation can be simplified using dict.setdefault(). This makes the code more concise and is a common Python idiom for this pattern.

    results_by_invocation = {}
    for invocation_idx, result in zip(invocation_indices, all_results):
      results_by_invocation.setdefault(invocation_idx, []).append(result)


# Aggregate samples for each invocation
per_invocation_results = []
for invocation_idx in sorted(results_by_invocation.keys()):
invocation_result_samples = results_by_invocation[invocation_idx]
if invocation_result_samples:
per_invocation_results.append(
self.aggregate_per_invocation_samples(invocation_result_samples)
)

if per_invocation_results:
return self.aggregate_invocation_results(per_invocation_results)
Expand Down