Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions tests/unit/test_grpc_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,128 @@ def test_wrap_unary_errors():
assert exc_info.value.response == grpc_error


class Test_StreamingResponseIterator:
@staticmethod
def _make_wrapped(*items):
return iter(items)

@staticmethod
def _make_one(wrapped, **kw):
return grpc_helpers._StreamingResponseIterator(wrapped, **kw)

def test_ctor_defaults(self):
wrapped = self._make_wrapped("a", "b", "c")
iterator = self._make_one(wrapped)
assert iterator._stored_first_result == "a"
assert list(wrapped) == ["b", "c"]

def test_ctor_explicit(self):
wrapped = self._make_wrapped("a", "b", "c")
iterator = self._make_one(wrapped, prefetch_first_result=False)
assert getattr(iterator, "_stored_first_result", self) is self
assert list(wrapped) == ["a", "b", "c"]

def test_ctor_w_rpc_error_on_prefetch(self):
wrapped = mock.MagicMock()
wrapped.__next__.side_effect = grpc.RpcError()

with pytest.raises(grpc.RpcError):
self._make_one(wrapped)

def test___iter__(self):
wrapped = self._make_wrapped("a", "b", "c")
iterator = self._make_one(wrapped)
assert iter(iterator) is iterator

def test___next___w_cached_first_result(self):
wrapped = self._make_wrapped("a", "b", "c")
iterator = self._make_one(wrapped)
assert next(iterator) == "a"
iterator = self._make_one(wrapped, prefetch_first_result=False)
assert next(iterator) == "b"
assert next(iterator) == "c"

def test___next___wo_cached_first_result(self):
wrapped = self._make_wrapped("a", "b", "c")
iterator = self._make_one(wrapped, prefetch_first_result=False)
assert next(iterator) == "a"
assert next(iterator) == "b"
assert next(iterator) == "c"

def test___next___w_rpc_error(self):
wrapped = mock.MagicMock()
wrapped.__next__.side_effect = grpc.RpcError()
iterator = self._make_one(wrapped, prefetch_first_result=False)

with pytest.raises(exceptions.GoogleAPICallError):
next(iterator)

def test_add_callback(self):
wrapped = mock.MagicMock()
callback = mock.Mock(spec={})
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.add_callback(callback) is wrapped.add_callback.return_value

wrapped.add_callback.assert_called_once_with(callback)

def test_cancel(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.cancel() is wrapped.cancel.return_value

wrapped.cancel.assert_called_once_with()

def test_code(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.code() is wrapped.code.return_value

wrapped.code.assert_called_once_with()

def test_details(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.details() is wrapped.details.return_value

wrapped.details.assert_called_once_with()

def test_initial_metadata(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.initial_metadata() is wrapped.initial_metadata.return_value

wrapped.initial_metadata.assert_called_once_with()

def test_is_active(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.is_active() is wrapped.is_active.return_value

wrapped.is_active.assert_called_once_with()

def test_time_remaining(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.time_remaining() is wrapped.time_remaining.return_value

wrapped.time_remaining.assert_called_once_with()

def test_trailing_metadata(self):
wrapped = mock.MagicMock()
iterator = self._make_one(wrapped, prefetch_first_result=False)

assert iterator.trailing_metadata() is wrapped.trailing_metadata.return_value

wrapped.trailing_metadata.assert_called_once_with()


def test_wrap_stream_okay():
expected_responses = [1, 2, 3]
callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses))
Expand Down