# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Author(s) | Renato Leite (renatoleite@), Egon Soares (egon@) |
Last updated | 09/01/2023 |
Complete LLM Model Evaluation Workflow for Classification using KFP Pipelines¶
In this notebook, we will explore various aspects related to running the Vertex LLM evaluation pipeline. Our journey will encompass the following key stages:
Data Preparation: Before we begin the evaluation process, we'll ensure our data is prepared and ready for input into the pipeline.
Model Tuning: We'll optimize the performance of the foundational model through tuning. We'll also monitor the tuning job's progress using a managed Tensorboard instance.
Evaluation with Tuned Model: After tuning, we'll execute the evaluation phase using the tuned model. This step is critical for assessing the model's performance.
Baseline Evaluation with Model text-bison@001: Additionally, we'll perform a baseline evaluation using the foundational model, text-bison@001. This will provide a benchmark for model performance assessment.
Metric Analysis: Following the evaluations, we'll visualize all the metrics within the Vertex AI Model Registry.
Reference Architecture¶
Install required python packages¶
# Install Vertex AI LLM SDK (Private Preview)
! pip install -U google-cloud-aiplatform
! pip install -U google-cloud-pipeline-components
! pip install "shapely<2.0.0"
# Install HuggingFace Datasets
! pip install datasets
# OPTIONAL (if you are using Colab, restart the Kernel at this point, uncommend and execute the following code)
# from google.colab import auth as google_auth
# google_auth.authenticate_user()
Import python packages and define project variables¶
import pandas as pd
import vertexai
import uuid
from datasets import load_dataset, DatasetDict
from google.cloud import aiplatform
from google.cloud import storage
from google_cloud_pipeline_components.preview.model_evaluation import evaluation_llm_classification_pipeline
from kfp import compiler
from kfp import dsl
from vertexai.preview.language_models import (
TextGenerationModel,
EvaluationTextClassificationSpec,
TuningEvaluationSpec
)
Replace the values of the variables below according to your project specification.
# Project variables
PROJECT_ID = "rl-llm-dev"
ENDPOINT_LOCATION = "us-central1"
STAGING_BUCKET = "gs://<YOUR BUCKET NAME>" # Same location as ENDPOINT_LOCATION
TUNING_JOB_LOCATION = "us-central1"
DATA_STAGING_GCS_LOCATION = "gs://<YOUR BUCKET NAME>" # Same location as ENDPOINT_LOCATION
storage_client = storage.Client()
vertexai.init(project=PROJECT_ID, location=ENDPOINT_LOCATION, staging_bucket=STAGING_BUCKET)
aiplatform.init(project=PROJECT_ID, location=ENDPOINT_LOCATION, staging_bucket=STAGING_BUCKET)
Create a Vertex AI TensorBoard instance¶
The Adapter Tuning pipeline can log the training metrics for tracking and retrospective analysis.
Create an instance of Vertex AI Tensorboard that will be used by tuning pipeline runs.
If you want to reuse an existing instance, skip the following cell and set the tensorboard_id
variable to your instance ID. Note that the instance must be in the same region where the tuning jobs will run.
display_name = 'notebook4-llm-eval-tensorboard'
tensorboard = aiplatform.Tensorboard.create(
display_name=display_name,
project=PROJECT_ID,
location=TUNING_JOB_LOCATION,
)
print(tensorboard.display_name)
print(tensorboard.resource_name)
# Replace with your Tensorboard ID
# Example: tensorboard_id = '6279148178507825152'
tensorboard_id = '<YOUR TENSORBOARD ID>'
Prepare training dataset¶
In this lab, you are going to tune the text-bison foundation model for a single label text classification task. You are going to use the dair-ai/emotion
dataset from HuggingFace.
dataset = load_dataset('dair-ai/emotion')
print(dataset)
print(dataset['test'][0:2])
splits = {k:v for (k,v) in zip(['train', 'validation', 'test'],
load_dataset('dair-ai/emotion', split=['train[0:7200]', 'validation[0:256]', 'test[0:256]']))}
dataset = DatasetDict(splits)
dataset
Convert to the format required by the tuning pipeline¶
Your model tuning dataset must be in JSON Lines (JSONL) format where each line contains a single tuning example. Each example is composed of an input_text
field that contains the prompt to the model and an output_text
field that contains an example response that the tuned model is expected to produce. The maximum token length for input_text is 8,192 and the maximum token length for output_text is 1,024. If either fields exceed the maximum token length, the excess tokens are truncated.
The examples included in your dataset should match your expected production traffic. If your dataset contains specific formatting, keywords, instructions, or information, the production data should be formatted in the same way and contain the same instructions.
For example, if the examples in your dataset include a "question:"
and a "context:"
, production traffic should also be formatted to include a "question:"
and a "context:"
in the same order as it appears in the dataset examples. If you exclude the context, the model will not recognize the pattern, even if the exact question was in an example in the dataset.
For tasks such as classification, it is possible to create a dataset of examples that don't contain instructions. However, excluding instructions from the examples in the dataset leads to worse performance after tuning than including instructions, especially for smaller datasets.
For our dataset, we are going to add the following instructions
Classify the following as one of the following categories:
- sadness,
- joy,
Text:
class_labels = {
0: 'sadness',
1: 'joy',
2: 'love',
3: 'anger',
4: 'fear',
5: 'surprise'
}
class_labels.values()
instructions = f'''Classify the following text into one of the following classes:
[{', '.join(class_labels.values())}]
Text:
'''
def add_instructions(example, instructions):
example["input_text"] = f'{instructions}{example["text"]}'
example["output_text"] = class_labels[example["label"]]
return example
tuning_dataset = dataset.map(lambda x: add_instructions(x, instructions)).remove_columns(['text', 'label'])
print(tuning_dataset)
print(tuning_dataset['train'][:1])
Export the dataset splits to GCS¶
gcs_uris = {}
filename_prefix = 'emotion'
for split_name, split_data in tuning_dataset.items():
jsonl_filename = f'{filename_prefix}-{split_name}.jsonl'
gcs_uri = f'{DATA_STAGING_GCS_LOCATION}/{jsonl_filename}'
gcs_uris[split_name] = gcs_uri
split_data.to_json(jsonl_filename)
!gsutil cp {jsonl_filename} {gcs_uri}
!gsutil ls {DATA_STAGING_GCS_LOCATION}/*.jsonl
# Export the evaluation dataset split to GCS
jsonl_filename = 'emotions-eval.jsonl'
evaluation_dataset_gcs_uri = f'{STAGING_BUCKET}/{jsonl_filename}'
evaluation_dataset = tuning_dataset['test'].rename_column('input_text', 'prompt').rename_column('output_text', 'ground_truth')
evaluation_dataset.to_json(jsonl_filename)
# Copy file to GCS
!gsutil cp {jsonl_filename} {evaluation_tuned_gcs_uri}
# List GCS bucket to verify the file was copied successfully
!gsutil ls {STAGING_BUCKET}/*.jsonl
Tuning and Evaluation Vertex AI Pipeline¶
from google_cloud_pipeline_components.preview.model_evaluation import evaluation_llm_classification_pipeline
from google.cloud.aiplatform import PipelineJob
from google_cloud_pipeline_components.types import artifact_types
from kfp import dsl, components
from kfp.dsl import Input, Output, Markdown, Artifact
tune_large_model = components.load_component_from_url(
'https://us-kfp.pkg.dev/ml-pipeline/large-language-model-pipelines/tune-large-model/v2.0.0')
@dsl.component(
packages_to_install=[
'google_cloud_pipeline_components',
'google-cloud-storage',
'pandas']
)
def record_metrics_component(
evaluation_class_labels: list,
evaluation_metrics: Input[artifact_types.ClassificationMetrics],
confusion_artifact: Output[dsl.ClassificationMetrics],
classification_artifact: Output[Markdown],
raw_metrics: Output[dsl.Metrics]
):
import json
from google.cloud import storage
import pandas as pd
storage_client = storage.Client()
# Read metrics content from GCS
def get_metrics_blob(metrics_uri):
splits = metrics_uri.split("/")
bucket_name = splits[2]
blob_name = '/'.join(splits[3:])
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
with blob.open("r") as f:
return json.loads(f.read())
def get_confusion_matrix(overall_metrics):
confusion_matrix = []
for slice_metric in overall_metrics['slicedMetrics']:
if 'value' in slice_metric['singleOutputSlicingSpec']:
continue
for row in slice_metric['metrics']['classification']['confusionMatrix']['rows']:
confusion_matrix.append(row['dataItemCounts'])
return confusion_matrix
# Define the function to print classification metrics
def get_classification_metrics(overall_metrics):
all_metrics = overall_metrics['slicedMetrics']
metric_names = ["Metric Slice", "auPrc", "auRoc", "logLoss"]
f1_metrics = ["f1Score"]
aggregated_f1_metrics = ["f1ScoreMicro", "f1ScoreMacro"]
table = [metric_names + f1_metrics + aggregated_f1_metrics]
for metrics in all_metrics:
classification_metric = metrics['metrics']['classification']
slice_name = "class - " + metrics['singleOutputSlicingSpec']['value'] if 'value' in metrics['singleOutputSlicingSpec'] else "Overall"
slice_metric_values = [slice_name]
slice_metric_values.extend(
[classification_metric.get(metric_name, 0)
for metric_name in metric_names[1:]])
slice_metric_values.extend(
[classification_metric['confidenceMetrics'][0].get(metric_name, 0)
for metric_name in f1_metrics])
slice_metric_values.extend(
[classification_metric['confidenceMetrics'][0].get(metric_name, 'n/a')
for metric_name in aggregated_f1_metrics])
table.append(slice_metric_values)
return table
# Log Confusion Matrix artifact
overall_metrics = get_metrics_blob(metrics_uri=evaluation_metrics.uri)
confusion_matrix = get_confusion_matrix(overall_metrics)
evaluation_class_labels.append('UNKNOWN')
confusion_artifact.log_confusion_matrix(
categories=evaluation_class_labels,
matrix=confusion_matrix
)
# Log Classification metrics
metrics_table = get_classification_metrics(overall_metrics)
markdown_content = pd.DataFrame(metrics_table).to_markdown()
with open(classification_artifact.path, 'w') as fp:
fp.write(markdown_content)
# Log Raw metrics
raw_metrics.log_metric(
metric='f1Score',
value=metrics_table[1][4]
)
# Log Raw metrics
raw_metrics.log_metric(
metric='f1ScoreMicro',
value=metrics_table[1][5]
)
# Log Raw metrics
raw_metrics.log_metric(
metric='f1ScoreMacro',
value=metrics_table[1][6]
)
@dsl.pipeline
def complete_evaluation_pipeline(
project: str,
training_dataset_uri: str,
evaluation_data_uri: str,
tensorboard_id: str,
evaluation_class_labels: list,
evaluation_tuned_output_uri: str,
evaluation_tuned_input_uris: list,
evaluation_bison_output_uri: str,
evaluation_bison_input_uris: list,
bison_model_name: str
):
# tune com tensorboard + evaluation no tuned model
model_resources = tune_large_model(
model_display_name='notebook4-tuned-model',
location='us-central1',
large_model_reference='text-bison@001',
project=project,
train_steps=2,
dataset_uri=training_dataset_uri,
evaluation_interval=1,
evaluation_data_uri=evaluation_data_uri,
tensorboard_resource_id=tensorboard_id
).set_display_name(name='Tune foundational model')
tuned_model_evaluation = evaluation_llm_classification_pipeline(
project=project,
location='us-central1',
batch_predict_gcs_destination_output_uri=evaluation_tuned_output_uri,
evaluation_class_labels=evaluation_class_labels,
batch_predict_gcs_source_uris=evaluation_tuned_input_uris,
target_field_name='ground_truth',
model_name=model_resources.outputs['model_resource_name']
).set_display_name(name='Evaluate tuned model')
record_metrics_component(
evaluation_class_labels=evaluation_class_labels,
evaluation_metrics=tuned_model_evaluation.outputs[
'evaluation_metrics']).set_display_name(name="Record tuned model evaluation metrics")
eval_pipeline = evaluation_llm_classification_pipeline(
project=project,
location='us-central1',
batch_predict_gcs_destination_output_uri=evaluation_bison_output_uri,
evaluation_class_labels=evaluation_class_labels,
batch_predict_gcs_source_uris=evaluation_bison_input_uris,
target_field_name='ground_truth',
model_name=bison_model_name
).set_display_name(name="Evaluate foundational model")
record_metrics_component(
evaluation_class_labels=evaluation_class_labels,
evaluation_metrics=eval_pipeline.outputs[
'evaluation_metrics']).set_display_name(name="Record foundational model evaluation metrics")
base_model = TextGenerationModel.from_pretrained('text-bison@001')
model_name = base_model._model_resource_name
job_id = "custom-model-evaluation-{}".format(uuid.uuid4())
experiment_name = 'notebook4-complete-classification-pipeline'
target_field_name='ground_truth'
tuned_class_labels=['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
aiplatform.init(
project=PROJECT_ID,
location=ENDPOINT_LOCATION,
staging_bucket=STAGING_BUCKET,
experiment=experiment_name,
experiment_tensorboard=tensorboard_id)
complete_classification_pipeline_path = 'complete_classification_pipeline_path.json'
compiler.Compiler().compile(
pipeline_func=complete_evaluation_pipeline,
package_path=complete_classification_pipeline_path
)
parameters = {
"project": PROJECT_ID,
"evaluation_class_labels": tuned_class_labels,
"evaluation_tuned_output_uri": f'{STAGING_BUCKET}/output',
"evaluation_tuned_input_uris": [evaluation_dataset_gcs_uri],
"training_dataset_uri": gcs_uris['train'],
"evaluation_data_uri": gcs_uris['validation'],
"tensorboard_id": tensorboard_id,
"evaluation_bison_output_uri": f'{STAGING_BUCKET}/output',
"evaluation_bison_input_uris": [evaluation_dataset_gcs_uri],
"bison_model_name": model_name,
}
job = aiplatform.PipelineJob(
display_name=job_id,
template_path=complete_classification_pipeline_path,
pipeline_root=STAGING_BUCKET,
parameter_values=parameters,
enable_caching=True,
location='us-central1'
)
job.submit(experiment=experiment_name)