# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Ingestion of Unstructured Documents with Metadata in Vertex AI Search¶
Open in Colab |
Open in Colab Enterprise |
Open in Workbench |
View on GitHub |
Author(s) | Hossein Mansour |
Reviewers(s) | Meltem Subasioglu, Rajesh Thallam |
Last updated | 2024-07-23: The first draft |
Overview¶
In this notebook, we will show you how to prepare and ingest unstructured documents with metadata into Vertex AI Search. Metadata can be used for different purposes such as improving recall and precision, influencing results via boosting and filtering, and including additional context to be retrieved together with the documents. You can find more information about different types of metadata here.
We will perform the following steps:
- Creating a Vertex AI Search Datastore
- Creating a Vertex AI Search App
- [Optional] Updating the Schema for the Datastore
- Reading Documents and their Metadata from a GCS bucket and combining them together as JSONL file
- Uploading the documents with their metadata to the Datastore
- Searching the Datastore
Please refer to the official documentation of Vertex AI Search for the definition of Datastores and Apps and their relationships to one another.
REST API is used throughout this notebook. Please consult the official documentation for alternative ways to achieve the same goal, namely Client libraries and RPC.
Vertex AI Search¶
Vertex AI Search (VAIS) is a fully-managed platform, powered by large language models, that lets you build AI-enabled search and recommendation experiences for your public or private websites or mobile applications
VAIS can handle a diverse set of data sources including structured, unstructured, and website data, as well as data from third-party applications such as Jira, Salesforce, and Confluence.
VAIS also has built-in integration with LLMs which enables you to provide answers to complex questions, grounded in your data
Using this Notebook¶
If you're running outside of Colab, depending on your environment you may need to install pip packages that are included in the Colab environment by default but are not part of the Python Standard Library. Outside of Colab you'll also notice comments in code cells that look like #@something, these trigger special Colab functionality but don't change the behavior of the notebook.
This tutorial uses the following Google Cloud services and resources:
- Service Usage API
- Discovery Engine
- Google Cloud Storage Client
This notebook has been tested in the following environment:
- Python version = 3.10.12
- google.cloud.storage = 2.8.0
- google.auth = 2.27.0
Getting Started¶
The following steps are necessary to run this notebook, no matter what notebook environment you're using.
If you're entirely new to Google Cloud, get started here
Google Cloud Project Setup¶
- Select or create a Google Cloud project. When you first create an account, you get a $300 free credit towards your compute/storage costs
- Make sure that billing is enabled for your project
- Enable the Service Usage API
- Enable the Cloud Storage API
- Enable the Discovery Engine API for your project
Google Cloud Permissions¶
Ideally you should have Owner role for your project to run this notebook. If that is not an option, you need at least the following roles
roles/serviceusage.serviceUsageAdmin
to enable APIsroles/iam.serviceAccountAdmin
to modify service agent permissionsroles/discoveryengine.admin
to modify discoveryengine assetsroles/storage.objectAdmin
to modify and delete GCS buckets
Setup Environment¶
Authentication¶
If you're using Colab, run the code in the next cell. Follow the popups and authenticate with an account that has access to your Google Cloud project.
If you're running this notebook somewhere besides Colab, make sure your environment has the right Google Cloud access. If that's a new concept to you, consider looking into Application Default Credentials for your local environment and initializing the Google Cloud CLI. In many cases, running gcloud auth application-default login
in a shell on the machine running the notebook kernel is sufficient.
More authentication options are discussed here.
# Colab authentication.
import sys
if "google.colab" in sys.modules:
from google.colab import auth
auth.authenticate_user()
print("Authenticated")
from google.auth import default
from google.auth.transport.requests import AuthorizedSession
creds, _ = default()
authed_session = AuthorizedSession(creds)
Import Libraries¶
import time
import os
import json
import glob
import re
import shutil
from typing import Dict, Any
import pandas as pd
import requests
from google.cloud import storage
from urllib.parse import urlparse
Configure environment¶
You can enter the ID for an existing App and Datastore to be used in this notebook. Alternatively, you can enter the desired IDs for non-existings App and Datastore and they will be created later in this notebook.
Same applies to the GCS Directory of Documents and Metadata. The Documents and Metadata can be in separate buckets, but it is advised to keep them (together with the JSONL created later in this notebook) in the same temporary bucket for the ease of cleanup.
You can find more information regarding the "Location" of datastores and associated limitations here. The Location of a Datastore is set at the time of creation and it should be called appropriately to query the Datastore.
PROJECT_ID = "" # @param {type:"string"}
# Vertex AI Search Parameters
DATASTORE_ID = "" # @param {type:"string"}
APP_ID = "" # @param {type:"string"}
LOCATION = "global" # @param ["global", "us", "eu"] Global is preferred
# GCS Parameters, e.g. 'gs://my_bucket/folder1/docs/'
GCS_DIRECTORY_DOCS = '' # @param {type:"string"}
GCS_DIRECTORY_METADATA = '' # @param {type:"string"}
Create VAIS App and Datastore¶
[Prerequisite] Create a GCS bucket with sample documents¶
This step is only needed for the purpose of this demo. For the real use case you will need to upload your actual documents to a GCS bucket.
Here, we download Alphabet's 2022 Q1-Q4 Earning transcripts as sample documents.
def create_gcs_bucket_and_download_files(project_id, new_bucket_path, file_urls):
"""
Creates a new GCS bucket (if it doesn't exist) and downloads files from specified URLs.
Handles paths with subdirectories correctly using `urlparse`.
"""
if not new_bucket_path.startswith("gs://") or not new_bucket_path.endswith("/"):
raise ValueError(
"Invalid GCS path format. Must start with 'gs://' and end with '/'. "
f"Received: '{new_bucket_path}'"
)
storage_client = storage.Client(project=project_id)
# Extract bucket name and prefix from path
parsed_path = urlparse(new_bucket_path)
new_bucket_name = parsed_path.netloc
blob_prefix = parsed_path.path.strip('/') # Remove leading and trailing slashes
new_bucket = storage_client.bucket(new_bucket_name)
if not new_bucket.exists():
new_bucket = storage_client.create_bucket(new_bucket_name)
print(f"Bucket {new_bucket_name} created.")
for url in file_urls:
file_name = url.split("/")[-1]
print(f"Downloading: {file_name}")
try:
response = requests.get(url)
response.raise_for_status()
# Construct the full blob path (including prefix)
blob_name = f"{blob_prefix}/{file_name}" if blob_prefix else file_name
blob = new_bucket.blob(blob_name)
blob.upload_from_string(response.content)
print(f"Uploaded: {blob_name}") # Print the uploaded blob path
except requests.exceptions.RequestException as e:
print(f"Error downloading {file_name}: {e}")
file_urls = [
"https://abc.xyz/assets/investor/static/pdf/2022_Q1_Earnings_Transcript.pdf",
"https://abc.xyz/assets/investor/static/pdf/2022_Q2_Earnings_Transcript.pdf",
"https://abc.xyz/assets/investor/static/pdf/2022_Q3_Earnings_Transcript.pdf",
"https://abc.xyz/assets/investor/static/pdf/2022_Q4_Earnings_Transcript.pdf"
]
create_gcs_bucket_and_download_files(PROJECT_ID, GCS_DIRECTORY_DOCS, file_urls)
[Prerequisite] Create a GCS bucket with sample Metadata¶
Similar to the code block above, this step is only needed for the purpose of this demo.
Here we extract some trivial metadata from the file name. Each Metadata will have a content similar to the one below:
{
"doc_name": "2022_Q1_Earnings_Transcript",
"year": "2022",
"quarter": "Q1",
"doc_type": "earnings transcript",
"stock_tickers": ["GOOG", "GOOGL"],
"company_name": "alphabet",
}
def create_metadata_files(source_folder_path, metadata_folder_path):
"""Creates metadata JSON files for documents in a GCS folder."""
if not metadata_folder_path.startswith("gs://") or not metadata_folder_path.endswith("/"):
raise ValueError(
"Invalid GCS path format. Must start with 'gs://' and end with '/'. "
f"Received: '{metadata_folder_path}'"
)
bucket_name = source_folder_path.split("/")[2]
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
source_folder = source_folder_path.replace(f"gs://{bucket_name}/", "")
metadata_folder = metadata_folder_path.replace(f"gs://{bucket_name}/", "")
blobs = bucket.list_blobs(prefix=source_folder)
for blob in blobs:
# Explicitly check if the blob is a folder/directory
if blob.name.endswith("/"):
print(f"Skipping folder: {blob.name}")
continue
# Get the filename by splitting on the last "/"
filename = blob.name.split("/")[-1]
# Improved regex to match a wider variety of file names
doc_name_match = re.match(r"(\d{4})_Q(\d)_\w+_Transcript\.pdf", filename)
if not doc_name_match:
print(f"Skipping file with unexpected name: {filename}")
continue
year, quarter = doc_name_match.groups()
# Construct doc_type from the filename (without path)
doc_type = "_".join(filename.split("_")[2:-1]).replace("_", " ")
metadata = {
"doc_name": filename.replace(".pdf", ""),
"year": year,
"quarter": f"Q{quarter}",
"doc_type": doc_type,
"stock_tickers": ["GOOG", "GOOGL"],
"company_name": "alphabet"
}
metadata_file_name = f"{metadata['doc_name']}.txt"
metadata_blob = bucket.blob(metadata_folder + metadata_file_name)
metadata_blob.upload_from_string(json.dumps(metadata, indent=4))
print(f"Created metadata file: {metadata_blob.name}")
create_metadata_files(GCS_DIRECTORY_DOCS, GCS_DIRECTORY_METADATA)
Helper functions to issue basic search on a Datastore or an App¶
def search_by_datastore(project_id: str, location: str, datastore_id: str, query: str) -> Dict[str, Any]:
"""Searches a datastore using the provided query."""
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_id}/servingConfigs/default_search:search',
headers={
'Content-Type': 'application/json',
},
json={
"query": query,
"pageSize": 1
},
)
return response
def search_by_app(project_id: str, location: str, app_id: str, query: str) -> Dict[str, Any]:
"""Searches an app using the provided query."""
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/engines/{app_id}/servingConfigs/default_config:search',
headers={
'Content-Type': 'application/json',
},
json={
"query": query,
"pageSize": 1
},
)
return response
Helper functions to check whether or not a Datastore or an App already exist¶
def datastore_exists(project_id: str, location: str, datastore_id: str) -> bool:
"""Check if a datastore exists."""
response = search_by_datastore(project_id, location, datastore_id, "test")
status_code = response.status_code
if status_code == 200:
return True
if status_code == 404:
return False
raise Exception(f"Error: {status_code}")
def app_exists(project_id: str, location: str, app_id: str) -> bool:
"""Check if an App exists."""
response = search_by_app(project_id, location, app_id, "test")
status_code = response.status_code
if status_code == 200:
return True
if status_code == 404:
return False
raise Exception(f"Error: {status_code}")
Helper functions to create a Datastore or an App¶
The datastore is created with Chunk Mode and Chunk size of 500 tokens.
The documents will be processed with Layout parser (higher quality for complex documents containing elements like tables and lists) and Ancestor information (i.e. headings) is included with each Chunk. Please see official documentation for more details.
These settings are chosen to optimize accuracy, they can be adjusted in the create_datastore function below.
def create_datastore(project_id: str, location: str, datastore_id: str) -> int:
"""Create a datastore."""
payload = {
"displayName": datastore_id,
"industryVertical": "GENERIC",
"solutionTypes": ["SOLUTION_TYPE_SEARCH"],
"contentConfig": "CONTENT_REQUIRED",
"documentProcessingConfig": {
"chunkingConfig": {
"layoutBasedChunkingConfig": {
"chunkSize": 500,
"includeAncestorHeadings": True,
}
},
"defaultParsingConfig": {
"layoutParsingConfig": {}
}
}
}
header = {"X-Goog-User-Project": project_id, "Content-Type": "application/json"}
es_endpoint = f"https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores?dataStoreId={datastore_id}"
response = authed_session.post(es_endpoint, data=json.dumps(payload), headers=header)
if response.status_code == 200:
print(f"The creation of Datastore {datastore_id} is initiated.")
print("It may take a few minutes for the Datastore to become available")
else:
print(f"Failed to create Datastore {datastore_id}")
print(response.json())
return response.status_code
def create_app(project_id: str, location: str, datastore_id: str, app_id: str) -> int:
"""Create a search app."""
payload = {
"displayName": app_id,
"dataStoreIds": [datastore_id],
"solutionType": "SOLUTION_TYPE_SEARCH",
"searchEngineConfig": {
"searchTier": "SEARCH_TIER_ENTERPRISE",
"searchAddOns": ["SEARCH_ADD_ON_LLM"],
}
}
header = {"X-Goog-User-Project": project_id, "Content-Type": "application/json"}
es_endpoint = f"https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/engines?engineId={app_id}"
response = authed_session.post(es_endpoint, data=json.dumps(payload), headers=header)
if response.status_code == 200:
print(f"The creation of App {app_id} is initiated.")
print("It may take a few minutes for the App to become available")
else:
print(f"Failed to create App {app_id}")
print(response.json())
return response.status_code
Create a Datastore with the provided ID if it doesn't exist¶
if datastore_exists(PROJECT_ID, LOCATION, DATASTORE_ID):
print(f"Datastore {DATASTORE_ID} already exists.")
else:
create_datastore(PROJECT_ID, LOCATION, DATASTORE_ID)
[Optional] Check if the Datastore is created successfully¶
The Datastore is polled to track when it becomes available.
This may take a few minutes
while not datastore_exists(PROJECT_ID, LOCATION, DATASTORE_ID):
print(f"Datastore {DATASTORE_ID} is still being created.")
time.sleep(30)
print(f"Datastore {DATASTORE_ID} is created successfully.")
Create an App with the provided ID if it doesn't exist¶
The App will be connected to a Datastore with the provided ID earlier in this notebook
if app_exists(PROJECT_ID, LOCATION, APP_ID):
print(f"App {APP_ID} already exists.")
else:
create_app(PROJECT_ID, LOCATION, DATASTORE_ID, APP_ID)
[Optional] Check if the App is created successfully¶
The App is polled to track when it becomes available.
This may take a few minutes
while not app_exists(PROJECT_ID, LOCATION, APP_ID):
print(f"App {APP_ID} is still being created.")
time.sleep(30)
print(f"App {APP_ID} is created successfully.")
Providing your own schema for the Metadata¶
[Optional] Provide your own Schema¶
The schema is detected automatically but it can be optionally adjusted to decide which fields should be:
- Retrievable (returned in the response),
- Searchable (searched through term-based and semantically),
- Indexible (filtered, boosted etc)
We can also specify keyProperties which gives special retrieval treatment to certain fields.
Note that the Schema is only relevant to the Metadata and not the actual documents and it's hierarchical structure.
See this documentation on auto-detecting versus providing your own Schema
schema: Dict[str, Any] = {
"structSchema": {
"type": "object",
"properties": {
"doc_name": {
"keyPropertyMapping": "title",
"retrievable": True,
"dynamicFacetable": False,
"type": "string"
},
"year": {
"retrievable": True,
"indexable": True,
"dynamicFacetable": False,
"searchable": False,
"type": "string"
},
"quarter": {
"retrievable": True,
"indexable": True,
"dynamicFacetable": False,
"searchable": False,
"type": "string"
},
"doc_type": {
"retrievable": True,
"indexable": True,
"dynamicFacetable": False,
"searchable": False,
"type": "string"
},
"stock_tickers": {
"type": "array",
"items": {
"type": "string",
"keyPropertyMapping": "category"
}
},
"company_name": {
"retrievable": True,
"indexable": True,
"dynamicFacetable": False,
"searchable": False,
"type": "string"
},
},
"$schema": "https://json-schema.org/draft/2020-12/schema",
}
}
response = authed_session.patch(
f'https://discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}/schemas/default_schema',
headers={
'Content-Type': 'application/json',
},
json = schema,
)
print(response.json())
schema_update_lro = response.json()["name"]
Check the status of Schema update¶
For an empty Datastore the Schema update should be almost instantaneous.
A request to update the schema creates a Long-Running Operation which can be polled.
while True:
response = authed_session.get(
f"https://discoveryengine.googleapis.com/v1/{schema_update_lro}",
)
try:
status = response.json()["done"]
if status:
print(f"Import completed!")
break
except:
print(f"Import in progress.")
time.sleep(10)
[Optional] Get the current Schema¶
This block can be used to check whether or not the schema is in the desired state (particularly useful for an auto-detected schema).
resp = authed_session.get(
f'https://discoveryengine.googleapis.com/v1/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}/schemas/default_schema',
)
resp.json()
Prepare documents with metadata for ingestion¶
Define the path to documents and Metadata (both in GCS and Local)¶
The JSONL GCS Directory will be used to store the JSONL file to-be-cereated. If such a directory does not exist, it will be created.
For the purpose of this demo, the documents and their correponding metadata are joined based on the FIELD_FOR_FILE_NAME within the metadata (doc_name in this example)
Based on that convention, the metadata for "2022_Q1_Earnings_Transcript.pdf" will have the following content:
{
"doc_name": "2022_Q1_Earnings_Transcript",
"year": "2022",
"quarter": "Q1",
"doc_type": "earnings transcript",
"stock_tickers": ["GOOG", "GOOGL"],
"company_name": "alphabet",
}
The logic is applied for illustration purposes and you can apply any other joining logic that fits your data (e.g. common name between metadata and document files)
DOCUMENT_FORMAT = 'pdf' # @param ["docx", "pdf"]
GCS_DIRECTORY_JSONL = '' # @param {type:"string"}
FIELD_FOR_FILE_NAME = "doc_name" # @param {type:"string"}
JSONL_FILENAME = "alphabet_earnings.json"
LOCAL_DOCS_PATH = "data"
LOCAL_METADATA_PATH = "metadata"
LOCAL_JSONL_PATH = "jsonl"
def prepare_jsonl(row: pd.Series) -> Dict[str, Any]:
"""Prepares metadata for a given row in the DataFrame."""
mimetype = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' if DOCUMENT_FORMAT == 'docx' else 'application/pdf'
struct_data = row.to_dict()
return {
"id": row[FIELD_FOR_FILE_NAME],
"structData": struct_data,
"content": {"mimeType": mimetype, "uri": f'{GCS_DIRECTORY_DOCS}{row[FIELD_FOR_FILE_NAME]}.{DOCUMENT_FORMAT}'}
}
Prepare JSONL file and save to GCS¶
Documents and their metadata are copied to the local path, loaded in a DataFrame, and processed to prepare a JSONL file with the expected format The JSONL file is then uploaded the provided GCS path
# Copy files from GCS to local
os.makedirs(LOCAL_DOCS_PATH, exist_ok=True)
os.makedirs(LOCAL_METADATA_PATH, exist_ok=True)
os.makedirs(LOCAL_JSONL_PATH, exist_ok=True)
!gsutil -m cp -r {GCS_DIRECTORY_DOCS}* {LOCAL_DOCS_PATH}
!gsutil -m cp -r {GCS_DIRECTORY_METADATA}* {LOCAL_METADATA_PATH}
# Load and process metadata
metadata_files = glob.glob(f"{os.getcwd()}/{LOCAL_METADATA_PATH}/*.txt")
df_json = pd.concat([pd.read_json(file, typ="series") for file in metadata_files], axis=1).T # Load all JSON into one DataFrame
# Apply metadata preparation and save as JSONL
df_json['metadata'] = df_json.apply(prepare_jsonl, axis=1)
df_json['metadata'].to_json(f'{LOCAL_JSONL_PATH}/{JSONL_FILENAME}', orient='records', lines=True)
# Upload the local JSONL file to GCS
!gsutil -m cp {LOCAL_JSONL_PATH}/* {GCS_DIRECTORY_JSONL}
# Optional print of the jsonL content
print("\nJSONL Content:")
for metadata_entry in df_json['metadata']:
print(json.dumps(metadata_entry, indent=2))
Ingest documents to Datastore¶
Import documents with metadata from JSONL on GCS¶
This is where the actual import to the Datastore happens. The process is done Async, and the request returns an instance of a "Long running Operation"
This may take xx minutes. Feel free to grab a coffee.
def import_documents_from_gcs_jsonl(project_id: str, location: str, datastore_id: str, gcs_uri: str) -> str:
"""Imports documents from a JSONL file in GCS."""
payload = {
"reconciliationMode": "INCREMENTAL",
"gcsSource": {"inputUris": [gcs_uri]},
}
header = {"Content-Type": "application/json"}
es_endpoint = f"https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_id}/branches/default_branch/documents:import"
response = authed_session.post(es_endpoint, data=json.dumps(payload), headers=header)
print(f"--{response.json()}")
return response.json()["name"]
import_lro = import_documents_from_gcs_jsonl(
project_id=PROJECT_ID,
location=LOCATION,
datastore_id=DATASTORE_ID,
gcs_uri=f'{GCS_DIRECTORY_JSONL}{JSONL_FILENAME}',
)
[Optional] Check the status of document import via polling¶
Optionally check the status of the long running operation for the import job. You can check this in the UI as well by looking at the "activity" tab of the corresponding Datastore
while True:
response = authed_session.get(
f"https://discoveryengine.googleapis.com/v1/{import_lro}",
)
try:
status = response.json()["done"]
if status:
print(f"Import completed!")
break
except KeyError:
print(f"Import in progress.")
time.sleep(60)
Run queries with and without Metadata filter¶
Sample search without filter¶
A basic search request issued to the Datastore
We get relevant results from all four documents in the datastore
test_query = "Google revenue"
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}/servingConfigs/default_search:search',
headers={
'Content-Type': 'application/json',
},
json = {
"query": test_query,
}
)
response.json()
test_query = "Google revenue"
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}/servingConfigs/default_search:search',
headers={
'Content-Type': 'application/json',
},
json = {
"query": test_query,
"filter": 'quarter: ANY("Q2")',
}
)
response.json()
Cleanup¶
Clean up resources created in this notebook.
Clean up GCS bucket¶
❗❗❗ Only run the below cells if you created a new bucket just for this notebook ❗❗❗
Technically you could have used different buckets for documents, their Metadata and JSONL. If you happened to use the same TEST bucket for all of them, the following cells help you do the cleanup.
To cofirm the assumption above, you're asked to expliitely enter the Bucket name.
def empty_bucket(bucket_name):
"""Deletes all objects in the specified GCS bucket."""
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs() # List all blobs (objects)
for blob in blobs:
blob.delete() # Delete each blob
print(f"Bucket {bucket_name} emptied.")
# Name of the bucket to be deleted. e.g. "my_bucket"
BUCKET_TO_DELETE = '' # @param {type:"string"}
## Empty the bucket by deleting all files in it
empty_bucket(BUCKET_TO_DELETE)
## Create a client object
client = storage.Client(project=PROJECT_ID)
## Get the bucket object
bucket = client.get_bucket(BUCKET_TO_DELETE)
## Delete the bucket
bucket.delete()
print(f"Bucket {BUCKET_TO_DELETE} deleted successfully.")
Delete local files¶
This will delete local folders for Documents, Metadata, and JSONL according to paths specified earlier in this notebook.
shutil.rmtree(LOCAL_DOCS_PATH)
shutil.rmtree(LOCAL_METADATA_PATH)
shutil.rmtree(LOCAL_JSONL_PATH)
print("Local files deleted successfully.")
Delete the Search App¶
Delete the App if you no longer need it
Alternatively you can follow these instructions to delete an App from the UI
response = authed_session.delete(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/engines/{APP_ID}',
headers={
"X-Goog-User-Project": PROJECT_ID
}
)
print(response.text)
Delete the Datastores¶
Delete the Datastore if you no longer need it
Alternatively you can follow these instructions to delete a Datastore from the UI
response = authed_session.delete(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}',
headers={
"X-Goog-User-Project": PROJECT_ID
}
)
print(response.text)