Skip to content

Sessions

Sessions is how you actually talk to a CXAS agent. You can send text, events, DTMF digits, raw audio bytes, or multimodal blobs — and get back the agent's response in a structured object you can inspect or render.

The class supports two modalities via the Modality enum:

  • Modality.TEXT (default) — synchronous request/response over gRPC. Perfect for evals, notebooks, and scripted tests.
  • Modality.AUDIO — asynchronous bidirectional streaming over a WebSocket (BidiRunSession). This is what you use for phone or voice integrations.

Sessions also has a handy parse_result() method that pretty-prints the full conversation trace — tool calls, transfers, custom payloads — in the terminal or as colorful HTML in a Jupyter notebook.

Quick Example

from cxas_scrapi import Sessions

app_name = "projects/my-project/locations/us/apps/my-app-id"

sessions = Sessions(app_name=app_name)

# Start a conversation
session_id = sessions.create_session_id()

# Text turn
response = sessions.run(
    session_id=session_id,
    text="I need help with my bill",
)
sessions.parse_result(response)

# Send an event (e.g., a welcome trigger)
sessions.run(
    session_id=session_id,
    event="WELCOME",
    event_vars={"caller_id": "+14155551234"},
)

# Audio turn — convert text to speech automatically
audio_response = sessions.run(
    session_id=session_id,
    text="Tell me my account balance",
    modality="audio",
)

Reference

Sessions

Sessions(app_name, deployment_id=None, **kwargs)

Bases: Common

Initializes the Sessions client.

Source code in src/cxas_scrapi/core/sessions.py
def __init__(
    self,
    app_name: str,
    deployment_id: str = None,
    **kwargs,
):
    """Initializes the Sessions client."""
    super().__init__(app_name=app_name, **kwargs)

    # Initialize Sessions Client
    self.client = SessionServiceClient(
        transport=self.get_grpc_transport(SessionServiceClient),
        client_info=self.client_info,
    )

    self.app_name = app_name
    self.deployment_id = deployment_id

create_session_id

create_session_id()

Create a unique uuid4 string to use as the session ID.

Source code in src/cxas_scrapi/core/sessions.py
def create_session_id(self) -> str:
    """Create a unique uuid4 string to use as the session ID."""
    return str(uuid.uuid4())

run

run(session_id, text=None, dtmf=None, event=None, event_vars=None, blob=None, blob_mime_type='application/octet-stream', variables=None, tool_responses=None, audio=None, audio_config=None, input_audio_config=None, output_audio_config=None, deployment_id=None, historical_contexts=None, turn_count=None, modality=TEXT, use_tool_fakes=False)

Sends inputs to a Conversational Agents Session and returns the response.

Parameters:

Name Type Description Default
session_id str

Unique UUID string or identifying string (e.g. 'test1') for the session.

required
text Optional[str | list[str]]

Text input from the user. Can give a single string or list of strings.

None
dtmf Optional[str]

DTMF input from the user.

None
event Optional[str]

Name of a system event to trigger (e.g. 'WELCOME').

None
event_vars Optional[Dict[str, Any]]

Key-value map of variables to inject alongside the event.

None
blob bytes

Raw binary content (image, pdf, etc.) for multimodal inputs.

None
blob_mime_type str

Mime type for the blob (defaults to 'application/octet-stream').

'application/octet-stream'
variables Optional[Dict[str, Any]]

Key-value state maps to inject for the session turn.

None
tool_responses Optional[List[Dict[str, Any]]]

Pre-computed tool run outputs if mocking tool execution.

None
audio bytes

Raw audio bytes to send as user input.

None
audio_config Optional[Dict[str, Any]]

Custom turn-specific audio configurations.

None
input_audio_config Optional[Dict[str, Any]]

Custom gRPC properties for input audio (defaults to 16kHz linear PCM).

None
output_audio_config Optional[Dict[str, Any]]

Custom gRPC properties for output audio (defaults to 16kHz linear PCM).

None
deployment_id Optional[str]

Overrides the default deployment ID setting for this turn run.

None
historical_contexts Optional[List[Dict[str, Any]] | str]

An existing conversation ID (string) or raw list of dictionaries to pre-set past history.

None
turn_count Optional[int]

Truncates historical context limits when pulling from a saved conversation ID.

None
modality Modality | str

Running via text (synced) or audio (asynchronous bidirectional streaming). Defaults to Modality.TEXT.

TEXT
use_tool_fakes bool

Use fake tools for the session if available. Defaults to False.

False
Source code in src/cxas_scrapi/core/sessions.py
def run(  # noqa: C901
    self,
    session_id: str,
    text: Optional[str | list[str]] = None,
    dtmf: Optional[str] = None,
    event: Optional[str] = None,
    event_vars: Optional[Dict[str, Any]] = None,
    blob: bytes = None,
    blob_mime_type: str = "application/octet-stream",
    variables: Optional[Dict[str, Any]] = None,
    tool_responses: Optional[List[Dict[str, Any]]] = None,
    audio: bytes = None,
    audio_config: Optional[Dict[str, Any]] = None,
    input_audio_config: Optional[Dict[str, Any]] = None,
    output_audio_config: Optional[Dict[str, Any]] = None,
    deployment_id: Optional[str] = None,
    historical_contexts: Optional[List[Dict[str, Any]] | str] = None,
    turn_count: Optional[int] = None,
    modality: Modality | str = Modality.TEXT,
    use_tool_fakes: bool = False,
):
    """Sends inputs to a Conversational Agents Session and returns the
    response.

    Args:
        session_id: Unique UUID string or identifying string (e.g. 'test1')
            for the session.
        text: Text input from the user. Can give a single string or list of
            strings.
        dtmf: DTMF input from the user.
        event: Name of a system event to trigger (e.g. 'WELCOME').
        event_vars: Key-value map of variables to inject alongside the
            event.
        blob: Raw binary content (image, pdf, etc.) for multimodal inputs.
        blob_mime_type: Mime type for the blob (defaults to
            'application/octet-stream').
        variables: Key-value state maps to inject for the session turn.
        tool_responses: Pre-computed tool run outputs if mocking tool
            execution.
        audio: Raw audio bytes to send as user input.
        audio_config: Custom turn-specific audio configurations.
        input_audio_config: Custom gRPC properties for input audio
            (defaults to 16kHz linear PCM).
        output_audio_config: Custom gRPC properties for output audio
            (defaults to 16kHz linear PCM).
        deployment_id: Overrides the default deployment ID setting for this
            turn run.

        historical_contexts: An existing conversation ID (string) or raw
            list of dictionaries to pre-set past history.
        turn_count: Truncates historical context limits when pulling from a
            saved conversation ID.
        modality: Running via text (synced) or audio (asynchronous
            bidirectional streaming). Defaults to Modality.TEXT.
        use_tool_fakes: Use fake tools for the session if available.
            Defaults to False.
    """

    if isinstance(modality, str):
        try:
            modality = Modality(modality.lower())
        except ValueError as e:
            raise ValueError(
                f"Invalid modality: {modality}. Must be 'text' or 'audio'."
            ) from e

    config = {"session": f"{self.app_name}/sessions/{session_id}"}
    if use_tool_fakes:
        config["use_tool_fakes"] = True
    inputs = []

    if modality == Modality.AUDIO:
        self._check_audio_requirements()
        config["input_audio_config"] = (
            input_audio_config
            or types.InputAudioConfig(
                audio_encoding=types.AudioEncoding.LINEAR16,
                sample_rate_hertz=SAMPLE_RATE,
            )
        )
        config["output_audio_config"] = (
            output_audio_config
            or types.OutputAudioConfig(
                audio_encoding=types.AudioEncoding.LINEAR16,
                sample_rate_hertz=SAMPLE_RATE,
            )
        )

    # Determine deployment/version
    if deployment_id or self.deployment_id:
        config["deployment"] = (
            f"{self.app_name}/deployments/"
            f"{deployment_id or self.deployment_id}"
        )
    # app_version is not supported in SessionConfig, only deployment is.

    if historical_contexts:
        parsed_contexts = []
        if isinstance(historical_contexts, str):
            ch = ConversationHistory(
                app_name=self.app_name, creds=self.creds
            )
            conv = ch.get_conversation(historical_contexts)
            d = type(conv).to_dict(conv)
            if "turns" in d and d["turns"]:
                turns_to_process = d["turns"]
                if turn_count is not None and turn_count > 0:
                    turns_to_process = turns_to_process[:turn_count]

                for turn in turns_to_process:
                    msgs = turn.get("messages", [])
                    for m in msgs:
                        if "role" in m and "chunks" in m:
                            parsed_contexts.append(
                                {"role": m["role"], "chunks": m["chunks"]}
                            )
        else:
            for ctx in historical_contexts:
                if isinstance(ctx, dict):
                    if "role" in ctx and "chunks" in ctx:
                        parsed_contexts.append(ctx)
                    elif "user" in ctx:
                        parsed_contexts.append(
                            {
                                "role": "user",
                                "chunks": [{"text": str(ctx["user"])}],
                            }
                        )
                    elif "agent" in ctx or "model" in ctx:
                        role_name = ctx.get("name", "model")
                        text_val = ctx.get("text", "")

                        if not text_val:
                            val = ctx.get("agent") or ctx.get("model")
                            if isinstance(val, str):
                                text_val = val

                        parsed_contexts.append(
                            {
                                "role": role_name,
                                "chunks": [{"text": str(text_val)}],
                            }
                        )
                    else:
                        parsed_contexts.append(ctx)
                else:
                    raise ValueError(
                        f"historical_contexts must be a list of "
                        f"dictionaries. Received: {type(ctx)}"
                    )
        config["historical_contexts"] = parsed_contexts

    if variables and modality == Modality.TEXT:
        inputs.append({"variables": variables})

    if dtmf is not None:
        inputs.append({"dtmf": dtmf})

    if event is not None:
        if event_vars:
            inputs.append({"variables": event_vars})
        inputs.append({"event": {"event": event}})

    # Wrap blob input correctly
    if blob is not None:
        inputs.append({"blob": {"mime_type": blob_mime_type, "data": blob}})

    if audio is not None:
        audio_payload = {"audio": audio}
        if audio_config:
            audio_payload["config"] = audio_config
        if variables and modality == Modality.AUDIO:
            audio_payload["variables"] = variables
        inputs.append({"audio": audio_payload})

    # Wrap tool responses correctly
    if tool_responses is not None:
        inputs.append(
            {"tool_responses": {"tool_responses": tool_responses}}
        )

    if modality == Modality.AUDIO:
        if text is not None:
            if isinstance(text, str):
                logger.warning(
                    "Single string input for audio modality introduces "
                    "minor latency before user utterances."
                )
                text = [text]
            audio_transformer = AudioTransformer()
            input_audio_bytes = []
            for input in text:
                input_audio_bytes.append(
                    audio_transformer.text_to_speech_bytes(
                        text=input,
                        credentials=self.creds,
                        project_id=self.project_id,
                    )
                )
            for input_data in input_audio_bytes:
                # Construct input payload matching sessions.py expectation
                audio_payload = {
                    "audio": input_data["audio_bytes"],
                    "text": input_data["text"],
                }
                if variables:
                    audio_payload["variables"] = variables
                inputs.append({"audio": audio_payload})
            return self.async_bidi_run_session(config=config, inputs=inputs)
        elif inputs:
            return self.async_bidi_run_session(config=config, inputs=inputs)
        else:
            raise ValueError(
                "Input payloads (text, audio, event, etc.) must be "
                "provided for audio modality."
            )
    elif modality == Modality.TEXT:
        if text is not None and isinstance(text, str):
            text = [text]

        all_outputs = []
        final_response = None

        if text:
            for input in text:
                inputs.append({"text": input})
                response = self.make_text_request(config, inputs)
                inputs.pop()

                if response:
                    if hasattr(response, "outputs"):
                        all_outputs.extend(response.outputs)
                    final_response = response
        elif inputs:
            # Handle case where only event/blob/variables are provided
            # without text
            response = self.make_text_request(config, inputs)
            if response:
                if hasattr(response, "outputs"):
                    all_outputs.extend(response.outputs)
                final_response = response
        else:
            raise ValueError(
                "Text or valid inputs (e.g. event) must be provided."
            )

        if final_response:
            return types.RunSessionResponse(outputs=all_outputs)
        return final_response
    else:
        if text is None and not inputs:
            raise ValueError("Text or inputs must be provided.")
        raise ValueError("Modality must be either 'text' or 'audio'.")

parse_result

parse_result(res)

Parses the CX Agent Studio session response to extract and print turn-by-turn interactions including User Queries, Agent Responses, Tool Calls, Tool Results, and Agent Transfers. Requires Jupyter Notebook or IPython environment for HTML rendering.

Source code in src/cxas_scrapi/core/sessions.py
def parse_result(self, res: Any):  # noqa: C901
    """
    Parses the CX Agent Studio session response to extract and print
    turn-by-turn interactions including User Queries, Agent Responses,
    Tool Calls, Tool Results, and Agent Transfers.
    Requires Jupyter Notebook or IPython environment for HTML rendering.
    """

    is_notebook = "ipykernel" in sys.modules

    if not is_notebook:
        # ANSI escape codes for terminal
        tool_call_font = "\033[1;31mTOOL CALL:\033[0m"
        tool_res_font = "\033[1;33mTOOL RESULT:\033[0m"
        query_font = "\033[1;32mUSER QUERY:\033[0m"
        response_font = "\033[1;35mAGENT RESPONSE:\033[0m"
        transfer_font = "\033[1;36mAGENT TRANSFER:\033[0m"
        payload_font = "\033[1;94mCUSTOM PAYLOAD:\033[0m"

        render = print

        def render_html(text):
            return text  # Pass-through for terminal

    elif HAS_IPYTHON:
        tool_call_font = "<font color='darkred'><b>TOOL CALL:</b></font>"
        tool_res_font = "<font color='goldenrod'><b>TOOL RESULT:</b></font>"
        query_font = "<font color='darkgreen'><b>USER QUERY:</b></font>"
        response_font = "<font color='purple'><b>AGENT RESPONSE:</b></font>"
        transfer_font = (
            "<font color='darkorange'><b>AGENT TRANSFER:</b></font>"
        )
        payload_font = "<font color='brown'><b>CUSTOM PAYLOAD:</b></font>"

        render = display
        render_html = HTML
    else:
        tool_call_font = "TOOL CALL:"
        tool_res_font = "TOOL RESULT:"
        query_font = "USER QUERY:"
        response_font = "AGENT RESPONSE:"
        transfer_font = "AGENT TRANSFER:"
        payload_font = "CUSTOM PAYLOAD:"

        render = print

        def render_html(text):
            return re.sub(r"<[^>]*>", "", text).strip()

    outputs = getattr(res, "outputs", [])
    if not outputs:
        return

    for output in outputs:
        diagnostic_info = getattr(output, "diagnostic_info", None)

        # If diagnostic_info is available, use it for a rich
        # turn-by-turn trace
        if diagnostic_info and hasattr(diagnostic_info, "messages"):
            messages = getattr(diagnostic_info, "messages", [])
            for message in messages:
                role = getattr(message, "role", "")
                chunks = getattr(message, "chunks", [])

                for chunk in chunks:
                    # Depending on the generated class, WhichOneof is
                    # available on the internal _pb message
                    chunk_type = (
                        chunk._pb.WhichOneof("data")
                        if hasattr(chunk, "_pb")
                        else None
                    )

                    if chunk_type == "text":
                        if role.lower() == "user":
                            logging.debug(f"USER QUERY: {chunk.text}")
                            render(
                                render_html(f"{query_font} {chunk.text}")
                            )
                        else:
                            logging.debug(
                                f"AGENT RESPONSE: [{role}] {chunk.text}"
                            )
                            render(
                                render_html(
                                    f"{response_font} [{role}] {chunk.text}"
                                )
                            )

                    elif chunk_type == "transcript":
                        if role.lower() == "user":
                            logging.debug(f"USER QUERY: {chunk.transcript}")
                            render(
                                render_html(
                                    f"{query_font} {chunk.transcript}"
                                )
                            )
                        else:
                            logging.debug(
                                f"AGENT RESPONSE: [{role}] "
                                f"{chunk.transcript}"
                            )
                            render(
                                render_html(
                                    f"{response_font} [{role}] "
                                    f"{chunk.transcript}"
                                )
                            )

                    elif chunk_type == "tool_call":
                        tc = chunk.tool_call
                        tool_name = tc.display_name or tc.tool
                        expanded_args = Sessions._expand_pb_struct(tc.args)
                        logging.debug(
                            f"TOOL CALL: [{role}] {tool_name} -- "
                            f"Args: {expanded_args}"
                        )
                        render(
                            render_html(
                                f"{tool_call_font} [{role}] {tool_name} -- "
                                f"Args: {expanded_args}"
                            )
                        )

                    elif chunk_type == "tool_response":
                        tr = chunk.tool_response
                        tool_name = tr.display_name or tr.tool
                        expanded_response = Sessions._expand_pb_struct(
                            tr.response
                        )
                        logging.debug(
                            f"TOOL RESULT: [{role}] {tool_name} -- "
                            f"Result: {expanded_response}"
                        )
                        render(
                            render_html(
                                f"{tool_res_font} [{role}] {tool_name} -- "
                                f"Result: {expanded_response}"
                            )
                        )

                    elif chunk_type == "agent_transfer":
                        at = chunk.agent_transfer
                        logging.debug(
                            f"AGENT TRANSFER: [{role}] "
                            f"Transferred to {at.display_name}"
                        )
                        render(
                            render_html(
                                f"{transfer_font} [{role}] "
                                f"Transferred to {at.display_name}"
                            )
                        )

                    elif chunk_type == "payload":
                        expanded_payload = Sessions._expand_pb_struct(
                            chunk.payload
                        )
                        logging.debug(
                            f"CUSTOM PAYLOAD: [{role}] {expanded_payload}"
                        )
                        render(
                            render_html(
                                f"{payload_font} [{role}] "
                                f"{expanded_payload}"
                            )
                        )

send_event

send_event(unique_id, event_name, event_vars)
Source code in src/cxas_scrapi/core/sessions.py
def send_event(
    self, unique_id: str, event_name: str, event_vars: Dict[str, Any]
):
    config = {"session": f"{self.app_name}/sessions/{unique_id}"}
    inputs = [{"variables": event_vars}, {"event": {"event": event_name}}]

    request = types.RunSessionRequest(config=config, inputs=inputs)

    return self.client.run_session(request=request)

async_bidi_run_session

async_bidi_run_session(config, inputs)
Source code in src/cxas_scrapi/core/sessions.py
def async_bidi_run_session(
    self, config: dict, inputs: list[dict[str, Any]]
):
    handler = BidiSessionHandler(
        self.location,
        self.token,
        config,
        inputs,
        user_agent=self.user_agent,
    )
    return handler.run()

make_text_request

make_text_request(config, inputs)
Source code in src/cxas_scrapi/core/sessions.py
def make_text_request(self, config: dict, inputs: list[dict[str, Any]]):
    request = types.RunSessionRequest(config=config, inputs=inputs)
    return self.client.run_session(request=request)

get_file_data staticmethod

get_file_data(file_path)

Reads a local file, returns a blob dict.

Source code in src/cxas_scrapi/core/sessions.py
@staticmethod
def get_file_data(file_path: str) -> Dict[str, Any]:
    """
    Reads a local file, returns a blob dict.
    """
    if not os.path.exists(file_path):
        logger.error(f"File not found at path: {file_path}")
        raise FileNotFoundError(
            f"The file specified at {file_path} was not found."
        )

    mime_type, _ = mimetypes.guess_type(file_path)
    if mime_type is None:
        mime_type = "application/octet-stream"

    with open(file_path, "rb") as f:
        raw_bytes = f.read()

    return {"mime_type": mime_type, "data": raw_bytes}

Modality

Bases: str, Enum