Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions src/main/java/org/elasticsearch/river/mongodb/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.BlockingQueue;

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
Expand Down Expand Up @@ -60,24 +62,31 @@ public Indexer(MongoDBRiver river, MongoDBRiverDefinition definition, SharedCont

@Override
public void run() {
while (context.getStatus() == Status.RUNNING) {
BlockingQueue<QueueEntry> stream = context.getStream();
long refreshTime = definition.getBulk().getFlushInterval().millis();

while (context.getStatus() == Status.RUNNING) {
try {
Timestamp<?> lastTimestamp = null;

// 1. Attempt to fill as much of the bulk request as possible
QueueEntry entry = context.getStream().take();
logger.trace("Queueing bulk entries, timeout is {}ms", refreshTime);
QueueEntry entry = stream.take();
lastTimestamp = processBlockingQueue(entry);
while ((entry = context.getStream().poll(definition.getBulk().getFlushInterval().millis(), MILLISECONDS)) != null) {
while ((entry = stream.poll(refreshTime, MILLISECONDS)) != null) {
logger.trace("Queueing bulk entry");
lastTimestamp = processBlockingQueue(entry);
}

logger.trace("Done queueing bulk entries, updating timestamp");

// 2. Update the timestamp
if (lastTimestamp != null) {
MongoDBRiver.setLastTimestamp(definition, lastTimestamp,
getBulkProcessor(definition.getIndexName(), definition.getTypeName()).getBulkProcessor());
}

logger.trace("Finished updating timestamp");
} catch (InterruptedException e) {
logger.info("river-mongodb indexer interrupted");
releaseProcessors();
Expand Down Expand Up @@ -107,7 +116,7 @@ private void releaseProcessors() {
private Timestamp<?> processBlockingQueue(QueueEntry entry) {
Operation operation = entry.getOperation();
if (entry.getData().get(MongoDBRiver.MONGODB_ID_FIELD) == null
&& (operation == Operation.INSERT || operation == Operation.UPDATE || operation == Operation.DELETE)) {
&& (operation == Operation.INSERT || operation == Operation.UPDATE || operation == Operation.UPDATE_ROW || operation == Operation.DELETE)) {
logger.warn("Cannot get object id. Skip the current item: [{}]", entry.getData());
return null;
}
Expand Down Expand Up @@ -245,15 +254,15 @@ private void updateBulkRequest(DBObject data, String objectId, Operation operati
getBulkProcessor(index, type).addBulkRequest(objectId, build(data, objectId), routing, parent);
}
// UPDATE = DELETE + INSERT operation
if (operation == Operation.UPDATE) {
if (operation == Operation.UPDATE || operation == Operation.UPDATE_ROW) {
if (logger.isTraceEnabled()) {
logger.trace("Update operation - id: {} - contains attachment: {}", objectId, isAttachment);
}
deleteBulkRequest(objectId, index, type, routing, parent);
getBulkProcessor(index, type).addBulkRequest(objectId, build(data, objectId), routing, parent);
}
if (operation == Operation.DELETE) {
logger.info("Delete request [{}], [{}], [{}]", index, type, objectId);
logger.trace("Delete request [{}], [{}], [{}]", index, type, objectId);
deleteBulkRequest(objectId, index, type, routing, parent);
}
if (operation == Operation.DROP_COLLECTION) {
Expand All @@ -264,6 +273,9 @@ private void updateBulkRequest(DBObject data, String objectId, Operation operati
logger.info("Ignore drop collection request [{}], [{}]. The option has been disabled.", index, type);
}
}
if (logger.isTraceEnabled()) {
logger.trace("Completed updateBulkRequest");
}
}

/*
Expand Down Expand Up @@ -399,7 +411,7 @@ private XContentBuilder build(final DBObject data, final String objectId) throws

/**
* Map a DBObject for indexing
*
*
* @param base
* @param mapData
*/
Expand All @@ -423,7 +435,7 @@ private Map<String, Object> createObjectMap(DBObject base) {

/**
* Map a DBRef to a Map for indexing
*
*
* @param ref
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String MONGODB_ADMIN_DATABASE = "admin";
public final static String MONGODB_CONFIG_DATABASE = "config";
public final static String MONGODB_ID_FIELD = "_id";
public final static String MONGODB_OID_FIELD = "oid";
public final static String MONGODB_SEQ_FIELD = "seq";
public final static String MONGODB_IN_OPERATOR = "$in";
public final static String MONGODB_OR_OPERATOR = "$or";
public final static String MONGODB_AND_OPERATOR = "$and";
public final static String MONGODB_NATURAL_OPERATOR = "$natural";
public final static String OPLOG_COLLECTION = "oplog.rs";
public final static String OPLOG_REFS_COLLECTION = "oplog.refs";
public final static String OPLOG_NAMESPACE = "ns";
public final static String OPLOG_NAMESPACE_COMMAND = "$cmd";
public final static String OPLOG_ADMIN_COMMAND = "admin." + OPLOG_NAMESPACE_COMMAND;
Expand All @@ -110,6 +113,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String OPLOG_FROM_MIGRATE = "fromMigrate";
public static final String OPLOG_OPS = "ops";
public static final String OPLOG_CREATE_COMMAND = "create";
public static final String OPLOG_REF = "ref";
public final static String GRIDFS_FILES_SUFFIX = ".files";
public final static String GRIDFS_CHUNKS_SUFFIX = ".chunks";
public final static String INSERTION_ORDER_KEY = "$natural";
Expand Down Expand Up @@ -444,7 +448,7 @@ public static Timestamp<?> getLastTimestamp(Client client, MongoDBRiverDefinitio
/**
* Adds an index request operation to a bulk request, updating the last
* timestamp for a given namespace (ie: host:dbName.collectionName)
*
*
* @param bulk
*/
static void setLastTimestamp(final MongoDBRiverDefinition definition, final Timestamp<?> time, final BulkProcessor bulkProcessor) {
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/org/elasticsearch/river/mongodb/Operation.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package org.elasticsearch.river.mongodb;

public enum Operation {
INSERT(MongoDBRiver.OPLOG_INSERT_OPERATION), UPDATE(MongoDBRiver.OPLOG_UPDATE_OPERATION), DELETE(MongoDBRiver.OPLOG_DELETE_OPERATION), DROP_COLLECTION(
"dc"), DROP_DATABASE("dd"), COMMAND(MongoDBRiver.OPLOG_COMMAND_OPERATION), UNKNOWN(null);
INSERT(MongoDBRiver.OPLOG_INSERT_OPERATION),
UPDATE(MongoDBRiver.OPLOG_UPDATE_OPERATION),
UPDATE_ROW(MongoDBRiver.OPLOG_UPDATE_ROW_OPERATION),
DELETE(MongoDBRiver.OPLOG_DELETE_OPERATION),
DROP_COLLECTION("dc"),
DROP_DATABASE("dd"),
COMMAND(MongoDBRiver.OPLOG_COMMAND_OPERATION),
UNKNOWN(null);

private String value;

Expand All @@ -21,9 +27,6 @@ public static Operation fromString(String value) {
return operation;
}
}
if (MongoDBRiver.OPLOG_UPDATE_ROW_OPERATION.equalsIgnoreCase(value)) {
return Operation.UPDATE;
}
}
return Operation.UNKNOWN;
}
Expand Down
Loading