From 5ecae1b77f22eb615811964df7fa6d2af21c5b7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 08:48:41 +0200 Subject: [PATCH 1/9] GH-610 add PipedCopyIteratorUnordered and AsyncIteratorFetcherUnordered --- .../core/hdt/impl/HDTDiskImporter.java | 3 +- .../utils/AsyncIteratorFetcherUnordered.java | 127 ++++++ .../utils/PipedCopyIteratorUnordered.java | 400 ++++++++++++++++++ .../qendpoint/core/rdf/RDFParserFactory.java | 18 +- 4 files changed, 538 insertions(+), 10 deletions(-) create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java index 22cc46c2..64747b99 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java @@ -14,6 +14,7 @@ import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult; import com.the_qa_company.qendpoint.core.header.HeaderPrivate; import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher; +import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered; import com.the_qa_company.qendpoint.core.listener.MultiThreadListener; import com.the_qa_company.qendpoint.core.listener.ProgressListener; import com.the_qa_company.qendpoint.core.options.HDTOptions; @@ -187,7 +188,7 @@ public CompressTripleMapper compressDictionary(Iterator iterator) "Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with " + ways + "ways and " + workers + " worker(s)"); - AsyncIteratorFetcher source = new AsyncIteratorFetcher<>(iterator); + AsyncIteratorFetcherUnordered source = new AsyncIteratorFetcherUnordered<>(iterator); profiler.pushSection("section compression"); CompressionResult compressionResult; diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java new file mode 100644 index 00000000..f9073713 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java @@ -0,0 +1,127 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; + +/** + * Synchronise an iterator + * + * @param iterator type + * @author Håvard M. Ottestad + * @author Antoine Willerval + */ +public class AsyncIteratorFetcherUnordered extends AsyncIteratorFetcher { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + public static final int BUFFER = 1024 * 32; + private final Iterator iterator; + private boolean end; + volatile Queue[] queue = new Queue[CORES * 2]; + + { + for (int i = 0; i < queue.length; i++) { + queue[i] = new ArrayDeque<>(BUFFER); + } + } + + public AsyncIteratorFetcherUnordered(Iterator iterator) { + super(iterator); + this.iterator = iterator; + } + + /** + * @return an element from the iterator, this method is thread safe + */ + @Override + public E get() { + + int index = (int) (Thread.currentThread().getId() % queue.length); + + Queue es = queue[index]; + if (es == null) { + for (Queue eQueue : queue) { + if (eQueue != null) { + synchronized (eQueue) { + E poll = eQueue.poll(); + + if (poll != null) { + return poll; + } + } + } + } + } + + if (es != null) { + // With this approach there is some risk that a queue is filled but + // never emptied. Maybe we should look for another queue to read + // from + // before filling our own queue? + synchronized (es) { + E poll = es.poll(); + + if (poll != null) { + return poll; + } + + synchronized (this) { + es = queue[index]; + if (es != null) { + + poll = es.poll(); + if (poll == null) { + if (iterator.hasNext()) { + poll = iterator.next(); + for (int i = 0; i < BUFFER && iterator.hasNext(); i++) { + es.add(iterator.next()); + } + } + + } + + if (poll == null) { + queue[index] = null; + } else { + return poll; + } + } + } + } + } + + for (Queue eQueue : queue) { + if (eQueue != null) { + + synchronized (eQueue) { + synchronized (this) { + E poll = eQueue.poll(); + + if (poll != null) { + return poll; + } + } + } + } + } + + synchronized (this) { + if (iterator.hasNext()) { + E poll = iterator.next(); + return poll; + } + } + + end = true; + return null; + + } + + /** + * @return is the end + */ + public boolean isEnd() { + return end; + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java new file mode 100644 index 00000000..72c91101 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java @@ -0,0 +1,400 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * a utility class to create an iterator from the value returned by another + * Thread + * + * @param the iterator type + * @author Antoine Willerval + * @author Håvard M. Ottestad + */ + +public class PipedCopyIteratorUnordered extends PipedCopyIterator { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + /** + * RuntimeException generated by the PipedCopyIterator + * + * @author Antoine Willerval + */ + public static class PipedIteratorException extends RuntimeException { + public PipedIteratorException(String message, Throwable t) { + super(message, t); + } + } + + /** + * Callback for the + * {@link #createOfCallback(PipedCopyIteratorUnordered.PipeCallBack)} method + * + * @param the iterator type + * @author Antoine Willerval + */ + @FunctionalInterface + public interface PipeCallBack { + /** + * method called from the new thread to generate the new data, at the + * end of the callback, the pipe is closed with or without exception + * + * @param pipe the pipe to fill + * @throws Exception any exception returned by the generator + */ + void createPipe(PipedCopyIterator pipe) throws Exception; + } + + /** + * create a piped iterator from a callback runner, the call to the callback + * should be made in the callbackRunner + * + * @param callbackRunner the callback runner + * @param type of the iterator + * @return the iterator + */ + public static PipedCopyIterator createUnorderedOfCallback(PipeCallBack callbackRunner) { + PipedCopyIteratorUnordered pipe = new PipedCopyIteratorUnordered<>(); + + Thread thread = new Thread(() -> { + try { + callbackRunner.createPipe(pipe); + pipe.closePipe(); + } catch (Throwable e) { + pipe.closePipe(e); + } + }, "PipeIterator"); + thread.start(); + + // close the thread at end + pipe.attachThread(thread); + + return pipe; + } + + private interface QueueObject { + boolean end(); + + T get(); + } + + private class ElementQueueObject implements QueueObject { + private final T obj; + + private ElementQueueObject(T obj) { + this.obj = obj; + } + + @Override + public boolean end() { + return false; + } + + @Override + public T get() { + return obj; + } + } + + private class EndQueueObject implements QueueObject { + @Override + public boolean end() { + return true; + } + + @Override + public T get() { + throw new IllegalArgumentException(); + } + } + + private final ArrayBlockingQueue>[] queue = new ArrayBlockingQueue[CORES * 2]; + + { + for (int i = 0; i < queue.length; i++) { + queue[i] = new ArrayBlockingQueue<>(16 * 1024); + } + } + + private final AtomicBoolean[] queueEnd = new AtomicBoolean[queue.length]; + + { + for (int i = 0; i < queueEnd.length; i++) { + queueEnd[i] = new AtomicBoolean(false); + } + } + + private T next; + private boolean end; + private PipedIteratorException exception; + + private Thread thread; + + volatile ArrayBlockingQueue> focusQueue; + + static AtomicInteger index = new AtomicInteger(0); + + static ThreadLocal threadLocalIndexW = ThreadLocal.withInitial(() -> index.getAndIncrement()); + + @Override + public boolean hasNext() { + if (end) { + return false; + } + if (next != null) { + return true; + } + + QueueObject obj; + try { + obj = useFocusQueue(); + + if (obj == null) { + obj = useThreadBasedQueue(); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PipedIteratorException("Can't read pipe", e); + } + + if (obj == null || obj.end()) { + obj = checkAllQueues(obj); + } + + if (obj.end()) { + end = true; + if (exception != null) { + throw exception; + } + return false; + } + next = obj.get(); + return true; + } + + private QueueObject useThreadBasedQueue() throws InterruptedException { + QueueObject obj; + int i = Thread.currentThread().hashCode(); + obj = queue[i % queue.length].poll(); + if (obj == null) { + obj = iterateThroughAllQueues(obj); + } else if (obj.end()) { + setQueueEnd(queue[i % queue.length]); + } else if (focusQueue == null) { + focusQueue = queue[i % queue.length]; + } + return obj; + } + + private QueueObject checkAllQueues(QueueObject originalObj) { + QueueObject obj = null; + boolean done; + + do { + done = true; + for (int i = 0; i < queue.length; i++) { + if (queueEnd[i].get()) { + continue; + } + done = false; + ArrayBlockingQueue> queueObjects = queue[i]; + obj = queueObjects.poll(); + if (obj == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else if (!obj.end()) { + return obj; + } else { + queueEnd[i].set(true); + } + } + } while (!done); + + if (obj == null) { + obj = originalObj; + } + + return obj; + } + + private QueueObject iterateThroughAllQueues(QueueObject obj) throws InterruptedException { + while (obj == null) { + for (int i = 0; i < queue.length; i++) { + if (queueEnd[i].get()) { + continue; + } + ArrayBlockingQueue> queueObjects = queue[i]; + obj = queueObjects.poll(); + if (obj != null) { + if (obj.end()) { + queueEnd[i].set(true); + } else if (focusQueue == null) { + focusQueue = queueObjects; + } + return obj; + } + } + Thread.sleep(10); + } + return obj; + } + + private QueueObject useFocusQueue() throws InterruptedException { + QueueObject obj; + var focusQueue = this.focusQueue; + if (focusQueue != null) { + QueueObject poll = focusQueue.poll(); + if (poll != null) { + obj = poll; + if (obj.end()) { + setQueueEnd(focusQueue); + } + } else { + obj = null; + this.focusQueue = null; + } + } else { + obj = null; + } + return obj; + } + + private void setQueueEnd(ArrayBlockingQueue> focusQueue) { + for (int i = 0; i < queue.length; i++) { + if (queue[i] == focusQueue) { + queueEnd[i].set(true); + break; + } + } + } + + @Override + public T next() { + if (!hasNext()) { + return null; + } + T next = this.next; + this.next = null; + return next; + } + + public void closePipe() { + closePipe(null); + } + + public void closePipe(Throwable e) { + if (e != null) { + // clear the queue to force the exception + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.clear(); + } + if (e instanceof PipedIteratorException) { + this.exception = (PipedIteratorException) e; + } else { + this.exception = new PipedIteratorException("closing exception", e); + } + } + try { + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.put(new EndQueueObject()); + } + } catch (InterruptedException ee) { + Thread.currentThread().interrupt(); + throw new PipedIteratorException("Can't close pipe", ee); + } + } + + /** + * map this iterator to another type + * + * @param mappingFunction the mapping function + * @param the future type + * @return mapped iterator + */ + public Iterator map(Function mappingFunction) { + return new MapIterator<>(this, mappingFunction); + } + + /** + * map this iterator to another type + * + * @param mappingFunction the mapping function + * @param the future type + * @return mapped iterator + */ + public Iterator mapWithId(MapIterator.MapWithIdFunction mappingFunction) { + return new MapIterator<>(this, mappingFunction); + } + + public void addElement(T node) { + ArrayBlockingQueue> currentQueue = queue[threadLocalIndexW.get() % queue.length]; + try { + + if (currentQueue == focusQueue) { + for (ArrayBlockingQueue> queueObjects : queue) { + if (queueObjects != focusQueue) { + currentQueue = queueObjects; + break; + } + + } + } + + boolean success = currentQueue.offer(new ElementQueueObject(node)); + if (!success) { + focusQueue = currentQueue; + while (!success) { + for (ArrayBlockingQueue> queueObjects : queue) { + success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS); + if (success) { + break; + } + } + } + } + + } catch (InterruptedException ee) { + Thread.currentThread().interrupt(); + throw new PipedIteratorException("Can't add element to pipe", ee); + } + } + + /** + * attach a thread to interrupt with this iterator + * + * @param thread the thread + */ + public void attachThread(Thread thread) { + Objects.requireNonNull(thread, "thread can't be null!"); + if (this.thread != null && this.thread != thread) { + throw new IllegalArgumentException("Thread already attached"); + } + this.thread = thread; + } + + /** + * Allow receiving again elements after an end node + */ + public void reset() { + this.end = false; + } + + @Override + public void close() throws IOException { + if (thread != null) { + thread.interrupt(); + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java index f03ca8f6..af808601 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java @@ -21,6 +21,7 @@ import com.the_qa_company.qendpoint.core.enums.RDFNotation; import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; +import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIteratorUnordered; import com.the_qa_company.qendpoint.core.options.HDTOptions; import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser; @@ -34,7 +35,6 @@ import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserZip; import com.the_qa_company.qendpoint.core.triples.TripleString; import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIterator; -import com.the_qa_company.qendpoint.core.util.string.PrefixesStorage; import java.io.InputStream; @@ -97,8 +97,8 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(stream, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -112,8 +112,8 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(file, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -127,8 +127,8 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(stream, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -143,8 +143,8 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(file, baseUri, notation, + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } } From 496fde01343320d78ba4a790a0fdeb94e0635545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 09:14:46 +0200 Subject: [PATCH 2/9] log triples per second --- .../core/hdt/impl/diskimport/SectionCompressor.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java index 78cc3d01..3ecec875 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java @@ -53,6 +53,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl source, MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict, @@ -253,7 +254,10 @@ public void createChunk(SizeFetcher fetcher, CloseSuppressPath out } if (tripleID % 100_000 == 0) { - listener.notifyProgress(10, "reading triples " + tripleID); + // use start to measure how many triples are read per second + int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0)); + + listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond); } // too much ram allowed? if (subjects.size() == Integer.MAX_VALUE - 6) { From 79e0360906c31658b812d83ed561d84e4ebb0680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 10:40:10 +0200 Subject: [PATCH 3/9] initial copy of the code --- .../core/hdt/impl/TempHDTImporterOnePass.java | 2 +- .../core/hdt/impl/TempHDTImporterTwoPass.java | 4 +- .../qendpoint/core/header/PlainHeader.java | 2 +- .../qendpoint/core/rdf/RDFParserCallback.java | 14 +- .../qendpoint/core/rdf/RDFParserFactory.java | 8 +- .../parsers/ChunkedConcurrentInputStream.java | 141 +++++ .../rdf/parsers/ConcurrentInputStream.java | 126 ++++ .../core/rdf/parsers/RDFDeltaFileParser.java | 12 +- .../core/rdf/parsers/RDFParserDir.java | 35 +- .../core/rdf/parsers/RDFParserHDT.java | 10 +- .../core/rdf/parsers/RDFParserList.java | 14 +- .../core/rdf/parsers/RDFParserRAR.java | 8 +- .../core/rdf/parsers/RDFParserRIOT.java | 194 +++++- .../core/rdf/parsers/RDFParserSimple.java | 6 +- .../core/rdf/parsers/RDFParserTar.java | 10 +- .../core/rdf/parsers/RDFParserZip.java | 10 +- .../core/rdf/parsers/TurtleChunker.java | 596 ++++++++++++++++++ .../qendpoint/core/tools/HDTDiffCat.java | 2 +- .../org/apache/jena/iri/impl/LexerFixer.java | 22 + .../core/hdt/impl/TempHDTImporterTest.java | 2 +- .../parsers/AbstractNTriplesParserTest.java | 2 +- .../core/rdf/parsers/RDFParserDirTest.java | 10 +- .../core/rdf/parsers/RDFParserHDTTest.java | 2 +- .../core/rdf/parsers/RDFParserSimpleTest.java | 2 +- .../core/util/UnicodeEscapeTest.java | 7 +- 25 files changed, 1134 insertions(+), 107 deletions(-) create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ChunkedConcurrentInputStream.java create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java create mode 100644 qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java create mode 100644 qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java index 9991d1d2..78a6b7f0 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java @@ -92,7 +92,7 @@ public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RD // Load RDF in the dictionary and generate triples dictionary.startProcessing(); - parser.doParse(filename, baseUri, notation, true, appender); + parser.doParse(filename, baseUri, notation, true, appender, false); dictionary.endProcessing(); // Reorganize both the dictionary and the triples diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java index e0b8532e..1e6658f8 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java @@ -108,14 +108,14 @@ public TempHDT loadFromRDF(HDTOptions specs, String filename, String baseUri, RD // Load RDF in the dictionary dictionary.startProcessing(); - parser.doParse(filename, baseUri, notation, true, new DictionaryAppender(dictionary, listener)); + parser.doParse(filename, baseUri, notation, true, new DictionaryAppender(dictionary, listener), false); dictionary.endProcessing(); // Reorganize IDs before loading triples modHDT.reorganizeDictionary(listener); // Load triples (second pass) - parser.doParse(filename, baseUri, notation, true, new TripleAppender2(dictionary, triples, listener)); + parser.doParse(filename, baseUri, notation, true, new TripleAppender2(dictionary, triples, listener), false); // reorganize HDT modHDT.reorganizeTriples(listener); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java index 0c37539b..f7022154 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java @@ -137,7 +137,7 @@ public void load(InputStream input, ControlInfo ci, ProgressListener listener) t try { RDFParserSimple parser = new RDFParserSimple(); parser.doParse(new ByteArrayInputStream(headerData), "http://www.rdfhdt.org", RDFNotation.NTRIPLES, true, - this); + this, false); } catch (ParserException e) { log.error("Unexpected exception.", e); throw new IllegalFormatException("Error parsing header"); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java index 1cc6caf0..fc2cc3ae 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java @@ -48,14 +48,14 @@ default RDFCallback async() { } } - void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException; + void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException; - default void doParse(Path file, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { - doParse(file.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback); + default void doParse(Path file, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { + doParse(file.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback, parallel); } - void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException; + void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException; } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java index af808601..d0f37007 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java @@ -98,7 +98,7 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation) { return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(stream, baseUri, notation, - keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()), true)); } /** @@ -113,7 +113,7 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation) { return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(file, baseUri, notation, - keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()), true)); } /** @@ -128,7 +128,7 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(stream, baseUri, notation, - keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()), true)); } /** @@ -144,7 +144,7 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { return PipedCopyIteratorUnordered.createUnorderedOfCallback(pipe -> parser.doParse(file, baseUri, notation, - keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); + keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()), true)); } } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ChunkedConcurrentInputStream.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ChunkedConcurrentInputStream.java new file mode 100644 index 00000000..851dc5c1 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ChunkedConcurrentInputStream.java @@ -0,0 +1,141 @@ +package com.the_qa_company.qendpoint.core.rdf.parsers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; + +public class ChunkedConcurrentInputStream { + + private static final Logger log = LoggerFactory.getLogger(ChunkedConcurrentInputStream.class); + private final InputStream source; + private final int numberOfStreams; + + private PipedInputStream[] pipedInputStreams; + private PipedOutputStream[] pipedOutputStreams; + + private PipedInputStream bnodeInputStream; + private PipedOutputStream bnodeOutputStream; + + private Thread readerThread; + + public ChunkedConcurrentInputStream(InputStream stream, int numberOfStreams) { + this.source = stream; + this.numberOfStreams = numberOfStreams; + setupPipes(); + startReadingThread(); + } + + private void setupPipes() { + pipedInputStreams = new PipedInputStream[numberOfStreams]; + pipedOutputStreams = new PipedOutputStream[numberOfStreams]; + + // The size of the pipes needs to be larger than the buffer of the + // buffered reader that Jena uses inside the parser, which is 131072 + // bytes. If our pipeSize is too small it limits the ability for the + // parsers to work concurrently. + int pipeSize = 131072 * 1024 * 2; + + try { + // Set up main fan-out pipes + for (int i = 0; i < numberOfStreams; i++) { + pipedOutputStreams[i] = new PipedOutputStream(); + pipedInputStreams[i] = new PipedInputStream(pipedOutputStreams[i], pipeSize); + } + + // Set up bnode pipe + bnodeOutputStream = new PipedOutputStream(); + bnodeInputStream = new PipedInputStream(bnodeOutputStream, pipeSize); + + } catch (IOException e) { + throw new RuntimeException("Error creating pipes", e); + } + } + + private void startReadingThread() { + readerThread = new Thread(new ReaderThread()); + + readerThread.setName("ConcurrentInputStream reader"); + readerThread.setDaemon(true); + readerThread.start(); + } + + /** + * Returns the stream for blank-node lines only. + */ + public InputStream getBnodeStream() { + return bnodeInputStream; + } + + /** + * Returns the array of InputStreams that share all concurrently read data. + */ + public InputStream[] getStreams() { + return pipedInputStreams; + } + + private class ReaderThread implements Runnable { + @Override + public void run() { + + try { + + TurtleChunker tokenizer = new TurtleChunker(source); + TurtleChunker.BlockIterator it = tokenizer.blockIterator(); + + it.setPrefixConsumer(p -> { + try { + bnodeOutputStream.write(p.getBytes(StandardCharsets.UTF_8)); + for (PipedOutputStream out : pipedOutputStreams) { + out.write(p.getBytes(StandardCharsets.UTF_8)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + int currentStreamIndex = 0; + + while (it.hasNext()) { + + String block = it.next(); + byte[] data = block.getBytes(StandardCharsets.UTF_8); + + if (block.contains("_:")) { + // Write to bnodeOutputStream only + bnodeOutputStream.write(data); + } else { + // Write to a single stream from pipedOutputStreams in a + // round-robin manner + pipedOutputStreams[currentStreamIndex].write(data); + currentStreamIndex = (currentStreamIndex + 1) % pipedOutputStreams.length; + } + + } + + } catch (IOException e) { + log.error("Error reading input stream", e); + // If there's a read error, close everything. + } finally { + // Close all output streams to signal EOF + for (PipedOutputStream out : pipedOutputStreams) { + try { + out.close(); + } catch (IOException ignored) { + } + } + + try { + bnodeOutputStream.close(); + } catch (IOException e) { + log.error("Error closing bnodeOutputStream", e); + } + } + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java new file mode 100644 index 00000000..39e4c96b --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java @@ -0,0 +1,126 @@ +package com.the_qa_company.qendpoint.core.rdf.parsers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; + +public class ConcurrentInputStream { + + private static final Logger log = LoggerFactory.getLogger(ConcurrentInputStream.class); + private final InputStream source; + private final int numberOfStreams; + + private PipedInputStream[] pipedInputStreams; + private PipedOutputStream[] pipedOutputStreams; + + private PipedInputStream bnodeInputStream; + private PipedOutputStream bnodeOutputStream; + + private Thread readerThread; + + public ConcurrentInputStream(InputStream stream, int numberOfStreams) { + this.source = stream; + this.numberOfStreams = numberOfStreams; + setupPipes(); + startReadingThread(); + } + + private void setupPipes() { + pipedInputStreams = new PipedInputStream[numberOfStreams]; + pipedOutputStreams = new PipedOutputStream[numberOfStreams]; + + // The size of the pipes needs to be larger than the buffer of the + // buffered reader that Jena uses inside the parser, which is 131072 + // bytes. If our pipeSize is too small it limits the ability for the + // parsers to work concurrently. + int pipeSize = 131072 * 1024; + + try { + // Set up main fan-out pipes + for (int i = 0; i < numberOfStreams; i++) { + pipedOutputStreams[i] = new PipedOutputStream(); + pipedInputStreams[i] = new PipedInputStream(pipedOutputStreams[i], pipeSize); + } + + // Set up bnode pipe + bnodeOutputStream = new PipedOutputStream(); + bnodeInputStream = new PipedInputStream(bnodeOutputStream, pipeSize); + + } catch (IOException e) { + throw new RuntimeException("Error creating pipes", e); + } + } + + private void startReadingThread() { + readerThread = new Thread(new ReaderThread()); + + readerThread.setName("ConcurrentInputStream reader"); + readerThread.setDaemon(true); + readerThread.start(); + } + + /** + * Returns the stream for blank-node lines only. + */ + public InputStream getBnodeStream() { + return bnodeInputStream; + } + + /** + * Returns the array of InputStreams that share all concurrently read data. + */ + public InputStream[] getStreams() { + return pipedInputStreams; + } + + private class ReaderThread implements Runnable { + @Override + public void run() { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(source, StandardCharsets.UTF_8))) { + String line; + int currentStreamIndex = 0; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + continue; // Skip empty lines + } + + byte[] data = (line + "\n").getBytes(StandardCharsets.UTF_8); + + if (line.contains("_:")) { + // Write to bnodeOutputStream only + bnodeOutputStream.write(data); + } else { + // Write to a single stream from pipedOutputStreams in a + // round-robin manner + pipedOutputStreams[currentStreamIndex].write(data); + currentStreamIndex = (currentStreamIndex + 1) % pipedOutputStreams.length; + } + } + } catch (IOException e) { + log.error("Error reading input stream", e); + // If there's a read error, close everything. + } finally { + // Close all output streams to signal EOF + for (PipedOutputStream out : pipedOutputStreams) { + try { + out.close(); + } catch (IOException ignored) { + } + } + + try { + bnodeOutputStream.close(); + } catch (IOException e) { + log.error("Error closing bnodeOutputStream", e); + } + } + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java index 5fe00558..8ec59584 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java @@ -144,18 +144,18 @@ public RDFDeltaFileParser(HDTOptions spec) { } @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try (InputStream is = IOUtil.getFileInputStream(fileName)) { - doParse(is, baseUri, notation, keepBNode, callback); + doParse(is, baseUri, notation, keepBNode, callback, parallel); } catch (IOException e) { throw new ParserException(e); } } @Override - public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { // read df file DeltaFileReader reader = new DeltaFileReader(in, spec); @@ -169,7 +169,7 @@ public void doParse(InputStream in, String baseUri, RDFNotation notation, boolea try { // read the next byte information parser.doParse(new GZIPInputStream(new ByteArrayInputStream(next.data)), baseUri, not, keepBNode, - callback); + callback, parallel); } catch (IOException e) { throw new ParserException("Error when reading " + next.fileName + " size: " + next.data.length, e); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java index 0e2043ae..fe6930c0 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java @@ -45,14 +45,14 @@ public RDFParserDir(HDTOptions spec) { } @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { - doParse(Path.of(fileName), baseUri, notation, keepBNode, callback); + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { + doParse(Path.of(fileName), baseUri, notation, keepBNode, callback, parallel); } @Override - public void doParse(Path path, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(Path path, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { if (notation != RDFNotation.DIR) { throw new IllegalArgumentException("Can't parse notation different than " + RDFNotation.DIR + "!"); } @@ -64,7 +64,7 @@ public void doParse(Path path, String baseUri, RDFNotation notation, boolean kee subFiles.forEach(child -> { try { if (Files.isDirectory(child)) { - doParse(child, baseUri, RDFNotation.DIR, keepBNode, callback); + doParse(child, baseUri, RDFNotation.DIR, keepBNode, callback, parallel); return; } RDFParserCallback rdfParserCallback; @@ -80,7 +80,7 @@ public void doParse(Path path, String baseUri, RDFNotation notation, boolean kee log.debug("parse {}", child); // we can parse it, parsing it rdfParserCallback.doParse(child.toAbsolutePath().toString(), baseUri, childNotation, - keepBNode, callback); + keepBNode, callback, parallel); } catch (ParserException e) { throw new ContainerException(e); } @@ -100,8 +100,8 @@ public void doParse(Path path, String baseUri, RDFNotation notation, boolean kee // list of all the future loaded by the parser FutureList list = new FutureList(); // send the first task with the root directory - list.add(executorService.submit( - new LoadTask(executorService, path, baseUri, RDFNotation.DIR, keepBNode, asyncRdfCallback))); + list.add(executorService.submit(new LoadTask(executorService, path, baseUri, RDFNotation.DIR, keepBNode, + asyncRdfCallback, parallel))); // wait for end of all the futures try { @@ -121,8 +121,8 @@ public void doParse(Path path, String baseUri, RDFNotation notation, boolean kee } @Override - public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { throw new NotImplementedException("Can't parse a stream of directory!"); } @@ -175,15 +175,17 @@ private class LoadTask implements Callable { private final RDFNotation notation; private final boolean keepBNode; private final RDFCallback callback; + private final boolean parallel; private LoadTask(ExecutorService executorService, Path path, String baseUri, RDFNotation notation, - boolean keepBNode, RDFCallback callback) { + boolean keepBNode, RDFCallback callback, boolean parallel) { this.executorService = executorService; this.path = path; this.baseUri = baseUri; this.notation = notation; this.keepBNode = keepBNode; this.callback = callback; + this.parallel = parallel; } @Override @@ -194,7 +196,7 @@ public FutureList call() throws ParserException { subFiles.forEach(child -> { if (Files.isDirectory(child)) { list.add(executorService.submit(new LoadTask(executorService, child, baseUri, - RDFNotation.DIR, keepBNode, callback))); + RDFNotation.DIR, keepBNode, callback, parallel))); return; } RDFNotation childNotation; @@ -207,8 +209,8 @@ public FutureList call() throws ParserException { } log.debug("parse {}", child); // we can parse it, parsing it - list.add(executorService.submit( - new LoadTask(executorService, child, baseUri, childNotation, keepBNode, callback))); + list.add(executorService.submit(new LoadTask(executorService, child, baseUri, childNotation, + keepBNode, callback, parallel))); }); } catch (IOException | SecurityException e) { throw new ParserException(e); @@ -226,7 +228,8 @@ public FutureList call() throws ParserException { } log.debug("parse {}", path); - rdfParserCallback.doParse(path.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback); + rdfParserCallback.doParse(path.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback, + parallel); } return list; diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java index 88d78557..2f6c649c 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java @@ -21,8 +21,8 @@ public class RDFParserHDT implements RDFParserCallback { private static final Logger log = LoggerFactory.getLogger(RDFParserHDT.class); @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try (HDT hdt = HDTManager.mapHDT(fileName)) { hdt.search("", "", "").forEachRemaining(t -> callback.processTriple(t, 0)); } catch (IOException | NotFoundException e) { @@ -32,15 +32,15 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole } @Override - public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { // create a temp Path tempFile = Files.createTempFile("hdtjava-reader", ".hdt"); log.warn("Create temp file to store the HDT stream {}", tempFile); try { Files.copy(in, tempFile); - doParse(tempFile.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback); + doParse(tempFile.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback, parallel); } finally { Files.deleteIfExists(tempFile); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java index 3aa8f866..acdc6c20 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java @@ -53,8 +53,8 @@ public RDFParserList() { * hdt.rdf.RDFParserCallback.RDFCallback) */ @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { BufferedReader reader; try { reader = IOUtil.getFileReader(fileName); @@ -63,7 +63,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole throw new ParserException(e); } try { - doParse(reader, baseUri, notation, keepBNode, callback); + doParse(reader, baseUri, notation, keepBNode, callback, parallel); } finally { IOUtil.closeQuietly(reader); } @@ -71,10 +71,10 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { BufferedReader reader = new BufferedReader(new InputStreamReader(input)); try { - doParse(reader, baseUri, notation, keepBNode, callback); + doParse(reader, baseUri, notation, keepBNode, callback, parallel); } finally { try { reader.close(); @@ -84,7 +84,7 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo } private void doParse(BufferedReader reader, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try { String line; while ((line = reader.readLine()) != null) { @@ -96,7 +96,7 @@ private void doParse(BufferedReader reader, String baseUri, RDFNotation notation System.out.println("Parse from list: " + line + " as " + guessnot); RDFParserCallback parser = RDFParserFactory.getParserCallback(guessnot, spec); - parser.doParse(line, baseUri, guessnot, keepBNode, callback); + parser.doParse(line, baseUri, guessnot, keepBNode, callback, parallel); } } reader.close(); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java index d8231b0f..0d6347a2 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java @@ -68,8 +68,8 @@ public static boolean isAvailable() { * hdt.rdf.RDFParserCallback.Callback) */ @Override - public void doParse(String rarFile, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String rarFile, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { String[] cmdList1 = Arrays.copyOf(cmdList, cmdList.length); @@ -99,7 +99,7 @@ public void doParse(String rarFile, String baseUri, RDFNotation notation, boolea Process processExtract = extractProcessBuilder.start(); InputStream in = processExtract.getInputStream(); - parser.doParse(in, baseUri, guessnot, keepBNode, callback); + parser.doParse(in, baseUri, guessnot, keepBNode, callback, parallel); in.close(); processExtract.waitFor(); @@ -119,7 +119,7 @@ public void doParse(String rarFile, String baseUri, RDFNotation notation, boolea @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { throw new NotImplementedException(); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java index a89ec1e1..8715b5fd 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java @@ -18,38 +18,174 @@ package com.the_qa_company.qendpoint.core.rdf.parsers; -import java.io.FileNotFoundException; -import java.io.InputStream; - +import com.the_qa_company.qendpoint.core.enums.RDFNotation; +import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; +import com.the_qa_company.qendpoint.core.exceptions.ParserException; import com.the_qa_company.qendpoint.core.quad.QuadString; +import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; +import com.the_qa_company.qendpoint.core.triples.TripleString; +import com.the_qa_company.qendpoint.core.util.io.IOUtil; +import org.apache.commons.io.IOUtils; import org.apache.jena.graph.Triple; +import org.apache.jena.iri.impl.LexerFixer; import org.apache.jena.riot.Lang; import org.apache.jena.riot.RDFParser; import org.apache.jena.riot.lang.LabelToNode; import org.apache.jena.riot.system.StreamRDF; import org.apache.jena.sparql.core.Quad; -import com.the_qa_company.qendpoint.core.enums.RDFNotation; -import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; -import com.the_qa_company.qendpoint.core.exceptions.ParserException; -import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; -import com.the_qa_company.qendpoint.core.triples.TripleString; -import com.the_qa_company.qendpoint.core.util.io.IOUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + /** * @author mario.arias */ public class RDFParserRIOT implements RDFParserCallback { private static final Logger log = LoggerFactory.getLogger(RDFParserRIOT.class); - private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBNode, ElemStringBuffer buffer) { - if (keepBNode) { - RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) - .parse(buffer); + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + static volatile boolean fixedLexer = false; + + private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBNode, ElemStringBuffer buffer, + boolean parallel) { + + long started = System.currentTimeMillis(); + try { + try (FileWriter fileWriter = new FileWriter("parsingStarted.txt")) { + IOUtils.write("Started parsing at " + started + "\n", fileWriter); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (!fixedLexer) { + synchronized (this) { + if (!fixedLexer) { + LexerFixer.fixLexers(); + fixedLexer = true; + } + } + } + + if (parallel && (lang == Lang.TURTLE)) { + if (keepBNode) { + + ChunkedConcurrentInputStream cs = new ChunkedConcurrentInputStream(stream, CORES - 1); + + InputStream bnodes = cs.getBnodeStream(); + + var threads = new ArrayList(); + + Thread e1 = new Thread(() -> { + RDFParser.source(bnodes).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + e1.setName("BNode parser"); + threads.add(e1); + + InputStream[] streams = cs.getStreams(); + int i = 0; + for (InputStream s : streams) { + int temp = i + 1; + Thread e = new Thread(() -> { + RDFParser.source(s).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + i++; + e.setName("Stream parser " + i); + threads.add(e); + + } + + threads.forEach(Thread::start); + for (Thread thread : threads) { + try { + while (thread.isAlive()) { + thread.join(1000); + } + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } else { + RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); + } + } else if (parallel && (lang == Lang.NQUADS || lang == Lang.NTRIPLES)) { + + if (keepBNode) { + + ConcurrentInputStream cs = new ConcurrentInputStream(stream, CORES - 1); + + InputStream bnodes = cs.getBnodeStream(); + + var threads = new ArrayList(); + + Thread e1 = new Thread(() -> { + RDFParser.source(bnodes).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + e1.setName("BNode parser"); + threads.add(e1); + + InputStream[] streams = cs.getStreams(); + int i = 0; + for (InputStream s : streams) { + int temp = i + 1; + Thread e = new Thread(() -> { + RDFParser.source(s).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + i++; + e.setName("Stream parser " + i); + threads.add(e); + + } + + threads.forEach(Thread::start); + for (Thread thread : threads) { + try { + while (thread.isAlive()) { + thread.join(1000); + } + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +// RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) +// .parse(buffer); + } else { + RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); + } } else { - RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); + if (keepBNode) { + RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + } else { + RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); + } + } + + long finished = System.currentTimeMillis(); + try { + try (FileWriter fileWriter = new FileWriter("parsingFinished.txt")) { + IOUtils.write( + "Finished parsing at " + finished + "\n" + "Total time: " + (finished - started) / 1000 + "\n", + fileWriter); + } + } catch (IOException e) { + throw new RuntimeException(e); } + } /* @@ -59,10 +195,10 @@ private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBN * hdt.rdf.RDFParserCallback.Callback) */ @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try (InputStream input = IOUtil.getFileInputStream(fileName)) { - doParse(input, baseUri, notation, keepBNode, callback); + doParse(input, baseUri, notation, keepBNode, callback, true); } catch (FileNotFoundException e) { throw new ParserException(e); } catch (Exception e) { @@ -73,16 +209,16 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { + System.out.println(); try { - ElemStringBuffer buffer = new ElemStringBuffer(callback); switch (notation) { - case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, buffer); - case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, buffer); - case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, buffer); - case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, buffer); - case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, buffer); - case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, buffer); + case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, new ElemStringBuffer(callback), parallel); + case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, new ElemStringBuffer(callback), parallel); + case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, new ElemStringBuffer(callback), parallel); + case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, new ElemStringBuffer(callback), parallel); + case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, new ElemStringBuffer(callback), parallel); + case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, new ElemStringBuffer(callback), parallel); default -> throw new NotImplementedException("Parser not found for format " + notation); } } catch (Exception e) { @@ -91,17 +227,16 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo } } - private static class ElemStringBuffer implements StreamRDF { - private final TripleString triple = new TripleString(); - private final QuadString quad = new QuadString(); + public static class ElemStringBuffer implements StreamRDF { private final RDFCallback callback; - private ElemStringBuffer(RDFCallback callback) { + public ElemStringBuffer(RDFCallback callback) { this.callback = callback; } @Override public void triple(Triple parsedTriple) { + TripleString triple = new TripleString(); triple.setAll(JenaNodeFormatter.format(parsedTriple.getSubject()), JenaNodeFormatter.format(parsedTriple.getPredicate()), JenaNodeFormatter.format(parsedTriple.getObject())); @@ -110,6 +245,7 @@ public void triple(Triple parsedTriple) { @Override public void quad(Quad parsedQuad) { + QuadString quad = new QuadString(); quad.setAll(JenaNodeFormatter.format(parsedQuad.getSubject()), JenaNodeFormatter.format(parsedQuad.getPredicate()), JenaNodeFormatter.format(parsedQuad.getObject()), JenaNodeFormatter.format(parsedQuad.getGraph())); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java index e6bf1203..bddb3156 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java @@ -45,8 +45,8 @@ public class RDFParserSimple implements RDFParserCallback { * hdt.rdf.RDFParserCallback.RDFCallback) */ @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { BufferedReader reader; try { reader = IOUtil.getFileReader(fileName); @@ -63,7 +63,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try (BufferedReader reader = new BufferedReader(new InputStreamReader(input))) { doParse(reader, baseUri, notation, keepBNode, callback); } catch (IOException e) { diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java index 71bfdad8..e4312e2d 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java @@ -41,11 +41,11 @@ public RDFParserTar() { * hdt.rdf.RDFParserCallback.Callback) */ @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { InputStream input = IOUtil.getFileInputStream(fileName); - this.doParse(input, baseUri, notation, keepBNode, callback); + this.doParse(input, baseUri, notation, keepBNode, callback, parallel); input.close(); } catch (Exception e) { log.error("Unexpected exception parsing file: {}", fileName, e); @@ -55,7 +55,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try { final TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory() @@ -74,7 +74,7 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo log.info("Parse from tar: {} as {}", entry.getName(), guessnot); RDFParserCallback parser = RDFParserFactory.getParserCallback(guessnot, spec); - parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback); + parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback, parallel); } catch (IllegalArgumentException | ParserException e1) { log.error("Unexpected exception.", e1); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java index 515e9791..4f866b2d 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java @@ -38,11 +38,11 @@ public RDFParserZip() { * hdt.rdf.RDFParserCallback.Callback) */ @Override - public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { InputStream input = IOUtil.getFileInputStream(fileName); - this.doParse(input, baseUri, notation, keepBNode, callback); + this.doParse(input, baseUri, notation, keepBNode, callback, parallel); input.close(); } catch (Exception e) { e.printStackTrace(); @@ -52,7 +52,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try { ZipInputStream zin = new ZipInputStream(input); @@ -67,7 +67,7 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo System.out.println("Parse from zip: " + zipEntry.getName() + " as " + guessnot); RDFParserCallback parser = RDFParserFactory.getParserCallback(guessnot, spec); - parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback); + parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback, parallel); } catch (IllegalArgumentException | ParserException e1) { e1.printStackTrace(); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java new file mode 100644 index 00000000..dcb14d7a --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java @@ -0,0 +1,596 @@ +package com.the_qa_company.qendpoint.core.rdf.parsers; + +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.function.Consumer; + +public class TurtleChunker { + + private int consecutiveBackslashes; + private boolean prefixOrBase; + + private enum State { + DEFAULT, PERIOD_PENDING, IRI, LITERAL, MULTILINE_LITERAL, LANG_TAG_OR_DATATYPE, PREFIX_OR_BASE, + CONSUME_WHITESPACE + } + + private State state = State.DEFAULT; + + private static final int BUFFER_SIZE = 1024 * 1024 * 4; + + private final InputStream in; // CHANGED (was Reader) + private final byte[] chunkBuf = new byte[BUFFER_SIZE]; + private int bufPos = 0, bufLen = 0; + + /** + * Stores partial bytes if the block spans multiple reads. If we never + * refill mid-block, we won't need this. + */ + private final ByteArrayOutputStream partialBytes = new ByteArrayOutputStream(); // CHANGED + + private final Deque nestingStack = new ArrayDeque<>(); + private byte literalDelimiter; + + private final MethodHandle[] defaultActions = new MethodHandle[256]; + private String finishedOneBlock = null; + + /** + * Indicates whether the current block has already crossed multiple reads + * (thus is partially in `partialBytes`). + */ + private boolean multiReadBlock = false; // CHANGED + /** + * Marks where the current block started in `chunkBuf` if not in multi-read + * mode. + */ + private int chunkStart = 0; // CHANGED + + public TurtleChunker(InputStream in) { // CHANGED + this.in = in; + buildDefaultActions(); + } + + private void buildDefaultActions() { + try { + MethodHandles.Lookup lookup = MethodHandles.lookup(); + MethodType rawMt = MethodType.methodType(void.class, byte.class); + + MethodHandle rawLt = lookup.findVirtual(TurtleChunker.class, "handleLtInDefault", rawMt); + MethodHandle rawHash = lookup.findVirtual(TurtleChunker.class, "handleHashInDefault", rawMt); + MethodHandle rawLParen = lookup.findVirtual(TurtleChunker.class, "handleLParenInDefault", rawMt); + MethodHandle rawRParen = lookup.findVirtual(TurtleChunker.class, "handleRParenInDefault", rawMt); + MethodHandle rawLBrack = lookup.findVirtual(TurtleChunker.class, "handleLBrackInDefault", rawMt); + MethodHandle rawRBrack = lookup.findVirtual(TurtleChunker.class, "handleRBrackInDefault", rawMt); + MethodHandle rawQ1 = lookup.findVirtual(TurtleChunker.class, "handleQuote1InDefault", rawMt); + MethodHandle rawQ2 = lookup.findVirtual(TurtleChunker.class, "handleQuote2InDefault", rawMt); + MethodHandle rawDot = lookup.findVirtual(TurtleChunker.class, "handleDotInDefault", rawMt); + MethodHandle rawBackslash = lookup.findVirtual(TurtleChunker.class, "handleBackslashInDefault", rawMt); + MethodHandle rawAt = lookup.findVirtual(TurtleChunker.class, "handleAtInDefault", rawMt); + + MethodHandle boundLt = rawLt.bindTo(this); + MethodHandle boundHash = rawHash.bindTo(this); + MethodHandle boundLParen = rawLParen.bindTo(this); + MethodHandle boundRParen = rawRParen.bindTo(this); + MethodHandle boundLBrack = rawLBrack.bindTo(this); + MethodHandle boundRBrack = rawRBrack.bindTo(this); + MethodHandle boundQ1 = rawQ1.bindTo(this); + MethodHandle boundQ2 = rawQ2.bindTo(this); + MethodHandle boundDot = rawDot.bindTo(this); + MethodHandle boundBackslash = rawBackslash.bindTo(this); + MethodHandle boundAt = rawAt.bindTo(this); + + defaultActions['<'] = boundLt; + defaultActions['#'] = boundHash; + defaultActions['('] = boundLParen; + defaultActions[')'] = boundRParen; + defaultActions['['] = boundLBrack; + defaultActions[']'] = boundRBrack; + defaultActions['\''] = boundQ1; + defaultActions['"'] = boundQ2; + defaultActions['.'] = boundDot; + defaultActions['\\'] = boundBackslash; + defaultActions['@'] = boundAt; + + } catch (NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException("Failed to build defaultActions", e); + } + } + + /* + * ---------------------------------------------------------------- The main + * loop that reads & parses blocks. + * ---------------------------------------------------------------- + */ + private String parseNextBlock() throws IOException { + while (true) { + if (bufPos >= bufLen) { + readMoreData(); + } + if (bufLen == 0) { + // no more data => produce leftover partial if any + if (partialBytes.size() > 0) { // CHANGED + partialBytes.reset(); // CHANGED + // CHANGED + String leftoverStr = partialBytes.toString(StandardCharsets.UTF_8); + leftoverStr = leftoverStr.trim(); + return leftoverStr.isEmpty() ? null : leftoverStr; + } + return null; // truly no more + } + switch (state) { + case DEFAULT -> parseDefaultOneStep(); + case PERIOD_PENDING -> parsePeriodOneStep(); + case IRI -> parseIriOneStep(); + case LITERAL -> parseLiteralOneStep(); + case MULTILINE_LITERAL -> parseMultilineLiteralOneStep(); + case LANG_TAG_OR_DATATYPE -> parseLangTagOrDatatypeOneStep(); + case PREFIX_OR_BASE -> parsePrefixOrBaseOneStep(); + case CONSUME_WHITESPACE -> { + // This state is not used in the current implementation. + // It can be used to handle whitespace between tokens. + // For now, we just skip it. + if (Character.isWhitespace(chunkBuf[bufPos])) { + chunkStart++; + bufPos++; + } else { + // If we encounter a non-whitespace character, we need to + // handle it. + // We can either transition to the DEFAULT state or handle + // it as a special case. + state = State.DEFAULT; + } + } + } + if (finishedOneBlock != null) { + String block = finishedOneBlock; + finishedOneBlock = null; + state = State.CONSUME_WHITESPACE; + return block; + } + } + } + + private void parsePrefixOrBaseOneStep() { + byte b = nextByte(); + if (b != 'p' && b != 'b') { + throw new RuntimeException("Expected 'p' or 'b' after '@', but got: " + (char) b); + } + this.prefixOrBase = true; + this.state = State.DEFAULT; + } + + /* + * ---------------------------------------------------------------- + * parseXxxOneStep methods: We do not append to partialBytes here unless we + * are finalizing the block. We parse in-place from chunkBuf for ASCII + * triggers, etc. + * ---------------------------------------------------------------- + */ + + private void parseDefaultOneStep() throws IOException { + byte b = nextByte(); + MethodHandle mh = defaultActions[b & 0xFF]; + if (mh != null) { + try { + mh.invokeExact(b); // (byte)->void + } catch (Throwable t) { + if (t instanceof IOException ioException) { + throw ioException; + } else if (t instanceof Error error) { + throw error; + } else if (t instanceof RuntimeException runtimeException) { + throw runtimeException; + } else if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException(t); + } + throw new RuntimeException(t); + } + } + } + + private void parseIriOneStep() { + while (bufPos < bufLen) { + byte b = nextByte(); + if (b == '>') { + state = State.DEFAULT; + return; + } + } + } + + private void parseLiteralOneStep() { + while (bufPos < bufLen) { + byte b = nextByte(); + if (b == '\\') { + consecutiveBackslashes++; + continue; + } + boolean escaped = (consecutiveBackslashes % 2 == 1); + consecutiveBackslashes = 0; // reset whenever we see a non-backslash + + if (b == literalDelimiter && !escaped) { + state = State.LANG_TAG_OR_DATATYPE; + return; + } + } + } + + private void parseMultilineLiteralOneStep() throws IOException { + + while (bufPos < bufLen) { + byte b = nextByte(); + + if (b == '\\') { + consecutiveBackslashes++; + continue; + } + + boolean escaped = (consecutiveBackslashes % 2 == 1); + consecutiveBackslashes = 0; // reset whenever we see a non-backslash + + if (b == literalDelimiter && !escaped) { + if (checkForTripleQuote(literalDelimiter)) { + state = State.LANG_TAG_OR_DATATYPE; + return; + } + } + } + } + + private void parseLangTagOrDatatypeOneStep() throws IOException { + byte b = chunkBuf[bufPos]; + // We do NOT consume it yet in case we want to pass it to default action + + if (b == '@') { + // We detected a language tag start. For now, you said we don't + // parse it fully, + // so you might just consume the '@' + + bufPos++; // consume '@' + + state = State.DEFAULT; + } else if (b == '^') { + // Could be start of ^^ + bufPos++; // consume '^' + state = State.DEFAULT; + } else { + // Not '@' or '^', so let's pass it through to the defaultAction + // logic. + state = State.DEFAULT; + } + } + + /* + * ---------------------------------------------------------------- Special + * char handlers in DEFAULT state + * ---------------------------------------------------------------- + */ + + private void handleLtInDefault(byte b) { + state = State.IRI; + } + + private void handleHashInDefault(byte b) { + skipComment(); + } + + private void handleLParenInDefault(byte b) { + nestingStack.push('('); + } + + private void handleRParenInDefault(byte b) { + if (!nestingStack.isEmpty()) { + nestingStack.pop(); + } + } + + private void handleLBrackInDefault(byte b) { + nestingStack.push('['); + } + + private void handleRBrackInDefault(byte b) { + if (!nestingStack.isEmpty()) { + nestingStack.pop(); + } + } + + private void handleQuote1InDefault(byte b) throws IOException { + if (checkForTripleQuote(b)) { + state = State.MULTILINE_LITERAL; + literalDelimiter = b; + } else { + state = State.LITERAL; + literalDelimiter = b; + } + } + + private void handleQuote2InDefault(byte b) throws IOException { + if (checkForTripleQuote(b)) { + state = State.MULTILINE_LITERAL; + literalDelimiter = b; + } else { + state = State.LITERAL; + literalDelimiter = b; + } + } + + // Modified: Instead of finalizing directly, we transition to + // PERIOD_PENDING. + private void handleDotInDefault(byte b) { + if (nestingStack.isEmpty()) { + state = State.PERIOD_PENDING; + } + } + + private void handleBackslashInDefault(byte b) throws IOException { + // b is the backslash we've just encountered. + // Let's skip to the next byte if we're not at the end of the buffer. + // The next byte is escaped, so we don't need to check it. + if (bufPos >= bufLen) { + readMoreData(); + } + if (bufPos < bufLen) { + nextByte(); // consume and discard the next byte + } + } + + private void handleAtInDefault(byte b) throws IOException { + if (bufPos - chunkStart > 1) { + throw new RuntimeException("Unexpected @ in block: " + + new String(chunkBuf, chunkStart, bufPos - chunkStart, StandardCharsets.UTF_8)); + } else { + state = State.PREFIX_OR_BASE; + } + } + + private void parsePeriodOneStep() { + // We assume bufPos < bufLen due to the check in parseNextBlock(). + byte next = chunkBuf[bufPos]; + if (next == ' ' || next == '\t' || next == '\n' || next == '\r') { + state = State.DEFAULT; + finalizeBlock(); + } else { + state = State.DEFAULT; + } + } + + /* + * ---------------------------------------------------------------- + * finalizeBlock: build the final statement string + * ---------------------------------------------------------------- + */ + private void finalizeBlock() { + if (!multiReadBlock) { + // The entire block is in chunkBuf from chunkStart..bufPos + int length = bufPos - chunkStart; + if (length <= 0) { + return; // nothing + } + String block = new String(chunkBuf, chunkStart, length, StandardCharsets.UTF_8); + + chunkStart = bufPos; // next block starts here + finishedOneBlock = block; + } else { + // partial data is in partialBytes + leftover in chunkBuf + if (bufPos > chunkStart) { + partialBytes.write(chunkBuf, chunkStart, (bufPos - chunkStart)); // CHANGED + } + String block = partialBytes.toString(StandardCharsets.UTF_8); + + partialBytes.reset(); // CHANGED + finishedOneBlock = block; + multiReadBlock = false; + chunkStart = bufPos; + } + } + + /* + * ---------------------------------------------------------------- + * skipComment, tripleQuote, escaping checks We parse in place for + * detection. + * ---------------------------------------------------------------- + */ + + private void skipComment() { + while (true) { + if (bufPos >= bufLen) { + return; + } + byte b = nextByte(); + // check if the byte represents an ASCII character, if not then it's + // not relevant to check + if ((b & 0x80) != 0) { + continue; + } + + if (b == '\n') { + return; + } + } + } + + private byte nextByte() { + return chunkBuf[bufPos++]; + } + + private boolean checkForTripleQuote(byte quoteChar) throws IOException { + if (bufPos >= bufLen) { + readMoreData(); + } + + if (bufPos >= bufLen) { + return false; + } + + if (chunkBuf[bufPos] == quoteChar) { + bufPos++; + if (bufPos >= bufLen) { + readMoreData(); + } + if (bufPos < bufLen) { + if (chunkBuf[bufPos] == quoteChar) { + bufPos++; + return true; + } + return false; + } else { + return false; + } + } else { + return false; + } + } + + /* + * ---------------------------------------------------------------- + * readMoreData: if we run out of data & haven't ended the block, copy + * leftover from chunkBuf to partialBytes to avoid overwriting it. + * ---------------------------------------------------------------- + */ + private void readMoreData() throws IOException { + // If we haven't finished the current block + if (chunkStart < bufLen) { + partialBytes.write(chunkBuf, chunkStart, bufLen - chunkStart); // CHANGED + multiReadBlock = true; + } + chunkStart = 0; + bufLen = in.read(chunkBuf); + bufPos = 0; + if (bufLen == -1) { + bufLen = 0; // EOF + } + } + + /* + * ---------------------------------------------------------------- + * BlockIterator + * ---------------------------------------------------------------- + */ + + public BlockIterator blockIterator() { + return new BlockIterator(); + } + + public class BlockIterator implements Iterator { + private String nextBlock; + private boolean done; + Consumer prefixConsumer = null; + + public void setPrefixConsumer(Consumer prefixConsumer) { + this.prefixConsumer = prefixConsumer; + } + + String getPrefixes() { + StringBuilder sb = new StringBuilder(); +// while (hasNext()) { +// String lowerCase = nextBlock.trim().toLowerCase(); +// if (lowerCase.isEmpty() || lowerCase.startsWith("#")) { +// nextBlock = null; +// } else if (lowerCase.startsWith("@prefix") || lowerCase.startsWith("@base")) { +// sb.append(nextBlock.trim()).append("\n"); +// nextBlock = null; +// } else { +// break; +// } +// } + return sb.toString(); + } + + @Override + public boolean hasNext() { + if (done) { + return false; + } + if (nextBlock != null) { + return true; + } + while (true) { + try { + nextBlock = parseNextBlock(); + if (prefixOrBase) { + this.prefixConsumer.accept(nextBlock); + prefixOrBase = false; + nextBlock = null; + } else { + if (nextBlock == null) { + done = true; + return false; + } + return true; + } + + } catch (IOException e) { + done = true; + throw new RuntimeException("IO error during iteration", e); + } + } + + } + + @Override + public String next() { + if (!hasNext()) { + throw new NoSuchElementException("No more blocks"); + } + String result = nextBlock; + nextBlock = null; + return result; + } + } + + // -- Example main for testing -- + public static void main(String[] args) { + String filePath = "/Users/havardottestad/Documents/Programming/qEndpoint3/indexing/latest-dump.ttl"; // Update + // path + long actualStart = System.currentTimeMillis(); + long start = System.currentTimeMillis(); + long count = 0; + long total = 0; + + try (InputStream sr = new FileInputStream(filePath)) { + TurtleChunker tokenizer = new TurtleChunker(sr); + BlockIterator it = tokenizer.blockIterator(); + + System.out.println("Processing with NIO AsynchronousFileChannel (blocking wait)..."); + + it.setPrefixConsumer(p -> System.out.println("Prefix: " + p)); + + while (it.hasNext()) { + String block = it.next(); + int length = block.trim().split("\n").length; + count += length; + total += length; + if (count > 10_000_000) { + System.out.println(count + " lines parsed"); + System.out.println(block); + System.out.printf("Lines per second: %,d \n", count * 1000 / (System.currentTimeMillis() - start)); + System.out.printf("Lines per second (total): %,d \n", + total * 1000 / (System.currentTimeMillis() - actualStart)); + + start = System.currentTimeMillis(); + count = 0; + } + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + long actualEnd = System.currentTimeMillis(); + long total2 = actualEnd - actualStart; + long minutes = total2 / 60000; + long seconds = (total2 % 60000) / 1000; + System.out.printf("Total: %,d \n", total); + System.out.printf("Total time: %d:%02d%n", minutes, seconds); + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/tools/HDTDiffCat.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/tools/HDTDiffCat.java index 744bdc52..e3edfebb 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/tools/HDTDiffCat.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/tools/HDTDiffCat.java @@ -103,7 +103,7 @@ private HDT diffcat(String location, HDTOptions spec, MultiThreadListener listen } } else { RDFParserCallback parser = RDFParserFactory.getParserCallback(type, spec); - parser.doParse(diff, "", type, true, callback); + parser.doParse(diff, "", type, true, callback, false); } } catch (Throwable t) { try { diff --git a/qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java b/qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java new file mode 100644 index 00000000..779313ac --- /dev/null +++ b/qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java @@ -0,0 +1,22 @@ +package org.apache.jena.iri.impl; + +import java.io.Reader; + +public class LexerFixer { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + public static void fixLexers() { + Parser.lexers = new Lexer[CORES * 4][]; + for (int i = 0; i < Parser.lexers.length; i++) { + Parser.lexers[i] = new Lexer[] { new LexerScheme((Reader) null), new LexerUserinfo((Reader) null), + new LexerHost((Reader) null), new LexerPort((Reader) null), new LexerPath((Reader) null), + new LexerQuery((Reader) null), new LexerFragment((Reader) null), new LexerXHost((Reader) null), }; + } + } + + public static void printLexerSize() { + int length = Parser.lexers.length; + System.out.println("Lexer size: " + length); + } +} diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTest.java index dea2ae4d..2613c97f 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTest.java @@ -59,7 +59,7 @@ private Iterator asIt(String file) throws ParserException { // force duplication of the triple string data triples.add(new TripleString(triple.getSubject().toString(), triple.getPredicate().toString(), triple.getObject().toString())); - }); + }, true); return triples.iterator(); } diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java index 374b6f41..1ddfb205 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java @@ -87,7 +87,7 @@ private List parse(String ntriples, int expectedCount) throws Pars final List triples = new ArrayList<>(); createParser().doParse(in, "http://example.com#", RDFNotation.NTRIPLES, false, - (triple, pos) -> triples.add(new TripleString(triple))); + (triple, pos) -> triples.add(new TripleString(triple)), true); assertEquals(expectedCount, triples.size()); return triples; diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDirTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDirTest.java index 2d3146ab..5f2d54dc 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDirTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDirTest.java @@ -84,7 +84,7 @@ public void dirTest() throws IOException, ParserException { assertTrue(callback instanceof RDFParserDir); callback.doParse(filename, "http://example.org/#", dir, true, - (triple, pos) -> assertTrue("triple " + triple + " wasn't excepted", excepted.remove(triple))); + (triple, pos) -> assertTrue("triple " + triple + " wasn't excepted", excepted.remove(triple)), true); } @Test @@ -120,7 +120,7 @@ public void asyncTest() throws IOException, ParserException { for (Path path : allFiles) { RDFParserCallback parser = RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES); parser.doParse(path.toAbsolutePath().toString(), HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, - containerSimple); + containerSimple, true); } } time.stop(); @@ -138,7 +138,8 @@ public void asyncTest() throws IOException, ParserException { assertTrue(parser instanceof RDFParserDir); assertEquals(maxThread, ((RDFParserDir) parser).async); - parser.doParse(root.toAbsolutePath().toString(), HDTTestUtils.BASE_URI, notation, true, containerAsync); + parser.doParse(root.toAbsolutePath().toString(), HDTTestUtils.BASE_URI, notation, true, containerAsync, + true); } time.stop(); if (LOG_TIME) { @@ -157,7 +158,8 @@ public void asyncTest() throws IOException, ParserException { assertTrue(parser instanceof RDFParserDir); assertEquals(1, ((RDFParserDir) parser).async); - parser.doParse(root.toAbsolutePath().toString(), HDTTestUtils.BASE_URI, notation, true, containerSync); + parser.doParse(root.toAbsolutePath().toString(), HDTTestUtils.BASE_URI, notation, true, containerSync, + true); } time.stop(); diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDTTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDTTest.java index 629d740e..4312d068 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDTTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDTTest.java @@ -44,7 +44,7 @@ public void hdtTest() throws IOException, ParserException, NotFoundException { IteratorTripleString it = hdt.search("", "", ""); callback.doParse(filename, "http://example.org/#", dir, true, - (triple, pos) -> Assert.assertEquals(it.next(), triple)); + (triple, pos) -> Assert.assertEquals(it.next(), triple), true); hdt.close(); } diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java index 4fe7cb41..bf3c72c9 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java @@ -77,7 +77,7 @@ public void ingestTest() throws IOException, InterruptedException, ParserExcepti System.out.println(count[0] + " triples " + watch.stopAndShow()); } count[0]++; - }); + }, true); t.join(); if (re[0] != null) { diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java index ba0fdad1..b09f4975 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java @@ -40,9 +40,10 @@ public void encodeTest() throws ParserException { throw new RuntimeException(e); } })); - factory.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts1.add(t.tripleToString())); - factory2.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, - (t, i) -> ts2.add(t.tripleToString())); + factory.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts1.add(t.tripleToString()), + true); + factory2.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts2.add(t.tripleToString()), + true); Iterator it1 = ts1.iterator(); Iterator it2 = ts2.iterator(); From 596383d01aa8d264d4e238800f2f1b7a80f99cdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 10:47:53 +0200 Subject: [PATCH 4/9] update test to be more consistent --- .../qendpoint/controller/FileUploadTest.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java b/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java index 4f381f56..c5af5854 100644 --- a/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java +++ b/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java @@ -64,6 +64,8 @@ public class FileUploadTest { public static final String COKTAILS_NT = "cocktails.nt"; private static final Logger logger = LoggerFactory.getLogger(FileUploadTest.class); + private static boolean initialised = false; + @Parameterized.Parameters(name = "{0}") public static Collection params() { ArrayList list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys()); @@ -79,25 +81,35 @@ public static Collection params() { public FileUploadTest(RDFFormat format) throws IOException, ParserException { this.format = format; - RDFFormat originalFormat = Rio.getParserFormatForFileName(COKTAILS_NT).orElseThrow(); + RDFFormat originalFormat = Rio.getParserFormatForFileName(COKTAILS_NT).orElseThrow(); RDFParser parser = Rio.createParser(originalFormat); Path testDir = Paths.get("tests", "testrdf"); Files.createDirectories(testDir); Path RDFFile = testDir.resolve(COKTAILS_NT + "." + format.getDefaultFileExtension()); - if (!Files.exists(RDFFile)) { - try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) { - if (format == RDFFormat.HDT) { - try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.TURTLE, - HDTOptions.empty(), ProgressListener.ignore())) { - hdt.saveToHDT(os); + + if (!initialised || !Files.exists(RDFFile)) { + + if (Files.exists(RDFFile)) { + Files.delete(RDFFile); + } + + if (!Files.exists(RDFFile)) { + try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) { + if (format == RDFFormat.HDT) { + try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.TURTLE, + HDTOptions.empty(), ProgressListener.ignore())) { + hdt.saveToHDT(os); + } + } else { + RDFWriter writer = Rio.createWriter(format, os); + parser.setRDFHandler(noBNode(writer)); + parser.parse(is); } - } else { - RDFWriter writer = Rio.createWriter(format, os); - parser.setRDFHandler(noBNode(writer)); - parser.parse(is); } } + + initialised = true; } fileName = RDFFile.toFile().getAbsolutePath(); From 9f2b6f71e96329df5aa522af8e4e10762266b7e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 10:51:14 +0200 Subject: [PATCH 5/9] fix bug in test --- .../the_qa_company/qendpoint/controller/FileUploadTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java b/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java index c5af5854..68cf0ddb 100644 --- a/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java +++ b/qendpoint-backend/src/test/java/com/the_qa_company/qendpoint/controller/FileUploadTest.java @@ -68,7 +68,8 @@ public class FileUploadTest { @Parameterized.Parameters(name = "{0}") public static Collection params() { - ArrayList list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys()); +// ArrayList list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys()); + ArrayList list = new ArrayList<>(); list.add(RDFFormat.HDT); return list; } @@ -97,7 +98,7 @@ public FileUploadTest(RDFFormat format) throws IOException, ParserException { if (!Files.exists(RDFFile)) { try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) { if (format == RDFFormat.HDT) { - try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.TURTLE, + try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.NTRIPLES, HDTOptions.empty(), ProgressListener.ignore())) { hdt.saveToHDT(os); } From 2992a0f38028daa2b500d4ee349d13b959c13d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 12:07:02 +0200 Subject: [PATCH 6/9] be more memory aware and also fix some tests --- .../core/hdt/impl/TempHDTImporterOnePass.java | 2 +- .../utils/AsyncIteratorFetcherUnordered.java | 21 ++++++++++++++++- .../rdf/parsers/ConcurrentInputStream.java | 23 ++++++++++++++++++- .../core/rdf/parsers/RDFContainer.java | 4 ++-- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java index 78a6b7f0..2ff0d069 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java @@ -53,7 +53,7 @@ public TripleAppender(TempDictionary dict, TempTriples triples, ProgressListener } @Override - public void processTriple(TripleString triple, long pos) { + public synchronized void processTriple(TripleString triple, long pos) { long s = dict.insert(triple.getSubject(), TripleComponentRole.SUBJECT); long p = dict.insert(triple.getPredicate(), TripleComponentRole.PREDICATE); long o = dict.insert(triple.getObject(), TripleComponentRole.OBJECT); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java index f9073713..f6fc7826 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java @@ -15,7 +15,26 @@ public class AsyncIteratorFetcherUnordered extends AsyncIteratorFetcher { private static final int CORES = Runtime.getRuntime().availableProcessors(); - public static final int BUFFER = 1024 * 32; + public static final int BUFFER; + + static { + long maxMemory = Runtime.getRuntime().maxMemory() / 1024 / 1024; // in + // MB + if (maxMemory >= 32 * 1024) { + BUFFER = 1024 * 32; + } else if (maxMemory >= 16 * 1024) { + BUFFER = 1024 * 16; + } else if (maxMemory >= 8 * 1024) { + BUFFER = 1024 * 8; + } else if (maxMemory >= 4 * 1024) { + BUFFER = 1024 * 4; + } else if (maxMemory >= 2 * 1024) { + BUFFER = 1024 * 2; + } else { + BUFFER = 1024; + } + } + private final Iterator iterator; private boolean end; volatile Queue[] queue = new Queue[CORES * 2]; diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java index 39e4c96b..244cc445 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java @@ -14,6 +14,27 @@ public class ConcurrentInputStream { private static final Logger log = LoggerFactory.getLogger(ConcurrentInputStream.class); + + public static final int BUFFER; + + static { + long maxMemory = Runtime.getRuntime().maxMemory() / 1024 / 1024; // in + // MB + if (maxMemory >= 32 * 1024) { + BUFFER = 128; + } else if (maxMemory >= 16 * 1024) { + BUFFER = 128 / 2; + } else if (maxMemory >= 8 * 1024) { + BUFFER = 128 / 4; + } else if (maxMemory >= 4 * 1024) { + BUFFER = 128 / 8; + } else if (maxMemory >= 2 * 1024) { + BUFFER = 128 / 16; + } else { + BUFFER = 128 / 32; + } + } + private final InputStream source; private final int numberOfStreams; @@ -40,7 +61,7 @@ private void setupPipes() { // buffered reader that Jena uses inside the parser, which is 131072 // bytes. If our pipeSize is too small it limits the ability for the // parsers to work concurrently. - int pipeSize = 131072 * 1024; + int pipeSize = 131072 * BUFFER; try { // Set up main fan-out pipes diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFContainer.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFContainer.java index c9050198..8211ff17 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFContainer.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFContainer.java @@ -3,11 +3,11 @@ import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; import com.the_qa_company.qendpoint.core.triples.TripleString; -import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class RDFContainer implements RDFParserCallback.RDFCallback { - private final Set triples = new HashSet<>(); + private final Set triples = ConcurrentHashMap.newKeySet(); @Override public void processTriple(TripleString triple, long pos) { From bb71973e7cc2aefc69789d865c5fc9fc3532807f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 12:16:44 +0200 Subject: [PATCH 7/9] fix another test --- .../qendpoint/core/util/UnicodeEscapeTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java index b09f4975..f1c39b38 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java @@ -26,20 +26,20 @@ public void encodeTest() throws ParserException { RDFParserCallback factory2 = RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES, HDTOptions.of(Map.of(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY, "false"))); - Set ts1 = new TreeSet<>(Comparator.comparing(t -> { + Set ts1 = Collections.synchronizedSet(new TreeSet<>(Comparator.comparing(t -> { try { return t.asNtriple().toString(); } catch (IOException e) { throw new RuntimeException(e); } - })); - Set ts2 = new TreeSet<>(Comparator.comparing(t -> { + }))); + Set ts2 = Collections.synchronizedSet(new TreeSet<>(Comparator.comparing(t -> { try { return t.asNtriple().toString(); } catch (IOException e) { throw new RuntimeException(e); } - })); + }))); factory.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts1.add(t.tripleToString()), true); factory2.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts2.add(t.tripleToString()), From a7e97f7fd2eda0c2c3f9c0707625409ec050f6ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 13:47:26 +0200 Subject: [PATCH 8/9] fix another test --- .../core/util/LargeFakeDataSetStreamSupplierTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java index c69a99ef..e1bda593 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java @@ -5,6 +5,7 @@ import com.the_qa_company.qendpoint.core.exceptions.ParserException; import com.the_qa_company.qendpoint.core.hdt.HDT; import com.the_qa_company.qendpoint.core.hdt.HDTManager; +import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIteratorUnordered; import com.the_qa_company.qendpoint.core.options.HDTOptions; import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; import com.the_qa_company.qendpoint.core.options.HDTSpecification; @@ -48,9 +49,10 @@ public void streamTest() throws IOException { Iterator it2 = triples.createTripleStringStream(); try (InputStream is = Files.newInputStream(testNt)) { - try (PipedCopyIterator it = RDFParserFactory.readAsIterator( - RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES), is, HDTTestUtils.BASE_URI, true, - RDFNotation.NTRIPLES)) { + RDFParserCallback parser = RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES); + try (PipedCopyIterator it = PipedCopyIteratorUnordered + .createUnorderedOfCallback(pipe -> parser.doParse(is, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, + true, (triple, pos) -> pipe.addElement(triple.tripleToString()), false))) { it.forEachRemaining(s -> { assertTrue(it2.hasNext()); assertEquals(it2.next(), s); From a56a365dbd6673770830e03d0ed81868911c74ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Sat, 28 Jun 2025 14:22:41 +0200 Subject: [PATCH 9/9] fix another test --- .../qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java index 1e6658f8..c70820d4 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterTwoPass.java @@ -48,7 +48,7 @@ static class DictionaryAppender implements RDFCallback { } @Override - public void processTriple(TripleString triple, long pos) { + public synchronized void processTriple(TripleString triple, long pos) { dict.insert(triple.getSubject(), TripleComponentRole.SUBJECT); dict.insert(triple.getPredicate(), TripleComponentRole.PREDICATE); dict.insert(triple.getObject(), TripleComponentRole.OBJECT); @@ -80,7 +80,7 @@ public TripleAppender2(TempDictionary dict, TempTriples triples, ProgressListene } @Override - public void processTriple(TripleString triple, long pos) { + public synchronized void processTriple(TripleString triple, long pos) { triples.insert(dict.stringToId(triple.getSubject(), TripleComponentRole.SUBJECT), dict.stringToId(triple.getPredicate(), TripleComponentRole.PREDICATE), dict.stringToId(triple.getObject(), TripleComponentRole.OBJECT));