# -*- coding: utf-8 -*-
"""Main module."""
import os
from google.cloud import bigquery
from pbq.query import Query
from google.cloud import bigquery_storage_v1beta1
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import BadRequest
import pandas as pd
import datetime
[docs]class PBQ(object):
"""
bigquery driver using the google official API
Attributes
------
query : str
the query
query_obj : Query
pbq.Query object
client : Client
the client object for bigquery
bqstorage_client : BigQueryStorageClient
the google storage client object
Methods
------
to_dataframe(save_query=False, **params)
return the query results as data frame
to_csv(filename, sep=',', save_query=False, **params)
save the query results to a csv file
save_to_table(table, dataset, project=None, replace=True, partition=None)
save query to table
run_query()
simply execute your query
table_details(table, dataset, project)
get the information about the table
Static Methods
------
save_file_to_table(filename, table, dataset, project, file_format=bigquery.SourceFormat.CSV, max_bad_records=0,
replace=True, partition=None)
save file to table, it can be partitioned and it can append to existing table.
the supported formats are CSV or PARQUET
save_dataframe_to_table(df: pd.DataFrame, table, dataset, project, max_bad_records=0, replace=True,
partition=None)
same as save file just with pandas dataframe
table_exists(client: bigquery.Client, table_ref: bigquery.table.TableReference)
check if table exists - if True - table exists else not exists
Examples
------
getting query to dataframe
>>> from pbq import Query, PBQ
>>> query = Query("select * from table")
>>> print("the query price:", query.price)
>>> if not query.validate():
>>> raise RuntimeError("table not valid")
>>> pbq = PBQ(query)
>>> pbq.to_dataframe()
saving query to csv
>>> from pbq import Query, PBQ
>>> query = Query("select * from table")
>>> pbq = PBQ(query)
>>> pbq.to_csv()
saving dataframe to table
>>> import pandas as pd
>>> from pbq import Query, PBQ
>>> df = pd.DataFrame()
>>> PBQ.save_dataframe_to_table(df, 'table', 'dataset', 'project_id', partition='20191013', replace=False)
"""
def __init__(self, query: Query, project=None):
"""
bigquery driver using the google official API
:param query: Query object
:param project: str
the BQ project
"""
self.query = query.query
self.query_obj = query
self.project = project
if project:
self.client = bigquery.Client(project=project)
else:
self.client = bigquery.Client()
self.bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient()
[docs] def to_dataframe(self, save_query=False, **params):
"""
return the query results as data frame
in order to save the query to a table as well as getting the dataframe, send a dict as params with:
- table
- dataset
it will save to the same project
:param save_query: boolean
if to save the query to a table also
:param params: dict
when `save_query` flag is on you need to give the relevant params
:return: pd.DataFrame
the query results
"""
job_config = bigquery.QueryJobConfig()
if save_query:
table_ref = self.client.dataset(params['dataset']).table(params['table'])
job_config.destination = table_ref
query_job = self.client.query(query=self.query, job_config=job_config)
query_job_res = query_job.result()
df = query_job_res.to_dataframe(bqstorage_client=self.bqstorage_client)
return df
[docs] def to_csv(self, filename, sep=',', save_query=False, **params):
"""
save the query results to a csv file
in order to save the query to a table as well as getting the dataframe, send a dict as params with:
- table
- dataset
it will save to the same project
:param filename: str
with the path to save the file
:param sep: str
separator to the csv file
:param save_query: boolean
if to save the query to a table also
:param params: dict
when `save_query` flag is on you need to give the relevant params
"""
df = self.to_dataframe(save_query, **params)
df.to_csv(filename, sep=sep, index=False)
[docs] def run_query(self):
"""
execute your query
"""
# Set the destination table
client = self.client
query_job = client.query(self.query)
query_job.result()
print('Done running your amazing query')
[docs] def save_to_table(self, table, dataset, project=None, replace=True, partition=None):
"""
save query to table
:param table: str
table name
:param dataset: str
data set name
:param project: str
project name
:param replace: boolean
if set as true - it will replace the table, else append to table (default: True)
:param partition: str
partition format DDMMYYY (default: None)
"""
job_config = bigquery.QueryJobConfig()
# Set the destination table
client = self.client
if partition:
table = '{0}${1}'.format(table, partition)
table_ref = client.dataset(dataset).table(table.split('$')[0])
exists_ok = PBQ._writing_disposition(job_config, replace)
if project:
table_ref = client.dataset(dataset, project=project).table(table)
PBQ._create_table(client, exists_ok, partition, replace, table_ref)
job_config.destination = table_ref
query_job = client.query(self.query, job_config=job_config)
query_job.result()
print('Query results loaded to table {}'.format(table_ref.path))
@staticmethod
def _writing_disposition(job_config: bigquery.QueryJobConfig, replace):
exists_ok = False
if replace:
exists_ok = True
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
return exists_ok
@staticmethod
def _create_table(client: bigquery.Client, exists_ok, partition, replace, table_ref):
if (partition and not PBQ.table_exists(client, table_ref)) or (not partition and replace):
bq_table = bigquery.Table(table_ref)
if partition:
time_partitioning = bigquery.TimePartitioning()
bq_table.time_partitioning = time_partitioning
client.create_table(bq_table, exists_ok=exists_ok)
[docs] @staticmethod
def save_file_to_table(filename, table, dataset, project, file_format=bigquery.SourceFormat.CSV, max_bad_records=0,
replace=True, partition=None):
"""
save file to table, it can be partitioned and it can append to existing table.
the supported formats are CSV or PARQUET
:param filename: str
with the path to save the file
:param table: str
table name
:param dataset: str
data set name
:param project: str
project name
:param file_format: str
possible file format (CSV, PARQUET) (default: CSV)
:param max_bad_records: int
number of bad records allowed in file (default: 0)
:param replace: boolean
if set as trueit will replace the table, else append to table (default: True)
:param partition: str
partition format DDMMYYY (default: None)
"""
client = bigquery.Client(project=project)
dataset_ref = client.dataset(dataset)
table_ref = dataset_ref.table(table)
job_config = bigquery.LoadJobConfig()
job_config.max_bad_records = max_bad_records
job_config.source_format = file_format
exists_ok = PBQ._writing_disposition(job_config, replace)
if file_format == bigquery.SourceFormat.CSV:
job_config.skip_leading_rows = 1
job_config.autodetect = True
PBQ._create_table(client, exists_ok, partition, replace, table_ref)
if not partition:
with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
job.result() # Waits for table load to complete.
print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset, table))
else:
print('fallback loading by CMD command due to missing api feature for partition')
table = '{0}${1}'.format(table, partition)
cmd = "bq load"
if replace:
cmd = "{} --replace".format(cmd)
cmd = "{cmd} --source_format={file_format} '{project}:{dataset}.{tbl_name}' {filename}". \
format(cmd=cmd, tbl_name=table, filename=filename, project=project, dataset=dataset,
file_format=file_format)
os.system(cmd)
[docs] @staticmethod
def save_dataframe_to_table(df: pd.DataFrame, table, dataset, project, max_bad_records=0, replace=True,
partition=None, validate_params=False):
"""
save pd.DataFrame object to table
:param df: pd.DataFrame
the dataframe you want to save
:param table: str
table name
:param dataset: str
data set name
:param project: str
project name
:param max_bad_records: int
number of bad records allowed in file (default: 0)
:param replace: boolean
if set as true - it will replace the table, else append to table (default: True)
:param partition: str
partition format DDMMYYY (default: None)
:param validate_params: boolean
validate the schema of the table to the dataframe object (default: False)
"""
now = datetime.datetime.now()
random_string = '{}'.format(now.strftime('%y%m%d%H%M%S'))
input_path = "/tmp/tmp-{}.parquet".format(random_string)
schema = None
if validate_params: # because of the fallback it need to change to be as the schema
table_details = PBQ.table_details(table, dataset, project)
if 'schema' in table_details:
schema = table_details['schema']
PBQ._save_df_to_parquet(df, input_path, schema=schema)
PBQ.save_file_to_table(input_path, table, dataset, project, file_format=bigquery.SourceFormat.PARQUET,
max_bad_records=max_bad_records, replace=replace, partition=partition)
@staticmethod
def _save_df_to_parquet(df, input_path, index=False, schema=None):
if schema:
for s in schema:
if s['field_type'] == 'STRING':
s['field_type'] = 'str'
if s['field_type'] == 'INTEGER':
s['field_type'] = 'int'
if s['field_type'] == 'TIMESTAMP':
df[s['column']] = pd.to_datetime(df[s['column']], errors='coerce')
continue
if s['field_type'] == 'DATE':
df[s['column']] = pd.to_datetime(df[s['column']], errors='coerce')
df[s['column']] = df[s['column']].dt.date
continue
df.columns = ["{}".format(col) for col in df.columns]
df.to_parquet(input_path, index=index)
[docs] @staticmethod
def table_details(table, dataset, project):
"""
return a dict object with some details about the table
:param table: str
table name
:param dataset: str
data set name
:param project: str
project name
:return: dict
with some table information like, last_modified_time, num_bytes, num_rows, and creation_time
"""
client = bigquery.Client(project=project)
dataset_ref = client.dataset(dataset, project=project)
table_ref = dataset_ref.table(table)
try:
table = client.get_table(table_ref)
except NotFound as error:
return {}
schema = []
for s in table.schema:
schema.append({'column': s.name, 'field_type': s.field_type})
res = {'last_modified_time': table.modified, 'num_bytes': table.num_bytes, 'num_rows': table.num_rows,
'creation_time': table.created, 'schema': schema}
return res
[docs] @staticmethod
def table_exists(client: bigquery.Client, table_ref: bigquery.table.TableReference):
"""
check if table exists - if True - table exists else not exists
:param client: bigquery.Client object
:param table_ref: bigquery.table.TableReference object
with the table name and dataset
:return: boolean
True if table exists
False if table not exists
"""
try:
table = client.get_table(table_ref)
if table:
return True
except NotFound as error:
return False
except BadRequest as error:
return True