diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 5b3f14102b..38e3135979 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -95,9 +95,11 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar if (bLen < 0) { throw util::Bug(ERR_LOC, "MergingHandler invalid blen=" + to_string(bLen) + " from " + _wName); } + util::InstanceCount ica(_tableName + "_Merge_flush_LDB_a"); switch (_state) { - case MsgState::HEADER_WAIT: + case MsgState::HEADER_WAIT: { + //&&&util::InstanceCount icl(_tableName + "_Merge_flush_LDB_l_header_wait"); _response->headerSize = static_cast((*bufPtr)[0]); if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) { std::string sErr = @@ -140,8 +142,10 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar _state = MsgState::RESULT_RECV; } } + } return true; case MsgState::RESULT_WAIT: { + //&&&util::InstanceCount icp(_tableName + "_Merge_flush_LDB_p_result_wait"); nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); auto jobQuery = getJobQuery().lock(); if (!_verifyResult(bufPtr, bLen)) { @@ -160,7 +164,9 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName); auto success = _merge(); + //&&&util::InstanceCount icpx(_tableName + "_Merge_flush_LDB_px"); _response.reset(new WorkerResponse()); + //&&&util::InstanceCount icpz(_tableName + "_Merge_flush_LDB_pz"); return success; } case MsgState::RESULT_RECV: @@ -169,6 +175,7 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar case MsgState::HEADER_ERR: [[fallthrough]]; case MsgState::RESULT_ERR: { + util::InstanceCount iczerror(_tableName + "_Merge_flush_LDB_zerror"); std::ostringstream eos; eos << "Unexpected message From:" << _wName << " flush state=" << getStateStr(_state) << " last=" << last; @@ -224,11 +231,14 @@ void MergingHandler::_initState() { } bool MergingHandler::_merge() { + util::InstanceCount ica(_tableName + "_Merge_merge_LDB_a"); if (auto job = getJobQuery().lock()) { if (_flushed) { throw util::Bug(ERR_LOC, "MergingRequester::_merge : already flushed"); } + //&&&util::InstanceCount icb(_tableName + "_Merge_merge_LDB_b"); bool success = _infileMerger->merge(_response); + //&&&util::InstanceCount icc(_tableName + "_Merge_merge_LDB_c"); if (!success) { LOGS(_log, LOG_LVL_WARN, "_merge() failed"); rproc::InfileMergerError const& err = _infileMerger->getError(); @@ -236,10 +246,11 @@ bool MergingHandler::_merge() { _state = MsgState::RESULT_ERR; } _response.reset(); - + //&&&util::InstanceCount icx(_tableName + "_Merge_merge_LDB_x"); return success; } LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL"); + //&&&util::InstanceCount icz(_tableName + "_Merge_merge_LDB_z"); return false; } diff --git a/src/qdisp/QueryRequest.cc b/src/qdisp/QueryRequest.cc index b8bfb6140e..bc74c5a9fc 100644 --- a/src/qdisp/QueryRequest.cc +++ b/src/qdisp/QueryRequest.cc @@ -79,6 +79,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { void action(util::CmdData* data) override { // If everything is ok, call GetResponseData to have XrdSsi ask the worker for the data. QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); + util::InstanceCount ica("QI=" + to_string(_qid) + ":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_a"); util::Timer tWaiting; util::Timer tTotal; PseudoFifo::Element::Ptr pseudoFifoElem; @@ -107,14 +108,22 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { pseudoFifoElem = _pseudoFifo->queueAndWait(); tWaiting.start(); + util::InstanceCount icb("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_b"); qr->GetResponseData(&buffer[0], buffer.size()); + util::InstanceCount icc("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_c"); } // Wait for XrdSsi to call ProcessResponseData with the data, - // which will notify this wait with a call to receivedProcessResponseDataParameters. + // which will notify this wait with a call to notifyDataSuccess. { LOGS(_log, LOG_LVL_TRACE, "GetResponseData called respC=" << _respCount); + util::InstanceCount icd("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_d"); std::unique_lock uLock(_mtx); + util::InstanceCount ice("QI=" + to_string(_qid) + ":" + to_string(_jobid) + + "_QReq_AfRDCmd_LDB_e"); // TODO: make timed wait, check for wedged, if weak pointers dead, log and give up. // The only purpose of the below being in a function is make this easier to find in gdb. _lockWaitQrA(uLock); @@ -137,6 +146,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // Actually process the data. // If more data needs to be sent, _processData will make a new AskForResponseDataCmd // object and queue it. + util::InstanceCount icf("QI=" + to_string(_qid) + ":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_f"); { auto jq = _jQuery.lock(); auto qr = _qRequest.lock(); @@ -149,6 +159,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // _processData will have created another AskForResponseDataCmd object if was needed. tTotal.stop(); } + util::InstanceCount icg("QI=" + to_string(_qid) + ":" + to_string(_jobid) + "_QReq_AfRDCmd_LDB_g"); _setState(State::DONE2); LOGS(_log, LOG_LVL_DEBUG, "Ask data is done wait=" << tWaiting.getElapsed() << " total=" << tTotal.getElapsed()); @@ -207,6 +218,148 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { bool _last = true; }; +// &&& Keep track of items which are being received but not yet finished +class QueryRequestStatus { +public: + using Ptr = shared_ptr; + static map queryReqStatMap; // &&& + static mutex reqStatMapMtx; // &&& + + static Ptr getPtr(string const& id_) { + Ptr ptr; + lock_guard lck(reqStatMapMtx); + auto iter = queryReqStatMap.find(id_); + if (iter == queryReqStatMap.end()) { + ptr = Ptr(new QueryRequestStatus(id_)); + } else { + ptr = iter->second; + } + return ptr; + } + + static void addProcessResp(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrProcessResp("addProcessResp"); + } + + static void addProcessRespData(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrProcessRespData("addProcessRespData"); + } + + static void addCancelled(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrCancelled("addCancelled"); + } + + static void addMarked(string const& id_) { + Ptr ptr = getPtr(id_); + ptr->incrMarked("addMarked"); + } + + static void addErr(string const& id_, string const& err, string const& note) { + Ptr ptr = getPtr(id_); + ptr->apendErr(err, note); + } + + static void addFinish(string const& id_, string const& note) { + Ptr ptr = getPtr(id_); + ptr->finito(note); + remove(id_); // &&& don't do this every time and remove old items that are finished?? + dumpMap(); + } + + static void dumpMap() { + stringstream os; + lock_guard lck(reqStatMapMtx); + os << "reqStatMap::"; + for (auto&& elem : queryReqStatMap) { + os << elem.second->dumpStr() << ";;"; + } + LOGS(_log, LOG_LVL_WARN, "&&& " << os.str()); + } + + static void remove(string const& id_) { + lock_guard lck(reqStatMapMtx); + queryReqStatMap.erase(id_); + } + + void incrProcessResp(string const& note) { + lock_guard lck(_mtx); + processRespCount++; + _dump(note); + } + + void incrProcessRespData(string const& note) { + lock_guard lck(_mtx); + processRespDataCount++; + _dump(note); + } + + void incrCancelled(string const& note) { + lock_guard lck(_mtx); + cancelledCount++; + _dump(note); + } + + void incrMarked(string const& note) { + lock_guard lck(_mtx); + cancelledCount++; + _dump(note); + } + + void apendErr(string const& err_, string const& note) { + lock_guard lck(_mtx); + if (err.length() < 200) { + err += err_; + } else { + err += "."; + } + _dump(note); + } + + void finito(string const& note) { + lock_guard lck(_mtx); + ++finished; + _dump(note); + } + + void dump() { + lock_guard lck(_mtx); + _dump(""); + } + + string dumpStr() { + lock_guard lck(_mtx); + return _dumpStr(""); + } + + string const id; + string err; + int processRespCount = 0; + int processRespDataCount = 0; + int cancelledCount = 0; + int markedCount = 0; + int finished = 0; + +private: + QueryRequestStatus(string const& id_) : id(id_) {} + + void _dump(string const& note) { LOGS(_log, LOG_LVL_WARN, _dumpStr("")); } + + string _dumpStr(string const& note) { + stringstream os; + os << id << " &&& " << note << " fin=" << finished << " markC=" << markedCount + << " RespC=" << processRespCount << " rData=" << processRespDataCount + << " cancelC=" << cancelledCount << " err=" << err; + return os.str(); + } + + mutex _mtx; +}; +map QueryRequestStatus::queryReqStatMap; // &&& +mutex QueryRequestStatus::reqStatMapMtx; // &&& + //////////////////////////////////////////////////////////////////////// // QueryRequest //////////////////////////////////////////////////////////////////////// @@ -231,7 +384,7 @@ QueryRequest::~QueryRequest() { } if (!_finishedCalled) { LOGS(_log, LOG_LVL_WARN, "~QueryRequest cleaning up calling Finished"); - Finished(true); + _qrFinished(true); } } @@ -258,6 +411,7 @@ char* QueryRequest::GetRequest(int& requestLength) { bool QueryRequest::ProcessResponse(XrdSsiErrInfo const& eInfo, XrdSsiRespInfo const& rInfo) { QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); LOGS(_log, LOG_LVL_DEBUG, "workerName=" << GetEndPoint() << " ProcessResponse"); + QueryRequestStatus::addProcessResp(_jobIdStr); //&&& string errorDesc = _jobIdStr + " "; if (isQueryCancelled()) { LOGS(_log, LOG_LVL_WARN, "QueryRequest::ProcessResponse job already cancelled"); @@ -303,9 +457,17 @@ bool QueryRequest::ProcessResponse(XrdSsiErrInfo const& eInfo, XrdSsiRespInfo co case XrdSsiRespInfo::isFile: // Local-only errorDesc += "Unexpected XrdSsiRespInfo.rType == isFile"; break; - case XrdSsiRespInfo::isStream: // All remote requests + case XrdSsiRespInfo::isStream: { // All remote requests jq->getStatus()->updateInfo(_jobIdStr, JobStatus::RESPONSE_READY, "SSI"); - return _importStream(jq); + //&&&return _importStream(jq); + bool importSuccess = _importStream(jq); + LOGS(_log, LOG_LVL_INFO, "&&& _importStream importSuccess=" << importSuccess); + if (!importSuccess) { + LOGS(_log, LOG_LVL_WARN, "ProcessResponse stream import failure."); + _errorFinish(); + } + return importSuccess; + } default: errorDesc += "Out of range XrdSsiRespInfo.rType"; } @@ -336,8 +498,10 @@ bool QueryRequest::_importStream(JobQuery::Ptr const& jq) { int nextBufSize = 0; bool last = false; int resultRows = 0; + util::InstanceCount ica(_jobIdStr + "_QReq_imStream_LDB_a"); bool flushOk = jq->getDescription()->respHandler()->flush(len, bufPtr, last, largeResult, nextBufSize, resultRows); + util::InstanceCount icb(_jobIdStr + "_QReq_imStream_LDB_b_last=" + to_string(last)); if (!flushOk) { LOGS(_log, LOG_LVL_ERROR, "_importStream not flushOk"); @@ -379,6 +543,7 @@ void QueryRequest::_queueAskForResponse(AskForResponseDataCmd::Ptr const& cmd, J /// Process an incoming error. bool QueryRequest::_importError(string const& msg, int code) { + QueryRequestStatus::addErr(_jobIdStr, msg, "_importError"); //&&& auto jq = _jobQuery; { lock_guard lock(_finishStatusMutex); @@ -408,6 +573,7 @@ void QueryRequest::ProcessResponseData(XrdSsiErrInfo const& eInfo, char* buff, i LOGS(_log, LOG_LVL_DEBUG, "ProcessResponseData with buflen=" << blen << " " << (last ? "(last)" : "(more)")); + QueryRequestStatus::addProcessRespData(_jobIdStr); //&&& if (_askForResponseDataCmd == nullptr) { LOGS(_log, LOG_LVL_ERROR, "ProcessResponseData called with invalid _askForResponseDataCmd!!!"); return; @@ -578,6 +744,7 @@ bool QueryRequest::cancel() { LOGS(_log, LOG_LVL_DEBUG, "QueryRequest::cancel already cancelled, ignoring"); return false; // Don't do anything if already cancelled. } + QueryRequestStatus::addCancelled(_jobIdStr); _cancelled = true; _retried = true; // Prevent retries. // Only call the following if the job is NOT already done. @@ -632,8 +799,22 @@ void QueryRequest::cleanup() { /// a local shared pointer for this QueryRequest and/or its owner JobQuery. /// See QueryRequest::cleanup() /// @return true if this QueryRequest object had the authority to make changes. + +atomic errorFinishFalseCount{0}; //&&& +atomic errorFinishTrueCount{0}; //&&& + bool QueryRequest::_errorFinish(bool stopTrying) { LOGS(_log, LOG_LVL_DEBUG, "_errorFinish() shouldCancel=" << stopTrying); + QueryRequestStatus::addErr(_jobIdStr, "_errorFinish " + to_string(stopTrying), + "_errorFinish " + to_string(stopTrying)); //&&& + if (stopTrying) { + ++errorFinishTrueCount; + } else { + ++errorFinishFalseCount; + } + LOGS(_log, LOG_LVL_WARN, + "&&&_errorFinish stopTrying=" << stopTrying << " tCount=" << errorFinishTrueCount + << " fCount=" << errorFinishFalseCount); auto jq = _jobQuery; { // Running _errorFinish more than once could cause errors. @@ -648,9 +829,10 @@ bool QueryRequest::_errorFinish(bool stopTrying) { } // Make the calls outside of the mutex lock. - LOGS(_log, LOG_LVL_DEBUG, "calling Finished(stopTrying=" << stopTrying << ")"); - bool ok = Finished(stopTrying); - _finishedCalled = true; + LOGS(_log, LOG_LVL_WARN, "calling Finished(stopTrying=" << stopTrying << ")"); + //&&&bool ok = &&&Finished(stopTrying); + bool ok = _qrFinished(true); // &&& + //&&&_finishedCalled = true; if (!ok) { LOGS(_log, LOG_LVL_ERROR, "QueryRequest::_errorFinish NOT ok"); } else { @@ -693,8 +875,8 @@ void QueryRequest::_finish() { _finishStatus = FINISHED; } - bool ok = Finished(); - _finishedCalled = true; + bool ok = _qrFinished(); + //&&&_finishedCalled = true; if (!ok) { LOGS(_log, LOG_LVL_ERROR, "QueryRequest::finish Finished() !ok "); } else { @@ -710,9 +892,20 @@ void QueryRequest::_callMarkComplete(bool success) { if (!_calledMarkComplete.exchange(true)) { auto jq = _jobQuery; if (jq != nullptr) jq->getMarkCompleteFunc()->operator()(success); + QueryRequestStatus::addMarked(_jobIdStr); } } +bool QueryRequest::_qrFinished(bool stopTrying) { + LOGS(_log, LOG_LVL_WARN, _jobIdStr << "_qrFinished A &&&stopTrying=" << stopTrying); + bool res = Finished(stopTrying); + _finishedCalled = true; + LOGS(_log, LOG_LVL_WARN, _jobIdStr << "_qrFinished B &&&stopTrying=" << stopTrying << " res=" << res); + QueryRequestStatus::addFinish(_jobIdStr, "_qrFinished stopTrying=" + to_string(stopTrying)); + + return res; +} + ostream& operator<<(ostream& os, QueryRequest const& qr) { os << "QueryRequest " << qr._jobIdStr; return os; diff --git a/src/qdisp/QueryRequest.h b/src/qdisp/QueryRequest.h index 9cefdb74ca..ef1a14625c 100644 --- a/src/qdisp/QueryRequest.h +++ b/src/qdisp/QueryRequest.h @@ -133,6 +133,8 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this const& response) if (!_queryIdStrSet) { _setQueryIdStr(QueryIdHelper::makeIdStr(response->result.queryid())); } + + util::InstanceCount ica(_getQueryIdStr() + "_InfMerge_LDB_a"); size_t resultSize = response->result.transmitsize(); LOGS(_log, LOG_LVL_TRACE, "Executing InfileMerger::merge(" diff --git a/src/util/InstanceCount.h b/src/util/InstanceCount.h index 9923b84b5b..877daddc90 100644 --- a/src/util/InstanceCount.h +++ b/src/util/InstanceCount.h @@ -5,15 +5,22 @@ // System headers #include +#include #include #include namespace lsst::qserv::util { /// This a utility class to track the number of instances of any class where it is a member. +/// Alternatively, it can be used to track functions or code blocks by giving the +/// InstanceCount a unique identifier in the block of code to track. +/// It can also be used to see how many threads are waiting on a mutex. +/// InstanceCount is a flexible debugging tool, but it is very noisy in the log. +/// Once a problem is solved, the local instances of InstanceCount should be removed. // class InstanceCount { public: + using Ptr = std::shared_ptr; InstanceCount(std::string const& className); InstanceCount(InstanceCount const& other); InstanceCount(InstanceCount&& origin); diff --git a/src/wbase/SendChannelShared.cc b/src/wbase/SendChannelShared.cc index 44de0c4bb6..fc57e23dbe 100644 --- a/src/wbase/SendChannelShared.cc +++ b/src/wbase/SendChannelShared.cc @@ -129,7 +129,7 @@ bool SendChannelShared::_addTransmit(bool cancelled, bool erred, bool lastIn, Tr bool reallyLast = _lastRecvd; string idStr(makeIdStr(qId, jId)); if (_icPtr == nullptr) { - _icPtr = std::make_shared(std::to_string(qId) + "_SCS_LDB"); + _icPtr = std::make_shared("QI=" + std::to_string(qId) + "_SCS_LDB"); } // If something bad already happened, just give up. @@ -160,7 +160,7 @@ util::TimerHistogram scsTransmitSend("scsTransmitSend", {0.01, 0.1, 1.0, 2.0, 5. bool SendChannelShared::_transmit(bool erred) { string idStr = "QID?"; - + util::InstanceCount::Ptr icPtrA; // Result data is transmitted in messages containing data and headers. // data - is the result data // header - contains information about the next chunk of result data, @@ -210,7 +210,11 @@ bool SendChannelShared::_transmit(bool erred) { // The first message needs to put its header data in metadata as there's // no previous message it could attach its header to. { + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_a"); //&&& lock_guard streamLock(_streamMutex); // Must keep meta and buffer together. + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_b"); //&&& if (_firstTransmit.exchange(false)) { // Put the header for the first message in metadata // _metaDataBuf must remain valid until Finished() is called. @@ -229,8 +233,12 @@ bool SendChannelShared::_transmit(bool erred) { { util::Timer sendTimer; sendTimer.start(); + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_c"); //&&& bool sent = _sendBuf(streamLock, streamBuf, reallyLast, "transmitLoop " + idStr + " " + seqStr, scsSeq); + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_d"); //&&& sendTimer.stop(); auto logMsgSend = scsTransmitSend.addTime(sendTimer.getElapsed(), idStr); LOGS(_log, LOG_LVL_INFO, logMsgSend); @@ -241,6 +249,8 @@ bool SendChannelShared::_transmit(bool erred) { } } } + icPtrA = std::make_shared(thisTransmit->getIdStr() + "_te_LDB_" + + to_string(reallyLast) + "_z"); //&&& // If that was the last message, break the loop. if (reallyLast) return true; } @@ -251,7 +261,9 @@ util::TimerHistogram transmitHisto("transmit Hist", {0.1, 1, 5, 10, 20, 40}); bool SendChannelShared::_sendBuf(lock_guard const& streamLock, xrdsvc::StreamBuffer::Ptr& streamBuf, bool last, string const& note, int scsSeq) { + util::InstanceCount ica(note + "_SCSStream_LDB_a"); bool sent = _sendChannel->sendStream(streamBuf, last, scsSeq); + util::InstanceCount icb(note + "_SCSStream_LDB_b"); if (!sent) { LOGS(_log, LOG_LVL_ERROR, "Failed to transmit " << note << "!"); return false; @@ -259,7 +271,9 @@ bool SendChannelShared::_sendBuf(lock_guard const& streamLock, xrdsvc::St util::Timer t; t.start(); LOGS(_log, LOG_LVL_INFO, "_sendbuf wait start " << note); + util::InstanceCount icc(note + "_SCSStream_LDB_c"); streamBuf->waitForDoneWithThis(); // Block until this buffer has been sent. + util::InstanceCount icd(note + "_SCSStream_LDB_d"); t.stop(); auto logMsg = transmitHisto.addTime(t.getElapsed(), note); LOGS(_log, LOG_LVL_DEBUG, logMsg); diff --git a/src/wbase/SendChannelShared.h b/src/wbase/SendChannelShared.h index 9981906c7c..0b9b8a640c 100644 --- a/src/wbase/SendChannelShared.h +++ b/src/wbase/SendChannelShared.h @@ -288,7 +288,7 @@ class SendChannelShared { std::shared_ptr _transmitData; ///< TransmitData object mutable std::mutex _tMtx; ///< protects _transmitData - std::shared_ptr _icPtr; ///< temporary for LockupDB + util::InstanceCount::Ptr _icPtr; ///< temporary for LockupDB }; } // namespace wbase diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 6591997d76..030c8a3f20 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -83,8 +83,9 @@ proto::Result* TransmitData::_createResult() { void TransmitData::attachNextHeader(TransmitData::Ptr const& nextTr, bool reallyLast, uint32_t seq, int scsSeq) { - _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast)); + _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast) + "_a"); lock_guard lock(_trMtx); + _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast) + "_b"); if (_result == nullptr) { throw util::Bug(ERR_LOC, _idStr + "_transmitLoop() had nullptr result!"); } @@ -105,6 +106,7 @@ void TransmitData::attachNextHeader(TransmitData::Ptr const& nextTr, bool really } // Append the next header to this data. _dataMsg += proto::ProtoHeaderWrap::wrap(nextHeaderString); + _icPtr = std::make_shared(_idStr + "_td_LDB_" + std::to_string(reallyLast) + "_c"); } string TransmitData::makeHeaderString(bool reallyLast, uint32_t seq, int scsSeq) { @@ -143,7 +145,7 @@ string TransmitData::getHeaderString(uint32_t seq, int scsSeq) { xrdsvc::StreamBuffer::Ptr TransmitData::getStreamBuffer() { lock_guard lock(_trMtx); // createWithMove invalidates _dataMsg - return xrdsvc::StreamBuffer::createWithMove(_dataMsg); + return xrdsvc::StreamBuffer::createWithMove(_dataMsg, getIdStr()); } void TransmitData::_buildHeader(bool largeResult) { diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index bff17fc96e..a7561af8a1 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -150,7 +150,7 @@ size_t QueryRunner::_getDesiredLimit() { util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40}); bool QueryRunner::runQuery() { - util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB + util::InstanceCount ic("QI=" + to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId()); LOGS(_log, LOG_LVL_INFO, "QueryRunner::runQuery() tid=" << _task->getIdStr() @@ -350,7 +350,7 @@ bool QueryRunner::_dispatchChannel() { // Pass all information on to the shared object to add on to // an existing message or build a new one as needed. - util::InstanceCount ica(to_string(_task->getQueryId()) + "_rqa_LDB"); // LockupDB + util::InstanceCount ica("QI=" + to_string(_task->getQueryId()) + "_rqa_LDB"); // LockupDB if (_task->getSendChannel()->buildAndTransmitResult(res, numFields, *_task, _largeResult, _multiError, _cancelled, readRowsOk)) { erred = true; @@ -371,14 +371,14 @@ bool QueryRunner::_dispatchChannel() { erred = true; } // IMPORTANT, do not leave this function before this check has been made. - util::InstanceCount icb(to_string(_task->getQueryId()) + "_rqb_LDB"); // LockupDB + util::InstanceCount icb("QI=" + to_string(_task->getQueryId()) + "_rqb_LDB"); // LockupDB if (needToFreeRes) { needToFreeRes = false; // All rows have been read out or there was an error. In // either case resources need to be freed. _mysqlConn->freeResult(); } - util::InstanceCount icc(to_string(_task->getQueryId()) + "_rqc_LDB"); // LockupDB + util::InstanceCount icc("QI=" + to_string(_task->getQueryId()) + "_rqc_LDB"); // LockupDB if (!readRowsOk) { // This means a there was a transmit error and there's no way to // send anything to the czar. However, there were mysql results diff --git a/src/wpublish/AddChunkGroupCommand.cc b/src/wpublish/AddChunkGroupCommand.cc index 3523027ce9..637322fa5b 100644 --- a/src/wpublish/AddChunkGroupCommand.cc +++ b/src/wpublish/AddChunkGroupCommand.cc @@ -69,7 +69,7 @@ void AddChunkGroupCommand::_reportError(proto::WorkerCommandChunkGroupR::Status _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); } void AddChunkGroupCommand::run() { @@ -122,7 +122,7 @@ void AddChunkGroupCommand::run() { } _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/ChunkListCommand.cc b/src/wpublish/ChunkListCommand.cc index a9502b0d50..3a01e4ce6a 100644 --- a/src/wpublish/ChunkListCommand.cc +++ b/src/wpublish/ChunkListCommand.cc @@ -76,7 +76,7 @@ void ChunkListCommand::_reportError(string const& message) { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); } void ChunkListCommand::run() { @@ -195,7 +195,7 @@ void ChunkListCommand::run() { } _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/GetChunkListCommand.cc b/src/wpublish/GetChunkListCommand.cc index ce6e550b8e..73954cd06c 100644 --- a/src/wpublish/GetChunkListCommand.cc +++ b/src/wpublish/GetChunkListCommand.cc @@ -71,7 +71,7 @@ void GetChunkListCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, "GetChunkListCommand::" << __func__ << " ** SENT **"); } diff --git a/src/wpublish/GetStatusCommand.cc b/src/wpublish/GetStatusCommand.cc index 257495f9f5..279059eb1f 100644 --- a/src/wpublish/GetStatusCommand.cc +++ b/src/wpublish/GetStatusCommand.cc @@ -60,7 +60,7 @@ void GetStatusCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, "GetStatusCommand::" << __func__ << " ** SENT **"); } diff --git a/src/wpublish/RemoveChunkGroupCommand.cc b/src/wpublish/RemoveChunkGroupCommand.cc index 5f54bf6d24..ae99e2da1d 100644 --- a/src/wpublish/RemoveChunkGroupCommand.cc +++ b/src/wpublish/RemoveChunkGroupCommand.cc @@ -73,7 +73,7 @@ void RemoveChunkGroupCommand::_reportError(proto::WorkerCommandChunkGroupR::Stat _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str); + auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str, "other"); _sendChannel->sendStream(streamBuffer, true); } @@ -149,7 +149,7 @@ void RemoveChunkGroupCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/SetChunkListCommand.cc b/src/wpublish/SetChunkListCommand.cc index 84df7b3f22..b2d06e9e99 100644 --- a/src/wpublish/SetChunkListCommand.cc +++ b/src/wpublish/SetChunkListCommand.cc @@ -94,7 +94,7 @@ void SetChunkListCommand::_reportError(proto::WorkerCommandSetChunkListR::Status _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str); + auto streamBuffer = xrdsvc::StreamBuffer::createWithMove(str, "other"); _sendChannel->sendStream(streamBuffer, true); } @@ -218,7 +218,7 @@ void SetChunkListCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, context << "** SENT **"); } diff --git a/src/wpublish/TestEchoCommand.cc b/src/wpublish/TestEchoCommand.cc index c73aad641b..a96be6c34f 100644 --- a/src/wpublish/TestEchoCommand.cc +++ b/src/wpublish/TestEchoCommand.cc @@ -61,7 +61,7 @@ void TestEchoCommand::run() { _frameBuf.serialize(reply); string str(_frameBuf.data(), _frameBuf.size()); - _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str, "other"), true); LOGS(_log, LOG_LVL_DEBUG, "TestEchoCommand::" << __func__ << " ** SENT **"); } diff --git a/src/xrdsvc/StreamBuffer.cc b/src/xrdsvc/StreamBuffer.cc index d13f0ff69a..67109ec586 100644 --- a/src/xrdsvc/StreamBuffer.cc +++ b/src/xrdsvc/StreamBuffer.cc @@ -65,18 +65,18 @@ double StreamBuffer::percentOfMaxTotalBytesUsed() { } // Factory function, because this should be able to delete itself when Recycle() is called. -StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input) { +StreamBuffer::Ptr StreamBuffer::createWithMove(std::string &input, string const &idStr) { unique_lock uLock(_createMtx); if (_totalBytes >= _maxTotalBytes) { LOGS(_log, LOG_LVL_WARN, "StreamBuffer at memory limit " << _totalBytes); } _createCv.wait(uLock, []() { return _totalBytes < _maxTotalBytes; }); - Ptr ptr(new StreamBuffer(input)); + Ptr ptr(new StreamBuffer(input, idStr)); ptr->_selfKeepAlive = ptr; return ptr; } -StreamBuffer::StreamBuffer(std::string &input) { +StreamBuffer::StreamBuffer(std::string &input, string const &idStr) : _idStr(idStr) { _dataStr = std::move(input); // TODO: try to make 'data' a const char* in xrootd code. // 'data' is not being changed after being passed, so hopefully not an issue. @@ -85,18 +85,21 @@ StreamBuffer::StreamBuffer(std::string &input) { next = 0; _totalBytes += _dataStr.size(); - LOGS(_log, LOG_LVL_DEBUG, "StreamBuffer::_totalBytes=" << _totalBytes << " thisSize=" << _dataStr.size()); + LOGS(_log, LOG_LVL_DEBUG, + _idStr << "StreamBuffer::_totalBytes=" << _totalBytes << " thisSize=" << _dataStr.size()); } StreamBuffer::~StreamBuffer() { _totalBytes -= _dataStr.size(); - LOGS(_log, LOG_LVL_DEBUG, "~StreamBuffer::_totalBytes=" << _totalBytes); + LOGS(_log, LOG_LVL_DEBUG, _idStr << "~StreamBuffer::_totalBytes=" << _totalBytes); } /// xrdssi calls this to recycle the buffer when finished. void StreamBuffer::Recycle() { { + util::InstanceCount ica(_idStr + "_streamBuf_LDB_a"); std::lock_guard lg(_mtx); + util::InstanceCount icb(_idStr + "_streamBuf_LDB_b"); _doneWithThis = true; } _cv.notify_all(); diff --git a/src/xrdsvc/StreamBuffer.h b/src/xrdsvc/StreamBuffer.h index 5ce3dd572a..9486bf64df 100644 --- a/src/xrdsvc/StreamBuffer.h +++ b/src/xrdsvc/StreamBuffer.h @@ -54,7 +54,7 @@ class StreamBuffer : public XrdSsiStream::Buffer { /// Factory function, because this should be able to delete itself when Recycle() is called. /// The constructor uses move to avoid copying the string. - static StreamBuffer::Ptr createWithMove(std::string &input); + static StreamBuffer::Ptr createWithMove(std::string &input, std::string const &idStr); /// Set the maximum number of bytes that can be used by all instances of this class. static void setMaxTotalBytes(int64_t maxBytes); @@ -81,9 +81,10 @@ class StreamBuffer : public XrdSsiStream::Buffer { private: /// This constructor will invalidate 'input'. - explicit StreamBuffer(std::string &input); + explicit StreamBuffer(std::string &input, std::string const &idStr); std::string _dataStr; + std::string const _idStr; std::mutex _mtx; std::condition_variable _cv; bool _doneWithThis = false;