2323import org .apache .hadoop .conf .Configuration ;
2424import org .apache .hadoop .hdds .HddsUtils ;
2525import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
26- import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos ;
2726import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos .ContainerCommandRequestProto ;
2827import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos .ContainerCommandResponseProto ;
2928import org .apache .hadoop .hdds .protocol .datanode .proto .XceiverClientProtocolServiceGrpc ;
3029import org .apache .hadoop .hdds .protocol .datanode .proto .XceiverClientProtocolServiceGrpc .XceiverClientProtocolServiceStub ;
3130import org .apache .hadoop .hdds .protocol .proto .HddsProtos ;
3231import org .apache .hadoop .hdds .scm .client .HddsClientUtils ;
3332import org .apache .hadoop .hdds .scm .pipeline .Pipeline ;
33+ import org .apache .hadoop .hdds .scm .storage .CheckedBiFunction ;
3434import org .apache .hadoop .hdds .security .exception .SCMSecurityException ;
3535import org .apache .hadoop .hdds .security .x509 .SecurityConfig ;
3636import org .apache .hadoop .hdds .tracing .GrpcClientInterceptor ;
6262import java .util .concurrent .Semaphore ;
6363import java .util .concurrent .TimeUnit ;
6464import java .util .concurrent .TimeoutException ;
65- import java .util .stream .Collectors ;
6665
6766/**
6867 * A Client for the storageContainer protocol.
@@ -83,15 +82,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
8382 * data nodes.
8483 *
8584 * @param pipeline - Pipeline that defines the machines.
86- * @param config -- Ozone Config
85+ * @param config -- Ozone Config
8786 */
8887 public XceiverClientGrpc (Pipeline pipeline , Configuration config ) {
8988 super ();
9089 Preconditions .checkNotNull (pipeline );
9190 Preconditions .checkNotNull (config );
9291 this .pipeline = pipeline ;
9392 this .config = config ;
94- this .secConfig = new SecurityConfig (config );
93+ this .secConfig = new SecurityConfig (config );
9594 this .semaphore =
9695 new Semaphore (HddsClientUtils .getMaxOutstandingRequests (config ));
9796 this .metrics = XceiverClientManager .getXceiverClientMetrics ();
@@ -101,7 +100,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
101100
102101 /**
103102 * To be used when grpc token is not enabled.
104- * * /
103+ */
105104 @ Override
106105 public void connect () throws Exception {
107106 // leader by default is the 1st datanode in the datanode list of pipleline
@@ -112,7 +111,7 @@ public void connect() throws Exception {
112111
113112 /**
114113 * Passed encoded token to GRPC header when security is enabled.
115- * * /
114+ */
116115 @ Override
117116 public void connect (String encodedToken ) throws Exception {
118117 // leader by default is the 1st datanode in the datanode list of pipleline
@@ -132,11 +131,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
132131 }
133132
134133 // Add credential context to the client call
135- String userName = UserGroupInformation .getCurrentUser ()
136- .getShortUserName ();
134+ String userName = UserGroupInformation .getCurrentUser ().getShortUserName ();
137135 LOG .debug ("Connecting to server Port : " + dn .getIpAddress ());
138- NettyChannelBuilder channelBuilder = NettyChannelBuilder . forAddress ( dn
139- .getIpAddress (), port ).usePlaintext ()
136+ NettyChannelBuilder channelBuilder =
137+ NettyChannelBuilder . forAddress ( dn .getIpAddress (), port ).usePlaintext ()
140138 .maxInboundMessageSize (OzoneConsts .OZONE_SCM_CHUNK_MAX_SIZE )
141139 .intercept (new ClientCredentialInterceptor (userName , encodedToken ),
142140 new GrpcClientInterceptor ());
@@ -149,8 +147,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
149147 if (trustCertCollectionFile != null ) {
150148 sslContextBuilder .trustManager (trustCertCollectionFile );
151149 }
152- if (secConfig .isGrpcMutualTlsRequired () && clientCertChainFile != null &&
153- privateKeyFile != null ) {
150+ if (secConfig .isGrpcMutualTlsRequired () && clientCertChainFile != null
151+ && privateKeyFile != null ) {
154152 sslContextBuilder .keyManager (clientCertChainFile , privateKeyFile );
155153 }
156154
@@ -216,77 +214,83 @@ public ContainerCommandResponseProto sendCommand(
216214 }
217215
218216 @ Override
219- public XceiverClientReply sendCommand (
220- ContainerCommandRequestProto request , List <DatanodeDetails > excludeDns )
217+ public ContainerCommandResponseProto sendCommand (
218+ ContainerCommandRequestProto request , List <CheckedBiFunction > validators )
221219 throws IOException {
222- Preconditions .checkState (HddsUtils .isReadOnly (request ));
223- return sendCommandWithTraceIDAndRetry (request , excludeDns );
220+ try {
221+ XceiverClientReply reply ;
222+ reply = sendCommandWithTraceIDAndRetry (request , validators );
223+ ContainerCommandResponseProto responseProto = reply .getResponse ().get ();
224+ return responseProto ;
225+ } catch (ExecutionException | InterruptedException e ) {
226+ throw new IOException ("Failed to execute command " + request , e );
227+ }
224228 }
225229
226230 private XceiverClientReply sendCommandWithTraceIDAndRetry (
227- ContainerCommandRequestProto request , List <DatanodeDetails > excludeDns )
231+ ContainerCommandRequestProto request , List <CheckedBiFunction > validators )
228232 throws IOException {
229233 try (Scope scope = GlobalTracer .get ()
230234 .buildSpan ("XceiverClientGrpc." + request .getCmdType ().name ())
231235 .startActive (true )) {
232236 ContainerCommandRequestProto finalPayload =
233237 ContainerCommandRequestProto .newBuilder (request )
234- .setTraceID (TracingUtil .exportCurrentSpan ())
235- .build ();
236- return sendCommandWithRetry (finalPayload , excludeDns );
238+ .setTraceID (TracingUtil .exportCurrentSpan ()).build ();
239+ return sendCommandWithRetry (finalPayload , validators );
237240 }
238241 }
239242
240243 private XceiverClientReply sendCommandWithRetry (
241- ContainerCommandRequestProto request , List <DatanodeDetails > excludeDns )
244+ ContainerCommandRequestProto request , List <CheckedBiFunction > validators )
242245 throws IOException {
243246 ContainerCommandResponseProto responseProto = null ;
247+ IOException ioException = null ;
244248
245249 // In case of an exception or an error, we will try to read from the
246250 // datanodes in the pipeline in a round robin fashion.
247251
248252 // TODO: cache the correct leader info in here, so that any subsequent calls
249253 // should first go to leader
250- List <DatanodeDetails > dns = pipeline .getNodes ();
251- List <DatanodeDetails > healthyDns =
252- excludeDns != null ? dns .stream ().filter (dnId -> {
253- for (DatanodeDetails excludeId : excludeDns ) {
254- if (dnId .equals (excludeId )) {
255- return false ;
256- }
257- }
258- return true ;
259- }).collect (Collectors .toList ()) : dns ;
260254 XceiverClientReply reply = new XceiverClientReply (null );
261- for (DatanodeDetails dn : healthyDns ) {
255+ for (DatanodeDetails dn : pipeline . getNodes () ) {
262256 try {
263257 LOG .debug ("Executing command " + request + " on datanode " + dn );
264258 // In case the command gets retried on a 2nd datanode,
265259 // sendCommandAsyncCall will create a new channel and async stub
266260 // in case these don't exist for the specific datanode.
267261 reply .addDatanode (dn );
268262 responseProto = sendCommandAsync (request , dn ).getResponse ().get ();
269- if (responseProto .getResult () == ContainerProtos .Result .SUCCESS ) {
270- break ;
263+ if (validators != null && !validators .isEmpty ()) {
264+ for (CheckedBiFunction validator : validators ) {
265+ validator .apply (request , responseProto );
266+ }
271267 }
272- } catch (ExecutionException | InterruptedException e ) {
268+ break ;
269+ } catch (ExecutionException | InterruptedException | IOException e ) {
273270 LOG .debug ("Failed to execute command " + request + " on datanode " + dn
274271 .getUuidString (), e );
275- if (Status .fromThrowable (e .getCause ()).getCode ()
276- == Status .UNAUTHENTICATED .getCode ()) {
277- throw new SCMSecurityException ("Failed to authenticate with "
278- + "GRPC XceiverServer with Ozone block token." );
272+ if (!(e instanceof IOException )) {
273+ if (Status .fromThrowable (e .getCause ()).getCode ()
274+ == Status .UNAUTHENTICATED .getCode ()) {
275+ throw new SCMSecurityException ("Failed to authenticate with "
276+ + "GRPC XceiverServer with Ozone block token." );
277+ }
278+ ioException = new IOException (e );
279+ } else {
280+ ioException = (IOException ) e ;
279281 }
282+ responseProto = null ;
280283 }
281284 }
282285
283286 if (responseProto != null ) {
284287 reply .setResponse (CompletableFuture .completedFuture (responseProto ));
285288 return reply ;
286289 } else {
287- throw new IOException (
288- "Failed to execute command " + request + " on the pipeline "
289- + pipeline .getId ());
290+ Preconditions .checkNotNull (ioException );
291+ LOG .error ("Failed to execute command " + request + " on the pipeline "
292+ + pipeline .getId ());
293+ throw ioException ;
290294 }
291295 }
292296
0 commit comments