Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions hbase-asyncfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
Expand Down Expand Up @@ -194,6 +199,27 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!--
These files can not compile against hadoop 2.x and we also do not need these
hacks when working with hadoop 2.x, so exclude them here.
See HBASE-28965 for more details
-->
<excludes>
<exclude>**/org/apache/hadoop/hdfs/**</exclude>
</excludes>
<testExcludes>
<testExclude>**/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal**</testExclude>
Copy link
Contributor

@NihalJain NihalJain Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. I could use this approach to add feature HBASE-27693 Support for Hadoop's LDAP Authentication mechanism (Web UI only) i.e. #5680 to branch-2!

</testExcludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +48,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
Expand All @@ -56,6 +58,7 @@
import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
Expand All @@ -68,6 +71,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -106,6 +111,8 @@
@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private static final Logger LOG = LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutput.class);

// The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum size. So here we set
// a smaller limit for data size.
private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
Expand All @@ -122,7 +129,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final String src;

private HdfsFileStatus stat;
private final HdfsFileStatus stat;

private final ExtendedBlock block;

Expand All @@ -138,6 +145,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final ByteBufAllocator alloc;

// a dummy DFSOutputStream used for lease renewal
private final DFSOutputStream dummyStream;

private static final class Callback {

private final CompletableFuture<Long> future;
Expand Down Expand Up @@ -356,8 +366,9 @@ private void setupReceiver(int timeoutMs) {

FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor encryptor,
Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, ByteBufAllocator alloc,
StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
Expand All @@ -376,6 +387,8 @@ private void setupReceiver(int timeoutMs) {
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
this.streamSlowMonitor = streamSlowMonitor;
this.dummyStream = FanOutOneBlockAsyncDFSOutputHelper.createDummyDFSOutputStream(this, client,
src, stat, createFlags, summer);
}

@Override
Expand Down Expand Up @@ -593,7 +606,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
buf = null;
}
closeDataNodeChannelsAndAwait();
endFileLease(client, stat);
endFileLease(this);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
Expand All @@ -608,7 +621,7 @@ public void close() throws IOException {
state = State.CLOSED;
closeDataNodeChannelsAndAwait();
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, stat);
completeFile(this, client, namenode, src, clientName, block, stat);
}

@Override
Expand All @@ -626,4 +639,20 @@ public long getSyncedLength() {
Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
return this.datanodeInfoMap;
}

DFSClient getClient() {
return client;
}

DFSOutputStream getDummyStream() {
return dummyStream;
}

boolean isClosed() {
return state == State.CLOSED;
}

HdfsFileStatus getStat() {
return stat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
Expand Down Expand Up @@ -140,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {

private interface LeaseManager {

void begin(DFSClient client, HdfsFileStatus stat);
void begin(FanOutOneBlockAsyncDFSOutput output);

void end(DFSClient client, HdfsFileStatus stat);
void end(FanOutOneBlockAsyncDFSOutput output);
}

private static final LeaseManager LEASE_MANAGER;
Expand Down Expand Up @@ -178,6 +179,16 @@ Object createObject(ClientProtocol instance, String src, FsPermission masked, St
CryptoProtocolVersion[] supportedVersions) throws Exception;
}

// helper class for creating the dummy DFSOutputStream
private interface DummyDFSOutputStreamCreator {

DFSOutputStream createDummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient,
String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum checksum);
}

private static final DummyDFSOutputStreamCreator DUMMY_DFS_OUTPUT_STREAM_CREATOR =
createDummyDFSOutputStreamCreator();

private static final FileCreator FILE_CREATOR;

// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
Expand Down Expand Up @@ -207,44 +218,28 @@ private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException
beginFileLeaseMethod.setAccessible(true);
Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
endFileLeaseMethod.setAccessible(true);
Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration");
getConfigurationMethod.setAccessible(true);
Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace");

Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey");
return new LeaseManager() {

private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
"dfs.client.output.stream.uniq.default.key";
private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

private String getUniqId(DFSClient client, HdfsFileStatus stat)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
// Copied from DFSClient in Hadoop 3.4.0
long fileId = stat.getFileId();
String namespace = (String) getNamespaceMehtod.invoke(stat);
if (namespace == null) {
Configuration conf = (Configuration) getConfigurationMethod.invoke(client);
String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
return defaultKey + "_" + fileId;
} else {
return namespace + "_" + fileId;
}
private String getUniqKey(FanOutOneBlockAsyncDFSOutput output)
throws IllegalAccessException, InvocationTargetException {
return (String) getUniqKeyMethod.invoke(output.getDummyStream());
}

@Override
public void begin(DFSClient client, HdfsFileStatus stat) {
public void begin(FanOutOneBlockAsyncDFSOutput output) {
try {
beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output),
output.getDummyStream());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, HdfsFileStatus stat) {
public void end(FanOutOneBlockAsyncDFSOutput output) {
try {
endFileLeaseMethod.invoke(client, getUniqId(client, stat));
endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
Expand All @@ -261,18 +256,19 @@ private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
return new LeaseManager() {

@Override
public void begin(DFSClient client, HdfsFileStatus stat) {
public void begin(FanOutOneBlockAsyncDFSOutput output) {
try {
beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
beginFileLeaseMethod.invoke(output.getClient(), output.getStat().getFileId(),
output.getDummyStream());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, HdfsFileStatus stat) {
public void end(FanOutOneBlockAsyncDFSOutput output) {
try {
endFileLeaseMethod.invoke(client, stat.getFileId());
endFileLeaseMethod.invoke(output.getClient(), output.getStat().getFileId());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -341,6 +337,28 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
return createFileCreator2();
}

private static final String DUMMY_DFS_OUTPUT_STREAM_CLASS =
"org.apache.hadoop.hdfs.DummyDFSOutputStream";

@SuppressWarnings("unchecked")
private static DummyDFSOutputStreamCreator createDummyDFSOutputStreamCreator() {
Constructor<? extends DFSOutputStream> constructor;
try {
constructor = (Constructor<? extends DFSOutputStream>) Class
.forName(DUMMY_DFS_OUTPUT_STREAM_CLASS).getConstructors()[0];
return (output, dfsClient, src, stat, flag, checksum) -> {
try {
return constructor.newInstance(output, dfsClient, src, stat, flag, checksum);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
};
} catch (Exception e) {
LOG.debug("can not find DummyDFSOutputStream, should be hadoop 2.x", e);
return (output, dfsClient, src, stat, flag, checksum) -> null;
}
}

private static CreateFlag loadShouldReplicateFlag() {
try {
return CreateFlag.valueOf("SHOULD_REPLICATE");
Expand Down Expand Up @@ -380,12 +398,12 @@ public boolean progress() {
}
}

static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.begin(client, stat);
private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) {
LEASE_MANAGER.begin(output);
}

static void endFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.end(client, stat);
static void endFileLease(FanOutOneBlockAsyncDFSOutput output) {
LEASE_MANAGER.end(output);
}

static DataChecksum createChecksum(DFSClient client) {
Expand Down Expand Up @@ -599,20 +617,19 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
getDataNodeInfo(toExcludeNodes), retry);
}
EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite, noLocalWrite);
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
CryptoProtocolVersion.supported());
createFlags, createParent, replication, blockSize, CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
} else {
throw new NameNodeException(e);
}
}
beginFileLease(client, stat);
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
Expand All @@ -637,7 +654,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
createFlags.get(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
beginFileLease(output);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down Expand Up @@ -676,7 +694,6 @@ public void operationComplete(Future<Channel> future) throws Exception {
});
}
}
endFileLease(client, stat);
}
}
}
Expand Down Expand Up @@ -713,13 +730,14 @@ public static boolean shouldRetryCreate(RemoteException e) {
return e.getClassName().endsWith("RetryStartFileException");
}

static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
ExtendedBlock block, HdfsFileStatus stat) throws IOException {
static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient client,
ClientProtocol namenode, String src, String clientName, ExtendedBlock block,
HdfsFileStatus stat) throws IOException {
int maxRetries = client.getConf().getNumBlockWriteLocateFollowingRetry();
for (int retry = 0; retry < maxRetries; retry++) {
try {
if (namenode.complete(src, clientName, block, stat.getFileId())) {
endFileLease(client, stat);
endFileLease(output);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);
Expand Down Expand Up @@ -749,4 +767,10 @@ public static String getDataNodeInfo(Collection<DatanodeInfo> datanodeInfos) {
.append(datanodeInfo.getInfoPort()).append(")").toString())
.collect(Collectors.joining(",", "[", "]"));
}

static DFSOutputStream createDummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient,
String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum checksum) {
return DUMMY_DFS_OUTPUT_STREAM_CREATOR.createDummyDFSOutputStream(output, dfsClient, src, stat,
flag, checksum);
}
}
Loading