-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-25988 Store the store file list by a file #3578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
36 changes: 36 additions & 0 deletions
36
hbase-protocol-shaded/src/main/protobuf/server/region/StoreFileTracker.proto
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| syntax = "proto2"; | ||
| // This file contains protocol buffers that are used for store file tracker. | ||
| package hbase.pb; | ||
|
|
||
| option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; | ||
| option java_outer_classname = "StoreFileTrackerProtos"; | ||
| option java_generic_services = true; | ||
| option java_generate_equals_and_hash = true; | ||
| option optimize_for = SPEED; | ||
|
|
||
| message StoreFileEntry { | ||
| required string name = 1; | ||
| required uint64 size = 2; | ||
| } | ||
|
|
||
| message StoreFileList { | ||
| required uint64 timestamp = 1; | ||
| repeated StoreFileEntry store_file = 2; | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
...java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hbase.regionserver.storefiletracker; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.hbase.regionserver.StoreContext; | ||
| import org.apache.hadoop.hbase.regionserver.StoreFileInfo; | ||
| import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
|
|
||
| import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry; | ||
| import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; | ||
|
|
||
| /** | ||
| * A file based store file tracker. | ||
| * <p/> | ||
| * For this tracking way, the store file list will be persistent into a file, so we can write the | ||
| * new store files directly to the final data directory, as we will not load the broken files. This | ||
| * will greatly reduce the time for flush and compaction on some object storages as a rename is | ||
| * actual a copy on them. And it also avoid listing when loading store file list, which could also | ||
| * speed up the loading of store files as listing is also not a fast operation on most object | ||
| * storages. | ||
| */ | ||
| @InterfaceAudience.Private | ||
| public class FileBasedStoreFileTracker extends StoreFileTrackerBase { | ||
|
|
||
| private final StoreFileListFile backedFile; | ||
|
|
||
| private final Map<String, StoreFileInfo> storefiles = new HashMap<>(); | ||
|
|
||
| public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { | ||
| super(conf, isPrimaryReplica, ctx); | ||
| backedFile = new StoreFileListFile(ctx); | ||
| } | ||
|
|
||
| @Override | ||
| public List<StoreFileInfo> load() throws IOException { | ||
| StoreFileList list = backedFile.load(); | ||
| if (list == null) { | ||
| return Collections.emptyList(); | ||
| } | ||
| FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); | ||
| List<StoreFileInfo> infos = new ArrayList<>(); | ||
| for (StoreFileEntry entry : list.getStoreFileList()) { | ||
| infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(), | ||
| ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(), | ||
| new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()))); | ||
| } | ||
| // In general, for primary replica, the load method should only be called once when | ||
| // initialization, so we do not need synchronized here. And for secondary replicas, though the | ||
| // load method could be called multiple times, we will never call other methods so no | ||
| // synchronized is also fine. | ||
| // But we have a refreshStoreFiles method in the Region interface, which can be called by CPs, | ||
| // and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so | ||
| // for safety, let's still keep the synchronized here. | ||
| synchronized (storefiles) { | ||
| for (StoreFileInfo info : infos) { | ||
| storefiles.put(info.getPath().getName(), info); | ||
| } | ||
| } | ||
| return infos; | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean requireWritingToTmpDirFirst() { | ||
| return false; | ||
| } | ||
|
|
||
| private StoreFileEntry toStoreFileEntry(StoreFileInfo info) { | ||
| return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize()) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { | ||
| synchronized (storefiles) { | ||
| StoreFileList.Builder builder = StoreFileList.newBuilder(); | ||
| for (StoreFileInfo info : storefiles.values()) { | ||
| builder.addStoreFile(toStoreFileEntry(info)); | ||
| } | ||
| for (StoreFileInfo info : newFiles) { | ||
| builder.addStoreFile(toStoreFileEntry(info)); | ||
| } | ||
| backedFile.update(builder); | ||
| for (StoreFileInfo info : newFiles) { | ||
| storefiles.put(info.getPath().getName(), info); | ||
Apache9 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, | ||
| Collection<StoreFileInfo> newFiles) throws IOException { | ||
| Set<String> compactedFileNames = | ||
| compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet()); | ||
| synchronized (storefiles) { | ||
| StoreFileList.Builder builder = StoreFileList.newBuilder(); | ||
| storefiles.forEach((name, info) -> { | ||
| if (compactedFileNames.contains(name)) { | ||
| return; | ||
| } | ||
| builder.addStoreFile(toStoreFileEntry(info)); | ||
| }); | ||
| for (StoreFileInfo info : newFiles) { | ||
| builder.addStoreFile(toStoreFileEntry(info)); | ||
| } | ||
| backedFile.update(builder); | ||
| for (String name : compactedFileNames) { | ||
| storefiles.remove(name); | ||
| } | ||
| for (StoreFileInfo info : newFiles) { | ||
| storefiles.put(info.getPath().getName(), info); | ||
| } | ||
| } | ||
| } | ||
| } | ||
142 changes: 142 additions & 0 deletions
142
...rc/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hbase.regionserver.storefiletracker; | ||
|
|
||
| import java.io.FileNotFoundException; | ||
| import java.io.IOException; | ||
| import org.apache.hadoop.fs.FSDataInputStream; | ||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.hbase.regionserver.StoreContext; | ||
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; | ||
| import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; | ||
| import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; | ||
|
|
||
| import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; | ||
|
|
||
| /** | ||
| * To fully avoid listing, here we use two files for tracking. When loading, we will try to read | ||
| * both the two files, if only one exists, we will trust this one, if both exist, we will compare | ||
| * the timestamp to see which one is newer and trust that one. And we will record in memory that | ||
| * which one is trusted by us, and when we need to update the store file list, we will write to the | ||
| * other file. | ||
| * <p/> | ||
| * So in this way, we could avoid listing when we want to load the store file list file. | ||
| */ | ||
| @InterfaceAudience.Private | ||
| class StoreFileListFile { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class); | ||
|
|
||
| private static final String TRACK_FILE_DIR = ".filelist"; | ||
|
|
||
| private static final String TRACK_FILE = "f1"; | ||
|
|
||
| private static final String TRACK_FILE_ROTATE = "f2"; | ||
|
|
||
| private final StoreContext ctx; | ||
|
|
||
| private final Path trackFileDir; | ||
|
|
||
| private final Path[] trackFiles = new Path[2]; | ||
|
|
||
| // this is used to make sure that we do not go backwards | ||
| private long prevTimestamp = -1; | ||
|
|
||
| private int nextTrackFile = -1; | ||
|
|
||
| StoreFileListFile(StoreContext ctx) { | ||
| this.ctx = ctx; | ||
| trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR); | ||
| trackFiles[0] = new Path(trackFileDir, TRACK_FILE); | ||
| trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE); | ||
| } | ||
|
|
||
| private StoreFileList load(Path path) throws IOException { | ||
| FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); | ||
| byte[] bytes; | ||
| try (FSDataInputStream in = fs.open(path)) { | ||
| bytes = ByteStreams.toByteArray(in); | ||
| } | ||
| // Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException | ||
| // here. This is very important for upper layer to determine whether this is the normal case, | ||
| // where the file does not exist or is incomplete. If there is another type of exception, the | ||
| // upper layer should throw it out instead of just ignoring it, otherwise it will lead to data | ||
| // loss. | ||
| return StoreFileList.parseFrom(bytes); | ||
| } | ||
|
|
||
| private int select(StoreFileList[] lists) { | ||
| if (lists[0] == null) { | ||
| return 1; | ||
| } | ||
| if (lists[1] == null) { | ||
| return 0; | ||
| } | ||
| return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1; | ||
| } | ||
|
|
||
| StoreFileList load() throws IOException { | ||
| StoreFileList[] lists = new StoreFileList[2]; | ||
| for (int i = 0; i < 2; i++) { | ||
| try { | ||
| lists[i] = load(trackFiles[i]); | ||
| } catch (FileNotFoundException | InvalidProtocolBufferException e) { | ||
| // this is normal case, so use info and do not log stacktrace | ||
| LOG.info("Failed to load track file {}: {}", trackFiles[i], e); | ||
| } | ||
| } | ||
| int winnerIndex = select(lists); | ||
| if (lists[winnerIndex] != null) { | ||
| nextTrackFile = 1 - winnerIndex; | ||
| prevTimestamp = lists[winnerIndex].getTimestamp(); | ||
| } else { | ||
| nextTrackFile = 0; | ||
| } | ||
| return lists[winnerIndex]; | ||
| } | ||
|
|
||
| /** | ||
| * We will set the timestamp in this method so just pass the builder in | ||
| */ | ||
| void update(StoreFileList.Builder builder) throws IOException { | ||
| Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update"); | ||
| FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); | ||
| long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); | ||
| try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) { | ||
| builder.setTimestamp(timestamp).build().writeTo(out); | ||
| } | ||
| // record timestamp | ||
| prevTimestamp = timestamp; | ||
| // rotate the file | ||
| nextTrackFile = 1 - nextTrackFile; | ||
| try { | ||
| fs.delete(trackFiles[nextTrackFile], false); | ||
| } catch (IOException e) { | ||
| // we will create new file with overwrite = true, so not a big deal here, only for speed up | ||
| // loading as we do not need to read this file when loading(we will hit FileNotFoundException) | ||
| LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e); | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.