Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// This file contains protocol buffers that are used for store file tracker.
package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "StoreFileTrackerProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message StoreFileEntry {
required string name = 1;
required uint64 size = 2;
}

message StoreFileList {
required uint64 timestamp = 1;
repeated StoreFileEntry store_file = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.HeapSize;
Expand Down Expand Up @@ -109,6 +110,10 @@ public RegionCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}

public TableName getTableName() {
return getRegionInfo().getTable();
}

public RegionInfo getRegionInfo() {
return regionFileSystem.getRegionInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public StoreFlusher getStoreFlusher() {
return this.storeFlusher;
}

private StoreFileTracker createStoreFileTracker(HStore store) {
return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
store.isPrimaryReplicaStore(), store.getStoreContext());
private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
store.getStoreContext());
}

/**
Expand Down Expand Up @@ -205,7 +205,7 @@ protected final void createComponentsOnce(Configuration conf, HStore store,
this.ctx = store.getStoreContext();
this.coprocessorHost = store.getHRegion().getCoprocessorHost();
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
this.storeFileTracker = createStoreFileTracker(store);
this.storeFileTracker = createStoreFileTracker(conf, store);
assert compactor != null && compactionPolicy != null && storeFileManager != null &&
storeFlusher != null && storeFileTracker != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -33,9 +32,9 @@
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {

public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
super(conf, tableName, isPrimaryReplica, ctx);
super(conf, isPrimaryReplica, ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
* A file based store file tracker.
* <p/>
* For this tracking way, the store file list will be persistent into a file, so we can write the
* new store files directly to the final data directory, as we will not load the broken files. This
* will greatly reduce the time for flush and compaction on some object storages as a rename is
* actual a copy on them. And it also avoid listing when loading store file list, which could also
* speed up the loading of store files as listing is also not a fast operation on most object
* storages.
*/
@InterfaceAudience.Private
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {

private final StoreFileListFile backedFile;

private final Map<String, StoreFileInfo> storefiles = new HashMap<>();

public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
backedFile = new StoreFileListFile(ctx);
}

@Override
public List<StoreFileInfo> load() throws IOException {
StoreFileList list = backedFile.load();
if (list == null) {
return Collections.emptyList();
}
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
List<StoreFileInfo> infos = new ArrayList<>();
for (StoreFileEntry entry : list.getStoreFileList()) {
infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName())));
}
// In general, for primary replica, the load method should only be called once when
// initialization, so we do not need synchronized here. And for secondary replicas, though the
// load method could be called multiple times, we will never call other methods so no
// synchronized is also fine.
// But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
// and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
// for safety, let's still keep the synchronized here.
synchronized (storefiles) {
for (StoreFileInfo info : infos) {
storefiles.put(info.getPath().getName(), info);
}
}
return infos;
}

@Override
protected boolean requireWritingToTmpDirFirst() {
return false;
}

private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
.build();
}

@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
synchronized (storefiles) {
StoreFileList.Builder builder = StoreFileList.newBuilder();
for (StoreFileInfo info : storefiles.values()) {
builder.addStoreFile(toStoreFileEntry(info));
}
for (StoreFileInfo info : newFiles) {
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
for (StoreFileInfo info : newFiles) {
storefiles.put(info.getPath().getName(), info);
}
}
}

@Override
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
Set<String> compactedFileNames =
compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
synchronized (storefiles) {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storefiles.forEach((name, info) -> {
if (compactedFileNames.contains(name)) {
return;
}
builder.addStoreFile(toStoreFileEntry(info));
});
for (StoreFileInfo info : newFiles) {
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
for (String name : compactedFileNames) {
storefiles.remove(name);
}
for (StoreFileInfo info : newFiles) {
storefiles.put(info.getPath().getName(), info);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
* To fully avoid listing, here we use two files for tracking. When loading, we will try to read
* both the two files, if only one exists, we will trust this one, if both exist, we will compare
* the timestamp to see which one is newer and trust that one. And we will record in memory that
* which one is trusted by us, and when we need to update the store file list, we will write to the
* other file.
* <p/>
* So in this way, we could avoid listing when we want to load the store file list file.
*/
@InterfaceAudience.Private
class StoreFileListFile {

private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);

private static final String TRACK_FILE_DIR = ".filelist";

private static final String TRACK_FILE = "f1";

private static final String TRACK_FILE_ROTATE = "f2";

private final StoreContext ctx;

private final Path trackFileDir;

private final Path[] trackFiles = new Path[2];

// this is used to make sure that we do not go backwards
private long prevTimestamp = -1;

private int nextTrackFile = -1;

StoreFileListFile(StoreContext ctx) {
this.ctx = ctx;
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
}

private StoreFileList load(Path path) throws IOException {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
byte[] bytes;
try (FSDataInputStream in = fs.open(path)) {
bytes = ByteStreams.toByteArray(in);
}
// Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
// here. This is very important for upper layer to determine whether this is the normal case,
// where the file does not exist or is incomplete. If there is another type of exception, the
// upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
// loss.
return StoreFileList.parseFrom(bytes);
}

private int select(StoreFileList[] lists) {
if (lists[0] == null) {
return 1;
}
if (lists[1] == null) {
return 0;
}
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
}

StoreFileList load() throws IOException {
StoreFileList[] lists = new StoreFileList[2];
for (int i = 0; i < 2; i++) {
try {
lists[i] = load(trackFiles[i]);
} catch (FileNotFoundException | InvalidProtocolBufferException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
}
}
int winnerIndex = select(lists);
if (lists[winnerIndex] != null) {
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
} else {
nextTrackFile = 0;
}
return lists[winnerIndex];
}

/**
* We will set the timestamp in this method so just pass the builder in
*/
void update(StoreFileList.Builder builder) throws IOException {
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
builder.setTimestamp(timestamp).build().writeTo(out);
}
// record timestamp
prevTimestamp = timestamp;
// rotate the file
nextTrackFile = 1 - nextTrackFile;
try {
fs.delete(trackFiles[nextTrackFile], false);
} catch (IOException e) {
// we will create new file with overwrite = true, so not a big deal here, only for speed up
// loading as we do not need to read this file when loading(we will hit FileNotFoundException)
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
}
}
}
Loading