Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,12 @@ public class FileUploadTest {
public static final String COKTAILS_NT = "cocktails.nt";
private static final Logger logger = LoggerFactory.getLogger(FileUploadTest.class);

private static boolean initialised = false;

@Parameterized.Parameters(name = "{0}")
public static Collection<Object> params() {
ArrayList<Object> list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys());
// ArrayList<Object> list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys());
ArrayList<Object> list = new ArrayList<>();
list.add(RDFFormat.HDT);
return list;
}
Expand All @@ -79,25 +82,35 @@ public static Collection<Object> params() {

public FileUploadTest(RDFFormat format) throws IOException, ParserException {
this.format = format;
RDFFormat originalFormat = Rio.getParserFormatForFileName(COKTAILS_NT).orElseThrow();

RDFFormat originalFormat = Rio.getParserFormatForFileName(COKTAILS_NT).orElseThrow();
RDFParser parser = Rio.createParser(originalFormat);
Path testDir = Paths.get("tests", "testrdf");
Files.createDirectories(testDir);
Path RDFFile = testDir.resolve(COKTAILS_NT + "." + format.getDefaultFileExtension());
if (!Files.exists(RDFFile)) {
try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) {
if (format == RDFFormat.HDT) {
try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.TURTLE,
HDTOptions.empty(), ProgressListener.ignore())) {
hdt.saveToHDT(os);

if (!initialised || !Files.exists(RDFFile)) {

if (Files.exists(RDFFile)) {
Files.delete(RDFFile);
}

if (!Files.exists(RDFFile)) {
try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) {
if (format == RDFFormat.HDT) {
try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.NTRIPLES,
HDTOptions.empty(), ProgressListener.ignore())) {
hdt.saveToHDT(os);
}
} else {
RDFWriter writer = Rio.createWriter(format, os);
parser.setRDFHandler(noBNode(writer));
parser.parse(is);
}
} else {
RDFWriter writer = Rio.createWriter(format, os);
parser.setRDFHandler(noBNode(writer));
parser.parse(is);
}
}

initialised = true;
}

fileName = RDFFile.toFile().getAbsolutePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult;
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered;
import com.the_qa_company.qendpoint.core.listener.MultiThreadListener;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
Expand Down Expand Up @@ -187,7 +188,7 @@ public CompressTripleMapper compressDictionary(Iterator<TripleString> iterator)
"Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with "
+ ways + "ways and " + workers + " worker(s)");

AsyncIteratorFetcher<TripleString> source = new AsyncIteratorFetcher<>(iterator);
AsyncIteratorFetcherUnordered<TripleString> source = new AsyncIteratorFetcherUnordered<>(iterator);

profiler.pushSection("section compression");
CompressionResult compressionResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public TripleAppender(TempDictionary dict, TempTriples triples, ProgressListener
}

@Override
public void processTriple(TripleString triple, long pos) {
public synchronized void processTriple(TripleString triple, long pos) {
long s = dict.insert(triple.getSubject(), TripleComponentRole.SUBJECT);
long p = dict.insert(triple.getPredicate(), TripleComponentRole.PREDICATE);
long o = dict.insert(triple.getObject(), TripleComponentRole.OBJECT);
Expand Down Expand Up @@ -92,7 +92,7 @@ public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RD

// Load RDF in the dictionary and generate triples
dictionary.startProcessing();
parser.doParse(filename, baseUri, notation, true, appender);
parser.doParse(filename, baseUri, notation, true, appender, false);
dictionary.endProcessing();

// Reorganize both the dictionary and the triples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ static class DictionaryAppender implements RDFCallback {
}

@Override
public void processTriple(TripleString triple, long pos) {
public synchronized void processTriple(TripleString triple, long pos) {
dict.insert(triple.getSubject(), TripleComponentRole.SUBJECT);
dict.insert(triple.getPredicate(), TripleComponentRole.PREDICATE);
dict.insert(triple.getObject(), TripleComponentRole.OBJECT);
Expand Down Expand Up @@ -80,7 +80,7 @@ public TripleAppender2(TempDictionary dict, TempTriples triples, ProgressListene
}

@Override
public void processTriple(TripleString triple, long pos) {
public synchronized void processTriple(TripleString triple, long pos) {
triples.insert(dict.stringToId(triple.getSubject(), TripleComponentRole.SUBJECT),
dict.stringToId(triple.getPredicate(), TripleComponentRole.PREDICATE),
dict.stringToId(triple.getObject(), TripleComponentRole.OBJECT));
Expand Down Expand Up @@ -108,14 +108,14 @@ public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RD

// Load RDF in the dictionary
dictionary.startProcessing();
parser.doParse(filename, baseUri, notation, true, new DictionaryAppender(dictionary, listener));
parser.doParse(filename, baseUri, notation, true, new DictionaryAppender(dictionary, listener), false);
dictionary.endProcessing();

// Reorganize IDs before loading triples
modHDT.reorganizeDictionary(listener);

// Load triples (second pass)
parser.doParse(filename, baseUri, notation, true, new TripleAppender2(dictionary, triples, listener));
parser.doParse(filename, baseUri, notation, true, new TripleAppender2(dictionary, triples, listener), false);

// reorganize HDT
modHDT.reorganizeTriples(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl<TripleString
private final boolean debugSleepKwayDict;
private final boolean quads;
private final CompressionType compressionType;
private final long start = System.currentTimeMillis();

public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher<TripleString> source,
MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict,
Expand Down Expand Up @@ -253,7 +254,10 @@ public void createChunk(SizeFetcher<TripleString> fetcher, CloseSuppressPath out
}

if (tripleID % 100_000 == 0) {
listener.notifyProgress(10, "reading triples " + tripleID);
// use start to measure how many triples are read per second
int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0));

listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond);
}
// too much ram allowed?
if (subjects.size() == Integer.MAX_VALUE - 6) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void load(InputStream input, ControlInfo ci, ProgressListener listener) t
try {
RDFParserSimple parser = new RDFParserSimple();
parser.doParse(new ByteArrayInputStream(headerData), "http://www.rdfhdt.org", RDFNotation.NTRIPLES, true,
this);
this, false);
} catch (ParserException e) {
log.error("Unexpected exception.", e);
throw new IllegalFormatException("Error parsing header");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package com.the_qa_company.qendpoint.core.iterator.utils;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;

/**
* Synchronise an iterator
*
* @param <E> iterator type
* @author Håvard M. Ottestad
* @author Antoine Willerval
*/
public class AsyncIteratorFetcherUnordered<E> extends AsyncIteratorFetcher<E> {

private static final int CORES = Runtime.getRuntime().availableProcessors();

public static final int BUFFER;

static {
long maxMemory = Runtime.getRuntime().maxMemory() / 1024 / 1024; // in
// MB
if (maxMemory >= 32 * 1024) {
BUFFER = 1024 * 32;
} else if (maxMemory >= 16 * 1024) {
BUFFER = 1024 * 16;
} else if (maxMemory >= 8 * 1024) {
BUFFER = 1024 * 8;
} else if (maxMemory >= 4 * 1024) {
BUFFER = 1024 * 4;
} else if (maxMemory >= 2 * 1024) {
BUFFER = 1024 * 2;
} else {
BUFFER = 1024;
}
}

private final Iterator<E> iterator;
private boolean end;
volatile Queue<E>[] queue = new Queue[CORES * 2];

{
for (int i = 0; i < queue.length; i++) {
queue[i] = new ArrayDeque<>(BUFFER);
}
}

public AsyncIteratorFetcherUnordered(Iterator<E> iterator) {
super(iterator);
this.iterator = iterator;
}

/**
* @return an element from the iterator, this method is thread safe
*/
@Override
public E get() {

int index = (int) (Thread.currentThread().getId() % queue.length);

Queue<E> es = queue[index];
if (es == null) {
for (Queue<E> eQueue : queue) {
if (eQueue != null) {
synchronized (eQueue) {
E poll = eQueue.poll();

if (poll != null) {
return poll;
}
}
}
}
}

if (es != null) {
// With this approach there is some risk that a queue is filled but
// never emptied. Maybe we should look for another queue to read
// from
// before filling our own queue?
synchronized (es) {
E poll = es.poll();

if (poll != null) {
return poll;
}

synchronized (this) {
es = queue[index];
if (es != null) {

poll = es.poll();
if (poll == null) {
if (iterator.hasNext()) {
poll = iterator.next();
for (int i = 0; i < BUFFER && iterator.hasNext(); i++) {
es.add(iterator.next());
}
}

}

if (poll == null) {
queue[index] = null;
} else {
return poll;
}
}
}
}
}

for (Queue<E> eQueue : queue) {
if (eQueue != null) {

synchronized (eQueue) {
synchronized (this) {
E poll = eQueue.poll();

if (poll != null) {
return poll;
}
}
}
}
}

synchronized (this) {
if (iterator.hasNext()) {
E poll = iterator.next();
return poll;
}
}

end = true;
return null;

}

/**
* @return is the end
*/
public boolean isEnd() {
return end;
}
}
Loading
Loading