Skip to content

Commit ea223c3

Browse files
committed
Fully encapsulate LocalCheckpointTracker inside of the engine (#31213)
* Fully encapsulate LocalCheckpointTracker inside of the engine This makes the Engine interface not expose the `LocalCheckpointTracker`, instead exposing the pieces needed (like retrieving the local checkpoint) as individual methods.
1 parent 62bf333 commit ea223c3

File tree

10 files changed

+129
-81
lines changed

10 files changed

+129
-81
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import org.elasticsearch.index.mapper.ParseContext.Document;
6363
import org.elasticsearch.index.mapper.ParsedDocument;
6464
import org.elasticsearch.index.merge.MergeStats;
65-
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
65+
import org.elasticsearch.index.seqno.SeqNoStats;
6666
import org.elasticsearch.index.seqno.SequenceNumbers;
6767
import org.elasticsearch.index.shard.ShardId;
6868
import org.elasticsearch.index.store.Store;
@@ -635,11 +635,28 @@ public CommitStats commitStats() {
635635
}
636636

637637
/**
638-
* The sequence number service for this engine.
638+
* @return the local checkpoint for this Engine
639+
*/
640+
public abstract long getLocalCheckpoint();
641+
642+
/**
643+
* Waits for all operations up to the provided sequence number to complete.
639644
*
640-
* @return the sequence number service
645+
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
646+
* @throws InterruptedException if the thread was interrupted while blocking on the condition
647+
*/
648+
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
649+
650+
/**
651+
* Reset the local checkpoint in the tracker to the given local checkpoint
652+
* @param localCheckpoint the new checkpoint to be set
653+
*/
654+
public abstract void resetLocalCheckpoint(long localCheckpoint);
655+
656+
/**
657+
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
641658
*/
642-
public abstract LocalCheckpointTracker getLocalCheckpointTracker();
659+
public abstract SeqNoStats getSeqNoStats(long globalCheckpoint);
643660

644661
/**
645662
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.merge.MergeStats;
6868
import org.elasticsearch.index.merge.OnGoingMerge;
6969
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
70+
import org.elasticsearch.index.seqno.SeqNoStats;
7071
import org.elasticsearch.index.seqno.SequenceNumbers;
7172
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
7273
import org.elasticsearch.index.shard.ShardId;
@@ -2234,10 +2235,31 @@ public MergeStats getMergeStats() {
22342235
return mergeScheduler.stats();
22352236
}
22362237

2237-
public final LocalCheckpointTracker getLocalCheckpointTracker() {
2238+
// Used only for testing! Package private to prevent anyone else from using it
2239+
LocalCheckpointTracker getLocalCheckpointTracker() {
22382240
return localCheckpointTracker;
22392241
}
22402242

2243+
@Override
2244+
public long getLocalCheckpoint() {
2245+
return localCheckpointTracker.getCheckpoint();
2246+
}
2247+
2248+
@Override
2249+
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
2250+
localCheckpointTracker.waitForOpsToComplete(seqNo);
2251+
}
2252+
2253+
@Override
2254+
public void resetLocalCheckpoint(long localCheckpoint) {
2255+
localCheckpointTracker.resetCheckpoint(localCheckpoint);
2256+
}
2257+
2258+
@Override
2259+
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
2260+
return localCheckpointTracker.getStats(globalCheckpoint);
2261+
}
2262+
22412263
/**
22422264
* Returns the number of times a version was looked up either from the index.
22432265
* Note this is only available if assertions are enabled

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ public void updateShardState(final ShardRouting newRouting,
407407
final Engine engine = getEngine();
408408
if (currentRouting.isRelocationTarget() == false || recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
409409
// there was no primary context hand-off in < 6.0.0, need to manually activate the shard
410-
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
410+
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpoint());
411411
}
412412
if (currentRouting.isRelocationTarget() == true && recoverySourceNode.getVersion().before(Version.V_6_0_0_alpha1)) {
413413
// Flush the translog as it may contain operations with no sequence numbers. We want to make sure those
@@ -497,8 +497,7 @@ public void updateShardState(final ShardRouting newRouting,
497497
*/
498498
engine.rollTranslogGeneration();
499499
engine.fillSeqNoGaps(newPrimaryTerm);
500-
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(),
501-
getEngine().getLocalCheckpointTracker().getCheckpoint());
500+
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
502501
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
503502
@Override
504503
public void onResponse(ResyncTask resyncTask) {
@@ -524,7 +523,7 @@ public void onFailure(Exception e) {
524523
}
525524
},
526525
e -> failShard("exception during primary term transition", e));
527-
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
526+
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
528527
primaryTerm = newPrimaryTerm;
529528
}
530529
}
@@ -905,7 +904,7 @@ public CommitStats commitStats() {
905904
@Nullable
906905
public SeqNoStats seqNoStats() {
907906
Engine engine = getEngineOrNull();
908-
return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
907+
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
909908
}
910909

911910
public IndexingStats indexingStats(String... types) {
@@ -1742,7 +1741,7 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
17421741
* @throws InterruptedException if the thread was interrupted while blocking on the condition
17431742
*/
17441743
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
1745-
getEngine().getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
1744+
getEngine().waitForOpsToComplete(seqNo);
17461745
}
17471746

17481747
/**
@@ -1775,7 +1774,7 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
17751774
* @return the local checkpoint
17761775
*/
17771776
public long getLocalCheckpoint() {
1778-
return getEngine().getLocalCheckpointTracker().getCheckpoint();
1777+
return getEngine().getLocalCheckpoint();
17791778
}
17801779

17811780
/**
@@ -1816,7 +1815,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
18161815
return;
18171816
}
18181817
// only sync if there are not operations in flight
1819-
final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
1818+
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
18201819
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
18211820
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
18221821
final String allocationId = routingEntry().allocationId().getId();
@@ -1853,7 +1852,7 @@ public ReplicationGroup getReplicationGroup() {
18531852
*/
18541853
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
18551854
verifyReplicationTarget();
1856-
final long localCheckpoint = getEngine().getLocalCheckpointTracker().getCheckpoint();
1855+
final long localCheckpoint = getLocalCheckpoint();
18571856
if (globalCheckpoint > localCheckpoint) {
18581857
/*
18591858
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
@@ -1882,8 +1881,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
18821881
verifyPrimary();
18831882
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
18841883
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
1885-
getEngine().getLocalCheckpointTracker().getCheckpoint() ==
1886-
primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
1884+
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
18871885
synchronized (mutex) {
18881886
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
18891887
}
@@ -2269,7 +2267,7 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
22692267
operationPrimaryTerm,
22702268
getLocalCheckpoint(),
22712269
localCheckpoint);
2272-
getEngine().getLocalCheckpointTracker().resetCheckpoint(localCheckpoint);
2270+
getEngine().resetLocalCheckpoint(localCheckpoint);
22732271
getEngine().rollTranslogGeneration();
22742272
});
22752273
globalCheckpointUpdated = true;

server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ Index getIndex() {
6262
}
6363

6464
long maxSeqNo() {
65-
return shard.getEngine().getLocalCheckpointTracker().getMaxSeqNo();
65+
return shard.getEngine().getSeqNoStats(-1).getMaxSeqNo();
6666
}
6767

6868
long maxUnsafeAutoIdTimestamp() {

server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.common.util.set.Sets;
3434
import org.elasticsearch.gateway.GatewayAllocator;
35+
import org.elasticsearch.index.engine.EngineTestCase;
3536
import org.elasticsearch.index.shard.IndexShard;
3637
import org.elasticsearch.index.shard.IndexShardTestCase;
3738
import org.elasticsearch.index.shard.ShardId;
@@ -350,7 +351,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
350351
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
351352
}
352353
final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId);
353-
IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno.
354+
EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(oldPrimaryShard)); // Make gap in seqno.
354355
long moreDocs = scaledRandomIntBetween(1, 10);
355356
for (int i = 0; i < moreDocs; i++) {
356357
IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i));

0 commit comments

Comments
 (0)