BigQuery#

Authentication / Configuration#

  • Use Client objects to configure your applications.

  • Client objects hold both a project and an authenticated connection to the BigQuery service.

  • The authentication credentials can be implicitly determined from the environment or directly via from_service_account_json and from_service_account_p12.

  • After setting GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT environment variables, create an instance of Client.

    >>> from google.cloud import bigquery
    >>> client = bigquery.Client()
    

Projects#

A project is the top-level container in the BigQuery API: it is tied closely to billing, and can provide default access control across all its datasets. If no project is passed to the client container, the library attempts to infer a project using the environment (including explicit environment variables, GAE, and GCE).

To override the project inferred from the environment, pass an explicit project to the constructor, or to either of the alternative classmethod factories:

>>> from google.cloud import bigquery
>>> client = bigquery.Client(project='PROJECT_ID')

Project ACLs#

Each project has an access control list granting reader / writer / owner permission to one or more entities. This list cannot be queried or set via the API; it must be managed using the Google Developer Console.

Datasets#

A dataset represents a collection of tables, and applies several default policies to tables as they are created:

  • An access control list (ACL). When created, a dataset has an ACL which maps to the ACL inherited from its project.
  • A default table expiration period. If set, tables created within the dataset will have the value as their expiration period.

See BigQuery documentation for more information on Datasets.

Dataset operations#

List datasets for the client’s project:

    for dataset in client.list_datasets():  # API request(s)
        do_something_with(dataset)

Create a new dataset for the client’s project:

    # DATASET_ID = 'dataset_ids_are_strings'
    dataset_ref = client.dataset(DATASET_ID)
    dataset = bigquery.Dataset(dataset_ref)
    dataset.description = 'my dataset'
    dataset = client.create_dataset(dataset)  # API request

Refresh metadata for a dataset (to pick up changes made by another client):

    assert dataset.description == ORIGINAL_DESCRIPTION
    dataset.description = LOCALLY_CHANGED_DESCRIPTION
    assert dataset.description == LOCALLY_CHANGED_DESCRIPTION
    dataset = client.get_dataset(dataset)  # API request
    assert dataset.description == ORIGINAL_DESCRIPTION

Update a property in a dataset’s metadata:

    assert dataset.description == ORIGINAL_DESCRIPTION
    dataset.description = UPDATED_DESCRIPTION

    dataset = client.update_dataset(dataset, ['description'])  # API request

    assert dataset.description == UPDATED_DESCRIPTION

Update multiple properties in a dataset’s metadata:

    assert dataset.description == ORIGINAL_DESCRIPTION
    assert dataset.default_table_expiration_ms is None
    ONE_DAY_MS = 24 * 60 * 60 * 1000  # in milliseconds
    dataset.description = UPDATED_DESCRIPTION
    dataset.default_table_expiration_ms = ONE_DAY_MS

    dataset = client.update_dataset(
        dataset,
        ['description', 'default_table_expiration_ms']
    )  # API request

    assert dataset.description == UPDATED_DESCRIPTION
    assert dataset.default_table_expiration_ms == ONE_DAY_MS

Modify user permissions on a dataset:

    entry = bigquery.AccessEntry(
        role='READER',
        entity_type='userByEmail',
        entity_id='sample.bigquery.dev@gmail.com')
    assert entry not in dataset.access_entries
    entries = list(dataset.access_entries)
    entries.append(entry)
    dataset.access_entries = entries

    dataset = client.update_dataset(dataset, ['access_entries'])  # API request

    assert entry in dataset.access_entries

Delete a dataset:

    from google.cloud.exceptions import NotFound

    client.delete_dataset(dataset)  # API request

    with pytest.raises(NotFound):
        client.get_dataset(dataset)  # API request

Tables#

Tables exist within datasets. See BigQuery documentation for more information on Tables.

Table operations#

List tables for the dataset:

    tables = list(client.list_tables(dataset))  # API request(s)
    assert len(tables) == 0

    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref)
    table.view_query = QUERY
    client.create_table(table)                          # API request
    tables = list(client.list_tables(dataset))  # API request(s)

    assert len(tables) == 1
    assert tables[0].table_id == 'my_table'

Create a table:

    SCHEMA = [
        bigquery.SchemaField('full_name', 'STRING', mode='required'),
        bigquery.SchemaField('age', 'INTEGER', mode='required'),
    ]
    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref, schema=SCHEMA)
    table = client.create_table(table)      # API request

    assert table.table_id == 'my_table'

Get a table:

    assert table.description == ORIGINAL_DESCRIPTION
    table.description = LOCALLY_CHANGED_DESCRIPTION
    table = client.get_table(table)  # API request
    assert table.description == ORIGINAL_DESCRIPTION

Update a property in a table’s metadata:

    assert table.description == ORIGINAL_DESCRIPTION
    table.description = UPDATED_DESCRIPTION

    table = client.update_table(table, ['description'])  # API request

    assert table.description == UPDATED_DESCRIPTION

Update multiple properties in a table’s metadata:

    assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
    assert table.description == ORIGINAL_DESCRIPTION

    NEW_SCHEMA = list(table.schema)
    NEW_SCHEMA.append(bigquery.SchemaField('phone', 'STRING'))
    table.friendly_name = UPDATED_FRIENDLY_NAME
    table.description = UPDATED_DESCRIPTION
    table.schema = NEW_SCHEMA
    table = client.update_table(
        table,
        ['schema', 'friendly_name', 'description']
    )  # API request

    assert table.friendly_name == UPDATED_FRIENDLY_NAME
    assert table.description == UPDATED_DESCRIPTION
    assert table.schema == NEW_SCHEMA

Get rows from a table’s data:

    for row in client.list_rows(table):  # API request
        do_something(row)

Utilize iterator properties returned with row data:

    iterator = client.list_rows(table)  # API request
    page = six.next(iterator.pages)
    rows = list(page)
    total = iterator.total_rows
    token = iterator.next_page_token

Insert rows into a table’s data:

    rows_to_insert = [
        (u'Phred Phlyntstone', 32),
        (u'Wylma Phlyntstone', 29),
    ]

    errors = client.insert_rows(table, rows_to_insert)  # API request

    assert errors == []

Copy a table:

    source_dataset = bigquery.DatasetReference(
        'bigquery-public-data', 'samples')
    source_table_ref = source_dataset.table('shakespeare')

    # dataset_id = 'my_dataset'
    dest_table_ref = dest_dataset.table('destination_table')

    job = client.copy_table(source_table_ref, dest_table_ref)  # API request
    job.result()  # Waits for job to complete.

    assert job.state == 'DONE'
    dest_table = client.get_table(dest_table_ref)  # API request
    assert dest_table.table_id == 'destination_table'

Extract a table to Google Cloud Storage:

    from google.cloud.storage import Client as StorageClient

    storage_client = StorageClient()
    bucket = storage_client.create_bucket(bucket_name)  # API request
    destination_blob_name = 'person_ages_out.csv'
    destination = bucket.blob(destination_blob_name)

    destination_uri = 'gs://{}/{}'.format(bucket_name, destination_blob_name)
    extract_job = client.extract_table(
        table_ref, destination_uri)  # API request
    extract_job.result(timeout=100)  # Waits for job to complete.

    got = destination.download_as_string().decode('utf-8')  # API request
    assert 'Bharney Rhubble' in got

Delete a table:

    from google.cloud.exceptions import NotFound

    client.delete_table(table)  # API request

    with pytest.raises(NotFound):
        client.get_table(table)  # API request

Upload table data from a file:

    csv_file = six.BytesIO(b"""full_name,age
Phred Phlyntstone,32
Wylma Phlyntstone,29
""")

    table_ref = dataset.table(TABLE_ID)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = 'CSV'
    job_config.skip_leading_rows = 1
    job_config.autodetect = True
    job = client.load_table_from_file(
        csv_file, table_ref, job_config=job_config)  # API request
    job.result()  # Waits for table load to complete.

Load table data from Google Cloud Storage#

See also: Loading JSON data from Cloud Storage.

Load a JSON file from Cloud Storage:

    # dataset_id = 'my_dataset'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('name', 'STRING'),
        bigquery.SchemaField('post_abbr', 'STRING')
    ]
    job_config.source_format = 'NEWLINE_DELIMITED_JSON'

    load_job = client.load_table_from_uri(
        'gs://cloud-samples-data/bigquery/us-states/us-states.json',
        dataset_ref.table('us_states'),
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(dataset_ref.table('us_states')).num_rows > 0

Load a JSON file from Cloud Storage, using an autodetected schema:

    # dataset_id = 'my_dataset'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = 'NEWLINE_DELIMITED_JSON'

    load_job = client.load_table_from_uri(
        'gs://cloud-samples-data/bigquery/us-states/us-states.json',
        dataset_ref.table('us_states'),
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(dataset_ref.table('us_states')).num_rows > 0

Append a JSON file from Cloud Storage to an existing table:

    # table_ref = client.dataset('my_dataset').table('existing_table')
    previous_rows = client.get_table(table_ref).num_rows
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = 'NEWLINE_DELIMITED_JSON'
    job_config.write_disposition = 'WRITE_APPEND'

    load_job = client.load_table_from_uri(
        'gs://cloud-samples-data/bigquery/us-states/us-states.json',
        table_ref,
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(table_ref).num_rows == previous_rows + 50

Overwrite / replace an existing table with a JSON file from Cloud Storage:

    # table_ref = client.dataset('my_dataset').table('existing_table')
    previous_rows = client.get_table(table_ref).num_rows
    assert previous_rows > 0

    job_config = bigquery.LoadJobConfig()
    job_config.source_format = 'NEWLINE_DELIMITED_JSON'
    job_config.write_disposition = 'WRITE_TRUNCATE'

    load_job = client.load_table_from_uri(
        'gs://cloud-samples-data/bigquery/us-states/us-states.json',
        table_ref,
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert client.get_table(table_ref).num_rows == 50

Customer Managed Encryption Keys#

Table data is always encrypted at rest, but BigQuery also provides a way for you to control what keys it uses to encrypt they data. See Protecting data with Cloud KMS keys in the BigQuery documentation for more details.

Create a new table, using a customer-managed encryption key from Cloud KMS to encrypt it.

    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref)

    # Set the encryption key to use for the table.
    # TODO: Replace this key with a key you have created in Cloud KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    table.encryption_configuration = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)

    table = client.create_table(table)  # API request

    assert table.encryption_configuration.kms_key_name == kms_key_name

Change the key used to encrypt a table.

    assert table.encryption_configuration.kms_key_name == original_kms_key_name

    # Set a new encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    updated_kms_key_name = (
        'projects/cloud-samples-tests/locations/us-central1/'
        'keyRings/test/cryptoKeys/otherkey')
    table.encryption_configuration = bigquery.EncryptionConfiguration(
        kms_key_name=updated_kms_key_name)

    table = client.update_table(
        table, ['encryption_configuration'])  # API request

    assert table.encryption_configuration.kms_key_name == updated_kms_key_name
    assert original_kms_key_name != updated_kms_key_name

Load a file from Cloud Storage, using a customer-managed encryption key from Cloud KMS for the destination table.

    # dataset_id = 'my_dataset'
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = 'NEWLINE_DELIMITED_JSON'

    # Set the encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    encryption_config = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)
    job_config.destination_encryption_configuration = encryption_config

    load_job = client.load_table_from_uri(
        'gs://cloud-samples-data/bigquery/us-states/us-states.json',
        dataset_ref.table('us_states'),
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    table = client.get_table(dataset_ref.table('us_states'))
    assert table.encryption_configuration.kms_key_name == kms_key_name

Copy a table, using a customer-managed encryption key from Cloud KMS for the destination table.

    source_dataset = bigquery.DatasetReference(
        'bigquery-public-data', 'samples')
    source_table_ref = source_dataset.table('shakespeare')

    # dataset_id = 'my_dataset'
    dest_dataset_ref = client.dataset(dataset_id)
    dest_table_ref = dest_dataset_ref.table('destination_table')

    # Set the encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    encryption_config = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)
    job_config = bigquery.CopyJobConfig()
    job_config.destination_encryption_configuration = encryption_config

    job = client.copy_table(
        source_table_ref, dest_table_ref, job_config=job_config)  # API request
    job.result()  # Waits for job to complete.

    assert job.state == 'DONE'
    dest_table = client.get_table(dest_table_ref)
    assert dest_table.encryption_configuration.kms_key_name == kms_key_name

Write query results to a table, using a customer-managed encryption key from Cloud KMS for the destination table.

    job_config = bigquery.QueryJobConfig()

    # Set the destination table. Here, dataset_id is a string, such as:
    # dataset_id = 'your_dataset_id'
    table_ref = client.dataset(dataset_id).table('your_table_id')
    job_config.destination = table_ref

    # Set the encryption key to use for the destination.
    # TODO: Replace this key with a key you have created in KMS.
    kms_key_name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
        'cloud-samples-tests', 'us-central1', 'test', 'test')
    encryption_config = bigquery.EncryptionConfiguration(
        kms_key_name=kms_key_name)
    job_config.destination_encryption_configuration = encryption_config

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        'SELECT 17 AS my_col;', job_config=job_config)
    query_job.result()

    # The destination table is written using the encryption configuration.
    table = client.get_table(table_ref)
    assert table.encryption_configuration.kms_key_name == kms_key_name

Queries#

Run a simple query#

Run a query and wait for it to finish:

    QUERY = (
        'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = "TX" '
        'LIMIT 100')
    query_job = client.query(QUERY)

    for row in query_job:  # API request
        # Row values can be accessed by field name or index
        assert row[0] == row.name == row['name']

Querying data#

    QUERY = (
        'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = "TX" '
        'LIMIT 100')
    TIMEOUT = 30  # in seconds
    query_job = client.query(QUERY)  # API request - starts the query

    # Waits for the query to finish
    iterator = query_job.result(timeout=TIMEOUT)
    rows = list(iterator)

    assert query_job.state == 'DONE'
    assert len(rows) == 100
    row = rows[0]
    assert row[0] == row.name == row['name']

Note

  • Use of the timeout parameter is optional. The query will continue to run in the background even if it takes longer the timeout allowed.

Writing query results to a destination table#

See BigQuery documentation for more information on writing query results.

    job_config = bigquery.QueryJobConfig()

    # Set the destination table. Here, dataset_id is a string, such as:
    # dataset_id = 'your_dataset_id'
    table_ref = client.dataset(dataset_id).table('your_table_id')
    job_config.destination = table_ref

    # The write_disposition specifies the behavior when writing query results
    # to a table that already exists. With WRITE_TRUNCATE, any existing rows
    # in the table are overwritten by the query results.
    job_config.write_disposition = 'WRITE_TRUNCATE'

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        'SELECT 17 AS my_col;', job_config=job_config)

    rows = list(query_job)  # Waits for the query to finish
    assert len(rows) == 1
    row = rows[0]
    assert row[0] == row.my_col == 17

    # In addition to using the results from the query, you can read the rows
    # from the destination table directly.
    iterator = client.list_rows(
        table_ref, selected_fields=[bigquery.SchemaField('my_col', 'INT64')])

    rows = list(iterator)
    assert len(rows) == 1
    row = rows[0]
    assert row[0] == row.my_col == 17

Run a query using a named query parameter#

See BigQuery documentation for more information on parameterized queries.

    QUERY_W_PARAM = (
        'SELECT name, state '
        'FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = @state '
        'LIMIT 100')
    TIMEOUT = 30  # in seconds
    param = bigquery.ScalarQueryParameter('state', 'STRING', 'TX')
    job_config = bigquery.QueryJobConfig()
    job_config.query_parameters = [param]
    query_job = client.query(
        QUERY_W_PARAM, job_config=job_config)  # API request - starts the query

    # Waits for the query to finish
    iterator = query_job.result(timeout=TIMEOUT)
    rows = list(iterator)

    assert query_job.state == 'DONE'
    assert len(rows) == 100
    row = rows[0]
    assert row[0] == row.name == row['name']
    assert row.state == 'TX'

List jobs for a project#

Jobs describe actions performed on data in BigQuery tables:

  • Load data into a table
  • Run a query against data in one or more tables
  • Extract data from a table
  • Copy a table
    job_iterator = client.list_jobs()  # API request(s)
    for job in job_iterator:
        do_something_with(job)