Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
*.egg
/.env/
/.vscode/
/.idea/
/.mypy_cache/
*.pyc
.cache/
*.iml
Expand Down
3 changes: 2 additions & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
sqlalchemy>=1.1.9
google-cloud-bigquery>=1.6.0
google-cloud-bigquery>=1.12.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the module doesn't work with anything below that ( also in master)

future==0.16.0
packaging>=20.0

pytest==3.2.2
pytz==2017.2
15 changes: 12 additions & 3 deletions pybigquery/parse_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def parse_url(url):
dataset_id = url.database or None
arraysize = None
credentials_path = None
use_bqstorage_api = None

# location
if 'location' in query:
Expand All @@ -51,14 +52,22 @@ def parse_url(url):
except ValueError:
raise ValueError("invalid int in url query arraysize: " + str_arraysize)

# use_bqstorage_api
if 'use_bqstorage_api' in query:
str_use_bqstorage_api = query.pop('use_bqstorage_api')
try:
use_bqstorage_api = parse_boolean(str_use_bqstorage_api)
except ValueError:
raise ValueError("invalid boolean in url query for use_bqstorage_api: " + str_use_bqstorage_api)

# if only these "non-config" values were present, the dict will now be empty
if not query:
# if a dataset_id exists, we need to return a job_config that isn't None
# so it can be updated with a dataset reference from the client
if dataset_id:
return project_id, location, dataset_id, arraysize, credentials_path, QueryJobConfig()
return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, QueryJobConfig()
else:
return project_id, location, dataset_id, arraysize, credentials_path, None
return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, None

job_config = QueryJobConfig()

Expand Down Expand Up @@ -170,4 +179,4 @@ def parse_url(url):
except AttributeError:
raise ValueError("invalid write_disposition in url query: " + query['write_disposition'])

return project_id, location, dataset_id, arraysize, credentials_path, job_config
return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config
86 changes: 54 additions & 32 deletions pybigquery/sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
from __future__ import absolute_import
from __future__ import unicode_literals

import re
import warnings
from packaging.version import Version, parse as parse_version

from google import auth
from google.cloud import bigquery
from google.cloud.bigquery import dbapi, QueryJobConfig

try:
from google.cloud import bigquery_storage_v1
except ImportError:
pass

from google.cloud.bigquery import dbapi
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import EncryptionConfiguration
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import EncryptionConfiguration, TableReference
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
from sqlalchemy.exc import NoSuchTableError
Expand All @@ -18,7 +27,6 @@
from sqlalchemy.engine.base import Engine
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql import elements
import re

from .parse_url import parse_url

Expand Down Expand Up @@ -294,53 +302,67 @@ def _add_default_dataset_to_job_config(job_config, project_id, dataset_id):

job_config.default_dataset = '{}.{}'.format(project_id, dataset_id)

@staticmethod
def create_credentials(credentials_path, credentials_info):
"""
Create a service account credentials object if possible.
Using a file is preffered over using info,
return None if both are not available.
"""
credentials = None

def _create_client_from_credentials(self, credentials, default_query_job_config, project_id):
if project_id is None:
project_id = credentials.project_id
if credentials_path:
credentials = service_account.Credentials.from_service_account_file(credentials_path)

scopes = (
elif credentials_info:
credentials = service_account.Credentials.from_service_account_info(credentials_info)

if credentials:
scopes = (
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive'
)
credentials = credentials.with_scopes(scopes)

self._add_default_dataset_to_job_config(default_query_job_config, project_id, self.dataset_id)
credentials = credentials.with_scopes(scopes)

return bigquery.Client(
project=project_id,
credentials=credentials,
location=self.location,
default_query_job_config=default_query_job_config,
)
return credentials

def create_connect_args(self, url):
project_id, location, dataset_id, arraysize, credentials_path, default_query_job_config = parse_url(url)
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, default_query_job_config = parse_url(url)

self.arraysize = self.arraysize or arraysize
self.location = location or self.location
self.credentials_path = credentials_path or self.credentials_path
self.dataset_id = dataset_id

if self.credentials_path:
credentials = service_account.Credentials.from_service_account_file(self.credentials_path)
client = self._create_client_from_credentials(credentials, default_query_job_config, project_id)
credentials = self.create_credentials(self.credentials_path, self.credentials_info)

elif self.credentials_info:
credentials = service_account.Credentials.from_service_account_info(self.credentials_info)
client = self._create_client_from_credentials(credentials, default_query_job_config, project_id)
# Use the credentials project as a default if no project was specefied
if (credentials is not None) and (project_id is None):
project_id = credentials.project_id

else:
self._add_default_dataset_to_job_config(default_query_job_config, project_id, dataset_id)
self._add_default_dataset_to_job_config(default_query_job_config, project_id, dataset_id)

client = bigquery.Client(
project=project_id,
location=self.location,
default_query_job_config=default_query_job_config
)
client = bigquery.Client(
project=project_id,
credentials=credentials,
location=self.location,
default_query_job_config=default_query_job_config
)

clients = [client]

if use_bqstorage_api:
if parse_version(bigquery.__version__) >= Version("1.26.0"):
try:
storage_client = bigquery_storage_v1.BigQueryReadClient(credentials=credentials)
clients.append(storage_client)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since bqstorage_client is an optional argument to connect(), we should probably be using the dictionary for this rather than appending to the list of positional arguments. (In fact, maybe we should do the same for the BigQuery client object, too)

except NameError:
warnings.warn("It is not possible to use the bqstorage api without installing the bqstorage extra requirement")
else:
warnings.warn('It is not possible to use the bqstorage api with google-cloud-bigquery < 1.26.0')

return ([client], {})
return (clients, {})

def _json_deserializer(self, row):
"""JSON deserializer for RECORD types.
Expand Down
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@ def readme():
"Topic :: Database :: Front-Ends"
],
install_requires=[
'google-cloud-bigquery>=1.12.0',
'packaging>=20.0'
'sqlalchemy>=1.1.9',
'google-cloud-bigquery>=1.6.0',
'future',
],
extras_require = {
"bqstorage": [
"google-cloud-bigquery-storage >= 1.0.0, <2.0.0dev",
"grpcio >= 1.8.2, < 2.0dev",
"pyarrow>=0.16.0, < 2.0dev",
]
},
tests_require=[
'pytz'
],
Expand Down
15 changes: 10 additions & 5 deletions test/test_parse_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def url_with_everything():
'?credentials_path=/some/path/to.json'
'&location=some-location'
'&arraysize=1000'
'&use_bqstorage_api=True'
'&clustering_fields=a,b,c'
'&create_disposition=CREATE_IF_NEEDED'
'&destination=different-project.different-dataset.table'
Expand All @@ -46,12 +47,13 @@ def url_with_everything():


def test_basic(url_with_everything):
project_id, location, dataset_id, arraysize, credentials_path, job_config = parse_url(url_with_everything)
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = parse_url(url_with_everything)

assert project_id == 'some-project'
assert location == 'some-location'
assert dataset_id == 'some-dataset'
assert arraysize == 1000
assert use_bqstorage_api is True
assert credentials_path == '/some/path/to.json'
assert isinstance(job_config, QueryJobConfig)

Expand All @@ -69,7 +71,7 @@ def test_basic(url_with_everything):
('write_disposition', 'WRITE_APPEND'),
])
def test_all_values(url_with_everything, param, value):
job_config = parse_url(url_with_everything)[5]
job_config = parse_url(url_with_everything)[6]

config_value = getattr(job_config, param)
if callable(value):
Expand All @@ -85,6 +87,7 @@ def test_all_values(url_with_everything, param, value):

@pytest.mark.parametrize("param, value", [
('arraysize', 'not-int'),
('use_bqstorage_api', 'not-bool'),
('create_disposition', 'not-attribute'),
('destination', 'not.fully-qualified'),
('dry_run', 'not-bool'),
Expand All @@ -108,24 +111,26 @@ def test_empty_url():
assert value is None

def test_empty_with_non_config():
url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&credentials_path=/some/path/to.json'))
project_id, location, dataset_id, arraysize, credentials_path, job_config = url
url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&use_bqstorage_api=False&credentials_path=/some/path/to.json'))
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = url

assert project_id is None
assert location == 'some-location'
assert dataset_id is None
assert arraysize == 1000
assert use_bqstorage_api is False
assert credentials_path == '/some/path/to.json'
assert job_config is None

def test_only_dataset():
url = parse_url(make_url('bigquery:///some-dataset'))
project_id, location, dataset_id, arraysize, credentials_path, job_config = url
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = url

assert project_id is None
assert location is None
assert dataset_id == 'some-dataset'
assert arraysize is None
assert use_bqstorage_api is None
assert credentials_path is None
assert isinstance(job_config, QueryJobConfig)
# we can't actually test that the dataset is on the job_config,
Expand Down