Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -195,17 +196,16 @@ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
if (snapshot == null) {
TermIndex empty =
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
LOG.info(
"The snapshot info is null." + "Setting the last applied index to:"
+ empty);
LOG.info("{}: The snapshot info is null. Setting the last applied index" +
"to:{}", gid, empty);
setLastAppliedTermIndex(empty);
return RaftLog.INVALID_LOG_INDEX;
return empty.getIndex();
}

final File snapshotFile = snapshot.getFile().getPath().toFile();
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
LOG.info("Setting the last applied index to " + last);
LOG.info("{}: Setting the last applied index to {}", gid, last);
setLastAppliedTermIndex(last);

// initialize the dispatcher with snapshot so that it build the missing
Expand Down Expand Up @@ -241,18 +241,20 @@ public void persistContainerSet(OutputStream out) throws IOException {
@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
LOG.info("Taking snapshot at termIndex:" + ti);
long startTime = Time.monotonicNow();
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("Taking a snapshot to file {}", snapshotFile);
LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
persistContainerSet(fos);
} catch (IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ "\", last applied index=" + ti);
LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
snapshotFile);
throw ioe;
}
LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
return ti.getIndex();
}
return -1;
Expand Down Expand Up @@ -326,7 +328,7 @@ private ContainerCommandRequestProto getContainerCommandRequestProto(

private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
requestProto.getCmdType(), requestProto.getContainerID(),
requestProto.getPipelineID(), requestProto.getTraceID());
if (isBlockTokenEnabled) {
Expand All @@ -344,7 +346,7 @@ private ContainerCommandResponseProto dispatchCommand(
}
ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
LOG.trace("response {}", response);
LOG.trace("{}: response {}", gid, response);
return response;
}

Expand Down Expand Up @@ -384,18 +386,18 @@ private CompletableFuture<Message> handleWriteChunk(
.supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);

writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData()
.getChunkName());
LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ write.getChunkData().getChunkName());
// Remove the future once it finishes execution from the
// writeChunkFutureMap.
writeChunkFuture.thenApply(r -> {
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
writeChunkFutureMap.remove(entryIndex);
LOG.debug("writeChunk writeStateMachineData completed: blockId " + write
.getBlockID() + " logIndex " + entryIndex + " chunkName " + write
.getChunkData().getChunkName());
LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ write.getChunkData().getChunkName());
return r;
});
return writeChunkFuture;
Expand Down Expand Up @@ -554,12 +556,12 @@ public CompletableFuture<ByteString> readStateMachineData(
}
} catch (Exception e) {
metrics.incNumReadStateMachineFails();
LOG.error("unable to read stateMachineData:" + e);
LOG.error("{} unable to read stateMachineData:", gid, e);
return completeExceptionally(e);
}
}

private void updateLastApplied() {
private synchronized void updateLastApplied() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mukul1987 Thanks for working on this! The patch looks good to me.
I think we can avoid using synchronized by using a concurrent sorted map. Each applyTransaction removes its corresponding entry from the map and we update lastApplied based on the first entry in the map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HDDS-1792 will fix this by using ConcurrentHashSet.

Long appliedTerm = null;
long appliedIndex = -1;
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
Expand Down