Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,26 @@ The function `getLocatedFileStatus(FS, d)` is as defined in
The atomicity and consistency constraints are as for
`listStatus(Path, PathFilter)`.


### `ContentSummary getContentSummary(Path path)`

Given a path return it's content summary.

GetContentSummary first checks if the given path is a file and if yes, it returns 0 for directory count
and 1 for file count.

#### Preconditions

exists(FS, path) else raise FileNotFoundException

#### Postconditions

Returns a `ContentSummary` object with information such as directory count
and file count for a given path.

The atomicity and consistency constraints are as for
`listStatus(Path, PathFilter)`.

### `BlockLocation[] getFileBlockLocations(FileStatus f, int s, int l)`

#### Preconditions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.contract;

import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.io.FileNotFoundException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

public abstract class AbstractContractContentSummaryTest extends AbstractFSContractTestBase {

@Test
public void testGetContentSummary() throws Throwable {
FileSystem fs = getFileSystem();

Path parent = path("parent");
Path nested = path(parent + "/a/b/c");
Path filePath = path(nested + "file.txt");

fs.mkdirs(parent);
fs.mkdirs(nested);
touch(getFileSystem(), filePath);

ContentSummary summary = fs.getContentSummary(parent);

Assertions.assertThat(summary.getDirectoryCount()).as("Summary " + summary).isEqualTo(4);

Assertions.assertThat(summary.getFileCount()).as("Summary " + summary).isEqualTo(1);
}

@Test
public void testGetContentSummaryIncorrectPath() throws Throwable {
FileSystem fs = getFileSystem();

Path parent = path("parent");
Path nested = path(parent + "/a");

fs.mkdirs(parent);

intercept(FileNotFoundException.class, () -> fs.getContentSummary(nested));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.contract.localfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractContentSummaryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;

public class TestLocalFSContractContentSummary extends AbstractContractContentSummaryTest {

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3284,9 +3284,9 @@ public S3AFileStatus probePathStatus(final Path path,
}

@Override
public RemoteIterator<S3AFileStatus> listStatusIterator(final Path path)
throws IOException {
return S3AFileSystem.this.innerListStatus(path);
public RemoteIterator<S3ALocatedFileStatus> listFilesIterator(final Path path,
final boolean recursive) throws IOException {
return S3AFileSystem.this.innerListFiles(path, recursive, Listing.ACCEPT_ALL_BUT_S3N, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public S3ObjectAttributes(
/**
* Construct from the result of a copy and those parameters
* which aren't included in an AWS SDK response.
* @param path
* @param path path
* @param copyResult copy result.
* @param serverSideEncryptionAlgorithm current encryption algorithm
* @param serverSideEncryptionKey any server side encryption key?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.slf4j.Logger;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

Expand Down Expand Up @@ -122,9 +124,7 @@ public ContentSummary execute() throws IOException {

/**
* Return the {@link ContentSummary} of a given directory.
* This is a recursive operation (as the original is);
* it'd be more efficient of stack and heap if it managed its
* own stack.
*
* @param dir dir to scan
* @throws FileNotFoundException if the path does not resolve
* @throws IOException IO failure
Expand All @@ -133,34 +133,65 @@ public ContentSummary execute() throws IOException {
* @throws IOException failure
*/
public ContentSummary getDirSummary(Path dir) throws IOException {

long totalLength = 0;
long fileCount = 0;
long dirCount = 1;
final RemoteIterator<S3AFileStatus> it
= callbacks.listStatusIterator(dir);

RemoteIterator<S3ALocatedFileStatus> it = callbacks.listFilesIterator(dir, true);

Set<Path> dirSet = new HashSet<>();
Set<Path> pathsTraversed = new HashSet<>();

while (it.hasNext()) {
final S3AFileStatus s = it.next();
if (s.isDirectory()) {
try {
ContentSummary c = getDirSummary(s.getPath());
totalLength += c.getLength();
fileCount += c.getFileCount();
dirCount += c.getDirectoryCount();
} catch (FileNotFoundException ignored) {
// path was deleted during the scan; exclude from
// summary.
}
} else {
totalLength += s.getLen();
S3ALocatedFileStatus fileStatus = it.next();
Path filePath = fileStatus.getPath();

if (fileStatus.isDirectory() && !filePath.equals(dir)) {
dirSet.add(filePath);
buildDirectorySet(dirSet, pathsTraversed, dir, filePath.getParent());
} else if (!fileStatus.isDirectory()) {
fileCount += 1;
totalLength += fileStatus.getLen();
buildDirectorySet(dirSet, pathsTraversed, dir, filePath.getParent());
}

}

// Add the list's IOStatistics
iostatistics.aggregate(retrieveIOStatistics(it));

return new ContentSummary.Builder().length(totalLength).
fileCount(fileCount).directoryCount(dirCount).
spaceConsumed(totalLength).build();
fileCount(fileCount).directoryCount(dirCount + dirSet.size()).
spaceConsumed(totalLength).build();
}

/***
* This method builds the set of all directories found under the base path. We need to do this
* because if the directory structure /a/b/c was created with a single mkdirs() call, it is
* stored as 1 object in S3 and the list files iterator will only return a single entry /a/b/c.
*
* We keep track of paths traversed so far to prevent duplication of work. For eg, if we had
* a/b/c/file-1.txt and /a/b/c/file-2.txt, we will only recurse over the complete path once
* and won't have to do anything for file-2.txt.
*
* @param dirSet Set of all directories found in the path
* @param pathsTraversed Set of all paths traversed so far
* @param basePath Path of directory to scan
* @param parentPath Parent path of the current file/directory in the iterator
*/
private void buildDirectorySet(Set<Path> dirSet, Set<Path> pathsTraversed, Path basePath,
Path parentPath) {

if (parentPath == null || pathsTraversed.contains(parentPath) || parentPath.equals(basePath)) {
return;
}

dirSet.add(parentPath);

buildDirectorySet(dirSet, pathsTraversed, basePath, parentPath.getParent());

pathsTraversed.add(parentPath);
}

/**
Expand All @@ -186,23 +217,24 @@ public interface GetContentSummaryCallbacks {

/**
* Get the status of a path.
* @param path path to probe.
*
* @param path path to probe.
* @param probes probes to exec
* @return the status
* @throws IOException failure
*/
@Retries.RetryTranslated
S3AFileStatus probePathStatus(Path path,
Set<StatusProbeEnum> probes) throws IOException;

/**
* Incremental list of all entries in a directory.
* @param path path of dir
* @return an iterator
S3AFileStatus probePathStatus(Path path, Set<StatusProbeEnum> probes) throws IOException;

/***
* List all entries under a path.
*
* @param path
* @param recursive if the subdirectories need to be traversed recursively
* @return an iterator over the listing.
* @throws IOException failure
*/
RemoteIterator<S3AFileStatus> listStatusIterator(Path path)
RemoteIterator<S3ALocatedFileStatus> listFilesIterator(Path path, boolean recursive)
throws IOException;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.contract.s3a;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractContentSummaryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;

public class ITestS3AContractContentSummary extends AbstractContractContentSummaryTest {

@Test
public void testGetContentSummaryDir() throws Throwable {
describe("getContentSummary on test dir with children");
S3AFileSystem fs = getFileSystem();
Path baseDir = methodPath();

// Nested folders created separately will return as separate objects in listFiles()
fs.mkdirs(new Path(baseDir, "a"));
fs.mkdirs(new Path(baseDir, "a/b"));
fs.mkdirs(new Path(baseDir, "a/b/a"));

// Will return as one object
fs.mkdirs(new Path(baseDir, "d/e/f"));

Path filePath = new Path(baseDir, "a/b/file");
touch(fs, filePath);

// look at path to see if it is a file
// it is not: so LIST
final ContentSummary summary = fs.getContentSummary(baseDir);

Assertions.assertThat(summary.getDirectoryCount()).as("Summary " + summary).isEqualTo(7);
Assertions.assertThat(summary.getFileCount()).as("Summary " + summary).isEqualTo(1);
}

@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}

@Override
public S3AFileSystem getFileSystem() {
return (S3AFileSystem) super.getFileSystem();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public void testGetContentSummaryDir() throws Throwable {
with(INVOCATION_GET_CONTENT_SUMMARY, 1),
withAuditCount(1),
always(FILE_STATUS_FILE_PROBE // look at path to see if it is a file
.plus(LIST_OPERATION) // it is not: so LIST
.plus(LIST_OPERATION))); // and a LIST on the child dir
.plus(LIST_OPERATION))); // it is not: so LIST

Assertions.assertThat(summary.getDirectoryCount())
.as("Summary " + summary)
.isEqualTo(2);
Expand Down
Loading