Skip to content

Commit 759b976

Browse files
committed
Resubscribe on Token Expiration
Before the recent rebase onto modern versions of Reactor Netty ([#984]), when the Client received a 401 Unauthorized, it would invalidated any existing token and then resubscribe with the same request causing a new token to be negotiated the request to be retried, transparently to the user. During the rebase, this functionality was lost (an easy mistake to make given that there are no tests for it...) resulting in invalidation of the token, but no re-subscription and therefore the failure was exposed to the user. This change re-introduced the previous re-subscribe behavior. Signed-off-by: Ben Hale <[email protected]>
1 parent ee77cf5 commit 759b976

File tree

2 files changed

+21
-15
lines changed

2 files changed

+21
-15
lines changed

.idea/compiler.xml

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ public <T> Mono<T> parseBodyToMono(Function<HttpClientResponseWithBody, Publishe
172172
return parseBodyToFlux(responseTransformer).singleOrEmpty();
173173
}
174174

175-
private static boolean isUnauthorized(HttpClientResponse response) {
176-
return response.status() == HttpResponseStatus.UNAUTHORIZED;
175+
private static boolean isUnauthorized(HttpClientResponseWithBody response) {
176+
return response.getResponse().status() == HttpResponseStatus.UNAUTHORIZED;
177177
}
178178

179179
private void attachChannelHandlers(HttpClientResponse response, Connection connection) {
@@ -186,18 +186,24 @@ private <T> Mono<T> deserialized(ByteBufFlux body, Class<T> bodyType) {
186186
return JsonCodec.decode(this.context.getConnectionContext().getObjectMapper(), body, bodyType);
187187
}
188188

189-
private HttpClientResponseWithBody invalidateToken(HttpClientResponseWithBody response) {
190-
if (isUnauthorized(response.getResponse())) {
191-
this.context.getTokenProvider().ifPresent(tokenProvider -> tokenProvider.invalidate(this.context.getConnectionContext()));
192-
}
193-
return response;
189+
private Mono<HttpClientResponseWithBody> invalidateToken(Mono<HttpClientResponseWithBody> inbound) {
190+
return inbound
191+
.flatMap(response -> {
192+
if (isUnauthorized(response)) {
193+
this.context.getTokenProvider().ifPresent(tokenProvider -> tokenProvider.invalidate(this.context.getConnectionContext()));
194+
return inbound
195+
.transform(this::invalidateToken);
196+
} else {
197+
return Mono.just(response);
198+
}
199+
});
194200
}
195201

196202
private Mono<HttpClientResponseWithBody> processResponse(HttpClientResponse response, ByteBufFlux body) {
197203
HttpClientResponseWithBody responseWithBody = HttpClientResponseWithBody.of(body, response);
198204

199205
return Mono.just(responseWithBody)
200-
.map(this::invalidateToken)
206+
.transform(this::invalidateToken)
201207
.transform(this.context.getErrorPayloadMapper()
202208
.orElse(ErrorPayloadMappers.fallback()));
203209
}

0 commit comments

Comments
 (0)