diff --git a/fdbclient/include/fdbclient/StorageCheckpoint.h b/fdbclient/include/fdbclient/StorageCheckpoint.h index 8ed71aab797..6c50b01da8b 100644 --- a/fdbclient/include/fdbclient/StorageCheckpoint.h +++ b/fdbclient/include/fdbclient/StorageCheckpoint.h @@ -179,6 +179,7 @@ struct DataMoveMetaData { int16_t phase; // DataMoveMetaData::Phase. int8_t mode; Optional bulkLoadTaskState; // set if the data move is a bulk load data move + Optional> dcTeamIds; // map of dcId to teamId DataMoveMetaData() = default; DataMoveMetaData(UID id, Version version, KeyRange range) : id(id), version(version), priority(0), mode(0) { @@ -206,7 +207,8 @@ struct DataMoveMetaData { template void serialize(Ar& ar) { - serializer(ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState); + serializer( + ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState, dcTeamIds); } }; diff --git a/fdbserver/DDTxnProcessor.actor.cpp b/fdbserver/DDTxnProcessor.actor.cpp index 07ad8b121e1..83c302b0f1c 100644 --- a/fdbserver/DDTxnProcessor.actor.cpp +++ b/fdbserver/DDTxnProcessor.actor.cpp @@ -340,6 +340,17 @@ class DDTxnProcessorImpl { for (int i = 0; i < dms.size(); ++i) { auto dataMove = std::make_shared(decodeDataMoveValue(dms[i].value), true); const DataMoveMetaData& meta = dataMove->meta; + bool assigned, emptyRange; + DataMoveType type; + DataMovementReason reason; + decodeDataMoveId(meta.id, assigned, emptyRange, type, reason); + if (reason == DataMovementReason::SEED_SHARD_SERVER) { + TraceEvent("FoundSeedSeverDataMove") + .detail("DataMoveId", meta.id) + .detail("Metadata", meta.toString()); + ASSERT_WE_THINK(meta.ranges.front() == allKeys); + continue; + } if (meta.ranges.empty()) { // Any persisted datamove with an empty range must be an tombstone persisted by // a background cleanup (with retry_clean_up_datamove_tombstone_added), diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7bb3462f0db..cd558cbc493 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -62,6 +62,25 @@ bool shouldCreateCheckpoint(const UID& dataMoveId) { return (type == DataMoveType::PHYSICAL || type == DataMoveType::PHYSICAL_EXP); } +std::unordered_map generateTeamIds( + std::unordered_map>& dcServerIds) { + std::unordered_map dcTeamIds; + for (auto& [dc, serverIds] : dcServerIds) { + std::sort(serverIds.begin(), serverIds.end()); + std::string teamId; + for (const auto& serverId : serverIds) { + if (teamId.size() == 0) { + teamId = serverId; + } else { + teamId += "," + serverId; + } + } + // Use the concatenated server ids as the team id to avoid conflicts. + dcTeamIds[dc] = teamId; + } + return dcTeamIds; +} + // Unassigns keyrange `range` from server `ssId`, except ranges in `shards`. // Note: krmSetRangeCoalescing() doesn't work in this case since each shard is assigned an ID. ACTOR Future unassignServerKeys(Transaction* tr, UID ssId, KeyRange range, std::vector shards, UID logId) { @@ -1712,7 +1731,7 @@ ACTOR static Future startMoveShards(Database occ, serverListEntries.push_back(tr.get(serverListKeyFor(servers[s]))); } std::vector> serverListValues = wait(getAll(serverListEntries)); - + state std::unordered_map> dcServers; for (int s = 0; s < serverListValues.size(); s++) { if (!serverListValues[s].present()) { // Attempt to move onto a server that isn't in serverList (removed or never added to the @@ -1721,6 +1740,13 @@ ACTOR static Future startMoveShards(Database occ, // TODO(psm): Mark the data move as 'deleting'. throw move_to_removed_server(); } + auto si = decodeServerListValue(serverListValues[s].get()); + ASSERT(si.id() == servers[s]); + auto it = dcServers.find(si.locality.describeDcId()); + if (it == dcServers.end()) { + dcServers[si.locality.describeDcId()] = std::vector(); + } + dcServers[si.locality.describeDcId()].push_back(si.id().shortString()); } currentKeys = KeyRangeRef(begin, keys.end); @@ -1733,6 +1759,15 @@ ACTOR static Future startMoveShards(Database occ, state Key endKey = old.back().key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); + if (ranges.front() != currentKeys) { + TraceEvent("MoveShardsPartialRange") + .detail("ExpectedRange", ranges.front()) + .detail("ActualRange", currentKeys) + .detail("DataMoveId", dataMoveId) + .detail("RowLimit", SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT) + .detail("ByteLimit", SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); + } + // Check that enough servers for each shard are in the correct state state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); @@ -1806,6 +1841,7 @@ ACTOR static Future startMoveShards(Database occ, TraceEvent( SevWarn, "StartMoveShardsCancelConflictingDataMove", relocationIntervalId) .detail("Range", rangeIntersectKeys) + .detail("CurrentDataMoveRange", ranges[0]) .detail("DataMoveID", dataMoveId.toString()) .detail("ExistingDataMoveID", destId.toString()); wait(cleanUpDataMove(occ, destId, lock, startMoveKeysLock, keys, ddEnabledState)); @@ -1868,6 +1904,7 @@ ACTOR static Future startMoveShards(Database occ, dataMove.ranges.clear(); dataMove.ranges.push_back(KeyRangeRef(keys.begin, currentKeys.end)); dataMove.dest.insert(servers.begin(), servers.end()); + dataMove.dcTeamIds = generateTeamIds(dcServers); } if (currentKeys.end == keys.end) { @@ -3348,6 +3385,8 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector, Tag> dcId_locality; std::map server_tag; int8_t nextLocality = 0; + std::unordered_map> dcServerIds; + for (auto& s : servers) { if (!dcId_locality.contains(s.locality.dcId())) { tr.set(arena, tagLocalityListKeyFor(s.locality.dcId()), tagLocalityListValue(nextLocality)); @@ -3357,6 +3396,8 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector(), shardId, UID()); krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value()); - for (auto& s : servers) { krmSetPreviouslyEmptyRange( tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysValue(shardId), serverKeysFalse); } + + DataMoveMetaData metadata{ shardId, allKeys }; + metadata.dcTeamIds = generateTeamIds(dcServerIds); + + // Data move metadata will be clean up on DD restarts. + tr.set(arena, dataMoveKeyFor(shardId), dataMoveValue(metadata)); } else { krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value());