Skip to content

EvalUtils

EvalUtils extends Evaluations with convenience methods for working with YAML-based evaluation files — the format used by the CXAS Scrapi eval runner. It knows how to load conversations from a YAML file, validate them against a Pydantic schema, and convert them to pandas DataFrames for analysis or reporting.

Think of EvalUtils as the bridge between your local eval files and the CXAS API: load from disk, inspect or transform, then push or run via the inherited Evaluations methods.

Key Pydantic models you'll encounter:

  • Conversation — one test conversation with turns, expectations, tags, and session parameters.
  • Turn — a single round-trip with user text, agent response, and optional tool_calls.
  • Conversations — the top-level container, supporting common_session_parameters shared across all conversations.

Quick Example

from cxas_scrapi import EvalUtils

app_name = "projects/my-project/locations/us/apps/my-app-id"
eu = EvalUtils(app_name=app_name)

# Load a YAML eval file
conversations = eu.load_golden_evals_from_yaml("evals/billing_evals.yaml")
print(f"Loaded {len(conversations.conversations)} conversations")

# Convert to a DataFrame for analysis
df = eu.evals_to_dataframe(conversations)
print(df[["conversation", "turns", "expectations"]].head())

# Export evaluation results to a spreadsheet-ready format
results_df = eu.get_evaluation_results_dataframe()
results_df.to_csv("eval_results.csv", index=False)

Reference

EvalUtils

EvalUtils(app_name, env='PROD')

Bases: Evaluations

Utility class for processing and exporting CXAS Evaluation Results.

Initializes the EvalUtils class for processing Evaluation Results.

Parameters:

Name Type Description Default
app_name str

CXAS App ID (projects/{project}/locations/{location}/apps/{app}).

required
env str

Environment override (default: PROD).

'PROD'
Source code in src/cxas_scrapi/utils/eval_utils.py
def __init__(self, app_name: str, env: str = "PROD"):
    """Initializes the EvalUtils class for processing Evaluation Results.

    Args:
        app_name: CXAS App ID
            (projects/{project}/locations/{location}/apps/{app}).
        env: Environment override (default: PROD).
    """
    super().__init__(app_name=app_name, env=env)
    self.app_name = app_name
    self.tools_client = Tools(app_name=self.app_name, creds=self.creds)
    self.var_client = Variables(app_name=self.app_name, creds=self.creds)
    try:
        self.tool_map = self.tools_client.get_tools_map(reverse=True)
    except (AttributeError, KeyError, RuntimeError, ValueError) as e:
        logger.warning(
            "Failed to fetch tool map for %s: %s", self.app_name, e
        )
        self.tool_map = {}
    self.agents_client = Agents(app_name=self.app_name, creds=self.creds)
    try:
        self.agent_map = self.agents_client.get_agents_map(reverse=True)
    except (AttributeError, KeyError, RuntimeError, ValueError) as e:
        logger.warning(
            "Failed to fetch agent map for %s: %s", self.app_name, e
        )
    # Defer import to break circular dependency:
    # conversation_history -> latency_parser -> eval_utils
    # -> conversation_history
    from cxas_scrapi.core.conversation_history import (  # noqa: PLC0415
        ConversationHistory,
    )

    self.ch_client = ConversationHistory(
        app_name=self.app_name, creds=self.creds
    )
    self.eval_client = Evaluations(app_name=self.app_name, env=env)

parse_variables_input staticmethod

parse_variables_input(v)

Allows YAML to accept a list of strings OR a custom dictionary.

Source code in src/cxas_scrapi/utils/eval_utils.py
@staticmethod
def parse_variables_input(v: Any) -> Dict[str, Any]:
    """Allows YAML to accept a list of strings OR a custom dictionary."""
    if v is None:
        return {}
    if isinstance(v, str):
        try:
            return json.loads(v)
        except json.JSONDecodeError:
            return {}
    if isinstance(v, list):
        # Convert list of names to a dict flagged for fetching (None)
        return {str(item): None for item in v}
    if isinstance(v, dict):
        return v
    return {}

score_result_audio staticmethod

score_result_audio(result)

Score a single result using audio-correct method. In audio mode, taskCompleted is broken (always False). Use goalScore AND allExpectationsSatisfied instead.

Source code in src/cxas_scrapi/utils/eval_utils.py
@staticmethod
def score_result_audio(result) -> bool:
    """Score a single result using audio-correct method.
    In audio mode, taskCompleted is broken (always False).
    Use goalScore AND allExpectationsSatisfied instead.
    """
    res_dict = (
        type(result).to_dict(result)
        if not isinstance(result, dict)
        else result
    )
    sr = res_dict.get("scenario_result", {})
    goal = sr.get("user_goal_satisfaction_result", {}).get("score", 0)
    all_exp = sr.get("all_expectations_satisfied", False)
    return (goal == 1) and all_exp

evals_to_dataframe

evals_to_dataframe(results=None, eval_names=None)

Provides three simplified views of the evaluation data.

Returns:

Type Description
Dict[str, Any]

A dict with 'summary', 'failures', and 'trace' DataFrames.

Source code in src/cxas_scrapi/utils/eval_utils.py
def evals_to_dataframe(
    self,
    results: Optional[Union[List[Any], str]] = None,
    eval_names: Optional[Union[List[str], str]] = None,
) -> Dict[str, Any]:
    """Provides three simplified views of the evaluation data.

    Returns:
        A dict with 'summary', 'failures', and 'trace' DataFrames.
    """
    run_summaries, expectations, turns = self._parse_eval_results(
        results, eval_names
    )

    # Summary
    df_summary = pd.DataFrame(run_summaries)
    if not df_summary.empty:
        if "create_time" in df_summary.columns:
            df_summary["create_time"] = pd.to_datetime(
                df_summary["create_time"]
            )
        if "update_time" in df_summary.columns:
            df_summary["update_time"] = pd.to_datetime(
                df_summary["update_time"]
            )
        if "update_time" in df_summary.columns:
            df_summary = df_summary.sort_values(
                by="update_time", ascending=False
            ).reset_index(drop=True)

    # Failures
    failures = []
    for run_sum in run_summaries:
        if run_sum.get("execution_state") in (
            "ERROR",
            "ERRORED",
        ) or run_sum.get("evaluation_status") in ("ERROR", "ERRORED"):
            raw_err = run_sum.get(
                "error_message", "Unknown Agent Exception"
            )
            failures.append(
                {
                    "display_name": run_sum.get("display_name", "Unknown"),
                    "eval_result_id": run_sum.get("eval_result_id", ""),
                    "turn_index": None,
                    "failure_type": "System Engine Error",
                    "expected": "Run evaluation to completion",
                    "actual": f"Error: {raw_err}",
                    "score": None,
                }
            )

    # Metadata
    all_metadata = []

    for exp in expectations:
        if exp.get("not_met_count", 0) > 0:
            explanation = exp.get("explanation", "")
            actual_text = (
                f"(Not Met) {explanation}" if explanation else "(Not Met)"
            )
            failures.append(
                {
                    "display_name": exp["display_name"],
                    "eval_result_id": exp["eval_result_id"],
                    "turn_index": None,
                    "failure_type": "Expectation",
                    "expected": str(exp.get("expectation", "")),
                    "actual": actual_text,
                    "score": None,
                }
            )
        if exp.get("record_type") == "summary_expectation":
            not_met = exp.get("not_met_count", 0)
            met = exp.get("met_count", 0)
            outcome_str = "PASS" if not_met == 0 else "FAIL"

            all_metadata.append(
                {
                    "display_name": exp["display_name"],
                    "evaluation_run": exp.get("evaluation_run"),
                    "eval_result_id": exp["eval_result_id"],
                    "evaluation_status": exp.get("evaluation_status"),
                    "turn_index": None,
                    "type": "Custom Expectation",
                    "expected": exp["expectation"],
                    "actual": exp["explanation"],
                    "outcome": outcome_str,
                    "score": f"{met} / {met + not_met}",
                }
            )

    for turn in turns:
        outcomes_str = turn.get("expectation_outcomes", "[]")
        outcomes = []
        try:
            outcomes = json.loads(outcomes_str)
        except (json.JSONDecodeError, TypeError):
            pass

        def _get_exp_act(outcome_obj):
            e_text = ""
            a_text = "(None / Missed)"
            f_type = "Turn Expectation"
            e_dict = outcome_obj.get("expectation", {})

            if "agent_response" in e_dict:
                chunks = e_dict["agent_response"].get("chunks", [])
                e_text = "agent_response"
                if chunks:
                    e_text = chunks[0].get("text", "agent_response")
                f_type = "Semantic Similarity"
            elif "tool_call" in e_dict:
                e_text = e_dict["tool_call"].get(
                    "display_name",
                    e_dict["tool_call"].get("id", "tool_call"),
                )
                f_type = "Tool Call"
            elif "tool_response" in e_dict:
                e_text = e_dict["tool_response"].get(
                    "display_name", "tool_response"
                )
                f_type = "Tool Response"
            elif "agent_transfer" in e_dict:
                e_text = e_dict["agent_transfer"].get(
                    "display_name",
                    e_dict["agent_transfer"].get(
                        "target_agent", "agent_transfer"
                    ),
                )
                f_type = "Routing / Agent"

            if "observed_agent_response" in outcome_obj:
                chunks = outcome_obj["observed_agent_response"].get(
                    "chunks", []
                )
                a_text = chunks[0].get("text", "") if chunks else ""
            elif "observed_tool_call" in outcome_obj:
                a_text = outcome_obj["observed_tool_call"].get(
                    "display_name",
                    outcome_obj["observed_tool_call"].get("id", ""),
                )
            elif "observed_tool_response" in outcome_obj:
                a_text = outcome_obj["observed_tool_response"].get(
                    "display_name",
                    outcome_obj["observed_tool_response"].get("id", ""),
                )
            elif "observed_agent_transfer" in outcome_obj:
                a_text = outcome_obj["observed_agent_transfer"].get(
                    "display_name",
                    outcome_obj["observed_agent_transfer"].get(
                        "target_agent", ""
                    ),
                )

            return e_text, a_text, f_type

        sem_handled = False
        tool_handled = False

        # Process individual explicit expectation failures
        for outcome_obj in outcomes:
            e, a, f = _get_exp_act(outcome_obj)
            if f == "Tool Response":
                continue
            raw_outcome = outcome_obj.get("outcome")
            outcome_str = EvalUtils._map_outcome(raw_outcome)

            score_val = None
            if f == "Semantic Similarity":
                raw_score = turn.get("semantic_score")
                if isinstance(raw_score, (int, float)):
                    s_val = (
                        int(raw_score)
                        if raw_score == int(raw_score)
                        else raw_score
                    )
                    score_val = f"{s_val} / 4.0"
                else:
                    score_val = (
                        str(raw_score) if raw_score is not None else None
                    )
            elif f == "Tool Call":
                raw_score = turn.get("tool_invocation_score")
                if isinstance(raw_score, (int, float)):
                    score_val = f"{int(raw_score * 100)}%"
                else:
                    score_val = EvalUtils._map_outcome(raw_score)

            all_metadata.append(
                {
                    "display_name": turn["display_name"],
                    "eval_result_id": turn["eval_result_id"],
                    "evaluation_run": turn.get("evaluation_run"),
                    "evaluation_status": turn.get("evaluation_status"),
                    "turn_index": turn["turn_index"],
                    "type": f,
                    "expected": e,
                    "actual": a,
                    "outcome": outcome_str,
                    "score": score_val,
                }
            )

            if raw_outcome == 2 or outcome_str == "FAIL":
                if f == "Semantic Similarity":
                    sem_handled = True
                elif f == "Tool Call":
                    raw_score = turn.get("tool_invocation_score")
                    if EvalUtils._map_outcome(raw_score) == "FAIL":
                        tool_handled = True

                failures.append(
                    {
                        "display_name": turn["display_name"],
                        "eval_result_id": turn["eval_result_id"],
                        "turn_index": turn["turn_index"],
                        "failure_type": f,
                        "expected": e,
                        "actual": a,
                        "score": score_val,
                    }
                )

        # Process overall semantic failure if not caught
        sem_outcome = turn.get("semantic_outcome")
        if sem_outcome == "FAIL" and not sem_handled:
            e_text, a_text = "", ""
            for outcome_obj in outcomes:
                if "agent_response" in outcome_obj.get("expectation", {}):
                    e_text, a_text, _ = _get_exp_act(outcome_obj)
                    break
            raw_score = turn.get("semantic_score")
            if isinstance(raw_score, (int, float)):
                s_val = (
                    int(raw_score)
                    if raw_score == int(raw_score)
                    else raw_score
                )
                score_val = f"{s_val} / 4.0"
            else:
                score_val = (
                    str(raw_score) if raw_score is not None else None
                )

            failures.append(
                {
                    "display_name": turn["display_name"],
                    "eval_result_id": turn["eval_result_id"],
                    "turn_index": turn["turn_index"],
                    "failure_type": "Semantic Similarity",
                    "expected": e_text,
                    "actual": a_text,
                    "score": score_val,
                }
            )

        # Process overall tool failure if not caught
        tool_outcome = turn.get("tool_invocation_score")
        if tool_outcome == "FAIL" and not tool_handled:
            e_text, a_text = "", ""
            for outcome_obj in outcomes:
                if "tool_call" in outcome_obj.get("expectation", {}):
                    e_text, a_text, _ = _get_exp_act(outcome_obj)
                    break
            failures.append(
                {
                    "display_name": turn["display_name"],
                    "eval_result_id": turn["eval_result_id"],
                    "turn_index": turn["turn_index"],
                    "failure_type": "Tool Call",
                    "expected": e_text,
                    "actual": a_text,
                    "score": "FAIL",
                }
            )

    df_failures = pd.DataFrame(failures)
    if not df_failures.empty:
        df_failures = df_failures.sort_values(
            by=["eval_result_id", "turn_index"]
        ).reset_index(drop=True)

    df_traces = pd.DataFrame(turns)

    df_metadata = pd.DataFrame(all_metadata)

    return {
        "summary": df_summary,
        "failures": df_failures,
        "trace": df_traces,
        "metadata": df_metadata,
    }

get_latency_metrics_dfs

get_latency_metrics_dfs(results=None, eval_names=None, app_name=None)

Generates latency metrics DataFrames from results and traces.

Parameters:

Name Type Description Default
results Optional[List[Any]]

An optional list of Eval Result List payload chunks.

None
eval_names Optional[List[str]]

Alternatively, an optional list of string Display Names / Names of Evals.

None
app_name Optional[str]

Optional override if retrieving Conversation traces dynamically.

None
Source code in src/cxas_scrapi/utils/eval_utils.py
def get_latency_metrics_dfs(
    self,
    results: Optional[List[Any]] = None,
    eval_names: Optional[List[str]] = None,
    app_name: Optional[str] = None,
) -> Dict[str, pd.DataFrame]:
    """Generates latency metrics DataFrames from results and traces.

    Args:
        results: An optional list of Eval Result List payload chunks.
        eval_names: Alternatively, an optional list of string Display
            Names / Names of Evals.
        app_name: Optional override if retrieving Conversation traces
            dynamically.
    """
    if not results:
        if not getattr(self, "app_name", None) and not app_name:
            raise ValueError(
                "app_name must be set to look up evaluations by name."
            )
        results = []
        for name in eval_names or []:
            # Retrieve all results for the provided display name
            results.extend(self.list_evaluation_results(name))

    if not results:
        return {
            "eval_summary": pd.DataFrame(),
            "eval_details": pd.DataFrame(),
            "tool_summary": pd.DataFrame(),
            "tool_details": pd.DataFrame(),
            "callback_summary": pd.DataFrame(),
            "callback_details": pd.DataFrame(),
            "guardrail_summary": pd.DataFrame(),
            "guardrail_details": pd.DataFrame(),
        }

    conv_ids = set()
    for res_obj in results:
        res_dict = (
            type(res_obj).to_dict(res_obj)
            if not isinstance(res_obj, dict)
            else res_obj
        )
        turns = res_dict.get("golden_result", {}).get(
            "turn_replay_results", []
        )
        for t in turns:
            if t.get("conversation"):
                conv_ids.add(t.get("conversation"))

    target_app = app_name or getattr(self, "app_name", None)
    traces = {}
    if target_app and conv_ids:
        if target_app == getattr(self, "app_name", None):
            ch_getter = self.ch_client.get_conversation
        else:
            # Defer import to break circular dependency:
            # conversation_history -> latency_parser -> eval_utils
            # -> conversation_history
            from cxas_scrapi.core.conversation_history import (  # noqa: PLC0415
                ConversationHistory,
            )

            ch_client = ConversationHistory(
                app_name=target_app, creds=self.creds
            )
            ch_getter = ch_client.get_conversation
        traces = LatencyParser.fetch_conversation_traces(
            list(conv_ids), ch_getter
        )

    eval_details_rows = []
    eval_summary_agg = []
    tool_details_rows = []
    callback_details_rows = []
    guardrail_details_rows = []
    llm_details_rows = []

    for res_obj in results:
        res_dict = (
            type(res_obj).to_dict(res_obj)
            if not isinstance(res_obj, dict)
            else res_obj
        )
        result_name = res_dict.get("name", "")
        tokens = result_name.split("/")
        eval_result_id = tokens[-1] if tokens else result_name
        eval_name = "/".join(tokens[:-2]) if len(tokens) >= 2 else ""

        # Get display name
        display_name = eval_name
        evals_map = getattr(self, "_get_or_load_evals_map", lambda x: {})(
            getattr(self, "app_name", None)
        )
        if evals_map:
            for lookup_name, full_path in evals_map.get(
                "goldens", {}
            ).items():
                if full_path == eval_name:
                    display_name = lookup_name
                    break
            for lookup_name, full_path in evals_map.get(
                "scenarios", {}
            ).items():
                if full_path == eval_name:
                    display_name = lookup_name
                    break

        golden = res_dict.get("golden_result", {})
        turns = golden.get("turn_replay_results", [])

        run_total_turn_latencies = []
        run_tool_latencies = []
        run_llm_latencies = []
        run_guardrail_latencies = []
        run_callback_latencies = []

        for turn_idx, t in enumerate(turns):
            total_turn_ms = LatencyParser._parse_duration_ms(
                t.get("turn_latency", "0s")
            )

            # Turn specific items
            tool_calls = t.get("tool_call_latencies", [])
            turn_tool_ms = sum(
                LatencyParser._parse_duration_ms(
                    tc.get("execution_latency", "0s")
                )
                for tc in tool_calls
            )
            tool_names = ", ".join(
                [
                    tc.get("display_name", tc.get("tool", ""))
                    for tc in tool_calls
                ]
            )

            turn_llm_ms = 0.0
            turn_guardrail_ms = 0.0
            turn_callback_ms = 0.0

            cid = t.get("conversation")
            if cid and cid in traces:
                conv = traces[cid]
                conv_turns = conv.get("turns", [])
                # Match by index assuming evaluating linearly
                if turn_idx < len(conv_turns):
                    trace_turn = conv_turns[turn_idx]
                    root = trace_turn.get("root_span", {})
                    if root:
                        sums = LatencyParser._process_spans(
                            [root],
                            eval_result_id,
                            turn_idx + 1,
                            tool_details_rows,
                            callback_details_rows,
                            guardrail_details_rows,
                            llm_details_rows,
                            context_key="eval_result_id",
                        )
                        turn_llm_ms = sums["LLM"]
                        turn_guardrail_ms = sums["Guardrail"]
                        turn_callback_ms = sums["Callback"]

            run_total_turn_latencies.append(total_turn_ms)
            if turn_tool_ms > 0:
                run_tool_latencies.append(turn_tool_ms)
            if turn_llm_ms > 0:
                run_llm_latencies.append(turn_llm_ms)
            if turn_guardrail_ms > 0:
                run_guardrail_latencies.append(turn_guardrail_ms)
            if turn_callback_ms > 0:
                run_callback_latencies.append(turn_callback_ms)

            eval_details_rows.append(
                {
                    "display_name": display_name,
                    "eval_result_id": eval_result_id,
                    "turn_index": turn_idx + 1,
                    "Total Turn Latency (ms)": int(total_turn_ms),
                    "Tool Call Latencies (ms)": int(turn_tool_ms),
                    "LLM Latencies (ms)": int(turn_llm_ms),
                    "Guardrail Latencies (ms)": int(turn_guardrail_ms),
                    "Callback Latencies (ms)": int(turn_callback_ms),
                    "tool_names": tool_names,
                }
            )

        # Compute run summary aggregations
        def _aggregate(arr):
            if not arr:
                return {"Average": 0, "p50": 0, "p90": 0, "p99": 0}
            ser = pd.Series(arr)
            return {
                "Average": int(ser.mean()),
                "p50": int(ser.quantile(0.50)),
                "p90": int(ser.quantile(0.90)),
                "p99": int(ser.quantile(0.99)),
            }

        t_agg = _aggregate(run_total_turn_latencies)
        tc_agg = _aggregate(run_tool_latencies)
        llm_agg = _aggregate(run_llm_latencies)
        gr_agg = _aggregate(run_guardrail_latencies)
        cb_agg = _aggregate(run_callback_latencies)

        t_p50, t_p90, t_p99 = t_agg["p50"], t_agg["p90"], t_agg["p99"]
        llm_p50, llm_p90, llm_p99 = (
            llm_agg["p50"],
            llm_agg["p90"],
            llm_agg["p99"],
        )
        tc_p50, tc_p90, tc_p99 = tc_agg["p50"], tc_agg["p90"], tc_agg["p99"]
        gr_p50, gr_p90, gr_p99 = gr_agg["p50"], gr_agg["p90"], gr_agg["p99"]
        cb_p50, cb_p90, cb_p99 = cb_agg["p50"], cb_agg["p90"], cb_agg["p99"]

        p50_90_99_turn = f"{t_p50} ms | {t_p90} ms | {t_p99} ms"
        p50_90_99_llm = f"{llm_p50} ms | {llm_p90} ms | {llm_p99} ms"
        p50_90_99_tc = f"{tc_p50} ms | {tc_p90} ms | {tc_p99} ms"
        p50_90_99_gr = f"{gr_p50} ms | {gr_p90} ms | {gr_p99} ms"
        p50_90_99_cb = f"{cb_p50} ms | {cb_p90} ms | {cb_p99} ms"

        eval_summary_agg.append(
            {
                "display_name": display_name,
                "eval_result_id": eval_result_id,
                "evaluation_type": "Golden" if golden else "Scenario",
                "Average (Turn)": f"""{t_agg["Average"]} ms""",
                "p50 | p90 | p99 (Turn)": p50_90_99_turn,
                "Average (LLM)": f"""{llm_agg["Average"]} ms""",
                "p50 | p90 | p99 (LLM)": p50_90_99_llm,
                "Average (Tool Call)": f"""{tc_agg["Average"]} ms""",
                "p50 | p90 | p99 (Tool Call)": p50_90_99_tc,
                "Average (Guardrail)": f"""{gr_agg["Average"]} ms""",
                "p50 | p90 | p99 (Guardrail)": p50_90_99_gr,
                "Average (Callback)": f"""{cb_agg["Average"]} ms""",
                "p50 | p90 | p99 (Callback)": p50_90_99_cb,
            }
        )

    eval_details = pd.DataFrame(eval_details_rows)
    eval_summary = pd.DataFrame(eval_summary_agg)

    tool_details = pd.DataFrame(tool_details_rows)
    callback_details = pd.DataFrame(callback_details_rows)
    guardrail_details = pd.DataFrame(guardrail_details_rows)

    tool_summary = LatencyParser.build_summary_df(
        tool_details, ["tool_name"]
    )
    callback_summary = LatencyParser.build_summary_df(
        callback_details, ["agent", "stage", "description"]
    )
    guardrail_summary = LatencyParser.build_summary_df(
        guardrail_details, ["agent", "name"]
    )

    return {
        "eval_summary": eval_summary,
        "eval_details": eval_details,
        "tool_summary": tool_summary,
        "tool_details": tool_details,
        "callback_summary": callback_summary,
        "callback_details": callback_details,
        "guardrail_summary": guardrail_summary,
        "guardrail_details": guardrail_details,
    }

to_bigquery

to_bigquery(df, dataset_table, project_id=None, if_exists='append')

Exports a pandas DataFrame to a Google BigQuery table.

Source code in src/cxas_scrapi/utils/eval_utils.py
def to_bigquery(
    self,
    df: Any,
    dataset_table: str,
    project_id: Optional[str] = None,
    if_exists: str = "append",
):
    """Exports a pandas DataFrame to a Google BigQuery table."""
    target_project = project_id or self._get_project_id(self.app_name)
    df.to_gbq(
        destination_table=dataset_table,
        project_id=target_project,
        if_exists=if_exists,
        credentials=self.creds,
    )
    print(
        f"Successfully uploaded {len(df)} rows to "
        f"{target_project}.{dataset_table}"
    )

load_golden_eval_from_yaml

load_golden_eval_from_yaml(yaml_file_path, auto_sideload=False)

Parses a YAML file and creates a Golden eval input from it.

Supports two formats: 1. A compressed YAML format matching tests/testdata/compressed_example.yaml 2. A YAML format matching the YAML from export_app, e.g. tests/testdata/exported_eval_example.yaml

Parameters:

Name Type Description Default
yaml_file_path str

Path to the YAML file to be parsed.

required

Returns:

Type Description
Optional[Dict[str, Any]]

A dictionary matching the Golden Evaluation proto structure.

Source code in src/cxas_scrapi/utils/eval_utils.py
def load_golden_eval_from_yaml(
    self, yaml_file_path: str, auto_sideload: bool = False
) -> Optional[Dict[str, Any]]:
    """Parses a YAML file and creates a Golden eval input from it.

    Supports two formats:
    1. A compressed YAML format matching
    tests/testdata/compressed_example.yaml
    2. A YAML format matching the YAML from export_app, e.g.
    tests/testdata/exported_eval_example.yaml

    Args:
        yaml_file_path: Path to the YAML file to be parsed.

    Returns:
        A dictionary matching the Golden Evaluation proto structure.
    """
    evals = self.load_golden_evals_from_yaml(yaml_file_path)
    return evals[0] if evals else None

load_golden_evals_from_yaml

load_golden_evals_from_yaml(yaml_file_path, auto_sideload=False)

Parses a YAML file and returns a list of Golden eval inputs.

Similar to load_golden_eval_from_yaml, but returns all conversations found in a dataset format instead of just the first one.

Parameters:

Name Type Description Default
yaml_file_path str

Path to the YAML file to be parsed.

required

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries matching the Golden Evaluation proto structure.

Source code in src/cxas_scrapi/utils/eval_utils.py
def load_golden_evals_from_yaml(
    self, yaml_file_path: str, auto_sideload: bool = False
) -> List[Dict[str, Any]]:
    """Parses a YAML file and returns a list of Golden eval inputs.

    Similar to load_golden_eval_from_yaml, but returns all conversations
    found in a dataset format instead of just the first one.

    Args:
        yaml_file_path: Path to the YAML file to be parsed.

    Returns:
        A list of dictionaries matching the Golden Evaluation proto
            structure.
    """
    try:
        with open(yaml_file_path, "r", encoding="utf-8") as f:
            data = yaml.safe_load(f)
    except (IOError, yaml.YAMLError) as e:
        logger.error("Failed to load YAML from %s: %s", yaml_file_path, e)
        return []

    if not data:
        return []

    base_dir = os.path.dirname(yaml_file_path)
    all_evals = []

    # Handle Dataset format (list of conversations)
    if "conversations" in data and isinstance(data["conversations"], list):
        dataset = Conversations.model_validate(data)
        for conversation in dataset.conversations:
            # Merge session parameters
            session_params = dataset.common_session_parameters.copy()
            session_params.update(conversation.session_parameters)

            # Extract basic info
            display_name = conversation.conversation

            # Process turns
            json_turns = []
            params_injected = False
            for turn in conversation.turns:
                result = self._process_dataset_turn(
                    turn, session_params, params_injected
                )
                json_turns.append({"steps": result["steps"]})
                params_injected = result["params_injected"]

            # Combine common and conversation-specific expectations
            expectations = (
                dataset.common_expectations + conversation.expectations
            )
            tags = conversation.tags

            # Final processing of expectations (handles side-loading)
            eval_expectations = self._process_conversation_expectations(
                expectations, base_dir=base_dir, auto_sideload=auto_sideload
            )

            all_evals.append(
                {
                    "displayName": display_name,
                    "tags": tags,
                    "golden": {
                        "turns": json_turns,
                        "evaluationExpectations": eval_expectations,
                    },
                }
            )

    # Handle Evaluation Resource or Direct Export format
    else:
        display_name = (
            data.get("displayName") or data.get("name") or "Imported_Eval"
        )
        tags = data.get("tags") or []
        if isinstance(tags, str):
            tags = [t.strip() for t in tags.split(",")]

        golden = data.get("golden", data)
        json_turns = []

        # If turns are already in JSON proto format (Case 1)
        if (
            "turns" in golden
            and golden["turns"]
            and "steps" in golden["turns"][0]
        ):
            json_turns = golden["turns"]
        # Otherwise process raw YAML turns (Case 1b)
        elif "turns" in golden:
            for t in golden["turns"]:
                turn = Turn.model_validate(t)
                result = self._process_dataset_turn(
                    turn, session_params={}, params_injected=True
                )
                json_turns.append({"steps": result["steps"]})

        expectations = (
            golden.get("evaluationExpectations")
            or data.get("expectations")
            or []
        )

        # Final processing of expectations (handles side-loading)
        eval_expectations = self._process_conversation_expectations(
            expectations, base_dir=base_dir, auto_sideload=auto_sideload
        )

        all_evals.append(
            {
                "displayName": display_name,
                "tags": tags,
                "golden": {
                    "turns": json_turns,
                    "evaluationExpectations": eval_expectations,
                },
            }
        )

    # Inject the file name as a tag
    file_tag = os.path.splitext(os.path.basename(yaml_file_path))[0]
    for eval_dict in all_evals:
        tags = eval_dict.get("tags", [])
        if not isinstance(tags, list):
            if isinstance(tags, str):
                tags = [t.strip() for t in tags.split(",") if t.strip()]
            else:
                tags = []
        if file_tag not in tags:
            tags.append(file_tag)
        eval_dict["tags"] = tags

    return all_evals

wait_for_run_and_get_results

wait_for_run_and_get_results(run_name, timeout_seconds=300)

Polls for completion of an evaluation run and returns results.

Parameters:

Name Type Description Default
run_name str

Name of the evaluation run.

required
timeout_seconds int

Max time to wait.

300

Returns:

Type Description
List[Dict[str, Any]]

A list of evaluation results.

Source code in src/cxas_scrapi/utils/eval_utils.py
def wait_for_run_and_get_results(
    self,
    run_name: str,
    timeout_seconds: int = 300,
) -> List[Dict[str, Any]]:
    """Polls for completion of an evaluation run and returns results.

    Args:
        run_name: Name of the evaluation run.
        timeout_seconds: Max time to wait.

    Returns:
        A list of evaluation results.
    """
    logger.info("Waiting for evaluation run %s to complete...", run_name)
    start_time = time.time()
    while True:
        run_status = self.eval_client.get_evaluation_run(run_name)
        if run_status.state.name in ["COMPLETED", "ERROR"]:
            break
        if time.time() - start_time > timeout_seconds:
            raise TimeoutError(f"Evaluation run {run_name} timed out.")
        time.sleep(10)

    results = self.eval_client.list_evaluation_results_by_run(run_name)
    return results

create_and_run_evaluation_from_yaml

create_and_run_evaluation_from_yaml(yaml_file_path, app_name=None, modality='text', run_count=None)

Loads, creates, and runs an evaluation from a YAML file.

Parameters:

Name Type Description Default
yaml_file_path str

Path to the YAML file.

required
app_name Optional[str]

Optional parent App ID. Defaults to self.app_name.

None
modality str

"text" (default) or "audio".

'text'
run_count Optional[int]

Number of times to run the evaluation. Default is 1 per golden, 5 per scenario.

None

Returns:

Type Description
Dict[str, Any]

A dictionary containing the evaluation and the run response.

Source code in src/cxas_scrapi/utils/eval_utils.py
def create_and_run_evaluation_from_yaml(
    self,
    yaml_file_path: str,
    app_name: Optional[str] = None,
    modality: str = "text",
    run_count: Optional[int] = None,
) -> Dict[str, Any]:
    """Loads, creates, and runs an evaluation from a YAML file.

    Args:
        yaml_file_path: Path to the YAML file.
        app_name: Optional parent App ID. Defaults to self.app_name.
        modality: "text" (default) or "audio".
        run_count: Number of times to run the evaluation. Default is 1
            per golden, 5 per scenario.

    Returns:
        A dictionary containing the evaluation and the run response.
    """
    app_name = app_name or self.app_name
    if not app_name:
        raise ValueError("app_name is required.")

    # 1. Load the evaluation from YAML
    evaluation_dict = self.load_golden_eval_from_yaml(yaml_file_path)
    if not evaluation_dict:
        raise ValueError(f"Failed to load evaluation from {yaml_file_path}")

    display_name = evaluation_dict.get("displayName")
    if not display_name:
        raise ValueError("YAML evaluation missing displayName")

    # 2. Check if it already exists
    evals_map = self._get_or_load_evals_map(app_name)
    existing_resource_name = evals_map.get("goldens", {}).get(
        display_name
    ) or evals_map.get("scenarios", {}).get(display_name)

    if existing_resource_name:
        logger.info(
            "Found existing evaluation '%s' (%s), reusing it.",
            display_name,
            existing_resource_name,
        )
        # Update evaluation
        evaluation_dict["name"] = existing_resource_name
        evaluation_obj = self.update_evaluation(evaluation_dict)
    else:
        logger.info("Evaluation '%s' not found, creating it.", display_name)
        evaluation_obj = self.create_evaluation(
            evaluation=evaluation_dict, app_name=app_name
        )
        logger.info("Created evaluation: %s", evaluation_obj.name)

    # Run the evaluation using the resource name
    run_response = self.run_evaluation(
        evaluations=[evaluation_obj.name],
        app_name=app_name,
        modality=modality,
        run_count=run_count,
    )

    logger.info("Started evaluation run: %s", run_response.operation.name)

    return {
        "evaluation": evaluation_obj,
        "run": run_response,
    }

Conversation

Bases: BaseModel

Conversations

Bases: BaseModel

Turn

Bases: BaseModel