diff --git a/.gitignore b/.gitignore
index deeb8432d..495a5eba9 100755
--- a/.gitignore
+++ b/.gitignore
@@ -58,3 +58,6 @@ data
wikidata
qendpoint-store/wdbench-indexes
wdbench-results
+testing
+indexing
+wdbench-indexes
diff --git a/pom.xml b/pom.xml
index e4845c971..ab40b6ff1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,10 +63,10 @@
UTF-8
- 17
- 17
- 17
- 17
+ 21
+ 21
+ 21
+ 21
diff --git a/qendpoint-backend/pom.xml b/qendpoint-backend/pom.xml
index 514353b79..c4affaa42 100644
--- a/qendpoint-backend/pom.xml
+++ b/qendpoint-backend/pom.xml
@@ -146,6 +146,16 @@
${java.source.version}
${java.target.version}
+
+
+
+ --add-exports
+ java.base/jdk.internal.misc=ALL-UNNAMED
+ --add-exports
+ java.base/jdk.internal.util=ALL-UNNAMED
+ --add-modules
+ jdk.incubator.vector
+
diff --git a/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java b/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java
index 7b25ce0ca..629b80440 100644
--- a/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java
+++ b/qendpoint-backend/src/main/java/com/the_qa_company/qendpoint/controller/Sparql.java
@@ -34,8 +34,8 @@
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebInputException;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
diff --git a/qendpoint-core/pom.xml b/qendpoint-core/pom.xml
index d390abb25..f3d7ba238 100644
--- a/qendpoint-core/pom.xml
+++ b/qendpoint-core/pom.xml
@@ -17,8 +17,18 @@
org.apache.maven.plugins
maven-compiler-plugin
- 17
- 17
+ 21
+ 21
+
+
+
+ --add-exports
+ java.base/jdk.internal.misc=ALL-UNNAMED
+ --add-exports
+ java.base/jdk.internal.util=ALL-UNNAMED
+ --add-modules
+ jdk.incubator.vector
+
@@ -75,7 +85,7 @@
org.apache.commons
commons-compress
- 1.21
+ 1.27.1
org.apache.jena
@@ -113,5 +123,22 @@
RoaringBitmap
${roaringbitmap.version}
+
+ it.unimi.dsi
+ fastutil
+ 8.5.15
+
+
+ org.spf4j
+ spf4j-core
+ 8.10.0
+
+
+ org.apache.avro
+ avro
+
+
+
+
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiRoaringBitmap.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiRoaringBitmap.java
index d63415a78..7f302abca 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiRoaringBitmap.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/bitmap/MultiRoaringBitmap.java
@@ -3,13 +3,13 @@
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.hdt.HDTVocabulary;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
+import org.spf4j.io.BufferedInputStream;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.util.io.CloseMappedByteBuffer;
import com.the_qa_company.qendpoint.core.util.io.Closer;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;
import org.roaringbitmap.RoaringBitmap;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/sequence/SequenceLog64Map.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/sequence/SequenceLog64Map.java
index e3ce9942d..60bda9327 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/sequence/SequenceLog64Map.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/compact/sequence/SequenceLog64Map.java
@@ -25,6 +25,7 @@
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.hdt.HDTVocabulary;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
+import org.spf4j.io.BufferedInputStream;
import com.the_qa_company.qendpoint.core.util.BitUtil;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
import com.the_qa_company.qendpoint.core.util.crc.CRC8;
@@ -35,7 +36,6 @@
import com.the_qa_company.qendpoint.core.util.io.CountInputStream;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionBig.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionBig.java
index 9a8a0fb54..0c74a2363 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionBig.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionBig.java
@@ -19,8 +19,6 @@
package com.the_qa_company.qendpoint.core.dictionary.impl.section;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -39,6 +37,8 @@
import com.the_qa_company.qendpoint.core.exceptions.IllegalFormatException;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
+import org.spf4j.io.BufferedInputStream;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.util.BitUtil;
import com.the_qa_company.qendpoint.core.util.Mutable;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionMap.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionMap.java
index 1a0775aac..918ded014 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionMap.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/dictionary/impl/section/PFCDictionarySectionMap.java
@@ -25,6 +25,7 @@
import com.the_qa_company.qendpoint.core.exceptions.IllegalFormatException;
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
+import org.spf4j.io.BufferedInputStream;
import com.the_qa_company.qendpoint.core.util.io.BigMappedByteBuffer;
import com.the_qa_company.qendpoint.core.compact.integer.VByte;
import com.the_qa_company.qendpoint.core.compact.sequence.Sequence;
@@ -40,7 +41,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/HDTManagerImpl.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/HDTManagerImpl.java
index 492421d91..11b3ea6a3 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/HDTManagerImpl.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/HDTManagerImpl.java
@@ -20,6 +20,7 @@
import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback;
import com.the_qa_company.qendpoint.core.rdf.RDFParserFactory;
import com.the_qa_company.qendpoint.core.rdf.TripleWriter;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.core.util.BitUtil;
import com.the_qa_company.qendpoint.core.util.Profiler;
@@ -35,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java
index 5b20e8378..384236ab2 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java
@@ -13,6 +13,7 @@
import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult;
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher;
+import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered;
import com.the_qa_company.qendpoint.core.listener.MultiThreadListener;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
@@ -100,10 +101,14 @@ public HDTDiskImporter(HDTOptions hdtFormat, ProgressListener progressListener,
throw new IllegalArgumentException("Number of workers should be positive!");
}
// maximum size of a chunk
- chunkSize = hdtFormat.getInt(HDTOptionsKeys.LOADER_DISK_CHUNK_SIZE_KEY, () -> getMaxChunkSize(this.workers));
+ long chunkSize = hdtFormat.getInt(HDTOptionsKeys.LOADER_DISK_CHUNK_SIZE_KEY,
+ () -> getMaxChunkSize(this.workers));
if (chunkSize < 0) {
throw new IllegalArgumentException("Negative chunk size!");
}
+ System.err.println("chunkSize: " + chunkSize);
+ this.chunkSize = ((((chunkSize / 1024 / 1024) / 32) * 32) * 1024 * 1024);
+ System.err.println("this.chunkSize: " + this.chunkSize);
long maxFileOpenedLong = hdtFormat.getInt(HDTOptionsKeys.LOADER_DISK_MAX_FILE_OPEN_KEY, 1024);
int maxFileOpened;
if (maxFileOpenedLong < 0 || maxFileOpenedLong > Integer.MAX_VALUE) {
@@ -178,11 +183,11 @@ public CompressTripleMapper compressDictionary(Iterator iterator)
throw new IllegalArgumentException("Dictionary already built! Use another importer instance!");
}
listener.notifyProgress(0,
- "Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with "
- + ways + "ways and " + workers + " worker(s)");
-
- AsyncIteratorFetcher source = new AsyncIteratorFetcher<>(iterator);
+ "Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, false)
+ + "iB with " + ways + "ways and " + workers + " worker(s)");
+ AsyncIteratorFetcherUnordered source = new AsyncIteratorFetcherUnordered<>(iterator);
+// AsyncIteratorFetcher source = new AsyncIteratorFetcher<>(iterator);
profiler.pushSection("section compression");
CompressionResult compressionResult;
try {
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTImpl.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTImpl.java
index 868e63db4..92c9742b6 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTImpl.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTImpl.java
@@ -52,6 +52,8 @@
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.options.HDTSpecification;
+import org.spf4j.io.BufferedInputStream;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.triples.DictionaryEntriesDiff;
import com.the_qa_company.qendpoint.core.triples.IteratorTripleID;
import com.the_qa_company.qendpoint.core.triples.IteratorTripleString;
@@ -74,8 +76,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -528,7 +528,7 @@ public void loadOrCreateIndex(ProgressListener listener, HDTOptions spec) throws
// SAVE
if (this.hdtFileName != null) {
- BufferedOutputStream out = null;
+ OutputStream out = null;
try {
out = new BufferedOutputStream(new FileOutputStream(versionName));
ci.clear();
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/WriteHDTImpl.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/WriteHDTImpl.java
index e9f6cf28e..fd481e10d 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/WriteHDTImpl.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/WriteHDTImpl.java
@@ -8,13 +8,13 @@
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.triples.IteratorTripleString;
import com.the_qa_company.qendpoint.core.triples.TriplesPrivate;
import com.the_qa_company.qendpoint.core.triples.impl.WriteBitmapTriples;
import com.the_qa_company.qendpoint.core.util.io.CloseSuppressPath;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java
index 05b47197f..72bb8388c 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java
@@ -18,6 +18,7 @@
import com.the_qa_company.qendpoint.core.util.listener.IntermediateListener;
import com.the_qa_company.qendpoint.core.util.string.ByteString;
import com.the_qa_company.qendpoint.core.util.string.CompactString;
+import org.apache.jena.base.Sys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl source,
MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict,
@@ -58,7 +60,8 @@ public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher fetcher, CloseSuppressPath out
}
if (tripleID % 100_000 == 0) {
- listener.notifyProgress(10, "reading triples " + tripleID);
+ // use start to measure how many triples are read per second
+ int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0));
+
+ listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond);
}
// too much ram allowed?
if (subjects.size() == Integer.MAX_VALUE - 6) {
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterHDT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterHDT.java
index b8260f78b..643649496 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterHDT.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterHDT.java
@@ -1,6 +1,5 @@
package com.the_qa_company.qendpoint.core.hdt.writer;
-import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -18,6 +17,7 @@
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.rdf.TripleWriter;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.triples.TempTriples;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.core.util.StopWatch;
@@ -42,7 +42,7 @@ public TripleWriterHDT(String baseUri, HDTOptions spec, String outFile, boolean
this.out = new BufferedOutputStream(
new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(outFile))));
} else {
- this.out = new BufferedOutputStream(new FileOutputStream(outFile));
+ this.out = new BufferedOutputStream(new FileOutputStream(outFile), 4 * 1024 * 1024);
}
close = true;
init();
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterNtriples.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterNtriples.java
index f7d0ea669..6b3abde17 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterNtriples.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/writer/TripleWriterNtriples.java
@@ -1,6 +1,5 @@
package com.the_qa_company.qendpoint.core.hdt.writer;
-import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileWriter;
@@ -11,6 +10,7 @@
import java.util.zip.GZIPOutputStream;
import com.the_qa_company.qendpoint.core.rdf.TripleWriter;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.triples.TripleString;
public class TripleWriterNtriples implements TripleWriter {
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncExceptionIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncExceptionIterator.java
new file mode 100644
index 000000000..40971700c
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncExceptionIterator.java
@@ -0,0 +1,12 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.concurrent.CompletableFuture;
+
+public /**
+ * The asynchronous iterator interface. Its nextFuture() returns a
+ * CompletableFuture that completes with null when the iterator is
+ * exhausted or an exception if something goes wrong.
+ */
+interface AsyncExceptionIterator {
+ CompletableFuture nextFuture();
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java
index 788b3ab8a..8eb9e77eb 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcher.java
@@ -1,6 +1,9 @@
package com.the_qa_company.qendpoint.core.iterator.utils;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@@ -13,9 +16,10 @@
*/
public class AsyncIteratorFetcher implements Supplier {
private final Iterator iterator;
- private final Lock lock = new ReentrantLock();
private boolean end;
+ private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();
+
public AsyncIteratorFetcher(Iterator iterator) {
this.iterator = iterator;
}
@@ -23,17 +27,48 @@ public AsyncIteratorFetcher(Iterator iterator) {
/**
* @return an element from the iterator, this method is thread safe
*/
+// @Override
+// public E get() {
+// lock.lock();
+// try {
+// if (iterator.hasNext()) {
+// return iterator.next();
+// }
+// end = true;
+// return null;
+// } finally {
+// lock.unlock();
+// }
+// }
+
@Override
public E get() {
- lock.lock();
- try {
- if (iterator.hasNext()) {
- return iterator.next();
+ E poll = queue.poll();
+ if (poll != null) {
+ return poll;
+ }
+
+ synchronized (this) {
+ poll = queue.poll();
+ if (poll != null) {
+ return poll;
}
+
+ ConcurrentLinkedQueue newqueue = new ConcurrentLinkedQueue<>();
+
+ for (int i = 0; i < 128 && iterator.hasNext(); i++) {
+ if (poll == null) {
+ poll = iterator.next();
+ }
+ newqueue.add(iterator.next());
+ }
+ this.queue = newqueue;
+ if (poll != null) {
+ return poll;
+ }
+
end = true;
return null;
- } finally {
- lock.unlock();
}
}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java
new file mode 100644
index 000000000..fa9930039
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java
@@ -0,0 +1,130 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Synchronise an iterator
+ *
+ * @param iterator type
+ * @author Antoine Willerval
+ */
+public class AsyncIteratorFetcherUnordered extends AsyncIteratorFetcher {
+
+ private static final int CORES = Runtime.getRuntime().availableProcessors();
+
+ public static final int BUFFER = 1024 * 4;
+ private final Iterator iterator;
+ private boolean end;
+ volatile Queue[] queue = new Queue[CORES * 2];
+
+ {
+ for (int i = 0; i < queue.length; i++) {
+ queue[i] = new ArrayDeque<>(BUFFER);
+ }
+ }
+
+ public AsyncIteratorFetcherUnordered(Iterator iterator) {
+ super(iterator);
+ this.iterator = iterator;
+ }
+
+ /**
+ * @return an element from the iterator, this method is thread safe
+ */
+ @Override
+ public E get() {
+
+ int index = (int) (Thread.currentThread().getId() % queue.length);
+
+ Queue es = queue[index];
+ if (es == null) {
+ for (Queue eQueue : queue) {
+ if (eQueue != null) {
+ synchronized (eQueue) {
+ E poll = eQueue.poll();
+
+ if (poll != null) {
+ return poll;
+ }
+ }
+ }
+ }
+ }
+
+ if (es != null) {
+ // With this approach there is some risk that a queue is filled but
+ // never emptied. Maybe we should look for another queue to read
+ // from
+ // before filling our own queue?
+ synchronized (es) {
+ E poll = es.poll();
+
+ if (poll != null) {
+ return poll;
+ }
+
+ synchronized (this) {
+ es = queue[index];
+ if (es != null) {
+
+ poll = es.poll();
+ if (poll == null) {
+ if (iterator.hasNext()) {
+ poll = iterator.next();
+ for (int i = 0; i < BUFFER && iterator.hasNext(); i++) {
+ es.add(iterator.next());
+ }
+ }
+
+ }
+
+ if (poll == null) {
+ queue[index] = null;
+ } else {
+ return poll;
+ }
+ }
+ }
+ }
+ }
+
+ for (Queue eQueue : queue) {
+ if (eQueue != null) {
+
+ synchronized (eQueue) {
+ synchronized (this) {
+ E poll = eQueue.poll();
+
+ if (poll != null) {
+ return poll;
+ }
+ }
+ }
+ }
+ }
+
+ synchronized (this) {
+ if (iterator.hasNext()) {
+ E poll = iterator.next();
+ return poll;
+ }
+ }
+
+ end = true;
+ return null;
+
+ }
+
+ /**
+ * @return is the end
+ */
+ public boolean isEnd() {
+ return end;
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncPreFetchIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncPreFetchIterator.java
new file mode 100644
index 000000000..f435123c0
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncPreFetchIterator.java
@@ -0,0 +1,116 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A wrapper that pre-fetches from a synchronous ExceptionIterator
+ * asynchronously. - Creates or uses a provided Executor (potentially a
+ * VirtualThreadPerTaskExecutor). - Encourages calling cancel() or close() if
+ * partial consumption is likely.
+ */
+class AsyncPreFetchIterator implements AsyncExceptionIterator, AutoCloseable {
+
+ private final ExceptionIterator iterator;
+
+ // Holds the current fetch (chained) so it happens strictly in sequence.
+ private CompletableFuture currentFetch;
+
+ private final AtomicBoolean hasMore = new AtomicBoolean(true);
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
+ private final AtomicReference exception = new AtomicReference<>(null);
+
+ private final Executor executor;
+ private final boolean ownExecutor;
+
+ public AsyncPreFetchIterator(ExceptionIterator iterator) {
+ this(iterator, null);
+ }
+
+ public AsyncPreFetchIterator(ExceptionIterator iterator, Executor executor) {
+ this.iterator = iterator;
+ if (executor == null) {
+ this.executor = Executors.newVirtualThreadPerTaskExecutor();
+ this.ownExecutor = true;
+ } else {
+ this.executor = executor;
+ this.ownExecutor = false;
+ }
+ // Schedule the initial fetch once
+ this.currentFetch = fetchNext(null);
+ }
+
+ /**
+ * nextFuture() returns the current future, then updates currentFetch so
+ * that the next fetch is chained after the current future completes.
+ */
+ @Override
+ public synchronized CompletableFuture nextFuture() {
+ CompletableFuture result = currentFetch;
+
+ // Chain the "next" fetch to happen strictly after the current result is
+ // done:
+ currentFetch = result.thenCompose(ignored -> fetchNext(null));
+ return result;
+ }
+
+ /**
+ * A peek method if needed to see the current element without advancing.
+ * This is safe: we do not schedule an additional fetch unless nextFuture()
+ * is called.
+ */
+ public synchronized CompletableFuture peekFuture() {
+ return currentFetch;
+ }
+
+ /**
+ * fetchNext() returns a future that (when run) checks hasNext() & next().
+ * It's always chained after the previous fetch, preventing parallel calls.
+ */
+ private CompletableFuture fetchNext(T ignored) {
+ // If already cancelled or exception set, do not schedule more tasks.
+ if (cancelled.getAcquire() || exception.getAcquire() != null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ if (iterator.hasNext()) {
+ return iterator.next();
+ } else {
+ hasMore.setRelease(false);
+ return null;
+ }
+ } catch (Exception ex) {
+ exception.compareAndSet(null, ex);
+ cancelled.setRelease(true);
+ throw new CompletionException(ex);
+ }
+ }, executor);
+ }
+
+ /**
+ * Cancel and prevent further scheduling.
+ */
+ public void cancel() {
+ cancelled.setRelease(true);
+ if (currentFetch != null) {
+ currentFetch.cancel(true);
+ }
+ }
+
+ /**
+ * Closes resources. If we own the executor, we shut it down.
+ */
+ @Override
+ public void close() {
+ cancel();
+ if (ownExecutor && executor instanceof ExecutorService) {
+ ((ExecutorService) executor).shutdownNow();
+ }
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncToSyncExceptionIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncToSyncExceptionIterator.java
new file mode 100644
index 000000000..5bf0f9d30
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncToSyncExceptionIterator.java
@@ -0,0 +1,60 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletionException;
+
+public class AsyncToSyncExceptionIterator implements ExceptionIterator {
+
+ private final AsyncExceptionIterator asyncIterator;
+ private T nextValue;
+ private boolean hasPrefetched = false;
+ private boolean finished = false;
+
+ public AsyncToSyncExceptionIterator(AsyncExceptionIterator asyncIterator) {
+ this.asyncIterator = asyncIterator;
+ }
+
+ @Override
+ public boolean hasNext() throws E {
+ if (finished) {
+ return false;
+ }
+ // If we haven't prefetched yet, do so
+ if (!hasPrefetched) {
+ fetchNext();
+ hasPrefetched = true;
+ }
+ return !finished;
+ }
+
+ @Override
+ public T next() throws E {
+ if (!hasNext()) {
+ throw new NoSuchElementException("Iterator exhausted");
+ }
+ // Return the prefetched value
+ T valueToReturn = nextValue;
+ // Immediately fetch the next one
+ fetchNext();
+ return valueToReturn;
+ }
+
+ private void fetchNext() throws E {
+ try {
+ T result = asyncIterator.nextFuture().join();
+ if (result == null) {
+ finished = true;
+ nextValue = null;
+ } else {
+ nextValue = result;
+ }
+ } catch (CompletionException ce) {
+ Throwable cause = ce.getCause();
+ if (cause instanceof Exception) {
+ // noinspection unchecked
+ throw (E) cause;
+ }
+ throw ce;
+ }
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeExceptionIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeExceptionIterator.java
index fbf7bf933..16ba7495a 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeExceptionIterator.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeExceptionIterator.java
@@ -1,5 +1,6 @@
package com.the_qa_company.qendpoint.core.iterator.utils;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -154,51 +155,56 @@ public MergeExceptionIterator(ExceptionIterator in1, ExceptionIterator implements ExceptionIterator {
+
+ private static final Logger log = LoggerFactory.getLogger(MergeExceptionParallelIterator.class);
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param itFunction a function to create an iterator from an element
+ * @param comp comparator for the merge iterator
+ * @param array the elements
+ * @param length the number of elements
+ * @param input of the element
+ * @param type of the element in the iterator
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static ExceptionIterator buildOfTree(
+ Function> itFunction, Comparator comp, I[] array, int length) {
+ return buildOfTree(itFunction, comp, array, 0, length);
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param itFunction a function to create an iterator from an element
+ * @param comp comparator for the merge iterator
+ * @param array the elements
+ * @param start the start of the array (inclusive)
+ * @param end the end of the array (exclusive)
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static ExceptionIterator buildOfTree(
+ Function> itFunction, Comparator comp, I[] array, int start, int end) {
+ return buildOfTree(itFunction, comp, Arrays.asList(array), start, end);
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param itFunction a function to create an iterator from an element
+ * @param comp comparator for the merge iterator
+ * @param array the elements
+ * @param start the start of the array (inclusive)
+ * @param end the end of the array (exclusive)
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static ExceptionIterator buildOfTree(
+ Function> itFunction, Comparator comp, List array, int start, int end) {
+ return buildOfTree((index, o) -> itFunction.apply(o), comp, array, start, end);
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param itFunction a function to create an iterator from an element
+ * @param array the elements
+ * @param start the start of the array (inclusive)
+ * @param end the end of the array (exclusive)
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static , E extends Exception> ExceptionIterator buildOfTree(
+ Function> itFunction, List array, int start, int end) {
+ return buildOfTree((index, o) -> itFunction.apply(o), Comparable::compareTo, array, start, end);
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param array the elements
+ * @param start the start of the array (inclusive)
+ * @param end the end of the array (exclusive)
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static , E extends Exception> ExceptionIterator buildOfTree(
+ List> array, int start, int end) {
+ return buildOfTree(Function.identity(), Comparable::compareTo, array, start, end);
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param array the elements
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static , E extends Exception> ExceptionIterator buildOfTree(
+ List> array) {
+ return MergeExceptionParallelIterator.buildOfTree(Function.identity(), Comparable::compareTo, array, 0,
+ array.size());
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param array the elements
+ * @param comparator comparator for the merge iterator
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static ExceptionIterator buildOfTree(List> array,
+ Comparator comparator) {
+ return buildOfTree(Function.identity(), comparator, array, 0, array.size());
+ }
+
+ /**
+ * Create a tree of merge iterators from an array of element
+ *
+ * @param itFunction a function to create an iterator from an element
+ * @param comp comparator for the merge iterator
+ * @param array the elements
+ * @param start the start of the array (inclusive)
+ * @param end the end of the array (exclusive)
+ * @param type of the element
+ * @param exception returned by the iterator
+ * @return the iterator
+ */
+ public static ExceptionIterator buildOfTree(
+ BiFunction> itFunction, Comparator comp, List array, int start,
+ int end) {
+ int length = end - start;
+ if (length <= 0) {
+ return ExceptionIterator.empty();
+ }
+ if (length == 1) {
+ return itFunction.apply(start, array.get(start));
+ }
+ int mid = (start + end) / 2;
+ return new MergeExceptionParallelIterator<>(buildOfTree(itFunction, comp, array, start, mid),
+ buildOfTree(itFunction, comp, array, mid, end), comp);
+ }
+
+ private final ExceptionIterator in1;
+ private final ExceptionIterator in2;
+ private final Comparator comp;
+ private final int chunkSize = 4096;
+ private final Executor executor = Executors.newVirtualThreadPerTaskExecutor();
+
+ // Each child's buffered items (at most chunkSize). We'll treat these like
+ // queues.
+ private final Deque buffer1 = new ArrayDeque<>();
+ private final Deque buffer2 = new ArrayDeque<>();
+
+ // Futures for the next chunk fetch (if currently in progress)
+ private CompletableFuture> future1 = null;
+ private CompletableFuture> future2 = null;
+
+ public MergeExceptionParallelIterator(ExceptionIterator in1, ExceptionIterator in2,
+ Comparator comp) {
+ this.in1 = in1;
+ this.in2 = in2;
+ this.comp = comp;
+ }
+
+ @Override
+ public boolean hasNext() throws E {
+ // Attempt to ensure we have at least one item available
+ prepareNextItem();
+ // If both buffers are empty now, we really have no more data
+ return !(buffer1.isEmpty() && buffer2.isEmpty());
+ }
+
+ @Override
+ public T next() throws E {
+ if (!hasNext()) {
+ return null; // or throw NoSuchElementException
+ }
+ // We know there's at least one item in buffer1 or buffer2
+ T result;
+ if (buffer1.isEmpty()) {
+ // Must come from buffer2
+ result = buffer2.pollFirst();
+ } else if (buffer2.isEmpty()) {
+ // Must come from buffer1
+ result = buffer1.pollFirst();
+ } else {
+ // Compare the heads
+ T head1 = buffer1.peekFirst();
+ T head2 = buffer2.peekFirst();
+ if (comp.compare(head1, head2) <= 0) {
+ result = buffer1.pollFirst();
+ } else {
+ result = buffer2.pollFirst();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public long getSize() {
+ long s1 = in1.getSize();
+ long s2 = in2.getSize();
+ if (s1 == -1 || s2 == -1) {
+ return -1;
+ }
+ return s1 + s2;
+ }
+
+ /**
+ * Ensures at least one buffer is non-empty if data remains. If both are
+ * empty, we fetch from both children in parallel.
+ */
+ private void prepareNextItem() throws E {
+ // If both buffers are already non-empty, nothing to do
+ if (!buffer1.isEmpty() || !buffer2.isEmpty()) {
+ return;
+ }
+
+ // We may need to start or finish a fetch for each child:
+ boolean need1 = buffer1.isEmpty() && in1.hasNext();
+ boolean need2 = buffer2.isEmpty() && in2.hasNext();
+
+// if (need1 && !need2) {
+// if (buffer2.size() < chunkSize / 2 && in2.hasNext()) {
+// need2 = true;
+// }
+// }
+// if (need2 && !need1) {
+// if (buffer1.size() < chunkSize / 2 && in1.hasNext()) {
+// need1 = true;
+// }
+// }
+
+ // If buffer1 is empty and child1 has data, ensure we have a future
+ if (need1 && future1 == null) {
+ future1 = fetchChunkAsync(in1, chunkSize);
+ }
+ // If buffer2 is empty and child2 has data, ensure we have a future
+ if (need2 && future2 == null) {
+ future2 = fetchChunkAsync(in2, chunkSize);
+ }
+
+ // If we started any future(s), wait for them all at once
+ if (future1 != null || future2 != null) {
+ CompletableFuture> f1 = (future1 != null) ? future1 : CompletableFuture.completedFuture(null);
+ CompletableFuture> f2 = (future2 != null) ? future2 : CompletableFuture.completedFuture(null);
+
+ // Wait for both to complete (parallel fetch)
+ CompletableFuture.allOf(f1, f2).join();
+
+ // Drain each completed future into its buffer
+ if (future1 != null) {
+ addToBuffer(future1, buffer1);
+ future1 = null;
+ }
+ if (future2 != null) {
+ addToBuffer(future2, buffer2);
+ future2 = null;
+ }
+ }
+ }
+
+ /**
+ * Helper to move the fetched chunk from a completed future into the buffer.
+ * Handles exceptions properly.
+ */
+ private void addToBuffer(CompletableFuture> future, Deque buffer) throws E {
+ List chunk;
+ try {
+ chunk = future.get(); // already done, so non-blocking
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while fetching chunk", ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof Exception ex) {
+ throw asE(ex);
+ } else {
+ throw new RuntimeException("Error in parallel chunk fetch", cause);
+ }
+ }
+ chunk.forEach(buffer::addLast);
+ }
+
+ /**
+ * Asynchronously fetch up to 'n' items from 'iter' on the executor.
+ */
+ private CompletableFuture> fetchChunkAsync(ExceptionIterator iter, int n) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return fetchChunk(iter, n);
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }, executor);
+ }
+
+ /**
+ * Synchronous fetch of up to 'n' items.
+ */
+ private List fetchChunk(ExceptionIterator iter, int n) throws E {
+ List chunk = new ArrayList<>(n);
+ while (chunk.size() < n && iter.hasNext()) {
+ chunk.add(iter.next());
+ }
+ return chunk;
+ }
+
+ @SuppressWarnings("unchecked")
+ private E asE(Exception ex) {
+ return (E) ex;
+ }
+
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeJoinZipper.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeJoinZipper.java
new file mode 100644
index 000000000..46d1d5332
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/MergeJoinZipper.java
@@ -0,0 +1,80 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * MergeJoinZipper builds a balanced merge tree from a list of sorted
+ * ExceptionIterator instances. The resulting ExceptionIterator is synchronous,
+ * but internally uses asynchronous prefetching and merging.
+ */
+public class MergeJoinZipper {
+
+ /**
+ * @param iterators A list of sorted, synchronous ExceptionIterator
+ * objects.
+ * @param comparator The comparator used to merge them.
+ * @return A final ExceptionIterator that merges all input in sorted order.
+ */
+ public static ExceptionIterator buildMergeTree(
+ List> iterators, Comparator comparator) {
+ if (iterators.isEmpty()) {
+ return new EmptyExceptionIterator<>();
+ }
+ if (iterators.size() == 1) {
+ return wrapAsync(iterators.get(0));
+ }
+
+ // 1) Wrap each synchronous iterator in an AsyncPreFetchIterator,
+ // but store them in a list as AsyncExceptionIterator.
+ List> asyncIters = new ArrayList<>();
+ for (ExceptionIterator it : iterators) {
+ asyncIters.add(new AsyncPreFetchIterator<>(it));
+ }
+
+ // 2) Pairwise merge them until only one remains
+ while (asyncIters.size() > 1) {
+ List> merged = new ArrayList<>();
+ for (int i = 0; i < asyncIters.size(); i += 2) {
+ if (i + 1 < asyncIters.size()) {
+ AsyncExceptionIterator left = asyncIters.get(i);
+ AsyncExceptionIterator right = asyncIters.get(i + 1);
+ // Now you can merge them in a ZipperAsyncIterator
+ merged.add(new ParallelZipperAsyncIterator<>(left, right, comparator));
+ } else {
+ merged.add(asyncIters.get(i));
+ }
+ }
+ asyncIters = merged;
+ }
+
+ // 3) Wrap the final AsyncExceptionIterator in a synchronous
+ // AsyncToSyncExceptionIterator
+ return new AsyncToSyncExceptionIterator<>(asyncIters.get(0));
+ }
+
+ /**
+ * Helper method for the single-iterator case.
+ */
+ private static ExceptionIterator wrapAsync(ExceptionIterator iterator) {
+ AsyncPreFetchIterator async = new AsyncPreFetchIterator<>(iterator);
+ return new AsyncToSyncExceptionIterator<>(async);
+ }
+}
+
+/**
+ * A simple empty iterator implementation (used if the list of iterators is
+ * empty).
+ */
+class EmptyExceptionIterator implements ExceptionIterator {
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public T next() {
+ throw new java.util.NoSuchElementException("Empty iterator");
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ParallelMerge.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ParallelMerge.java
new file mode 100644
index 000000000..ff7246187
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ParallelMerge.java
@@ -0,0 +1,378 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+/**
+ * Abstraction for a concurrent stream of T. nextElement() => returns a Future
+ * that completes with the next item, or null if done.
+ */
+interface MergeSource {
+ Future nextElement() throws IOException;
+
+ boolean hasMore() throws IOException;
+
+ boolean exhausted();
+}
+
+/**
+ * Leaf node that pulls from a single Iterator. Each nextElement() spawns a
+ * task that does iterator.next().
+ */
+class LeafSource implements MergeSource {
+ private final ExceptionIterator it;
+ private final ForkJoinPool pool;
+ private volatile boolean exhausted = false;
+
+ LeafSource(ExceptionIterator it, ForkJoinPool pool) {
+ this.it = it;
+ this.pool = pool;
+ }
+
+ @Override
+ public synchronized boolean hasMore() {
+ return !exhausted;
+ }
+
+ @Override
+ public boolean exhausted() {
+ if (exhausted)
+ return true;
+ try {
+ if (!it.hasNext()) {
+ return true;
+ }
+ } catch (Exception e) {
+
+ }
+ return exhausted;
+ }
+
+ @Override
+ public Future nextElement() {
+ if (!hasMore()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ CompletableFuture cf = new CompletableFuture<>();
+ pool.submit(() -> {
+ T val = null;
+ synchronized (LeafSource.this) {
+ try {
+ if (!exhausted && it.hasNext()) {
+ val = it.next();
+ } else {
+ exhausted = true;
+ }
+ } catch (Exception e) {
+ exhausted = true;
+ cf.completeExceptionally(e);
+ return;
+ }
+ }
+ cf.complete(val); // Will be null if exhausted
+ });
+ return cf;
+ }
+}
+
+/**
+ * A MergeNode that merges two children in parallel by prefetching into small
+ * queues.
+ */
+class MergeNode implements MergeSource {
+ private final MergeSource left;
+ private final MergeSource right;
+ private final Comparator super T> comp;
+ private final ForkJoinPool pool;
+
+ // Bounded queues to hold pre-fetched items from each child.
+ // In practice you might pick a different capacity or structure.
+ private final BlockingQueue leftQueue;
+ private final BlockingQueue rightQueue;
+
+ // Flags to indicate if we've exhausted each side.
+ private volatile boolean leftExhausted = false;
+ private volatile boolean rightExhausted = false;
+
+ // Constant to define how many items we prefetch from each child at a time.
+ private static final int PREFETCH_CAPACITY = 4;
+
+ MergeNode(MergeSource left, MergeSource right, Comparator super T> comp, ForkJoinPool pool) {
+ this.left = left;
+ this.right = right;
+ this.comp = comp;
+ this.pool = pool;
+ // A small queue for each side:
+ this.leftQueue = new LinkedBlockingQueue<>(PREFETCH_CAPACITY);
+ this.rightQueue = new LinkedBlockingQueue<>(PREFETCH_CAPACITY);
+
+ // Kick off initial fill
+ ensurePrefetch(left, leftQueue, () -> leftExhausted);
+ ensurePrefetch(right, rightQueue, () -> rightExhausted);
+ }
+
+ /**
+ * We have more if either queue is non-empty or that side can still produce
+ * more.
+ */
+ @Override
+ public boolean hasMore() {
+ if (!leftQueue.isEmpty() || !rightQueue.isEmpty()) {
+ return true;
+ }
+ if ((leftExhausted || left.exhausted()) && (rightExhausted || right.exhausted())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean exhausted() {
+ return !hasMore();
+ }
+
+ @Override
+ public Future nextElement() throws IOException {
+ if (!hasMore()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture cf = new CompletableFuture<>();
+
+ // We'll pick from the heads of both queues (blocking if empty).
+ // But to remain asynchronous, we do that in a pool thread:
+ pool.submit(() -> {
+ try {
+ // Wait for an item from each queue if available, or null if
+ // side is exhausted:
+ left.exhausted();
+ T leftVal = pollOrNull(leftQueue, () -> leftExhausted || left.exhausted());
+ T rightVal = pollOrNull(rightQueue, () -> rightExhausted || right.exhausted());
+
+ // If both sides are null => everything is exhausted
+ if (leftVal == null && rightVal == null) {
+ cf.complete(null);
+ return;
+ }
+ if (leftVal != null && rightVal == null) {
+ // only left side had an item
+ cf.complete(leftVal);
+ // Re‐prefetch next from left
+ ensurePrefetch(left, leftQueue, () -> leftExhausted);
+ return;
+ }
+ if (leftVal == null && rightVal != null) {
+ // only right side had an item
+ cf.complete(rightVal);
+ ensurePrefetch(right, rightQueue, () -> rightExhausted);
+ return;
+ }
+ // Both are non-null. Pick the smaller, put the other back in
+ // its queue
+ if (comp.compare(leftVal, rightVal) <= 0) {
+ // leftVal is chosen
+ cf.complete(leftVal);
+ // Put the rightVal back into the rightQueue (front)
+ rightQueue.put(rightVal);
+ // Now we can refill left side again
+ ensurePrefetch(left, leftQueue, () -> leftExhausted);
+ } else {
+ // rightVal is chosen
+ cf.complete(rightVal);
+ // Put the leftVal back
+ leftQueue.put(leftVal);
+ // Refill right side
+ ensurePrefetch(right, rightQueue, () -> rightExhausted);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ cf.completeExceptionally(e);
+ } catch (Exception e) {
+ cf.completeExceptionally(e);
+ }
+ });
+
+ return cf;
+ }
+
+ /**
+ * Poll one item from the queue. If the queue is empty but not exhausted, we
+ * block. If it's exhausted and empty, return null.
+ */
+ private T pollOrNull(BlockingQueue queue, Supplier isExhausted) throws InterruptedException {
+ // If queue is non-empty, take() won't block long.
+ // If it's empty but not exhausted, we might wait for the next item
+ // (unless no more is coming).
+ while (true) {
+ if (!queue.isEmpty()) {
+ return queue.take();
+ }
+ if (isExhausted.get()) {
+ // The child can't produce more
+ return null;
+ }
+ // If not exhausted and the queue is empty,
+ // we wait a bit to see if new items arrive from prefetch
+ // (Though typically ensurePrefetch will produce them soon.)
+ Thread.sleep(1); // simplistic small sleep; or use e.g.
+ // queue.poll(timeout)
+ }
+ }
+
+ /**
+ * Ensures each child is prefetching new items up to the queue's capacity,
+ * asynchronously.
+ */
+ private void ensurePrefetch(MergeSource child, BlockingQueue queue, Supplier exhaustedFlag) {
+ if (exhaustedFlag.get()) {
+ return; // already exhausted
+ }
+
+ // While the queue still has capacity, request the next item.
+ // We'll do this in a loop (but asynchronously) so that we fill up to
+ // capacity.
+ pool.submit(() -> {
+ try {
+ while (!exhaustedFlag.get() && !exhausted() && queue.remainingCapacity() > 0) {
+
+ // fetch next item
+ Future fut = child.nextElement();
+ T val = fut.get(10, TimeUnit.SECONDS); // block in a pool
+ // thread
+ if (val == null) {
+ // child exhausted
+ setExhausted(exhaustedFlag);
+ break;
+ }
+ queue.put(val);
+
+ }
+ } catch (Exception e) {
+ // Mark exhausted or propagate error somehow
+ setExhausted(exhaustedFlag);
+ }
+ });
+ }
+
+ private synchronized void setExhausted(Supplier exhaustedFlag) {
+ // Ugly but easy: if exhaustedFlag points to leftExhausted, set it, else
+ // set rightExhausted
+ // A better design might store a boolean or do a callback.
+ Supplier isLeftExhausted = this::isLeftExhausted;
+ if (exhaustedFlag == isLeftExhausted) {
+ leftExhausted = true;
+ } else {
+ rightExhausted = true;
+ }
+ }
+
+ private boolean isLeftExhausted() {
+ return leftExhausted;
+ }
+
+ private boolean isRightExhausted() {
+ return rightExhausted;
+ }
+}
+
+/**
+ * Build a balanced merge tree from a list of Iterators.
+ */
+class ParallelMergeBuilder {
+ public static MergeSource buildMergeTree(
+ List> iterators, Comparator super T> comparator, ForkJoinPool pool) {
+
+ int n = iterators.size();
+ if (n == 0) {
+ return new MergeSource<>() {
+ @Override
+ public Future nextElement() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean hasMore() {
+ return false;
+ }
+
+ @Override
+ public boolean exhausted() {
+ return true;
+ }
+ };
+ }
+ if (n == 1) {
+ return new LeafSource<>(iterators.get(0), pool);
+ }
+ // Split in half
+ int mid = n / 2;
+ MergeSource left = buildMergeTree(iterators.subList(0, mid), comparator, pool);
+ MergeSource right = buildMergeTree(iterators.subList(mid, n), comparator, pool);
+ return new MergeNode<>(left, right, comparator, pool);
+ }
+}
+
+/**
+ * Convert a MergeSource into a normal Iterator.
+ */
+class ParallelMergeIterator implements ExceptionIterator {
+ private final MergeSource root;
+ private T nextItem;
+
+ ParallelMergeIterator(MergeSource root) {
+ this.root = root;
+// fetchNext();
+ }
+
+ private void fetchNext() throws IOException {
+ if (!root.hasMore()) {
+ nextItem = null;
+ return;
+ }
+ try {
+ nextItem = root.nextElement().get();
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextItem != null;
+ }
+
+ @Override
+ public T next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ T ret = nextItem;
+ fetchNext();
+ return ret;
+ }
+}
+
+public class ParallelMerge {
+
+ public static ExceptionIterator parallelMergeJoin(
+ List> iterators, Comparator comparator) {
+
+ ForkJoinPool pool = ForkJoinPool.commonPool();
+ MergeSource root = ParallelMergeBuilder.buildMergeTree(iterators, comparator, pool);
+ return new ParallelMergeIterator<>(root);
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ParallelZipperAsyncIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ParallelZipperAsyncIterator.java
new file mode 100644
index 000000000..c91c6e13e
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ParallelZipperAsyncIterator.java
@@ -0,0 +1,114 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * ParallelZipperAsyncIterator merges two sorted AsyncExceptionIterator sources
+ * into a single sorted stream. Both sides always have exactly one fetch in
+ * progress, which maximizes concurrency. The main steps in nextFuture(): 1.
+ * Wait for leftCurrent and rightCurrent to complete, giving (leftVal,
+ * rightVal). 2. Create a Result object indicating which value to return and
+ * which side(s) to advance. 3. Then compose (flatten) that into a single future
+ * that schedules new fetches for whichever side was consumed, and returns the
+ * chosen value to the caller.
+ */
+public class ParallelZipperAsyncIterator implements AsyncExceptionIterator {
+
+ private final AsyncExceptionIterator left; // The left input stream
+ private final AsyncExceptionIterator right; // The right input stream
+ private final Comparator comparator; // Comparator for sorting
+
+ // Each side holds one "current" item in flight
+ private CompletableFuture leftCurrent;
+ private CompletableFuture rightCurrent;
+
+ /**
+ * Constructs a ParallelZipperAsyncIterator from two AsyncExceptionIterator
+ * sources. Immediately fetches one element from each side.
+ */
+ public ParallelZipperAsyncIterator(AsyncExceptionIterator left, AsyncExceptionIterator right,
+ Comparator comparator) {
+ this.left = left;
+ this.right = right;
+ this.comparator = comparator;
+
+ // Start fetching one item on each side
+ this.leftCurrent = left.nextFuture();
+ this.rightCurrent = right.nextFuture();
+ }
+
+ /**
+ * nextFuture(): 1. Waits for both sides' current futures to complete ->
+ * (leftVal, rightVal). 2. Decides which item to return and which side to
+ * advance, building a small Result object. 3. thenCompose on that Result to
+ * schedule side fetches if needed and return the chosen value.
+ */
+ @Override
+ public CompletableFuture nextFuture() {
+ // Combine the two futures to get leftVal and rightVal once both
+ // complete
+ CompletableFuture combined = leftCurrent.thenCombine(rightCurrent, (leftVal, rightVal) -> {
+ if (leftVal == null && rightVal == null) {
+ // Both sides exhausted
+ return new Result(null, false, false);
+ } else if (leftVal == null) {
+ // Left exhausted, return rightVal, advance right
+ return new Result(rightVal, false, true);
+ } else if (rightVal == null) {
+ // Right exhausted, return leftVal, advance left
+ return new Result(leftVal, true, false);
+ } else {
+ // Both non-null, pick the smaller
+ if (comparator.compare(leftVal, rightVal) <= 0) {
+ // Use left
+ return new Result(leftVal, true, false);
+ } else {
+ // Use right
+ return new Result(rightVal, false, true);
+ }
+ }
+ });
+
+ // Now we flatten combined (a Future) into a Future.
+ // In the .thenCompose, we schedule new fetches for whichever side was
+ // consumed
+ // and return the chosen value (which might be null if both exhausted).
+ return combined.thenCompose(res -> {
+ // If res.value == null => both are exhausted
+ if (res.value == null) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ // If we used left side => schedule a new fetch for left
+ if (res.advanceLeft) {
+ leftCurrent = left.nextFuture();
+ }
+ // If we used right side => schedule a new fetch for right
+ if (res.advanceRight) {
+ rightCurrent = right.nextFuture();
+ }
+ return CompletableFuture.completedFuture(res.value);
+ }
+ }).exceptionally(ex -> {
+ // If an exception occurs, rethrow it as a CompletionException
+ throw new CompletionException(ex);
+ });
+ }
+
+ /**
+ * A small helper class that indicates which item we decided to return, and
+ * whether we want to advance the left or right side.
+ */
+ private class Result {
+ final T value;
+ final boolean advanceLeft;
+ final boolean advanceRight;
+
+ Result(T value, boolean advanceLeft, boolean advanceRight) {
+ this.value = value;
+ this.advanceLeft = advanceLeft;
+ this.advanceRight = advanceRight;
+ }
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java
index ab9aa05a0..da510fd15 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIterator.java
@@ -110,7 +110,7 @@ public T get() {
}
}
- private final ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(16);
+ private final ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(128);
private T next;
private boolean end;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java
new file mode 100644
index 000000000..c8e426615
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java
@@ -0,0 +1,383 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+/**
+ * a utility class to create an iterator from the value returned by another
+ * Thread
+ *
+ * @param the iterator type
+ * @author Antoine Willerval
+ */
+
+public class PipedCopyIteratorUnordered extends PipedCopyIterator {
+
+ private static final int CORES = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * RuntimeException generated by the PipedCopyIterator
+ *
+ * @author Antoine Willerval
+ */
+ public static class PipedIteratorException extends RuntimeException {
+ public PipedIteratorException(String message, Throwable t) {
+ super(message, t);
+ }
+ }
+
+ /**
+ * Callback for the
+ * {@link #createOfCallback(PipedCopyIteratorUnordered.PipeCallBack)} method
+ *
+ * @param the iterator type
+ * @author Antoine Willerval
+ */
+ @FunctionalInterface
+ public interface PipeCallBack {
+ /**
+ * method called from the new thread to generate the new data, at the
+ * end of the callback, the pipe is closed with or without exception
+ *
+ * @param pipe the pipe to fill
+ * @throws Exception any exception returned by the generator
+ */
+ void createPipe(PipedCopyIteratorUnordered pipe) throws Exception;
+ }
+
+ /**
+ * create a piped iterator from a callback runner, the call to the callback
+ * should be made in the callbackRunner
+ *
+ * @param callbackRunner the callback runner
+ * @param type of the iterator
+ * @return the iterator
+ */
+ public static PipedCopyIteratorUnordered createOfCallback(PipeCallBack callbackRunner) {
+ PipedCopyIteratorUnordered pipe = new PipedCopyIteratorUnordered<>();
+
+ Thread thread = new Thread(() -> {
+ try {
+ callbackRunner.createPipe(pipe);
+ pipe.closePipe();
+ } catch (Throwable e) {
+ pipe.closePipe(e);
+ }
+ }, "PipeIterator");
+ thread.start();
+
+ // close the thread at end
+ pipe.attachThread(thread);
+
+ return pipe;
+ }
+
+ private interface QueueObject {
+ boolean end();
+
+ T get();
+ }
+
+ private class ElementQueueObject implements QueueObject {
+ private final T obj;
+
+ private ElementQueueObject(T obj) {
+ this.obj = obj;
+ }
+
+ @Override
+ public boolean end() {
+ return false;
+ }
+
+ @Override
+ public T get() {
+ return obj;
+ }
+ }
+
+ private class EndQueueObject implements QueueObject {
+ @Override
+ public boolean end() {
+ return true;
+ }
+
+ @Override
+ public T get() {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private final ArrayBlockingQueue>[] queue = new ArrayBlockingQueue[CORES * 2];
+
+ {
+ for (int i = 0; i < queue.length; i++) {
+ queue[i] = new ArrayBlockingQueue<>(16 * 1024);
+ }
+ }
+
+ private final AtomicBoolean[] queueEnd = new AtomicBoolean[queue.length];
+
+ {
+ for (int i = 0; i < queueEnd.length; i++) {
+ queueEnd[i] = new AtomicBoolean(false);
+ }
+ }
+
+ private T next;
+ private boolean end;
+ private PipedIteratorException exception;
+
+ private Thread thread;
+
+ volatile ArrayBlockingQueue> focusQueue;
+
+ @Override
+ public boolean hasNext() {
+ if (end) {
+ return false;
+ }
+ if (next != null) {
+ return true;
+ }
+
+ QueueObject obj;
+ try {
+ obj = useFocusQueue();
+
+ if (obj == null) {
+ obj = useThreadBasedQueue();
+ }
+
+ } catch (InterruptedException e) {
+ throw new PipedIteratorException("Can't read pipe", e);
+ }
+
+ if (obj == null || obj.end()) {
+ obj = checkAllQueues(obj);
+ }
+
+ if (obj.end()) {
+ end = true;
+ if (exception != null) {
+ throw exception;
+ }
+ return false;
+ }
+ next = obj.get();
+ return true;
+ }
+
+ private QueueObject useThreadBasedQueue() throws InterruptedException {
+ QueueObject obj;
+ int i = Thread.currentThread().hashCode();
+ obj = queue[i % queue.length].poll();
+ if (obj == null) {
+ obj = iterateThroughAllQueues(obj);
+ } else if (obj.end()) {
+ setQueueEnd(queue[i % queue.length]);
+ } else if (focusQueue == null) {
+ focusQueue = queue[i % queue.length];
+ }
+ return obj;
+ }
+
+ private QueueObject checkAllQueues(QueueObject originalObj) {
+ QueueObject obj = null;
+ boolean done;
+
+ do {
+ done = true;
+ for (int i = 0; i < queue.length; i++) {
+ if (queueEnd[i].get()) {
+ continue;
+ }
+ done = false;
+ ArrayBlockingQueue> queueObjects = queue[i];
+ obj = queueObjects.poll();
+ if (obj == null) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else if (!obj.end()) {
+ return obj;
+ } else {
+ queueEnd[i].set(true);
+ }
+ }
+ } while (!done);
+
+ if (obj == null) {
+ obj = originalObj;
+ }
+
+ return obj;
+ }
+
+ private QueueObject iterateThroughAllQueues(QueueObject obj) throws InterruptedException {
+ while (obj == null) {
+ for (int i = 0; i < queue.length; i++) {
+ if (queueEnd[i].get()) {
+ continue;
+ }
+ ArrayBlockingQueue> queueObjects = queue[i];
+ obj = queueObjects.poll();
+ if (obj != null) {
+ if (obj.end()) {
+ queueEnd[i].set(true);
+ } else if (focusQueue == null) {
+ focusQueue = queueObjects;
+ }
+ return obj;
+ }
+ }
+ Thread.sleep(10);
+ }
+ return obj;
+ }
+
+ private QueueObject useFocusQueue() throws InterruptedException {
+ QueueObject obj;
+ var focusQueue = this.focusQueue;
+ if (focusQueue != null) {
+ QueueObject poll = focusQueue.poll();
+ if (poll != null) {
+ obj = poll;
+ if (obj.end()) {
+ setQueueEnd(focusQueue);
+ }
+ } else {
+ obj = null;
+ this.focusQueue = null;
+ }
+ } else {
+ obj = null;
+ }
+ return obj;
+ }
+
+ private void setQueueEnd(ArrayBlockingQueue> focusQueue) {
+ for (int i = 0; i < queue.length; i++) {
+ if (queue[i] == focusQueue) {
+ queueEnd[i].set(true);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ return null;
+ }
+ T next = this.next;
+ this.next = null;
+ return next;
+ }
+
+ public void closePipe() {
+ closePipe(null);
+ }
+
+ public void closePipe(Throwable e) {
+ if (e != null) {
+ // clear the queue to force the exception
+ for (ArrayBlockingQueue> queueObjects : queue) {
+ queueObjects.clear();
+ }
+ if (e instanceof PipedIteratorException) {
+ this.exception = (PipedIteratorException) e;
+ } else {
+ this.exception = new PipedIteratorException("closing exception", e);
+ }
+ }
+ try {
+ for (ArrayBlockingQueue> queueObjects : queue) {
+ queueObjects.put(new EndQueueObject());
+ }
+ } catch (InterruptedException ee) {
+ throw new PipedIteratorException("Can't close pipe", ee);
+ }
+ }
+
+ /**
+ * map this iterator to another type
+ *
+ * @param mappingFunction the mapping function
+ * @param the future type
+ * @return mapped iterator
+ */
+ public Iterator map(Function mappingFunction) {
+ return new MapIterator<>(this, mappingFunction);
+ }
+
+ /**
+ * map this iterator to another type
+ *
+ * @param mappingFunction the mapping function
+ * @param the future type
+ * @return mapped iterator
+ */
+ public Iterator mapWithId(MapIterator.MapWithIdFunction mappingFunction) {
+ return new MapIterator<>(this, mappingFunction);
+ }
+
+ AtomicInteger index = new AtomicInteger(0);
+
+ public void addElement(T node) {
+ int i = Thread.currentThread().hashCode();
+ int l = i % queue.length;
+ try {
+ boolean success = queue[l].offer(new ElementQueueObject(node));
+ if (!success) {
+ focusQueue = queue[l];
+ while (!success) {
+ for (ArrayBlockingQueue> queueObjects : queue) {
+ success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS);
+ if (success) {
+ break;
+ }
+ }
+ }
+ }
+
+ } catch (InterruptedException ee) {
+ throw new PipedIteratorException("Can't add element to pipe", ee);
+ }
+ }
+
+ /**
+ * attach a thread to interrupt with this iterator
+ *
+ * @param thread the thread
+ */
+ public void attachThread(Thread thread) {
+ Objects.requireNonNull(thread, "thread can't be null!");
+ if (this.thread != null && this.thread != thread) {
+ throw new IllegalArgumentException("Thread already attached");
+ }
+ this.thread = thread;
+ }
+
+ /**
+ * Allow receiving again elements after an end node
+ */
+ public void reset() {
+ this.end = false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ZipperAsyncIterator.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ZipperAsyncIterator.java
new file mode 100644
index 000000000..920825956
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/ZipperAsyncIterator.java
@@ -0,0 +1,82 @@
+package com.the_qa_company.qendpoint.core.iterator.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * ZipperAsyncIterator merges two sorted AsyncExceptionIterator streams into a
+ * single sorted stream. It stores a "buffer" future for each side, retrieves
+ * them in parallel, and compares their values.
+ */
+public class ZipperAsyncIterator implements AsyncExceptionIterator {
+ private final AsyncExceptionIterator left;
+ private final AsyncExceptionIterator right;
+ private final Comparator comparator;
+
+ // Buffers for each side. Each call to nextFuture() will compare these
+ // values and advance the used side.
+ private CompletableFuture leftBuffer;
+ private CompletableFuture rightBuffer;
+
+ /**
+ * Constructs a ZipperAsyncIterator from two AsyncExceptionIterators and a
+ * comparator. We initialize each side's buffer by calling nextFuture()
+ * once.
+ */
+ public ZipperAsyncIterator(AsyncExceptionIterator left, AsyncExceptionIterator right,
+ Comparator comparator) {
+ this.left = left;
+ this.right = right;
+ this.comparator = comparator;
+
+ // Initialize each buffer with one fetched value.
+ this.leftBuffer = left.nextFuture();
+ this.rightBuffer = right.nextFuture();
+ }
+
+ /**
+ * nextFuture() returns a future that, when complete, yields the next merged
+ * element (or null if both sides are exhausted). We compare the two
+ * buffered values and advance only the side whose element is chosen.
+ */
+ @Override
+ public CompletableFuture nextFuture() {
+ // Combine the two buffer futures into a single future-of-a-future.
+ // When both buffers resolve, compare them to see which side's value to
+ // consume.
+ CompletableFuture> combined = leftBuffer.thenCombine(rightBuffer, (leftVal, rightVal) -> {
+ if (leftVal == null && rightVal == null) {
+ // Both sides are exhausted
+ return CompletableFuture.completedFuture(null);
+ } else if (leftVal == null) {
+ // Left is exhausted, so return rightVal and advance right side
+ CompletableFuture toReturn = CompletableFuture.completedFuture(rightVal);
+ rightBuffer = right.nextFuture(); // fetch the next from the
+ // right
+ return toReturn;
+ } else if (rightVal == null) {
+ // Right is exhausted, so return leftVal and advance left side
+ CompletableFuture toReturn = CompletableFuture.completedFuture(leftVal);
+ leftBuffer = left.nextFuture(); // fetch the next from the left
+ return toReturn;
+ } else {
+ // Both sides have a value; compare them
+ if (comparator.compare(leftVal, rightVal) <= 0) {
+ // left is smaller (or equal)
+ CompletableFuture toReturn = CompletableFuture.completedFuture(leftVal);
+ leftBuffer = left.nextFuture(); // refill from left side
+ return toReturn;
+ } else {
+ // right is smaller
+ CompletableFuture toReturn = CompletableFuture.completedFuture(rightVal);
+ rightBuffer = right.nextFuture(); // refill from right side
+ return toReturn;
+ }
+ }
+ });
+
+ // combined is a Future>. We flatten it to Future with
+ // thenCompose.
+ return combined.thenCompose(f -> f);
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/ControlInfo.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/ControlInfo.java
index d1c1696ef..743804610 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/ControlInfo.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/ControlInfo.java
@@ -19,7 +19,8 @@
package com.the_qa_company.qendpoint.core.options;
-import java.io.BufferedOutputStream;
+import org.spf4j.io.BufferedOutputStream;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -43,7 +44,7 @@ enum Type {
void setFormat(String format);
default void save(Path filename) throws IOException {
- try (BufferedOutputStream os = new BufferedOutputStream(Files.newOutputStream(filename))) {
+ try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(filename))) {
save(os);
}
}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsFile.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsFile.java
index 8f93c5f39..9c6d56201 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsFile.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/options/HDTOptionsFile.java
@@ -2,13 +2,13 @@
import com.the_qa_company.qendpoint.core.compact.integer.VByte;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
+import org.spf4j.io.BufferedInputStream;
+import org.spf4j.io.BufferedOutputStream;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
import com.the_qa_company.qendpoint.core.util.crc.CRCInputStream;
import com.the_qa_company.qendpoint.core.util.crc.CRCOutputStream;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java
index 6653451f8..9aaa41bcb 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java
@@ -21,6 +21,7 @@
import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
+import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIteratorUnordered;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser;
@@ -96,8 +97,9 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio
*/
public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream,
String baseUri, boolean keepBNode, RDFNotation notation) {
- return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode,
- (triple, pos) -> pipe.addElement(triple.tripleToString())));
+ return PipedCopyIteratorUnordered
+ .createOfCallback((PipedCopyIteratorUnordered.PipeCallBack) pipe -> parser.doParse(stream,
+ baseUri, notation, keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString())));
}
/**
@@ -111,8 +113,9 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p
*/
public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri,
boolean keepBNode, RDFNotation notation) {
- return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode,
- (triple, pos) -> pipe.addElement(triple.tripleToString())));
+ return PipedCopyIteratorUnordered
+ .createOfCallback((PipedCopyIteratorUnordered.PipeCallBack) pipe -> parser.doParse(file,
+ baseUri, notation, keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString())));
}
}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java
new file mode 100644
index 000000000..39e4c96b7
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java
@@ -0,0 +1,126 @@
+package com.the_qa_company.qendpoint.core.rdf.parsers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class ConcurrentInputStream {
+
+ private static final Logger log = LoggerFactory.getLogger(ConcurrentInputStream.class);
+ private final InputStream source;
+ private final int numberOfStreams;
+
+ private PipedInputStream[] pipedInputStreams;
+ private PipedOutputStream[] pipedOutputStreams;
+
+ private PipedInputStream bnodeInputStream;
+ private PipedOutputStream bnodeOutputStream;
+
+ private Thread readerThread;
+
+ public ConcurrentInputStream(InputStream stream, int numberOfStreams) {
+ this.source = stream;
+ this.numberOfStreams = numberOfStreams;
+ setupPipes();
+ startReadingThread();
+ }
+
+ private void setupPipes() {
+ pipedInputStreams = new PipedInputStream[numberOfStreams];
+ pipedOutputStreams = new PipedOutputStream[numberOfStreams];
+
+ // The size of the pipes needs to be larger than the buffer of the
+ // buffered reader that Jena uses inside the parser, which is 131072
+ // bytes. If our pipeSize is too small it limits the ability for the
+ // parsers to work concurrently.
+ int pipeSize = 131072 * 1024;
+
+ try {
+ // Set up main fan-out pipes
+ for (int i = 0; i < numberOfStreams; i++) {
+ pipedOutputStreams[i] = new PipedOutputStream();
+ pipedInputStreams[i] = new PipedInputStream(pipedOutputStreams[i], pipeSize);
+ }
+
+ // Set up bnode pipe
+ bnodeOutputStream = new PipedOutputStream();
+ bnodeInputStream = new PipedInputStream(bnodeOutputStream, pipeSize);
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error creating pipes", e);
+ }
+ }
+
+ private void startReadingThread() {
+ readerThread = new Thread(new ReaderThread());
+
+ readerThread.setName("ConcurrentInputStream reader");
+ readerThread.setDaemon(true);
+ readerThread.start();
+ }
+
+ /**
+ * Returns the stream for blank-node lines only.
+ */
+ public InputStream getBnodeStream() {
+ return bnodeInputStream;
+ }
+
+ /**
+ * Returns the array of InputStreams that share all concurrently read data.
+ */
+ public InputStream[] getStreams() {
+ return pipedInputStreams;
+ }
+
+ private class ReaderThread implements Runnable {
+ @Override
+ public void run() {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(source, StandardCharsets.UTF_8))) {
+ String line;
+ int currentStreamIndex = 0;
+ while ((line = reader.readLine()) != null) {
+ if (line.isEmpty()) {
+ continue; // Skip empty lines
+ }
+
+ byte[] data = (line + "\n").getBytes(StandardCharsets.UTF_8);
+
+ if (line.contains("_:")) {
+ // Write to bnodeOutputStream only
+ bnodeOutputStream.write(data);
+ } else {
+ // Write to a single stream from pipedOutputStreams in a
+ // round-robin manner
+ pipedOutputStreams[currentStreamIndex].write(data);
+ currentStreamIndex = (currentStreamIndex + 1) % pipedOutputStreams.length;
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error reading input stream", e);
+ // If there's a read error, close everything.
+ } finally {
+ // Close all output streams to signal EOF
+ for (PipedOutputStream out : pipedOutputStreams) {
+ try {
+ out.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ try {
+ bnodeOutputStream.close();
+ } catch (IOException e) {
+ log.error("Error closing bnodeOutputStream", e);
+ }
+ }
+ }
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java
index a89ec1e12..ecac78e85 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java
@@ -18,35 +18,91 @@
package com.the_qa_company.qendpoint.core.rdf.parsers;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-
+import com.the_qa_company.qendpoint.core.enums.RDFNotation;
+import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
+import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.quad.QuadString;
+import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback;
+import com.the_qa_company.qendpoint.core.triples.TripleString;
+import com.the_qa_company.qendpoint.core.util.io.IOUtil;
import org.apache.jena.graph.Triple;
+import org.apache.jena.iri.impl.LexerFixer;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.lang.LabelToNode;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.sparql.core.Quad;
-import com.the_qa_company.qendpoint.core.enums.RDFNotation;
-import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
-import com.the_qa_company.qendpoint.core.exceptions.ParserException;
-import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback;
-import com.the_qa_company.qendpoint.core.triples.TripleString;
-import com.the_qa_company.qendpoint.core.util.io.IOUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
/**
* @author mario.arias
*/
public class RDFParserRIOT implements RDFParserCallback {
private static final Logger log = LoggerFactory.getLogger(RDFParserRIOT.class);
+ private static final int CORES = Runtime.getRuntime().availableProcessors();
+
private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBNode, ElemStringBuffer buffer) {
+
+ if (lang != Lang.NQUADS && lang != Lang.NTRIPLES) {
+ if (keepBNode) {
+ RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven())
+ .parse(buffer);
+ } else {
+ RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer);
+ }
+ return;
+ }
+
if (keepBNode) {
- RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven())
- .parse(buffer);
+ LexerFixer.fixLexers();
+
+ ConcurrentInputStream cs = new ConcurrentInputStream(stream, CORES - 1);
+
+ InputStream bnodes = cs.getBnodeStream();
+
+ var threads = new ArrayList();
+
+ Thread e1 = new Thread(() -> {
+ RDFParser.source(bnodes).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven())
+ .parse(buffer);
+ });
+ e1.setName("BNode parser");
+ threads.add(e1);
+
+ InputStream[] streams = cs.getStreams();
+ int i = 0;
+ for (InputStream s : streams) {
+ int temp = i + 1;
+ Thread e = new Thread(() -> {
+ RDFParser.source(s).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven())
+ .parse(buffer);
+ });
+ i++;
+ e.setName("Stream parser " + i);
+ threads.add(e);
+
+ }
+
+ threads.forEach(Thread::start);
+ for (Thread thread : threads) {
+ try {
+ while (thread.isAlive()) {
+ thread.join(1000);
+ }
+
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+// RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven())
+// .parse(buffer);
} else {
RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer);
}
@@ -75,14 +131,13 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole
public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode,
RDFCallback callback) throws ParserException {
try {
- ElemStringBuffer buffer = new ElemStringBuffer(callback);
switch (notation) {
- case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, buffer);
- case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, buffer);
- case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, buffer);
- case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, buffer);
- case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, buffer);
- case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, buffer);
+ case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, new ElemStringBuffer(callback));
+ case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, new ElemStringBuffer(callback));
+ case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, new ElemStringBuffer(callback));
+ case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, new ElemStringBuffer(callback));
+ case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, new ElemStringBuffer(callback));
+ case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, new ElemStringBuffer(callback));
default -> throw new NotImplementedException("Parser not found for format " + notation);
}
} catch (Exception e) {
@@ -91,17 +146,16 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo
}
}
- private static class ElemStringBuffer implements StreamRDF {
- private final TripleString triple = new TripleString();
- private final QuadString quad = new QuadString();
+ public static class ElemStringBuffer implements StreamRDF {
private final RDFCallback callback;
- private ElemStringBuffer(RDFCallback callback) {
+ public ElemStringBuffer(RDFCallback callback) {
this.callback = callback;
}
@Override
public void triple(Triple parsedTriple) {
+ TripleString triple = new TripleString();
triple.setAll(JenaNodeFormatter.format(parsedTriple.getSubject()),
JenaNodeFormatter.format(parsedTriple.getPredicate()),
JenaNodeFormatter.format(parsedTriple.getObject()));
@@ -110,6 +164,7 @@ public void triple(Triple parsedTriple) {
@Override
public void quad(Quad parsedQuad) {
+ QuadString quad = new QuadString();
quad.setAll(JenaNodeFormatter.format(parsedQuad.getSubject()),
JenaNodeFormatter.format(parsedQuad.getPredicate()),
JenaNodeFormatter.format(parsedQuad.getObject()), JenaNodeFormatter.format(parsedQuad.getGraph()));
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java
new file mode 100644
index 000000000..3c40c192a
--- /dev/null
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/TurtleChunker.java
@@ -0,0 +1,502 @@
+package com.the_qa_company.qendpoint.core.rdf.parsers;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A preprocessor for Turtle files, which reads the file and returns an iterator
+ * of chunks where each chunk is a block of turtle data with a . termination.
+ * This might be a single statement, or multiple statements.
+ */
+public class TurtleChunker {
+
+ private int consecutiveBackslashes;
+
+ private enum State {
+ DEFAULT, IRI, LITERAL, MULTILINE_LITERAL
+ }
+
+ private State state = State.DEFAULT;
+
+ private static final int BUFFER_SIZE = 1024 * 1024 * 4;
+
+ private final InputStream in; // CHANGED (was Reader)
+ private final byte[] chunkBuf = new byte[BUFFER_SIZE];
+ private int bufPos = 0, bufLen = 0;
+
+ /**
+ * Stores partial bytes if the block spans multiple reads. If we never
+ * refill mid-block, we won't need this.
+ */
+ private final ByteArrayOutputStream partialBytes = new ByteArrayOutputStream(); // CHANGED
+
+ private final Deque nestingStack = new ArrayDeque<>();
+ private byte literalDelimiter;
+
+ private final MethodHandle[] defaultActions = new MethodHandle[256];
+ private String finishedOneBlock = null;
+
+ /**
+ * Indicates whether the current block has already crossed multiple reads
+ * (thus is partially in `partialBytes`).
+ */
+ private boolean multiReadBlock = false; // CHANGED
+ /**
+ * Marks where the current block started in `chunkBuf` if not in multi-read
+ * mode.
+ */
+ private int chunkStart = 0; // CHANGED
+
+ public TurtleChunker(InputStream in) { // CHANGED
+ this.in = in;
+ buildDefaultActions();
+ }
+
+ private void buildDefaultActions() {
+ try {
+ MethodHandles.Lookup lookup = MethodHandles.lookup();
+ MethodType rawMt = MethodType.methodType(void.class, byte.class);
+
+ MethodHandle rawLt = lookup.findVirtual(TurtleChunker.class, "handleLtInDefault", rawMt);
+ MethodHandle rawHash = lookup.findVirtual(TurtleChunker.class, "handleHashInDefault", rawMt);
+ MethodHandle rawLParen = lookup.findVirtual(TurtleChunker.class, "handleLParenInDefault", rawMt);
+ MethodHandle rawRParen = lookup.findVirtual(TurtleChunker.class, "handleRParenInDefault", rawMt);
+ MethodHandle rawLBrack = lookup.findVirtual(TurtleChunker.class, "handleLBrackInDefault", rawMt);
+ MethodHandle rawRBrack = lookup.findVirtual(TurtleChunker.class, "handleRBrackInDefault", rawMt);
+ MethodHandle rawQ1 = lookup.findVirtual(TurtleChunker.class, "handleQuote1InDefault", rawMt);
+ MethodHandle rawQ2 = lookup.findVirtual(TurtleChunker.class, "handleQuote2InDefault", rawMt);
+ MethodHandle rawDot = lookup.findVirtual(TurtleChunker.class, "handleDotInDefault", rawMt);
+
+ MethodHandle boundLt = rawLt.bindTo(this);
+ MethodHandle boundHash = rawHash.bindTo(this);
+ MethodHandle boundLParen = rawLParen.bindTo(this);
+ MethodHandle boundRParen = rawRParen.bindTo(this);
+ MethodHandle boundLBrack = rawLBrack.bindTo(this);
+ MethodHandle boundRBrack = rawRBrack.bindTo(this);
+ MethodHandle boundQ1 = rawQ1.bindTo(this);
+ MethodHandle boundQ2 = rawQ2.bindTo(this);
+ MethodHandle boundDot = rawDot.bindTo(this);
+
+ defaultActions['<'] = boundLt;
+ defaultActions['#'] = boundHash;
+ defaultActions['('] = boundLParen;
+ defaultActions[')'] = boundRParen;
+ defaultActions['['] = boundLBrack;
+ defaultActions[']'] = boundRBrack;
+ defaultActions['\''] = boundQ1;
+ defaultActions['"'] = boundQ2;
+ defaultActions['.'] = boundDot;
+ } catch (NoSuchMethodException | IllegalAccessException e) {
+ throw new RuntimeException("Failed to build defaultActions", e);
+ }
+ }
+
+ /*
+ * ---------------------------------------------------------------- The main
+ * loop that reads & parses blocks.
+ * ----------------------------------------------------------------
+ */
+ private String parseNextBlock() throws IOException {
+ while (true) {
+ if (bufPos >= bufLen) {
+ readMoreData();
+ }
+ if (bufLen == 0) {
+ // no more data => produce leftover partial if any
+ if (partialBytes.size() > 0) { // CHANGED
+ partialBytes.reset(); // CHANGED
+ // CHANGED
+ String leftoverStr = partialBytes.toString(StandardCharsets.UTF_8);
+ leftoverStr = leftoverStr.trim();
+ return leftoverStr.isEmpty() ? null : leftoverStr;
+ }
+ return null; // truly no more
+ }
+
+ switch (state) {
+ case DEFAULT -> parseDefaultOneStep();
+ case IRI -> parseIriOneStep();
+ case LITERAL -> parseLiteralOneStep();
+ case MULTILINE_LITERAL -> parseMultilineLiteralOneStep();
+ }
+
+ if (finishedOneBlock != null) {
+ String block = finishedOneBlock;
+ finishedOneBlock = null;
+ return block;
+ }
+ }
+ }
+
+ /*
+ * ----------------------------------------------------------------
+ * parseXxxOneStep methods: We do not append to partialBytes here unless we
+ * are finalizing the block. We parse in-place from chunkBuf for ASCII
+ * triggers, etc.
+ * ----------------------------------------------------------------
+ */
+
+ private void parseDefaultOneStep() throws IOException {
+ byte b = nextByte();
+ MethodHandle mh = defaultActions[b & 0xFF];
+ if (mh != null) {
+ try {
+ mh.invokeExact(b); // (byte)->void
+ } catch (Throwable t) {
+ if (t instanceof IOException ioException) {
+ throw ioException;
+ } else if (t instanceof Error error) {
+ throw error;
+ } else if (t instanceof RuntimeException runtimeException) {
+ throw runtimeException;
+ } else if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(t);
+ }
+ throw new RuntimeException(t);
+ }
+ }
+ }
+
+ private void parseIriOneStep() {
+ while (bufPos < bufLen) {
+ byte b = nextByte();
+ if (b == '>') {
+ state = State.DEFAULT;
+ return;
+ }
+ }
+ }
+
+ private void parseLiteralOneStep() {
+ while (bufPos < bufLen) {
+
+ byte b = nextByte();
+
+ if (b == '\\') {
+ consecutiveBackslashes++;
+ continue;
+ }
+
+ boolean escaped = (consecutiveBackslashes % 2 == 1);
+ consecutiveBackslashes = 0; // reset whenever we see a non-backslash
+
+ if (b == literalDelimiter && !escaped) {
+ state = State.DEFAULT;
+ return;
+ }
+ }
+ }
+
+ private void parseMultilineLiteralOneStep() throws IOException {
+
+ while (bufPos < bufLen) {
+
+ byte b = nextByte();
+
+ if (b == '\\') {
+ consecutiveBackslashes++;
+ continue;
+ }
+
+ boolean escaped = (consecutiveBackslashes % 2 == 1);
+ consecutiveBackslashes = 0; // reset whenever we see a non-backslash
+
+ if (b == literalDelimiter && !escaped) {
+ if (checkForTripleQuote(literalDelimiter)) {
+ state = State.DEFAULT;
+ return;
+ }
+ }
+ }
+ }
+
+ /*
+ * ---------------------------------------------------------------- Special
+ * char handlers in DEFAULT state
+ * ----------------------------------------------------------------
+ */
+
+ private void handleLtInDefault(byte b) {
+ state = State.IRI;
+ }
+
+ private void handleHashInDefault(byte b) {
+ skipComment();
+ }
+
+ private void handleLParenInDefault(byte b) {
+ nestingStack.push('(');
+ }
+
+ private void handleRParenInDefault(byte b) {
+ if (!nestingStack.isEmpty()) {
+ nestingStack.pop();
+ }
+ }
+
+ private void handleLBrackInDefault(byte b) {
+ nestingStack.push('[');
+ }
+
+ private void handleRBrackInDefault(byte b) {
+ if (!nestingStack.isEmpty()) {
+ nestingStack.pop();
+ }
+ }
+
+ private void handleQuote1InDefault(byte b) throws IOException {
+ if (checkForTripleQuote(b)) {
+ state = State.MULTILINE_LITERAL;
+ literalDelimiter = b;
+ } else {
+ state = State.LITERAL;
+ literalDelimiter = b;
+ }
+ }
+
+ private void handleQuote2InDefault(byte b) throws IOException {
+ if (checkForTripleQuote(b)) {
+ state = State.MULTILINE_LITERAL;
+ literalDelimiter = b;
+ } else {
+ state = State.LITERAL;
+ literalDelimiter = b;
+ }
+ }
+
+ private void handleDotInDefault(byte b) {
+ if (nestingStack.isEmpty()) {
+ finalizeBlock();
+ }
+ }
+
+ /*
+ * ----------------------------------------------------------------
+ * finalizeBlock: build the final statement string
+ * ----------------------------------------------------------------
+ */
+ private void finalizeBlock() {
+ if (!multiReadBlock) {
+ // The entire block is in chunkBuf from chunkStart..bufPos
+ int length = bufPos - chunkStart;
+ if (length <= 0) {
+ return; // nothing
+ }
+ String block = new String(chunkBuf, chunkStart, length, StandardCharsets.UTF_8);
+
+ chunkStart = bufPos; // next block starts here
+ finishedOneBlock = block;
+ } else {
+ // partial data is in partialBytes + leftover in chunkBuf
+ if (bufPos > chunkStart) {
+ partialBytes.write(chunkBuf, chunkStart, (bufPos - chunkStart)); // CHANGED
+ }
+ String block = partialBytes.toString(StandardCharsets.UTF_8);
+
+ partialBytes.reset(); // CHANGED
+ finishedOneBlock = block;
+ multiReadBlock = false;
+ chunkStart = bufPos;
+ }
+ }
+
+ /*
+ * ----------------------------------------------------------------
+ * skipComment, tripleQuote, escaping checks We parse in place for
+ * detection.
+ * ----------------------------------------------------------------
+ */
+
+ private void skipComment() {
+ while (true) {
+ if (bufPos >= bufLen) {
+ return;
+ }
+ byte b = nextByte();
+ // check if the byte represents an ASCII character, if not then it's
+ // not relevant to check
+ if ((b & 0x80) != 0) {
+ continue;
+ }
+
+ if (b == '\n') {
+ return;
+ }
+ }
+ }
+
+ private byte nextByte() {
+ return chunkBuf[bufPos++];
+ }
+
+ private boolean checkForTripleQuote(byte quoteChar) throws IOException {
+ if (bufPos >= bufLen) {
+ readMoreData();
+ }
+
+ if (bufPos >= bufLen) {
+ return false;
+ }
+
+ if (chunkBuf[bufPos] == quoteChar) {
+ bufPos++;
+ if (bufPos >= bufLen) {
+ readMoreData();
+ }
+ if (bufPos < bufLen) {
+ if (chunkBuf[bufPos] == quoteChar) {
+ bufPos++;
+ return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+
+ } else {
+ return false;
+ }
+ }
+
+ /*
+ * ----------------------------------------------------------------
+ * readMoreData: if we run out of data & haven't ended the block, copy
+ * leftover from chunkBuf to partialBytes to avoid overwriting it.
+ * ----------------------------------------------------------------
+ */
+ private void readMoreData() throws IOException {
+ // If we haven't finished the current block
+ if (chunkStart < bufLen) {
+ partialBytes.write(chunkBuf, chunkStart, bufLen - chunkStart); // CHANGED
+ multiReadBlock = true;
+ }
+ chunkStart = 0;
+ bufLen = in.read(chunkBuf);
+ bufPos = 0;
+ if (bufLen == -1) {
+ bufLen = 0; // EOF
+ }
+ }
+
+ /*
+ * ----------------------------------------------------------------
+ * BlockIterator
+ * ----------------------------------------------------------------
+ */
+
+ public BlockIterator blockIterator() {
+ return new BlockIterator();
+ }
+
+ public class BlockIterator implements Iterator {
+ private String nextBlock;
+ private boolean done;
+
+ String getPrefixes() {
+ StringBuilder sb = new StringBuilder();
+ while (hasNext()) {
+ String lowerCase = nextBlock.trim().toLowerCase();
+ if (lowerCase.isEmpty() || lowerCase.startsWith("#")) {
+ nextBlock = null;
+ } else if (lowerCase.startsWith("@prefix") || lowerCase.startsWith("@base")) {
+ sb.append(nextBlock.trim()).append("\n");
+ nextBlock = null;
+ } else {
+ break;
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (done) {
+ return false;
+ }
+ if (nextBlock != null) {
+ return true;
+ }
+ try {
+ nextBlock = parseNextBlock();
+ if (nextBlock == null) {
+ done = true;
+ return false;
+ }
+ return true;
+ } catch (IOException e) {
+ done = true;
+ throw new RuntimeException("IO error during iteration", e);
+ }
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more blocks");
+ }
+ String result = nextBlock;
+ nextBlock = null;
+ return result;
+ }
+ }
+
+ // -- Example main for testing --
+ public static void main(String[] args) {
+ String filePath = "/Users/havardottestad/Documents/Programming/qEndpoint3/indexing/latest-dump.ttl"; // Update
+ // path
+ long actualStart = System.currentTimeMillis();
+ long start = System.currentTimeMillis();
+ long count = 0;
+ long total = 0;
+
+ try (InputStream sr = new FileInputStream(
+ "/Users/havardottestad/Documents/Programming/qEndpoint3/indexing/latest-dump.ttl")) {
+ TurtleChunker tokenizer = new TurtleChunker(sr);
+ BlockIterator it = tokenizer.blockIterator();
+
+ System.out.println("Processing with NIO AsynchronousFileChannel (blocking wait)...");
+
+ String prefixes = it.getPrefixes();
+ System.out.println("Prefixes:\n" + prefixes);
+
+ while (it.hasNext()) {
+ String block = it.next();
+ int length = block.trim().split("\n").length;
+ count += length;
+ total += length;
+ if (count > 10_000_000) {
+ System.out.println(count + " lines parsed");
+ System.out.println(block);
+ System.out.printf("Lines per second: %,d \n", count * 1000 / (System.currentTimeMillis() - start));
+
+ System.out.printf("Lines per second (total): %,d \n",
+ total * 1000 / (System.currentTimeMillis() - actualStart));
+
+ start = System.currentTimeMillis();
+ count = 0;
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ long actualEnd = System.currentTimeMillis();
+ long total2 = actualEnd - actualStart;
+
+ long minutes = total2 / 60000;
+ long seconds = (total2 % 60000) / 1000;
+ System.out.printf("Total: %,d \n", total);
+ System.out.printf("Total time: %d:%02d%n", minutes, seconds);
+ }
+ }
+}
diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/storage/QEPMapIdSorter.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/storage/QEPMapIdSorter.java
index c53fb7db6..f8a4d44cb 100644
--- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/storage/QEPMapIdSorter.java
+++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/storage/QEPMapIdSorter.java
@@ -14,12 +14,13 @@
import com.the_qa_company.qendpoint.core.util.disk.LongArray;
import com.the_qa_company.qendpoint.core.util.io.CloseSuppressPath;
import com.the_qa_company.qendpoint.core.util.io.Closer;
+import org.spf4j.io.BufferedInputStream;
+import org.spf4j.io.BufferedOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -43,7 +44,7 @@ public int compareTo(QEPMapIds o) {
}
public static final long MAX_ELEMENT_SIZE_THRESHOLD = 500_000_000L; // max
- // 500MB
+ // 500MB
private final LongArray ids;
private long index;
private final CloseSuppressPath computeLocation;
@@ -92,7 +93,7 @@ public void sort() throws IOException {
CloseSuppressPath output = merger.waitResult().orElse(null);
if (output != null) {
- try (BufferedInputStream stream = new BufferedInputStream(Files.newInputStream(output))) {
+ try (InputStream stream = new BufferedInputStream(Files.newInputStream(output))) {
QEPMapReader reader = new QEPMapReader(stream);
long index = 0;
@@ -154,7 +155,7 @@ private record Merger(long chunkSize) implements KWayMerger.KWayMergerImpl flux, CloseSuppressPath output)
throws KWayMerger.KWayMergerException {
- try (BufferedOutputStream stream = new BufferedOutputStream(Files.newOutputStream(output))) {
+ try (OutputStream stream = new BufferedOutputStream(Files.newOutputStream(output))) {
QEPMapIds ids;
List idList = new ArrayList<>();
@@ -163,12 +164,25 @@ public void createChunk(Supplier