Skip to content

Commit e947734

Browse files
committed
Add bq storage client
1 parent 863651a commit e947734

File tree

5 files changed

+33
-13
lines changed

5 files changed

+33
-13
lines changed

dev_requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
sqlalchemy>=1.1.9
2-
google-cloud-bigquery>=1.6.0
2+
google-cloud-bigquery[bqstorage]>=1.25.0
3+
google-cloud-bigquery-storage[fastavro]>=1.0.0
34
future==0.16.0
45

56
pytest==3.2.2

pybigquery/parse_url.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def parse_url(url):
3434
dataset_id = url.database or None
3535
arraysize = None
3636
credentials_path = None
37+
use_bqstorage_api = None
3738

3839
# location
3940
if 'location' in query:
@@ -51,14 +52,22 @@ def parse_url(url):
5152
except ValueError:
5253
raise ValueError("invalid int in url query arraysize: " + str_arraysize)
5354

55+
# use_bqstorage_api
56+
if 'use_bqstorage_api' in query:
57+
str_use_bqstorage_api = query.pop('use_bqstorage_api')
58+
try:
59+
use_bqstorage_api = parse_boolean(str_use_bqstorage_api)
60+
except ValueError:
61+
raise ValueError("invalid boolean in url query for use_bqstorage_api: " + str_use_bqstorage_api)
62+
5463
# if only these "non-config" values were present, the dict will now be empty
5564
if not query:
5665
# if a dataset_id exists, we need to return a job_config that isn't None
5766
# so it can be updated with a dataset reference from the client
5867
if dataset_id:
59-
return project_id, location, dataset_id, arraysize, credentials_path, QueryJobConfig()
68+
return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, QueryJobConfig()
6069
else:
61-
return project_id, location, dataset_id, arraysize, credentials_path, None
70+
return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, None
6271

6372
job_config = QueryJobConfig()
6473

@@ -170,4 +179,4 @@ def parse_url(url):
170179
except AttributeError:
171180
raise ValueError("invalid write_disposition in url query: " + query['write_disposition'])
172181

173-
return project_id, location, dataset_id, arraysize, credentials_path, job_config
182+
return project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config

pybigquery/sqlalchemy_bigquery.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from __future__ import unicode_literals
55

66
from google import auth
7-
from google.cloud import bigquery
7+
from google.cloud import bigquery, bigquery_storage_v1beta1
88
from google.cloud.bigquery import dbapi, QueryJobConfig
99
from google.cloud.bigquery.schema import SchemaField
1010
from google.cloud.bigquery.table import EncryptionConfiguration
@@ -320,7 +320,7 @@ def create_credentials(credentials_path, credentials_info):
320320
return credentials
321321

322322
def create_connect_args(self, url):
323-
project_id, location, dataset_id, arraysize, credentials_path, default_query_job_config = parse_url(url)
323+
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, default_query_job_config = parse_url(url)
324324

325325
self.arraysize = self.arraysize or arraysize
326326
self.location = location or self.location
@@ -341,8 +341,12 @@ def create_connect_args(self, url):
341341
location=self.location,
342342
default_query_job_config=default_query_job_config
343343
)
344+
345+
storage_client = None
346+
if use_bqstorage_api:
347+
storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
344348

345-
return ([client], {})
349+
return ([client, storage_client], {})
346350

347351
def _json_deserializer(self, row):
348352
"""JSON deserializer for RECORD types.

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ def readme():
2525
],
2626
install_requires=[
2727
'sqlalchemy>=1.1.9',
28-
'google-cloud-bigquery>=1.6.0',
28+
'google-cloud-bigquery[bqstorage]>=1.25.0',
29+
'google-cloud-bigquery-storage[fastavro]>=1.0.0',
2930
'future',
3031
],
3132
tests_require=[

test/test_parse_url.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def url_with_everything():
3131
'?credentials_path=/some/path/to.json'
3232
'&location=some-location'
3333
'&arraysize=1000'
34+
'&use_bqstorage_api=True'
3435
'&clustering_fields=a,b,c'
3536
'&create_disposition=CREATE_IF_NEEDED'
3637
'&destination=different-project.different-dataset.table'
@@ -46,12 +47,13 @@ def url_with_everything():
4647

4748

4849
def test_basic(url_with_everything):
49-
project_id, location, dataset_id, arraysize, credentials_path, job_config = parse_url(url_with_everything)
50+
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = parse_url(url_with_everything)
5051

5152
assert project_id == 'some-project'
5253
assert location == 'some-location'
5354
assert dataset_id == 'some-dataset'
5455
assert arraysize == 1000
56+
assert use_bqstorage_api is True
5557
assert credentials_path == '/some/path/to.json'
5658
assert isinstance(job_config, QueryJobConfig)
5759

@@ -69,7 +71,7 @@ def test_basic(url_with_everything):
6971
('write_disposition', 'WRITE_APPEND'),
7072
])
7173
def test_all_values(url_with_everything, param, value):
72-
job_config = parse_url(url_with_everything)[5]
74+
job_config = parse_url(url_with_everything)[6]
7375

7476
config_value = getattr(job_config, param)
7577
if callable(value):
@@ -85,6 +87,7 @@ def test_all_values(url_with_everything, param, value):
8587

8688
@pytest.mark.parametrize("param, value", [
8789
('arraysize', 'not-int'),
90+
('use_bqstorage_api', 'not-bool'),
8891
('create_disposition', 'not-attribute'),
8992
('destination', 'not.fully-qualified'),
9093
('dry_run', 'not-bool'),
@@ -108,24 +111,26 @@ def test_empty_url():
108111
assert value is None
109112

110113
def test_empty_with_non_config():
111-
url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&credentials_path=/some/path/to.json'))
112-
project_id, location, dataset_id, arraysize, credentials_path, job_config = url
114+
url = parse_url(make_url('bigquery:///?location=some-location&arraysize=1000&use_bqstorage_api=False&credentials_path=/some/path/to.json'))
115+
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = url
113116

114117
assert project_id is None
115118
assert location == 'some-location'
116119
assert dataset_id is None
117120
assert arraysize == 1000
121+
assert use_bqstorage_api is False
118122
assert credentials_path == '/some/path/to.json'
119123
assert job_config is None
120124

121125
def test_only_dataset():
122126
url = parse_url(make_url('bigquery:///some-dataset'))
123-
project_id, location, dataset_id, arraysize, credentials_path, job_config = url
127+
project_id, location, dataset_id, arraysize, credentials_path, use_bqstorage_api, job_config = url
124128

125129
assert project_id is None
126130
assert location is None
127131
assert dataset_id == 'some-dataset'
128132
assert arraysize is None
133+
assert use_bqstorage_api is None
129134
assert credentials_path is None
130135
assert isinstance(job_config, QueryJobConfig)
131136
# we can't actually test that the dataset is on the job_config,

0 commit comments

Comments
 (0)