# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Inline Ingestion of Documents into Vertex AI Search¶
Open in Colab |
Open in Colab Enterprise |
Open in Workbench |
View on GitHub |
Author(s) | Jaival Desai, Hossein Mansour |
Reviewers(s) | Lei Chen, Abhishek Bhagwat |
Last updated | 2024-09-11: The first draft |
Overview¶
In this notebook, we will demonstrate how to make an inline ingestion of documents into Vertex AI Search (VAIS) datastores.
VAIS supports a variety of sources and data types. For structured documents or unstructured documents, with or without metadata, it is advised to initially stage them on a GCS bucket or a BQ table and perform a subsequent import by referring to those documents by their URI. This approach creates a source-of-truth which can be investigated in details and allows for the possibility of Incremental
import or Full
import depending on the choice of ReconciliationMode. The Full
option is particularly useful to resolve possible conflics and duplicates.
However in some cases customers may prefer an inline ingestion of documents for its simplicity or to help them stay compliant with some restrictions defined on Org level. Note that inline ingestion comes with some limitations including more strict limits on the file size, and lower visibility on the UI given the fact that the content needs to be encoded into rawBytes.
We will perform the following steps:
- Create a VAIS Datastore
- Prepare sample documents
- Import sample documents (and other operations)
- Query the datastore
- Cleanup
REST API is used throughout this notebook. Please consult the official documentation for alternative ways to achieve the same goal, namely Client libraries and RPC.
Vertex AI Search¶
Vertex AI Search (VAIS) is a fully-managed platform, powered by large language models, that lets you build AI-enabled search and recommendation experiences for your public or private websites or mobile applications
VAIS can handle a diverse set of data sources including structured, unstructured, and website data, as well as data from third-party applications such as Jira, Salesforce, and Confluence.
VAIS also has built-in integration with LLMs which enables you to provide answers to complex questions, grounded in your data
Using this Notebook¶
If you're running outside of Colab, depending on your environment you may need to install pip packages that are included in the Colab environment by default but are not part of the Python Standard Library. Outside of Colab you'll also notice comments in code cells that look like #@something, these trigger special Colab functionality but don't change the behavior of the notebook.
This tutorial uses the following Google Cloud services and resources:
- Service Usage API
- Discovery Engine
- Google Cloud Storage Client
This notebook has been tested in the following environment:
- Python version = 3.10.12
- google.cloud.storage = 2.8.0
- google.auth = 2.27.0
Getting Started¶
The following steps are necessary to run this notebook, no matter what notebook environment you're using.
If you're entirely new to Google Cloud, get started here
Google Cloud Project Setup¶
- Select or create a Google Cloud project. When you first create an account, you get a $300 free credit towards your compute/storage costs
- Make sure that billing is enabled for your project
- Enable the Service Usage API
- Enable the Cloud Storage API
- Enable the Discovery Engine API for your project
Google Cloud Permissions¶
Ideally you should have Owner role for your project to run this notebook. If that is not an option, you need at least the following roles
roles/serviceusage.serviceUsageAdmin
to enable APIsroles/iam.serviceAccountAdmin
to modify service agent permissionsroles/discoveryengine.admin
to modify discoveryengine assetsroles/storage.objectAdmin
to modify and delete GCS buckets
Setup Environment¶
Authentication¶
If you're using Colab, run the code in the next cell. Follow the popups and authenticate with an account that has access to your Google Cloud project.
If you're running this notebook somewhere besides Colab, make sure your environment has the right Google Cloud access. If that's a new concept to you, consider looking into Application Default Credentials for your local environment and initializing the Google Cloud CLI. In many cases, running gcloud auth application-default login
in a shell on the machine running the notebook kernel is sufficient.
More authentication options are discussed here.
# Colab authentication.
import sys
if "google.colab" in sys.modules:
from google.colab import auth
auth.authenticate_user()
print("Authenticated")
from google.auth import default
from google.auth.transport.requests import AuthorizedSession
creds, _ = default()
authed_session = AuthorizedSession(creds)
Import Libraries¶
import os
import time
import base64
import json
from typing import Dict, Any, List, Tuple
from io import BytesIO
import shutil
import requests
import pandas as pd
from google.cloud import storage # do we need storage??
Configure environment¶
You can enter the ID for an existing Vertex AI Search Datastore to be used in this notebook.
You can find more information regarding the location
of datastores and associated limitations here. global
is preferred unless there is a certain data residency requirement you have to comply with.
The location of a Datastore is set at the time of creation and it should be called appropriately to query the Datastore.
LOCAL_DIRECTORY_DOCS
is used to store the sample files locally.
PROJECT_ID = "" # @param {type:"string"}
# Vertex AI Search Parameters
DATASTORE_ID = "" # @param {type:"string"}
LOCATION = "global" # @param ["global", "us", "eu"]
LOCAL_DIRECTORY_DOCS = "./sample_docs" # @param {type:"string"}
STEP 1. Create VAIS Datastore¶
You can skip this section if you already have a datastore set up.
Helper functions to create a Datastore¶
def create_datastore(project_id: str, location: str, datastore_id: str) -> int:
"""Create a datastore with doc mode and the basic digital parser"""
payload = {
"displayName": datastore_id,
"industryVertical": "GENERIC",
"solutionTypes": ["SOLUTION_TYPE_SEARCH"],
"contentConfig": "CONTENT_REQUIRED",
"documentProcessingConfig": {
"defaultParsingConfig": {
"digitalParsingConfig": {}
}
}
}
header = {"X-Goog-User-Project": project_id, "Content-Type": "application/json"}
es_endpoint = f"https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores?dataStoreId={datastore_id}"
response = authed_session.post(es_endpoint, data=json.dumps(payload), headers=header)
if response.status_code == 200:
print(f"The creation of Datastore {datastore_id} is initiated.")
print("It may take a few minutes for the Datastore to become available")
else:
print(f"Failed to create Datastore {datastore_id}")
print(response.json())
return response.status_code
Helper functions to issue basic search on a Datastore¶
def search_by_datastore(project_id: str, location: str, datastore_id: str, query: str) -> requests.Response:
"""Searches a datastore using the provided query."""
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_id}/servingConfigs/default_search:search',
headers={
'Content-Type': 'application/json',
},
json={
"query": query,
"pageSize": 1
},
)
return response
Helper functions to check whether or not a Datastore already exists¶
def datastore_exists(project_id: str, location: str, datastore_id: str) -> bool:
"""Check if a datastore exists."""
response = search_by_datastore(project_id, location, datastore_id, "test")
status_code = response.status_code
if status_code == 200:
return True
if status_code == 404:
return False
raise Exception(f"Error: {status_code}")
Create a Datastore with the provided ID if it doesn't exist¶
# Create Chunk mode Datastore if it doesn't exist
if datastore_exists(PROJECT_ID, LOCATION, DATASTORE_ID):
print(f"Datastore {DATASTORE_ID} already exists.")
else:
create_datastore(PROJECT_ID, LOCATION, DATASTORE_ID)
[Optional] Check if the Datastore is created successfully¶
The Datastore is polled to track when it becomes available.
This may take a few minutes after the datastore creation is initiated
while not datastore_exists(PROJECT_ID, LOCATION, DATASTORE_ID):
print(f"Datastore {DATASTORE_ID} is still being created.")
time.sleep(30)
print(f"Datastore {DATASTORE_ID} is created successfully.")
STEP 2. Prepare sample documents¶
Create a folder to store the files locally¶
# Check if the folder already exists
if not os.path.exists(LOCAL_DIRECTORY_DOCS):
# Create the folder
os.makedirs(LOCAL_DIRECTORY_DOCS)
print(f"Folder '{LOCAL_DIRECTORY_DOCS}' created successfully!")
else:
print(f"Folder '{LOCAL_DIRECTORY_DOCS}' already exists.")
Helper function to download pdf files and store them locally¶
def download_pdfs(url_list: List[str], save_directory: str = LOCAL_DIRECTORY_DOCS) -> List[str]:
"""Downloads PDFs from a list of URLs and saves them to a specified directory.
Args:
url_list: A list of URLs pointing to PDF files.
save_directory: The directory where the PDFs will be saved. Defaults to LOCAL_DIRECTORY_DOCS.
Returns:
A list of file paths where the PDFs were saved.
"""
pdf_file_paths = []
# Create the save directory if it doesn't exist
if not os.path.exists(save_directory):
os.makedirs(save_directory)
for i, url in enumerate(url_list):
try:
response = requests.get(url)
response.raise_for_status()
# Construct the full file path within the save directory
file_name = f"downloaded_pdf_{i+1}.pdf"
file_path = os.path.join(save_directory, file_name)
with open(file_path, "wb") as f:
f.write(response.content)
pdf_file_paths.append(file_path)
print(f"Downloaded PDF from {url} and saved to {file_path}")
except requests.exceptions.RequestException as e:
print(f"Error downloading PDF from {url}: {e}")
return pdf_file_paths
Download sample PDF files¶
file_urls = [
"https://abc.xyz/assets/91/b3/3f9213d14ce3ae27e1038e01a0e0/2024q1-alphabet-earnings-release-pdf.pdf",
"https://abc.xyz/assets/19/e4/3dc1d4d6439c81206370167db1bd/2024q2-alphabet-earnings-release.pdf"
]
pdf_variables = download_pdfs(file_urls)
Create a sample text file and store locally¶
sample_text ="""
MOUNTAIN VIEW, Calif. – January 30, 2024 – Alphabet Inc. (NASDAQ: GOOG, GOOGL) today announced
financial results for the quarter and fiscal year ended December 31, 2023.
Sundar Pichai, CEO, said: “We are pleased with the ongoing strength in Search and the growing contribution from
YouTube and Cloud. Each of these is already benefiting from our AI investments and innovation. As we enter the
Gemini era, the best is yet to come.”
Ruth Porat, President and Chief Investment Officer; CFO said: “We ended 2023 with very strong fourth quarter
financial results, with Q4 consolidated revenues of $86 billion, up 13% year over year. We remain committed to our
work to durably re-engineer our cost base as we invest to support our growth opportunities.”
"""
def save_string_to_file(string_to_save, filename="doc_3.txt", save_directory=LOCAL_DIRECTORY_DOCS):
"""Saves a string to a text file within a specified directory.
Args:
string_to_save: The string content to be saved.
filename: The desired name for the output file (default: "doc_3.txt").
save_directory: The directory where the file will be saved (default: LOCAL_DIRECTORY_DOCS).
Returns:
None
"""
# Create the save directory if it doesn't exist
if not os.path.exists(save_directory):
os.makedirs(save_directory)
# Construct the full file path within the save directory
file_path = os.path.join(save_directory, filename)
try:
with open(file_path, "w", encoding="utf-8") as file:
file.write(string_to_save)
print(f"String successfully saved to {file_path}")
except IOError as e:
print(f"An error occurred while saving the file: {e}")
save_string_to_file(sample_text)
Helper function to convert the content of a file to Base64 encoding¶
def file_to_base64(file_path):
"""Converts the content of a file to Base64 encoding.
Args:
file_path: The path to the file.
Returns:
The Base64 encoded string representing the file's content.
"""
with open(file_path, "rb") as file:
file_data = file.read()
base64_encoded_data = base64.b64encode(file_data).decode('utf-8')
return base64_encoded_data
Convert sample files to Base64 encoding¶
content_doc_1 = file_to_base64(LOCAL_DIRECTORY_DOCS + "/downloaded_pdf_1.pdf")
content_doc_2 = file_to_base64(LOCAL_DIRECTORY_DOCS + "/downloaded_pdf_2.pdf")
content_doc_3 = file_to_base64(LOCAL_DIRECTORY_DOCS + "/doc_3.txt")
Create JSON documents from sample contents¶
Here we create Documents
in VAIS terminology based on contents from sample files created earlier.
Note that the field content
in the document references rawBytes as opposed to uri
that is used when the file is staged elsewhere.
mimeType should be consistent with the format of the files to be ingested (e.g. application/pdf). See a list of supported mimeTypes here
We add some metadata to each document as well to demonstrate this more advanced functionality. This is optional and you can ingest the content with no metadata as well.
my_document_1 = {"id":"doc-1","structData":{"title":"test_doc_1","color_theme":"blue"},"content":{"mimeType":"application/pdf","rawBytes":content_doc_1}}
my_document_2 = {"id":"doc-2","structData":{"title":"test_doc_2","color_theme":"red"},"content":{"mimeType":"application/pdf","rawBytes":content_doc_2}}
my_document_3 = {"id":"doc-3","structData":{"title":"test_doc_3","color_theme":"green"},"content":{"mimeType":"text/plain","rawBytes":content_doc_3}}
def import_documents_rawbytes(project_id: str, location: str, datastore_id: str) -> str:
"""Imports unstructured documents Inline."""
payload = {
"reconciliationMode": "INCREMENTAL",
"inlineSource": {"documents":[my_document_1,my_document_2,my_document_3]},
}
header = {"Content-Type": "application/json"}
es_endpoint = f"https://discoveryengine.googleapis.com/v1/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_id}/branches/default_branch/documents:import"
response = authed_session.post(es_endpoint, data=json.dumps(payload), headers=header)
print(f"--{response.json()}")
return response.json()
import_documents_rawbytes(PROJECT_ID, LOCATION, DATASTORE_ID)
def list_documents_datastore(project_id: str, location: str, data_store_id: str) -> List[Dict[str, str]] | None:
"""Lists documents in a specified data store using the REST API.
Args:
project_id: The ID of your Google Cloud project.
location: The location of your data store.
Values: "global", "us", "eu"
data_store_id: The ID of the datastore.
Returns:
The JSON response containing the list of documents, or None if an error occurs.
"""
base_url = f"{location}-discoveryengine.googleapis.com" if location != "global" else "discoveryengine.googleapis.com"
url = f"https://{base_url}/v1alpha/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{data_store_id}/branches/default_branch/documents"
try:
# Assuming 'authed_session' is available and properly configured for authentication
response = authed_session.get(url)
response.raise_for_status() # Raise an exception for bad status codes
documents = response.json()
print(f"Successfully retrieved {len(documents.get('documents', []))} document(s).\n")
return [document
for document in documents.get('documents', [])]
except requests.exceptions.RequestException as e:
print(f"Error listing documents: {e}")
return None
list_documents_datastore(PROJECT_ID, LOCATION, DATASTORE_ID)
DOCUMENT_ID = "doc-1"
def get_document_datastore(project_id: str, location: str, data_store_id: str, document_id: str) -> Dict[str, str] | None:
"""Gets a specific document from a data store using the REST API.
Args:
project_id: The ID of your Google Cloud project.
location: The location of your data store.
Values: "global", "us", "eu"
data_store_id: The ID of the datastore.
document_id: The ID of the document to retrieve.
Returns:
The JSON response containing the document data, or None if an error occurs.
"""
base_url = f"{location}-discoveryengine.googleapis.com" if location != "global" else "discoveryengine.googleapis.com"
url = f"https://{base_url}/v1alpha/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{data_store_id}/branches/default_branch/documents/{document_id}"
try:
# Assuming 'authed_session' is available and properly configured for authentication
response = authed_session.get(url)
response.raise_for_status() # Raise an exception for bad status codes
document = response.json()
print(f"Successfully retrieved document with ID: {document_id}\n")
return document
except requests.exceptions.RequestException as e:
print(f"Error getting document: {e}")
return None
get_document_datastore(PROJECT_ID, LOCATION, DATASTORE_ID, DOCUMENT_ID)
Delete a document¶
Delete a particular document from a datastore by referencing its ID.
The line that actually deletes the document is commented out here as we need all documents in a subsequent section.
Note that if you are leveraging GCS/BQ staging approach for importing, a Full import from the source will make the document reappear in the datastore. Same goes with a page within an advanced website datastore which may reappear by subsequent recrawls.
def delete_document_datastore(project_id: str, location: str, data_store_id: str, document_id: str) -> bool:
"""Deletes a specific document from a data store using the REST API.
Args:
project_id: The ID of your Google Cloud project.
location: The location of your data store.
Values: "global", "us", "eu"
data_store_id: The ID of the datastore.
document_id: The ID of the document to delete.
Returns:
True if the document was deleted successfully, False otherwise.
"""
base_url = f"{location}-discoveryengine.googleapis.com" if location != "global" else "discoveryengine.googleapis.com"
url = f"https://{base_url}/v1alpha/projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{data_store_id}/branches/default_branch/documents/{document_id}"
try:
# Assuming 'authed_session' is available and properly configured for authentication
response = authed_session.delete(url)
response.raise_for_status() # Raise an exception for bad status codes
print(f"Successfully deleted document with ID: {document_id}\n")
return True
except requests.exceptions.RequestException as e:
print(f"Error deleting document: {e}")
return False
# delete_document_datastore(PROJECT_ID, LOCATION, DATASTORE_ID, DOCUMENT_ID)
STEP 4. Run queries with and without Metadata filter¶
Sample search without filter¶
A basic search request issued to the Datastore
We get relevant results from all three documents in the datastore
test_query = "Google revenue"
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}/servingConfigs/default_search:search',
headers={
'Content-Type': 'application/json',
},
json = {
"query": test_query,
}
)
response.json()
Sample search with filter¶
Now let's apply a filter to showcase how metadata can be used to influence the results.
We issue the same query as above, but limit the results to color_theme "red". A expected we only get one result back
Note that this block shows a very basic way of querying a Datastore. You can find more information here
test_query = "Google revenue"
response = authed_session.post(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}/servingConfigs/default_search:search',
headers={
'Content-Type': 'application/json',
},
json = {
"query": test_query,
"filter": "color_theme: ANY(\"red\")",
}
)
response.json()
Cleanup¶
Clean up resources created in this notebook.
Set DELETE_RESOURCES
flag to True
to delete resources.
DELETE_RESOURCES = False
Delete local files¶
if DELETE_RESOURCES:
shutil.rmtree(LOCAL_DIRECTORY_DOCS)
print("Local files deleted successfully.")
Delete the Datastore¶
Delete the Datastore if you no longer need it
Alternatively you can follow these instructions to delete a Datastore from the UI
if DELETE_RESOURCES:
response = authed_session.delete(
f'https://discoveryengine.googleapis.com/v1alpha/projects/{PROJECT_ID}/locations/{LOCATION}/collections/default_collection/dataStores/{DATASTORE_ID}',
headers={
"X-Goog-User-Project": PROJECT_ID
}
)
print(response.json())