diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index 141e3df2..4dc9937f 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -27,6 +27,8 @@ import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.common.unit.TimeValue; +import java.util.concurrent.BlockingQueue; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; @@ -60,24 +62,31 @@ public Indexer(MongoDBRiver river, MongoDBRiverDefinition definition, SharedCont @Override public void run() { - while (context.getStatus() == Status.RUNNING) { + BlockingQueue stream = context.getStream(); + long refreshTime = definition.getBulk().getFlushInterval().millis(); + while (context.getStatus() == Status.RUNNING) { try { Timestamp lastTimestamp = null; // 1. Attempt to fill as much of the bulk request as possible - QueueEntry entry = context.getStream().take(); + logger.trace("Queueing bulk entries, timeout is {}ms", refreshTime); + QueueEntry entry = stream.take(); lastTimestamp = processBlockingQueue(entry); - while ((entry = context.getStream().poll(definition.getBulk().getFlushInterval().millis(), MILLISECONDS)) != null) { + while ((entry = stream.poll(refreshTime, MILLISECONDS)) != null) { + logger.trace("Queueing bulk entry"); lastTimestamp = processBlockingQueue(entry); } + logger.trace("Done queueing bulk entries, updating timestamp"); + // 2. Update the timestamp if (lastTimestamp != null) { MongoDBRiver.setLastTimestamp(definition, lastTimestamp, getBulkProcessor(definition.getIndexName(), definition.getTypeName()).getBulkProcessor()); } + logger.trace("Finished updating timestamp"); } catch (InterruptedException e) { logger.info("river-mongodb indexer interrupted"); releaseProcessors(); @@ -107,7 +116,7 @@ private void releaseProcessors() { private Timestamp processBlockingQueue(QueueEntry entry) { Operation operation = entry.getOperation(); if (entry.getData().get(MongoDBRiver.MONGODB_ID_FIELD) == null - && (operation == Operation.INSERT || operation == Operation.UPDATE || operation == Operation.DELETE)) { + && (operation == Operation.INSERT || operation == Operation.UPDATE || operation == Operation.UPDATE_ROW || operation == Operation.DELETE)) { logger.warn("Cannot get object id. Skip the current item: [{}]", entry.getData()); return null; } @@ -245,7 +254,7 @@ private void updateBulkRequest(DBObject data, String objectId, Operation operati getBulkProcessor(index, type).addBulkRequest(objectId, build(data, objectId), routing, parent); } // UPDATE = DELETE + INSERT operation - if (operation == Operation.UPDATE) { + if (operation == Operation.UPDATE || operation == Operation.UPDATE_ROW) { if (logger.isTraceEnabled()) { logger.trace("Update operation - id: {} - contains attachment: {}", objectId, isAttachment); } @@ -253,7 +262,7 @@ private void updateBulkRequest(DBObject data, String objectId, Operation operati getBulkProcessor(index, type).addBulkRequest(objectId, build(data, objectId), routing, parent); } if (operation == Operation.DELETE) { - logger.info("Delete request [{}], [{}], [{}]", index, type, objectId); + logger.trace("Delete request [{}], [{}], [{}]", index, type, objectId); deleteBulkRequest(objectId, index, type, routing, parent); } if (operation == Operation.DROP_COLLECTION) { @@ -264,6 +273,9 @@ private void updateBulkRequest(DBObject data, String objectId, Operation operati logger.info("Ignore drop collection request [{}], [{}]. The option has been disabled.", index, type); } } + if (logger.isTraceEnabled()) { + logger.trace("Completed updateBulkRequest"); + } } /* @@ -399,7 +411,7 @@ private XContentBuilder build(final DBObject data, final String objectId) throws /** * Map a DBObject for indexing - * + * * @param base * @param mapData */ @@ -423,7 +435,7 @@ private Map createObjectMap(DBObject base) { /** * Map a DBRef to a Map for indexing - * + * * @param ref * @return */ diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index 5ea95e60..84aafe2f 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -85,11 +85,14 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String MONGODB_ADMIN_DATABASE = "admin"; public final static String MONGODB_CONFIG_DATABASE = "config"; public final static String MONGODB_ID_FIELD = "_id"; + public final static String MONGODB_OID_FIELD = "oid"; + public final static String MONGODB_SEQ_FIELD = "seq"; public final static String MONGODB_IN_OPERATOR = "$in"; public final static String MONGODB_OR_OPERATOR = "$or"; public final static String MONGODB_AND_OPERATOR = "$and"; public final static String MONGODB_NATURAL_OPERATOR = "$natural"; public final static String OPLOG_COLLECTION = "oplog.rs"; + public final static String OPLOG_REFS_COLLECTION = "oplog.refs"; public final static String OPLOG_NAMESPACE = "ns"; public final static String OPLOG_NAMESPACE_COMMAND = "$cmd"; public final static String OPLOG_ADMIN_COMMAND = "admin." + OPLOG_NAMESPACE_COMMAND; @@ -110,6 +113,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String OPLOG_FROM_MIGRATE = "fromMigrate"; public static final String OPLOG_OPS = "ops"; public static final String OPLOG_CREATE_COMMAND = "create"; + public static final String OPLOG_REF = "ref"; public final static String GRIDFS_FILES_SUFFIX = ".files"; public final static String GRIDFS_CHUNKS_SUFFIX = ".chunks"; public final static String INSERTION_ORDER_KEY = "$natural"; @@ -444,7 +448,7 @@ public static Timestamp getLastTimestamp(Client client, MongoDBRiverDefinitio /** * Adds an index request operation to a bulk request, updating the last * timestamp for a given namespace (ie: host:dbName.collectionName) - * + * * @param bulk */ static void setLastTimestamp(final MongoDBRiverDefinition definition, final Timestamp time, final BulkProcessor bulkProcessor) { diff --git a/src/main/java/org/elasticsearch/river/mongodb/Operation.java b/src/main/java/org/elasticsearch/river/mongodb/Operation.java index 59c3c3fe..98f51d52 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Operation.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Operation.java @@ -1,8 +1,14 @@ package org.elasticsearch.river.mongodb; public enum Operation { - INSERT(MongoDBRiver.OPLOG_INSERT_OPERATION), UPDATE(MongoDBRiver.OPLOG_UPDATE_OPERATION), DELETE(MongoDBRiver.OPLOG_DELETE_OPERATION), DROP_COLLECTION( - "dc"), DROP_DATABASE("dd"), COMMAND(MongoDBRiver.OPLOG_COMMAND_OPERATION), UNKNOWN(null); + INSERT(MongoDBRiver.OPLOG_INSERT_OPERATION), + UPDATE(MongoDBRiver.OPLOG_UPDATE_OPERATION), + UPDATE_ROW(MongoDBRiver.OPLOG_UPDATE_ROW_OPERATION), + DELETE(MongoDBRiver.OPLOG_DELETE_OPERATION), + DROP_COLLECTION("dc"), + DROP_DATABASE("dd"), + COMMAND(MongoDBRiver.OPLOG_COMMAND_OPERATION), + UNKNOWN(null); private String value; @@ -21,9 +27,6 @@ public static Operation fromString(String value) { return operation; } } - if (MongoDBRiver.OPLOG_UPDATE_ROW_OPERATION.equalsIgnoreCase(value)) { - return Operation.UPDATE; - } } return Operation.UNKNOWN; } diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java index 6e99f340..eb321e06 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java @@ -18,6 +18,7 @@ import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBList; import com.mongodb.Bytes; import com.mongodb.CommandResult; import com.mongodb.DB; @@ -38,7 +39,7 @@ class Slurper implements Runnable { class SlurperException extends Exception { /** - * + * */ private static final long serialVersionUID = 1L; @@ -61,7 +62,7 @@ class SlurperException extends Exception { private Mongo mongo; private DB slurpedDb; private DB oplogDb; - private DBCollection oplogCollection; + private DBCollection oplogCollection, oplogRefsCollection; private final AtomicLong totalDocuments = new AtomicLong(); public Slurper(List mongoServers, MongoDBRiverDefinition definition, SharedContext context, Client client) { @@ -117,21 +118,37 @@ public void run() { DBCursor cursor = null; try { cursor = oplogCursor(startTimestamp); + logger.debug("Slurping with cursor {}", cursor); if (cursor == null) { + logger.debug("No cursor, processing full oplog"); cursor = processFullOplog(); } - while (cursor.hasNext()) { - DBObject item = cursor.next(); - startTimestamp = processOplogEntry(item, startTimestamp); + while(true) { + if(cursor.hasNext()) { + DBObject item = cursor.next(); + if(item != null) { + startTimestamp = processOplogEntry(item, startTimestamp); + } + } else { + logger.debug("Waiting for 500 ms"); + Thread.sleep(500); + } + if(context.getStatus() != Status.RUNNING) { + logger.debug("context.getStatus() = {} -- breaking", context.getStatus()); + break; + } } - logger.debug("Before waiting for 500 ms"); - Thread.sleep(500); } catch (MongoException.CursorNotFound e) { logger.info("Cursor {} has been closed. About to open a new cusor.", cursor.getCursorId()); logger.debug("Total document inserted [{}]", totalDocuments.get()); } catch (SlurperException sEx) { logger.warn("Exception in slurper", sEx); break; + } catch (MongoException e) { + logger.error("Mongo gave an exception", e); + try { + Thread.sleep(5000); + } catch (InterruptedException iEx) {} } catch (Exception ex) { logger.warn("Exception while looping in cursor", ex); Thread.currentThread().interrupt(); @@ -178,7 +195,7 @@ protected boolean isIndexEmpty() { * Does an initial sync the same way MongoDB does. * https://groups.google.com/ * forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns - * + * * @return the last oplog timestamp before the import began * @throws InterruptedException * if the blocking queue stream is interrupted while waiting @@ -300,6 +317,7 @@ protected boolean assignCollections() { return false; } oplogCollection = oplogDb.getCollection(MongoDBRiver.OPLOG_COLLECTION); + oplogRefsCollection = oplogDb.getCollection(MongoDBRiver.OPLOG_REFS_COLLECTION); slurpedDb = mongo.getDB(definition.getMongoDb()); if (!definition.getMongoAdminUser().isEmpty() && !definition.getMongoAdminPassword().isEmpty() && adminDb.isAuthenticated()) { @@ -348,19 +366,35 @@ private DBCursor processFullOplog() throws InterruptedException, SlurperExceptio return oplogCursor(currentTimestamp); } - private Timestamp processOplogEntry(final DBObject entry, final Timestamp startTimestamp) throws InterruptedException { - // To support transactions, TokuMX wraps one or more operations in a single oplog entry, in a list. - // As long as clients are not transaction-aware, we can pretty safely assume there will only be one operation in the list. - // Supporting genuine multi-operation transactions will require a bit more logic here. - flattenOps(entry); + private Timestamp processOplogEntry(final DBObject entry, final Timestamp startTimestamp) throws InterruptedException, SlurperException { + return processOplogEntry(entry, startTimestamp, Timestamp.on(entry)); + } - if (!isValidOplogEntry(entry, startTimestamp)) { + private Timestamp processOplogEntry(final DBObject entry, final Timestamp startTimestamp, Timestamp oplogTimestamp) throws InterruptedException, SlurperException { + if(entry.containsKey(MongoDBRiver.OPLOG_REF)) { + logger.debug("Processing oplog refs"); + return processOplogRefs(entry, startTimestamp, oplogTimestamp); + } else { + Object ops = entry.get(MongoDBRiver.OPLOG_OPS); + if(ops != null) { + for (DBObject op : (List) ops) { + oplogTimestamp = processSingleOp(op, startTimestamp, oplogTimestamp); + } + } else { + oplogTimestamp = processSingleOp(entry, startTimestamp, oplogTimestamp); + } + return oplogTimestamp; + } + } + + private Timestamp processSingleOp(final DBObject entry, final Timestamp startTimestamp, final Timestamp oplogTimestamp) throws InterruptedException, SlurperException { + if (!isValidOplogEntry(entry, startTimestamp, oplogTimestamp)) { return startTimestamp; } + Operation operation = Operation.fromString(entry.get(MongoDBRiver.OPLOG_OPERATION).toString()); - String namespace = entry.get(MongoDBRiver.OPLOG_NAMESPACE).toString(); + String namespace = (String)entry.get(MongoDBRiver.OPLOG_NAMESPACE); String collection = null; - Timestamp oplogTimestamp = Timestamp.on(entry); DBObject object = (DBObject) entry.get(MongoDBRiver.OPLOG_OBJECT); if (definition.isImportAllCollections()) { @@ -403,7 +437,7 @@ private Timestamp processOplogEntry(final DBObject entry, final Timestamp String objectId = getObjectIdFromOplogEntry(entry); if (definition.isMongoGridFS() && namespace.endsWith(MongoDBRiver.GRIDFS_FILES_SUFFIX) - && (operation == Operation.INSERT || operation == Operation.UPDATE)) { + && (operation == Operation.INSERT || operation == Operation.UPDATE || operation == Operation.UPDATE_ROW)) { if (objectId == null) { throw new NullPointerException(MongoDBRiver.MONGODB_ID_FIELD); } @@ -426,10 +460,24 @@ private Timestamp processOplogEntry(final DBObject entry, final Timestamp } addToStream(operation, oplogTimestamp, applyFieldFilter(object), collection); } else { - if (operation == Operation.UPDATE) { - DBObject update = (DBObject) entry.get(MongoDBRiver.OPLOG_UPDATE); - logger.debug("Updated item: {}", update); - addQueryToStream(operation, oplogTimestamp, update, collection); + if (operation == Operation.UPDATE || operation == Operation.UPDATE_ROW) { + DBObject update = null; + if(operation == Operation.UPDATE) { + update = (DBObject) entry.get( MongoDBRiver.OPLOG_UPDATE ); + } else if (operation == Operation.UPDATE_ROW) { + update = (DBObject) entry.get( MongoDBRiver.OPLOG_OBJECT ); + } + logger.trace("Updated item: {} ({})", update, entry); + if(update != null) { + DBObject updateQuery; + if(update.containsField(MongoDBRiver.MONGODB_ID_FIELD)) { + updateQuery = new BasicDBObject(MongoDBRiver.MONGODB_ID_FIELD, new ObjectId(update.get(MongoDBRiver.MONGODB_ID_FIELD).toString())); + } else { + updateQuery = update; + } + logger.debug("Processing update for {}", updateQuery); + addQueryToStream(operation, oplogTimestamp, updateQuery, collection); + } } else { if (operation == Operation.INSERT) { addInsertToStream(oplogTimestamp, applyFieldFilter(object), collection); @@ -441,24 +489,37 @@ private Timestamp processOplogEntry(final DBObject entry, final Timestamp return oplogTimestamp; } - @SuppressWarnings("unchecked") - private void flattenOps(DBObject entry) { - Object ops = entry.get(MongoDBRiver.OPLOG_OPS); - if (ops != null) { - try { - for (DBObject op : (List) ops) { - String operation = (String) op.get(MongoDBRiver.OPLOG_OPERATION); - if (operation.equals(MongoDBRiver.OPLOG_COMMAND_OPERATION)) { - DBObject object = (DBObject) op.get(MongoDBRiver.OPLOG_OBJECT); - if (object.containsField(MongoDBRiver.OPLOG_CREATE_COMMAND)) { - continue; - } - } - entry.putAll(op); + private Timestamp processOplogRefs(final DBObject entry, final Timestamp timestamp, final Timestamp oplogTimestamp) throws InterruptedException, SlurperException { + ObjectId ref = (ObjectId) entry.get(MongoDBRiver.OPLOG_REF); + long seq = 0; + if(ref != null) { + // Find the refs matching this oplog entry and update the docs they touch + // db.oplog.refs.find({_id: {$gt: {oid: id_from_oplog, seq: 0}}}) + while(true) { + BasicDBObject selector = new BasicDBObject(MongoDBRiver.MONGODB_OID_FIELD, ref).append(MongoDBRiver.MONGODB_SEQ_FIELD, seq); + BasicDBObject query = new BasicDBObject(MongoDBRiver.MONGODB_ID_FIELD, new BasicDBObject(QueryOperators.GT, selector)); + BasicDBObject refResult = (BasicDBObject)oplogRefsCollection.findOne(query); + // logger.debug("Finding refs for {}", query); + if(refResult == null) { + logger.debug("Got no refResult for {}, breaking...", query); + break; } - } catch (ClassCastException e) { - logger.error(e.toString(), e); + + BasicDBObject refId = (BasicDBObject)refResult.get(MongoDBRiver.MONGODB_ID_FIELD); + ObjectId refOid = (ObjectId)refId.get(MongoDBRiver.MONGODB_OID_FIELD); + if( !refOid.equals(ref) ) { + // logger.debug("{} != {}, breaking oplog ref loop...", ref, refOid); + break; + } + + logger.debug("Processing oplog.refs entry: {} seq {}", ref, seq); + + seq = refId.getLong(MongoDBRiver.MONGODB_SEQ_FIELD); + processOplogEntry(refResult, timestamp, oplogTimestamp); } + return timestamp; + } else { + throw new SlurperException("Invalid oplog entry - namespace is null, but ref field is missing"); } } @@ -487,12 +548,13 @@ private String getCollectionFromNamespace(String namespace) { return null; } - private boolean isValidOplogEntry(final DBObject entry, final Timestamp startTimestamp) { + private boolean isValidOplogEntry(final DBObject entry, final Timestamp startTimestamp, final Timestamp oplogTimestamp) { if (MongoDBRiver.OPLOG_NOOP_OPERATION.equals(entry.get(MongoDBRiver.OPLOG_OPERATION))) { logger.debug("[No-op Oplog Entry] - can be ignored. {}", entry); return false; } String namespace = (String) entry.get(MongoDBRiver.OPLOG_NAMESPACE); + // Initial support for sharded collection - // https://jira.mongodb.org/browse/SERVER-4333 // Not interested in operation from migration or sharding @@ -506,7 +568,6 @@ private boolean isValidOplogEntry(final DBObject entry, final Timestamp start } if (startTimestamp != null) { - Timestamp oplogTimestamp = Timestamp.on(entry); if (Timestamp.compare(oplogTimestamp, startTimestamp) < 0) { logger.debug("[Invalid Oplog Entry] - entry timestamp [{}] before startTimestamp [{}]", entry, startTimestamp); return false; @@ -536,7 +597,7 @@ private boolean isValidOplogEntry(final DBObject entry, final Timestamp start } } if (!validNamespace) { - logger.debug("[Invalid Oplog Entry] - namespace [{}] is not valid", namespace); + logger.trace("[Ignored Oplog Entry] - namespace [{}] is not matched", namespace); return false; } String operation = (String) entry.get(MongoDBRiver.OPLOG_OPERATION); @@ -604,19 +665,23 @@ private String getObjectIdFromOplogEntry(DBObject entry) { private DBCursor oplogCursor(final Timestamp timestampOverride) throws SlurperException { Timestamp time = timestampOverride == null ? MongoDBRiver.getLastTimestamp(client, definition) : timestampOverride; + DBObject indexFilter = time.getOplogFilter(); + logger.debug("Getting oplog cursor starting at time {}, oplogFilter is {}", time, indexFilter); if (indexFilter == null) { return null; } - int options = Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA | Bytes.QUERYOPTION_NOTIMEOUT; - // Using OPLOGREPLAY to improve performance: // https://jira.mongodb.org/browse/JAVA-771 - if (indexFilter.containsField(MongoDBRiver.OPLOG_TIMESTAMP)) { - options = options | Bytes.QUERYOPTION_OPLOGREPLAY; - } + int options = Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA | Bytes.QUERYOPTION_NOTIMEOUT | Bytes.QUERYOPTION_OPLOGREPLAY; + DBCursor cursor = oplogCollection.find(indexFilter).setOptions(options); + + // Toku sometimes get stuck without the hint. + if (indexFilter.containsField(MongoDBRiver.MONGODB_ID_FIELD)) { + cursor = cursor.hint("_id_"); + } isRiverStale(cursor, time); return cursor; }