BigQuery#

Authentication / Configuration#

  • Use Client objects to configure your applications.

  • Client objects hold both a project and an authenticated connection to the BigQuery service.

  • The authentication credentials can be implicitly determined from the environment or directly via from_service_account_json and from_service_account_p12.

  • After setting GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT environment variables, create an instance of Client.

    >>> from google.cloud import bigquery
    >>> client = bigquery.Client()
    

Projects#

A project is the top-level container in the BigQuery API: it is tied closely to billing, and can provide default access control across all its datasets. If no project is passed to the client container, the library attempts to infer a project using the environment (including explicit environment variables, GAE, and GCE).

To override the project inferred from the environment, pass an explicit project to the constructor, or to either of the alternative classmethod factories:

>>> from google.cloud import bigquery
>>> client = bigquery.Client(project='PROJECT_ID')

Project ACLs#

Each project has an access control list granting reader / writer / owner permission to one or more entities. This list cannot be queried or set via the API; it must be managed using the Google Developer Console.

Datasets#

A dataset represents a collection of tables, and applies several default policies to tables as they are created:

  • An access control list (ACL). When created, a dataset has an ACL which maps to the ACL inherited from its project.
  • A default table expiration period. If set, tables created within the dataset will have the value as their expiration period.

See BigQuery documentation for more information on Datasets.

Dataset operations#

List datasets for the client’s project:

    for dataset in client.list_datasets():  # API request(s)
        do_something_with(dataset)

Create a new dataset for the client’s project:

    # DATASET_ID = 'dataset_ids_are_strings'
    dataset_ref = client.dataset(DATASET_ID)
    dataset = bigquery.Dataset(dataset_ref)
    dataset.description = 'my dataset'
    dataset = client.create_dataset(dataset)  # API request

Refresh metadata for a dataset (to pick up changes made by another client):

    assert dataset.description == ORIGINAL_DESCRIPTION
    dataset.description = LOCALLY_CHANGED_DESCRIPTION
    assert dataset.description == LOCALLY_CHANGED_DESCRIPTION
    dataset = client.get_dataset(dataset)  # API request
    assert dataset.description == ORIGINAL_DESCRIPTION

Update a property in a dataset’s metadata:

    assert dataset.description == ORIGINAL_DESCRIPTION
    dataset.description = UPDATED_DESCRIPTION

    dataset = client.update_dataset(dataset, ['description'])  # API request

    assert dataset.description == UPDATED_DESCRIPTION

Update multiple properties in a dataset’s metadata:

    assert dataset.description == ORIGINAL_DESCRIPTION
    assert dataset.default_table_expiration_ms is None
    entry = bigquery.AccessEntry(
        role='READER', entity_type='domain', entity_id='example.com')
    assert entry not in dataset.access_entries
    ONE_DAY_MS = 24 * 60 * 60 * 1000  # in milliseconds
    dataset.description = UPDATED_DESCRIPTION
    dataset.default_table_expiration_ms = ONE_DAY_MS
    entries = list(dataset.access_entries)
    entries.append(entry)
    dataset.access_entries = entries

    dataset = client.update_dataset(
        dataset,
        ['description', 'default_table_expiration_ms', 'access_entries']
    )  # API request

    assert dataset.description == UPDATED_DESCRIPTION
    assert dataset.default_table_expiration_ms == ONE_DAY_MS
    assert entry in dataset.access_entries

Delete a dataset:

    from google.cloud.exceptions import NotFound

    client.delete_dataset(dataset)  # API request

    with pytest.raises(NotFound):
        client.get_dataset(dataset)  # API request

Tables#

Tables exist within datasets. See BigQuery documentation for more information on Tables.

Table operations#

List tables for the dataset:

    tables = list(client.list_dataset_tables(dataset))  # API request(s)
    assert len(tables) == 0

    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref)
    table.view_query = QUERY
    client.create_table(table)                          # API request
    tables = list(client.list_dataset_tables(dataset))  # API request(s)

    assert len(tables) == 1
    assert tables[0].table_id == 'my_table'

Create a table:

    SCHEMA = [
        bigquery.SchemaField('full_name', 'STRING', mode='required'),
        bigquery.SchemaField('age', 'INTEGER', mode='required'),
    ]
    table_ref = dataset.table('my_table')
    table = bigquery.Table(table_ref, schema=SCHEMA)
    table = client.create_table(table)      # API request

    assert table.table_id == 'my_table'

Get a table:

    assert table.description == ORIGINAL_DESCRIPTION
    table.description = LOCALLY_CHANGED_DESCRIPTION
    table = client.get_table(table)  # API request
    assert table.description == ORIGINAL_DESCRIPTION

Update a property in a table’s metadata:

    assert table.description == ORIGINAL_DESCRIPTION
    table.description = UPDATED_DESCRIPTION

    table = client.update_table(table, ['description'])  # API request

    assert table.description == UPDATED_DESCRIPTION

Update multiple properties in a table’s metadata:

    assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
    assert table.description == ORIGINAL_DESCRIPTION

    NEW_SCHEMA = list(table.schema)
    NEW_SCHEMA.append(bigquery.SchemaField('phone', 'STRING'))
    table.friendly_name = UPDATED_FRIENDLY_NAME
    table.description = UPDATED_DESCRIPTION
    table.schema = NEW_SCHEMA
    table = client.update_table(
        table,
        ['schema', 'friendly_name', 'description']
    )  # API request

    assert table.friendly_name == UPDATED_FRIENDLY_NAME
    assert table.description == UPDATED_DESCRIPTION
    assert table.schema == NEW_SCHEMA

Get rows from a table’s data:

    for row in client.list_rows(table):  # API request
        do_something(row)

Utilize iterator properties returned with row data:

    iterator = client.list_rows(table)  # API request
    page = six.next(iterator.pages)
    rows = list(page)
    total = iterator.total_rows
    token = iterator.next_page_token

Insert rows into a table’s data:

    ROWS_TO_INSERT = [
        (u'Phred Phlyntstone', 32),
        (u'Wylma Phlyntstone', 29),
    ]

    errors = client.create_rows(table, ROWS_TO_INSERT)  # API request

    assert errors == []

Upload table data from a file:

    csv_file = six.BytesIO(b"""full_name,age
Phred Phlyntstone,32
Wylma Phlyntstone,29
""")

    table_ref = dataset.table(TABLE_ID)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = 'CSV'
    job_config.skip_leading_rows = 1
    job = client.load_table_from_file(
        csv_file, table_ref, job_config=job_config)  # API request
    job.result()  # Waits for table load to complete.

Load table data from Google Cloud Storage:

    table_ref = dataset.table('person_ages')
    table = bigquery.Table(table_ref)
    table.schema = [
        bigquery.SchemaField('full_name', 'STRING', mode='required'),
        bigquery.SchemaField('age', 'INTEGER', mode='required')
    ]
    client.create_table(table)  # API request
    GS_URL = 'gs://{}/{}'.format(bucket_name, blob_name)
    job_id_prefix = "my_job"
    job_config = bigquery.LoadJobConfig()
    job_config.create_disposition = 'NEVER'
    job_config.skip_leading_rows = 1
    job_config.source_format = 'CSV'
    job_config.write_disposition = 'WRITE_EMPTY'
    load_job = client.load_table_from_uri(
        GS_URL, table_ref, job_config=job_config,
        job_id_prefix=job_id_prefix)  # API request

    assert load_job.state == 'RUNNING'
    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'
    assert load_job.job_id.startswith(job_id_prefix)

Copy a table:

    source_dataset = bigquery.DatasetReference(
        'bigquery-public-data', 'samples')
    source_table_ref = source_dataset.table('shakespeare')

    dest_dataset = bigquery.Dataset(client.dataset(DATASET_ID))
    dest_dataset = client.create_dataset(dest_dataset)  # API request
    dest_table_ref = dest_dataset.table('destination_table')

    job_config = bigquery.CopyJobConfig()
    job = client.copy_table(
        source_table_ref, dest_table_ref, job_config=job_config)  # API request
    job.result()  # Waits for job to complete.

    assert job.state == 'DONE'
    dest_table = client.get_table(dest_table_ref)  # API request
    assert dest_table.table_id == 'destination_table'

Extract a table to Google Cloud Storage:

    from google.cloud.storage import Client as StorageClient

    storage_client = StorageClient()
    bucket = storage_client.create_bucket(bucket_name)  # API request
    destination_blob_name = 'person_ages_out.csv'
    destination = bucket.blob(destination_blob_name)

    destination_uri = 'gs://{}/{}'.format(bucket_name, destination_blob_name)
    extract_job = client.extract_table(
        table_ref, destination_uri)  # API request
    extract_job.result(timeout=100)  # Waits for job to complete.

    got = destination.download_as_string().decode('utf-8')  # API request
    assert 'Bharney Rhubble' in got

Delete a table:

    from google.cloud.exceptions import NotFound

    client.delete_table(table)  # API request

    with pytest.raises(NotFound):
        client.get_table(table)  # API request

Queries#

Querying data#

    QUERY = (
        'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = "TX" '
        'LIMIT 100')
    TIMEOUT = 30  # in seconds
    query_job = client.query(QUERY)  # API request - starts the query
    assert query_job.state == 'RUNNING'

    # Waits for the query to finish
    iterator = query_job.result(timeout=TIMEOUT)
    rows = list(iterator)

    assert query_job.state == 'DONE'
    assert len(rows) == 100
    row = rows[0]
    assert row[0] == row.name == row['name']

Note

  • Use of the timeout parameter is optional. The query will continue to run in the background even if it takes longer the timeout allowed.

Run a query using a named query parameter#

See BigQuery documentation for more information on parameterized queries.

    QUERY_W_PARAM = (
        'SELECT name, state '
        'FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = @state '
        'LIMIT 100')
    TIMEOUT = 30  # in seconds
    param = bigquery.ScalarQueryParameter('state', 'STRING', 'TX')
    job_config = bigquery.QueryJobConfig()
    job_config.query_parameters = [param]
    query_job = client.query(
        QUERY_W_PARAM, job_config=job_config)  # API request - starts the query
    assert query_job.state == 'RUNNING'

    # Waits for the query to finish
    iterator = query_job.result(timeout=TIMEOUT)
    rows = list(iterator)

    assert query_job.state == 'DONE'
    assert len(rows) == 100
    row = rows[0]
    assert row[0] == row.name == row['name']
    assert row.state == 'TX'

Querying Table Rows#

Run a query and wait for it to finish:

    QUERY = (
        'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
        'WHERE state = "TX" '
        'LIMIT 100')
    TIMEOUT = 30  # in seconds
    rows = list(client.query_rows(QUERY, timeout=TIMEOUT))  # API request

    assert len(rows) == 100
    row = rows[0]
    assert row[0] == row.name == row['name']

Note

  • Use of the timeout parameter is optional. The query will continue to run in the background even if it takes longer the timeout allowed. The job may be retrieved using the job ID via get_job()

List jobs for a project#

Jobs describe actions performed on data in BigQuery tables:

  • Load data into a table
  • Run a query against data in one or more tables
  • Extract data from a table
  • Copy a table
    job_iterator = client.list_jobs()  # API request(s)
    for job in job_iterator:
        do_something_with(job)