Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/czar/ActiveWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti
{
// Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a
// message to send to the worker.
jsWorkerReqPtr = _wqsData->serializeJson(maxLifetime);
jsWorkerReqPtr = _wqsData->toJson(maxLifetime);
}

// Always send the message as it's a way to inform the worker that this
Expand Down
10 changes: 8 additions & 2 deletions src/protojson/ScanTableInfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void ScanInfo::sortTablesSlowestFirst() {
std::sort(infoTables.begin(), infoTables.end(), func);
}

nlohmann::json ScanInfo::serializeJson() const {
nlohmann::json ScanInfo::toJson() const {
auto jsScanInfo = json({{"infoscanrating", scanRating}, {"infotables", json::array()}});

auto& jsInfoTables = jsScanInfo["infotables"];
Expand Down Expand Up @@ -127,6 +127,7 @@ ScanInfo::Ptr ScanInfo::createFromJson(nlohmann::json const& siJson) {
auto lockInMem = http::RequestBodyJSON::required<bool>(jsElem, "silockinmem");
iTbls.emplace_back(db, table, lockInMem, sRating);
}
siPtr->sortTablesSlowestFirst();

return siPtr;
}
Expand All @@ -137,8 +138,13 @@ std::ostream& operator<<(std::ostream& os, ScanTableInfo const& tbl) {
return os;
}

std::ostream& ScanInfo::dump(std::ostream& os) const {
os << "ScanInfo{speed=" << scanRating << " tables: " << util::printable(infoTables) << "}";
return os;
}

std::ostream& operator<<(std::ostream& os, ScanInfo const& info) {
os << "ScanInfo{speed=" << info.scanRating << " tables: " << util::printable(info.infoTables) << "}";
info.dump(os);
return os;
}

Expand Down
4 changes: 3 additions & 1 deletion src/protojson/ScanTableInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ class ScanInfo {
static Ptr createFromJson(nlohmann::json const& ujJson);

/// Return a json version of the contents of this class.
nlohmann::json serializeJson() const;
nlohmann::json toJson() const;

void sortTablesSlowestFirst();
int compareTables(ScanInfo const& rhs);

ScanTableInfo::ListOf infoTables;
int scanRating{Rating::FASTEST};

std::ostream& dump(std::ostream& os) const;
};

std::ostream& operator<<(std::ostream& os, ScanTableInfo const& tbl);
Expand Down
109 changes: 28 additions & 81 deletions src/protojson/UberJobMsg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,25 @@ UberJobMsg::UberJobMsg(unsigned int metaVersion, std::string const& replicationI
}
}

json UberJobMsg::serializeJson() const {
json UberJobMsg::toJson() const {
json ujmJson = {{"version", _metaVersion},
{"instance_id", _replicationInstanceId},
{"auth_key", _replicationAuthKey},
{"worker", _workerId},
{"queryid", _qId},
{"uberjobid", _ujId},
{"czarinfo", _czInfo->serializeJson()},
{"czarinfo", _czInfo->toJson()},
{"rowlimit", _rowLimit},
{"subqueries_map", _jobSubQueryTempMap->serializeJson()},
{"dbtables_map", _jobDbTablesMap->serializeJson()},
{"subqueries_map", _jobSubQueryTempMap->toJson()},
{"dbtables_map", _jobDbTablesMap->toJson()},
{"maxtablesizemb", _maxTableSizeMB},
{"scaninfo", _scanInfo->serializeJson()},
{"scaninfo", _scanInfo->toJson()},
{"scaninteractive", _scanInteractive},
{"jobs", json::array()}};

auto& jsJobs = ujmJson["jobs"];
for (auto const& jbMsg : *_jobMsgVect) {
jsJobs.emplace_back(jbMsg->serializeJson());
jsJobs.emplace_back(jbMsg->toJson());
}

LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " ujmJson=" << ujmJson);
Expand Down Expand Up @@ -138,7 +138,7 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) {
ujmPtr->_jobSubQueryTempMap = JobSubQueryTempMap::createFromJson(jsSubQueriesMap);

auto jsDbTablesMap = http::RequestBodyJSON::required<json>(ujmJson, "dbtables_map");
ujmPtr->_jobDbTablesMap = JobDbTablesMap::createFromJson(jsDbTablesMap);
ujmPtr->_jobDbTablesMap = JobDbTableMap::createFromJson(jsDbTablesMap);

for (auto const& jsUjJob : jsUjJobs) {
JobMsg::Ptr jobMsgPtr =
Expand All @@ -154,13 +154,13 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) {

JobMsg::Ptr JobMsg::create(std::shared_ptr<qdisp::JobQuery> const& jobPtr,
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
JobDbTablesMap::Ptr const& jobDbTablesMap) {
JobDbTableMap::Ptr const& jobDbTablesMap) {
auto jMsg = Ptr(new JobMsg(jobPtr, jobSubQueryTempMap, jobDbTablesMap));
return jMsg;
}

JobMsg::JobMsg(std::shared_ptr<qdisp::JobQuery> const& jobPtr,
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& jobDbTablesMap)
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTableMap::Ptr const& jobDbTablesMap)
: _jobSubQueryTempMap(jobSubQueryTempMap), _jobDbTablesMap(jobDbTablesMap) {
auto const descr = jobPtr->getDescription();
if (descr == nullptr) {
Expand All @@ -172,18 +172,11 @@ JobMsg::JobMsg(std::shared_ptr<qdisp::JobQuery> const& jobPtr,
_chunkQuerySpecDb = chunkQuerySpec->db;
_chunkId = chunkQuerySpec->chunkId;

// Add scan tables (TODO:UJ Verify this is the same for all jobs.)
for (auto const& sTbl : chunkQuerySpec->scanInfo->infoTables) {
int index = jobDbTablesMap->findDbTable(make_pair(sTbl.db, sTbl.table));
jobDbTablesMap->setScanRating(index, sTbl.scanRating, sTbl.lockInMemory);
_chunkScanTableIndexes.push_back(index);
}

// Add fragments
_jobFragments = JobFragment::createVect(*chunkQuerySpec, jobSubQueryTempMap, jobDbTablesMap);
}

nlohmann::json JobMsg::serializeJson() const {
nlohmann::json JobMsg::toJson() const {
auto jsJobMsg = nlohmann::json({{"jobId", _jobId},
{"attemptCount", _attemptCount},
{"querySpecDb", _chunkQuerySpecDb},
Expand All @@ -200,13 +193,13 @@ nlohmann::json JobMsg::serializeJson() const {

auto& jsqFrags = jsJobMsg["queryFragments"];
for (auto& jFrag : *_jobFragments) {
jsqFrags.emplace_back(jFrag->serializeJson());
jsqFrags.emplace_back(jFrag->toJson());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you already have a complete JSON object to be added to the collection then it should be push_back. Method emplace_back is used for constructing new objects during insert. Here is an example:

// When to use push_back
std::vector<std::string> v;
std::string s = "hello";
v.push_back(s);       // Copies 's' into the vector
v.push_back("world"); // Constructs a temporary string "world", then moves it into the vector

// When to use emplace_back
std::vector<std::string> v;
v.emplace_back("hello"); // Constructs a string "hello" directly in the vector
v.emplace_back(5, 'c');  // Constructs a string "ccccc" directly in the vector

Method std::vector::emplace_back is conceptually like std::make_shared. For example:

struct Point {
    Point(double x, double y, double z);
};

// One can construct a pointer like this:
auto ptr = std::shared_ptr<Point>(new Point(1, 2, 3));

// Or using:
auto ptr = std::make_shared<Point>(1, 2, 3);

The second method invokes the constructor of Point and passes 3 parameters to the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's all true, and I changed it to push_back, but as far as I know, using push_back is no more efficient than emplace_back in this case. After a little looking around, the it seems push_back may compile slightly faster than emplace_back. Otherwise, if there is a constructor that can build the object directly on container, emplace_back will be faster. So, there doesn't seem to be any real harm using emplace_back in this case, or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should construct a new json object directly in the jsqFrags json object thanks to RVO.
jsqFrags.emplace_back(jFrag->toJson());
while this may a new object and then move it.
jsqFrags.push_back(jFrag->toJson());

emplace_back should really be a bit faster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance-wise, and in this particular case, both methods should be the same. My comment was mostly about the semantic differences of the operations. I'm not really insisting on any changes here.

}

return jsJobMsg;
}

JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& jobDbTablesMap,
JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTableMap::Ptr const& jobDbTablesMap,
JobId jobId, int attemptCount, std::string const& chunkQuerySpecDb, int chunkId)
: _jobId(jobId),
_attemptCount(attemptCount),
Expand All @@ -217,7 +210,7 @@ JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap

JobMsg::Ptr JobMsg::createFromJson(nlohmann::json const& ujJson,
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
JobDbTablesMap::Ptr const& jobDbTablesMap) {
JobDbTableMap::Ptr const& jobDbTablesMap) {
JobId jobId = http::RequestBodyJSON::required<JobId>(ujJson, "jobId");
int attemptCount = http::RequestBodyJSON::required<int>(ujJson, "attemptCount");
string chunkQuerySpecDb = http::RequestBodyJSON::required<string>(ujJson, "querySpecDb");
Expand All @@ -231,11 +224,10 @@ JobMsg::Ptr JobMsg::createFromJson(nlohmann::json const& ujJson,
jMsgPtr->_chunkScanTableIndexes = jsChunkTblIndexes.get<std::vector<int>>();
jMsgPtr->_jobFragments =
JobFragment::createVectFromJson(jsQFrags, jMsgPtr->_jobSubQueryTempMap, jMsgPtr->_jobDbTablesMap);

return jMsgPtr;
}

json JobSubQueryTempMap::serializeJson() const {
json JobSubQueryTempMap::toJson() const {
// std::map<int, std::string> _qTemplateMap;
json jsSubQueryTemplateMap = {{"subquerytemplate_map", json::array()}};
auto& jsSqtMap = jsSubQueryTemplateMap["subquerytemplate_map"];
Expand Down Expand Up @@ -280,7 +272,7 @@ int JobSubQueryTempMap::findSubQueryTemp(string const& qTemp) {
return index;
}

int JobDbTablesMap::findDbTable(pair<string, string> const& dbTablePair) {
int JobDbTableMap::findDbTable(pair<string, string> const& dbTablePair) {
// The expected number of templates is expected to be small, less than 4,
// so this shouldn't be horribly expensive.
for (auto const& [key, dbTbl] : _dbTableMap) {
Expand All @@ -295,89 +287,44 @@ int JobDbTablesMap::findDbTable(pair<string, string> const& dbTablePair) {
return index;
}

json JobDbTablesMap::serializeJson() const {
json jsDbTablesMap = {{"dbtable_map", json::array()}, {"scanrating_map", json::array()}};

auto& jsDbTblMap = jsDbTablesMap["dbtable_map"];
json JobDbTableMap::toJson() const {
auto jsDbTblMap = json::array();
for (auto const& [key, valPair] : _dbTableMap) {
json jsDbTbl = {{"index", key}, {"db", valPair.first}, {"table", valPair.second}};
jsDbTblMap.push_back(jsDbTbl);
}

auto& jsScanRatingMap = jsDbTablesMap["scanrating_map"];
for (auto const& [key, valPair] : _scanRatingMap) {
json jsScanR = {{"index", key}, {"scanrating", valPair.first}, {"lockinmem", valPair.second}};
jsScanRatingMap.push_back(jsScanR);
}

LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " " << jsDbTablesMap);
return jsDbTablesMap;
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " " << jsDbTblMap);
return jsDbTblMap;
}

JobDbTablesMap::Ptr JobDbTablesMap::createFromJson(nlohmann::json const& ujJson) {
JobDbTableMap::Ptr JobDbTableMap::createFromJson(nlohmann::json const& ujJson) {
Ptr dbTablesMapPtr = create();
auto& dbTblMap = dbTablesMapPtr->_dbTableMap;
auto& scanRMap = dbTablesMapPtr->_scanRatingMap;

LOGS(_log, LOG_LVL_TRACE, "JobDbTablesMap::createFromJson " << ujJson);
LOGS(_log, LOG_LVL_TRACE, "JobDbTableMap::createFromJson " << ujJson);

json const& jsDbTbl = ujJson["dbtable_map"];
for (auto const& jsElem : jsDbTbl) {
for (auto const& jsElem : ujJson) {
int index = http::RequestBodyJSON::required<int>(jsElem, "index");
string db = http::RequestBodyJSON::required<string>(jsElem, "db");
string tbl = http::RequestBodyJSON::required<string>(jsElem, "table");
auto res = dbTblMap.insert(make_pair(index, make_pair(db, tbl)));
if (!res.second) {
throw invalid_argument(dbTablesMapPtr->cName(__func__) + " index=" + to_string(index) + "=" + db +
+"." + tbl + " index already found in " + to_string(jsDbTbl));
}
}

json const& jsScanR = ujJson["scanrating_map"];
for (auto const& jsElem : jsScanR) {
int index = http::RequestBodyJSON::required<int>(jsElem, "index");
int scanR = http::RequestBodyJSON::required<int>(jsElem, "scanrating");
bool lockInMem = http::RequestBodyJSON::required<bool>(jsElem, "lockinmem");
auto res = scanRMap.insert(make_pair(index, make_pair(scanR, lockInMem)));
if (!res.second) {
throw invalid_argument(dbTablesMapPtr->cName(__func__) + " index=" + to_string(index) + "=" +
to_string(scanR) + +", " + to_string(lockInMem) +
" index already found in " + to_string(jsDbTbl));
+"." + tbl + " index already found in " + to_string(ujJson));
}
}

return dbTablesMapPtr;
}

void JobDbTablesMap::setScanRating(int index, int scanRating, bool lockInMemory) {
auto iter = _scanRatingMap.find(index);
if (iter == _scanRatingMap.end()) {
_scanRatingMap[index] = make_pair(scanRating, lockInMemory);
} else {
auto& elem = *iter;
auto& pr = elem.second;
auto& [sRating, lInMem] = pr;
if (sRating != scanRating || lInMem != lockInMemory) {
auto [dbName, tblName] = getDbTable(index);
LOGS(_log, LOG_LVL_ERROR,
cName(__func__) << " unexpected change in scanRating for " << dbName << "." << tblName
<< " from " << sRating << " to " << scanRating << " lockInMemory from "
<< lInMem << " to " << lockInMemory);
if (scanRating > sRating) {
sRating = scanRating;
lInMem = lockInMemory;
}
}
}
}

JobFragment::JobFragment(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
JobDbTablesMap::Ptr const& jobDbTablesMap)
JobDbTableMap::Ptr const& jobDbTablesMap)
: _jobSubQueryTempMap(jobSubQueryTempMap), _jobDbTablesMap(jobDbTablesMap) {}

JobFragment::VectPtr JobFragment::createVect(qproc::ChunkQuerySpec const& chunkQuerySpec,
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
JobDbTablesMap::Ptr const& jobDbTablesMap) {
JobDbTableMap::Ptr const& jobDbTablesMap) {
VectPtr jFragments{new Vect()};
if (chunkQuerySpec.nextFragment.get()) {
qproc::ChunkQuerySpec const* sPtr = &chunkQuerySpec;
Expand All @@ -401,7 +348,7 @@ JobFragment::VectPtr JobFragment::createVect(qproc::ChunkQuerySpec const& chunkQ
void JobFragment::_addFragment(std::vector<Ptr>& jFragments, DbTableSet const& subChunkTables,
std::vector<int> const& subchunkIds, std::vector<std::string> const& queries,
JobSubQueryTempMap::Ptr const& subQueryTemplates,
JobDbTablesMap::Ptr const& dbTablesMap) {
JobDbTableMap::Ptr const& dbTablesMap) {
LOGS(_log, LOG_LVL_TRACE, "JobFragment::_addFragment start");
Ptr jFrag = Ptr(new JobFragment(subQueryTemplates, dbTablesMap));

Expand Down Expand Up @@ -448,7 +395,7 @@ string JobFragment::dump() const {
return os.str();
}

nlohmann::json JobFragment::serializeJson() const {
nlohmann::json JobFragment::toJson() const {
json jsFragment = {{"subquerytemplate_indexes", _jobSubQueryTempIndexes},
{"dbtables_indexes", _jobDbTablesIndexes},
{"subchunkids", _subchunkIds}};
Expand All @@ -459,7 +406,7 @@ nlohmann::json JobFragment::serializeJson() const {

JobFragment::VectPtr JobFragment::createVectFromJson(nlohmann::json const& jsFrags,
JobSubQueryTempMap::Ptr const& jobSubQueryTempMap,
JobDbTablesMap::Ptr const& dbTablesMap) {
JobDbTableMap::Ptr const& dbTablesMap) {
LOGS(_log, LOG_LVL_TRACE, "JobFragment::createVectFromJson " << jsFrags);

JobFragment::VectPtr jobFragments{new JobFragment::Vect()};
Expand Down
Loading