BigQuery#

Authentication / Configuration#

  • Use Client objects to configure your applications.

  • Client objects hold both a project and an authenticated connection to the BigQuery service.

  • The authentication credentials can be implicitly determined from the environment or directly via from_service_account_json and from_service_account_p12.

  • After setting GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT environment variables, create an instance of Client.

    >>> from google.cloud import bigquery
    >>> client = bigquery.Client()
    

Projects#

A project is the top-level container in the BigQuery API: it is tied closely to billing, and can provide default access control across all its datasets. If no project is passed to the client container, the library attempts to infer a project using the environment (including explicit environment variables, GAE, and GCE).

To override the project inferred from the environment, pass an explicit project to the constructor, or to either of the alternative classmethod factories:

>>> from google.cloud import bigquery
>>> client = bigquery.Client(project='PROJECT_ID')

Project ACLs#

Each project has an access control list granting reader / writer / owner permission to one or more entities. This list cannot be queried or set via the API: it must be managed using the Google Developer Console.

Datasets#

A dataset represents a collection of tables, and applies several default policies to tables as they are created:

  • An access control list (ACL). When created, a dataset has an ACL which maps to the ACL inherited from its project.
  • A default table expiration period. If set, tables created within the dataset will have the value as their expiration period.

Dataset operations#

List datasets for the client’s project:

    for dataset in client.list_datasets():  # API request(s)
        do_something_with(dataset)

Create a new dataset for the client’s project:

    dataset = client.dataset(DATASET_NAME)
    dataset.create()              # API request

Check for the existence of a dataset:

    assert not dataset.exists()   # API request
    dataset.create()              # API request
    assert dataset.exists()       # API request

Refresh metadata for a dataset (to pick up changes made by another client):

    assert dataset.description == ORIGINAL_DESCRIPTION
    dataset.description = LOCALLY_CHANGED_DESCRIPTION
    assert dataset.description == LOCALLY_CHANGED_DESCRIPTION
    dataset.reload()              # API request
    assert dataset.description == ORIGINAL_DESCRIPTION

Patch metadata for a dataset:

    ONE_DAY_MS = 24 * 60 * 60 * 1000
    assert dataset.description == ORIGINAL_DESCRIPTION
    dataset.patch(
        description=PATCHED_DESCRIPTION,
        default_table_expiration_ms=ONE_DAY_MS
    )      # API request
    assert dataset.description == PATCHED_DESCRIPTION
    assert dataset.default_table_expiration_ms == ONE_DAY_MS

Replace the ACL for a dataset, and update all writeable fields:

>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> dataset = client.dataset('dataset_name')
>>> dataset.get()  # API request
>>> acl = list(dataset.acl)
>>> acl.append(bigquery.Access(role='READER', entity_type='domain', entity='example.com'))
>>> dataset.acl = acl
>>> dataset.update()  # API request

Delete a dataset:

    assert dataset.exists()       # API request
    dataset.delete()
    assert not dataset.exists()   # API request

Tables#

Tables exist within datasets. List tables for the dataset:

    tables = list(dataset.list_tables())  # API request(s)
    assert len(tables) == 0
    table = dataset.table(TABLE_NAME)
    table.view_query = QUERY
    table.create()                          # API request
    tables = list(dataset.list_tables())  # API request(s)
    assert len(tables) == 1
    assert tables[0].name == TABLE_NAME

Create a table:

    table = dataset.table(TABLE_NAME, SCHEMA)
    table.create()                          # API request

Check for the existence of a table:

    table = dataset.table(TABLE_NAME, SCHEMA)
    assert not table.exists()               # API request
    table.create()                          # API request
    assert table.exists()                   # API request

Refresh metadata for a table (to pick up changes made by another client):

    assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
    assert table.description == ORIGINAL_DESCRIPTION
    table.friendly_name = LOCALLY_CHANGED_FRIENDLY_NAME
    table.description = LOCALLY_CHANGED_DESCRIPTION
    table.reload()                  # API request
    assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
    assert table.description == ORIGINAL_DESCRIPTION

Patch specific properties for a table:

    assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
    assert table.description == ORIGINAL_DESCRIPTION
    table.patch(
        friendly_name=PATCHED_FRIENDLY_NAME,
        description=PATCHED_DESCRIPTION,
    )      # API request
    assert table.friendly_name == PATCHED_FRIENDLY_NAME
    assert table.description == PATCHED_DESCRIPTION

Update all writable metadata for a table

    assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
    assert table.description == ORIGINAL_DESCRIPTION
    NEW_SCHEMA = table.schema[:]
    NEW_SCHEMA.append(SchemaField('phone', 'string'))
    table.friendly_name = UPDATED_FRIENDLY_NAME
    table.description = UPDATED_DESCRIPTION
    table.schema = NEW_SCHEMA
    table.update()              # API request
    assert table.friendly_name == UPDATED_FRIENDLY_NAME
    assert table.description == UPDATED_DESCRIPTION
    assert table.schema == NEW_SCHEMA

Get rows from a table’s data:

    for row in table.fetch_data():
        do_something(row)

Insert rows into a table’s data:

    ROWS_TO_INSERT = [
        (u'Phred Phlyntstone', 32),
        (u'Wylma Phlyntstone', 29),
    ]

    table.insert_data(ROWS_TO_INSERT)

Upload table data from a file:

    writer = csv.writer(csv_file)
    writer.writerow((b'full_name', b'age'))
    writer.writerow((b'Phred Phlyntstone', b'32'))
    writer.writerow((b'Wylma Phlyntstone', b'29'))
    csv_file.flush()

    with open(csv_file.name, 'rb') as readable:
        table.upload_from_file(
            readable, source_format='CSV', skip_leading_rows=1)

Delete a table:

    assert table.exists()       # API request
    table.delete()              # API request
    assert not table.exists()   # API request

Jobs#

Jobs describe actions peformed on data in BigQuery tables:

  • Load data into a table
  • Run a query against data in one or more tables
  • Extract data from a table
  • Copy a table

List jobs for a project:

    job_iterator = client.list_jobs()
    for job in job_iterator:   # API request(s)
        do_something_with(job)

Querying data (synchronous)#

Run a query which can be expected to complete within bounded time:

    query = client.run_sync_query(LIMITED)
    query.timeout_ms = TIMEOUT_MS
    query.run()             # API request

    assert query.complete
    assert len(query.rows) == LIMIT
    assert [field.name for field in query.schema] == ['name']

Run a query using a named query parameter:

    from google.cloud.bigquery import ScalarQueryParameter
    param = ScalarQueryParameter('state', 'STRING', 'TX')
    query = client.run_sync_query(LIMITED, query_parameters=[param])
    query.use_legacy_sql = False
    query.timeout_ms = TIMEOUT_MS
    query.run()             # API request

    assert query.complete
    assert len(query.rows) == LIMIT
    assert [field.name for field in query.schema] == ['name']

If the rows returned by the query do not fit into the initial response, then we need to fetch the remaining rows via fetch_data():

    query = client.run_sync_query(LIMITED)
    query.timeout_ms = TIMEOUT_MS
    query.max_results = PAGE_SIZE
    query.run()                     # API request

    assert query.complete
    assert query.page_token is not None
    assert len(query.rows) == PAGE_SIZE
    assert [field.name for field in query.schema] == ['name']

    iterator = query.fetch_data()   # API request(s) during iteration
    for row in iterator:
        do_something_with(row)

If the query takes longer than the timeout allowed, query.complete will be False. In that case, we need to poll the associated job until it is done, and then fetch the results:

    query = client.run_sync_query(QUERY)
    query.timeout_ms = TIMEOUT_MS
    query.use_query_cache = False
    query.run()                           # API request

    assert not query.complete

    job = query.job
    job.reload()                          # API rquest
    retry_count = 0

    while retry_count < 10 and job.state != u'DONE':
        time.sleep(1.5**retry_count)      # exponential backoff
        retry_count += 1
        job.reload()                      # API request

    assert job.state == u'DONE'

    iterator = query.fetch_data()         # API request(s) during iteration
    for row in iterator:
        do_something_with(row)

Querying data (asynchronous)#

Background a query, loading the results into a table:

>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> query = """\
SELECT firstname + ' ' + last_name AS full_name,
       FLOOR(DATEDIFF(CURRENT_DATE(), birth_date) / 365) AS age
 FROM dataset_name.persons
"""
>>> dataset = client.dataset('dataset_name')
>>> table = dataset.table(name='person_ages')
>>> job = client.run_async_query('fullname-age-query-job', query)
>>> job.destination = table
>>> job.write_disposition= 'WRITE_TRUNCATE'
>>> job.name
'fullname-age-query-job'
>>> job.job_type
'query'
>>> job.created
None
>>> job.state
None

Note

  • The created and state fields are not set until the job is submitted to the BigQuery back-end.

Then, begin executing the job on the server:

>>> job.begin()  # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'

Poll until the job is complete:

>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
...     retry_count -= 1
...     time.sleep(10)
...     job.reload()  # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Retrieve the results:

>>> results = job.results()
>>> rows, total_count, token = query.fetch_data()  # API request
>>> while True:
...     do_something_with(rows)
...     if token is None:
...         break
...     rows, total_count, token = query.fetch_data(
...         page_token=token)       # API request

Inserting data (asynchronous)#

Start a job loading data asynchronously from a set of CSV files, located on Google Cloud Storage, appending rows into an existing table. First, create the job locally:

>>> from google.cloud import bigquery
>>> from google.cloud.bigquery import SchemaField
>>> client = bigquery.Client()
>>> table = dataset.table(name='person_ages')
>>> table.schema = [
...     SchemaField('full_name', 'STRING', mode='required'),
...     SchemaField('age', 'INTEGER', mode='required')]
>>> job = client.load_table_from_storage(
...     'load-from-storage-job', table, 'gs://bucket-name/object-prefix*')
>>> job.source_format = 'CSV'
>>> job.skip_leading_rows = 1  # count of skipped header rows
>>> job.write_disposition = 'WRITE_TRUNCATE'
>>> job.name
'load-from-storage-job'
>>> job.job_type
'load'
>>> job.created
None
>>> job.state
None

Note

  • google.cloud.bigquery generates a UUID for each job.
  • The created and state fields are not set until the job is submitted to the BigQuery back-end.

Then, begin executing the job on the server:

>>> job.begin()  # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'

Poll until the job is complete:

>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
...     retry_count -= 1
...     time.sleep(10)
...     job.reload()  # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Exporting data (async)#

Start a job exporting a table’s data asynchronously to a set of CSV files, located on Google Cloud Storage. First, create the job locally:

>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> table = dataset.table(name='person_ages')
>>> job = client.extract_table_to_storage(
...     'extract-person-ages-job', table,
...     'gs://bucket-name/export-prefix*.csv')
... job.destination_format = 'CSV'
... job.print_header = True
... job.write_disposition = 'WRITE_TRUNCATE'
>>> job.name
'extract-person-ages-job'
>>> job.job_type
'extract'
>>> job.created
None
>>> job.state
None

Note

  • google.cloud.bigquery generates a UUID for each job.
  • The created and state fields are not set until the job is submitted to the BigQuery back-end.

Then, begin executing the job on the server:

>>> job.begin()  # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'

Poll until the job is complete:

>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
...     retry_count -= 1
...     time.sleep(10)
...     job.reload()  # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Copy tables (async)#

First, create the job locally:

>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> source_table = dataset.table(name='person_ages')
>>> destination_table = dataset.table(name='person_ages_copy')
>>> job = client.copy_table(
...     'copy-table-job', destination_table, source_table)
>>> job.name
'copy-table-job'
>>> job.job_type
'copy'
>>> job.created
None
>>> job.state
None

Note

  • google.cloud.bigquery generates a UUID for each job.
  • The created and state fields are not set until the job is submitted to the BigQuery back-end.

Then, begin executing the job on the server:

>>> job.begin()  # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'

Poll until the job is complete:

>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
...     retry_count -= 1
...     time.sleep(10)
...     job.reload()  # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)