Source code for pbq.query
from google.cloud import bigquery
import json
import requests
from functools import lru_cache
TERA_IN_BYTES = 1024 * 1024 * 1024 * 1024.0
PRICE_PER_TERA = 5.0
[docs]class Query(object):
"""this class control the query, validate, read sql files and return the price.
Attributes
------
query : str
the query
client : Client
the client object for bigquery
Methods
------
price()
return the query price
validate()
validate that the query can run without errors
Static Methods
------
read_file(query_file, parameters=None)
parse query from file, the query can be with parameters inside the file
Examples
------
generate Query object with simple query
>>> from pbq import Query
>>> query = Query("select * from table")
get query price
>>> from pbq import Query
>>> query = Query("select * from table")
>>> print("the query price:", query.price)
# the query price: 0.312
validate query
>>> from pbq import Query
>>> query = Query("select * from table")
>>> if not query.validate():
>>> raise RuntimeError("table not valid")
query with parameters
>>> from pbq import Query
>>> query = Query("select * from table where user_id={user_id}", parameters={'user_id': 123})
>>> print(query.query)
# select * from table where user_id=123
read query from file with parameters
>>> from pbq import Query
>>> query = Query.read_file('file_path.sql', parameters={'user_id':123})
>>> print(query.query)
# select * from table where user_id=123
"""
def __init__(self, query, parameters=None, project=None):
"""
:param query: str
the query
:param parameters: dict
key value parameters to change values dynamically inside the query
:param project: str
the BQ project
"""
self.query = query.replace('"', '\'')
self._parameters = parameters if parameters else dict()
if project:
self.client = bigquery.Client(project=project)
else:
self.client = bigquery.Client()
self._format()
def _format(self):
from string import Formatter
# logger.info("building your stunning query")
names = [fn for _, fn, _, _ in Formatter().parse(self.query) if fn is not None]
if len(names) == 0:
return
self._validate_params(names)
self.query = self.query.format_map(self._parameters)
# logger.info("finish building your query")
def _validate_params(self, names):
missing_keys = set(names) - set(self._parameters.keys())
if len(missing_keys) != 0:
# logger.error(
# "not all the params in the query are set in the config file. missing params: {}".format(missing_keys))
raise ValueError("not all the params in the query are set in the config file. \n missing params: {}".format(
missing_keys))
@staticmethod
def _js_r(filename):
with open(filename) as f_in:
return json.load(f_in)
@property
@lru_cache(1)
def price(self):
"""check the cost of the query
:return: float
the price of the query
"""
res = self._init_query_command()
if res is None:
raise RuntimeError("Query is not valid, please fix your query first")
price = int(res.total_bytes_billed) / TERA_IN_BYTES * PRICE_PER_TERA
return round(price, 4)
[docs] @lru_cache(1)
def validate(self):
"""validate the query
:return: Boolean
True if the query is runnable
:raise: RuntimeError
on query error
"""
res = self._init_query_command()
if res is None:
raise RuntimeError("Query is not valid, please fix your query first")
return True
[docs] @staticmethod
def read_file(query_file, parameters=None, project=None):
"""parse query from file
:param query_file: str
path to the query file
:param parameters: dict
key value parameters to change values dynamically inside the query
:return: Query object
"""
q_file = open(query_file, 'r')
_query = q_file.read()
return Query(_query, parameters, project=project)
@lru_cache(1)
def _init_query_command(self):
_query = self.query
job_config = bigquery.QueryJobConfig()
job_config.dry_run = True
job_config.use_query_cache = False
try:
query_job = self.client.query(query=_query, job_config=job_config, ) # API request
except requests.exceptions.HTTPError:
query_job = None
return query_job