@@ -47,6 +47,7 @@ namespace mtconnect::pipeline {
4747 DATA_SET,
4848 TIMESTAMP,
4949 ASSET,
50+ VALUE_ERROR,
5051
5152 VALUE,
5253 KEY,
@@ -174,6 +175,60 @@ namespace mtconnect::pipeline {
174175 Forward m_forward;
175176 std::list<pair<DataItemPtr, entity::Properties>> m_queue;
176177 };
178+
179+ // / @brief consume value in case of error
180+ struct ErrorHandler : rj::BaseReaderHandler<rj::UTF8<>, ErrorHandler>
181+ {
182+ ErrorHandler (int depth = 0 ) : m_depth(depth) {}
183+
184+ bool Default ()
185+ {
186+ return true ;
187+ }
188+ bool Key (const Ch *str, rj::SizeType length, bool copy)
189+ {
190+ return true ;
191+ }
192+ bool StartObject ()
193+ {
194+ m_depth++;
195+ return true ;
196+ }
197+ bool EndObject (rj::SizeType memberCount)
198+ {
199+ m_depth--;
200+ return true ;
201+ }
202+ bool StartArray ()
203+ {
204+ m_depth++;
205+ return true ;
206+ }
207+ bool EndArray (rj::SizeType elementCount)
208+ {
209+ m_depth--;
210+ return true ;
211+ }
212+
213+ bool operator ()(rj::Reader &reader, rj::StringStream &buff)
214+ {
215+ LOG (warning) << " Consuming value due to error" ;
216+
217+ if (!reader.IterativeParseNext <rj::kParseNanAndInfFlag >(buff, *this ))
218+ return false ;
219+
220+ while (m_depth > 0 && !reader.IterativeParseComplete ())
221+ {
222+ // Read the key
223+ if (!reader.IterativeParseNext <rj::kParseNanAndInfFlag >(buff, *this ))
224+ return false ;
225+ }
226+
227+ return true ;
228+ }
229+
230+ int m_depth {0 };
231+ };
177232
178233 // / @brief SAX Parser handler for JSON Parsing
179234 struct DataSetHandler : rj::BaseReaderHandler<rj::UTF8<>, DataSetHandler>
@@ -410,6 +465,7 @@ namespace mtconnect::pipeline {
410465
411466 bool StartArray ()
412467 {
468+ m_depth++;
413469 if (m_dataItem->isTimeSeries () || m_dataItem->isThreeSpace ())
414470 {
415471 auto &value = m_props[" VALUE" ];
@@ -420,11 +476,13 @@ namespace mtconnect::pipeline {
420476 else
421477 {
422478 LOG (warning) << " Unexpected vector type for data item " << m_dataItem->getId ();
423- return false ;
479+ m_expectation = Expectation::VALUE_ERROR;
480+ return true ;
424481 }
425482 }
426483 bool EndArray (rj::SizeType elementCount)
427484 {
485+ m_depth--;
428486 if (m_expectation == Expectation::VECTOR)
429487 {
430488 m_vector = nullptr ;
@@ -434,7 +492,8 @@ namespace mtconnect::pipeline {
434492 else
435493 {
436494 LOG (warning) << " Unexpected vector type for data item " << m_dataItem->getId ();
437- return false ;
495+ m_expectation = Expectation::VALUE_ERROR;
496+ return true ;
438497 }
439498 }
440499
@@ -449,7 +508,8 @@ namespace mtconnect::pipeline {
449508 if (f == e)
450509 {
451510 LOG (warning) << " Unexpected key: " << sv << " for condition " << m_dataItem->getId ();
452- return false ;
511+ m_expectation = Expectation::VALUE_ERROR;
512+ return true ;
453513 }
454514 }
455515 else
@@ -470,7 +530,8 @@ namespace mtconnect::pipeline {
470530 else
471531 {
472532 LOG (warning) << " Unexpected key " << sv << " for data item " << m_dataItem->getId ();
473- return false ;
533+ m_expectation = Expectation::VALUE_ERROR;
534+ return true ;
474535 }
475536 m_key = sv;
476537 }
@@ -480,13 +541,15 @@ namespace mtconnect::pipeline {
480541
481542 bool StartObject ()
482543 {
544+ m_depth++;
483545 m_object = true ;
484546 m_expectation = Expectation::KEY;
485547 return true ;
486548 }
487549
488550 bool EndObject (rj::SizeType memberCount)
489551 {
552+ m_depth--;
490553 m_done = true ;
491554 return true ;
492555 }
@@ -509,7 +572,15 @@ namespace mtconnect::pipeline {
509572
510573 if (!m_done)
511574 {
512- if (m_expectation == Expectation::DATA_SET)
575+ if (m_expectation == Expectation::VALUE_ERROR)
576+ {
577+ ErrorHandler handler (m_depth);
578+ if (!handler (reader, buff))
579+ return false ;
580+ m_props.clear ();
581+ return true ;
582+ }
583+ else if (m_expectation == Expectation::DATA_SET)
513584 {
514585 auto &value = m_props[" VALUE" ];
515586 DataSet &set = value.emplace <DataSet>();
@@ -547,6 +618,7 @@ namespace mtconnect::pipeline {
547618 bool m_object {false };
548619 std::string m_key;
549620 Expectation m_expectation {Expectation::NONE};
621+ int m_depth{0 };
550622 };
551623
552624 struct TimestampHandler : rj::BaseReaderHandler<rj::UTF8<>, TimestampHandler>
@@ -690,7 +762,8 @@ namespace mtconnect::pipeline {
690762 bool Default ()
691763 {
692764 LOG (warning) << " Expecting a key" ;
693- return false ;
765+ m_expectation = Expectation::VALUE_ERROR;
766+ return true ;
694767 }
695768
696769 bool Key (const Ch *str, rj::SizeType length, bool copy)
@@ -789,6 +862,16 @@ namespace mtconnect::pipeline {
789862 m_expectation = Expectation::KEY;
790863 break ;
791864 }
865+
866+ case Expectation::VALUE_ERROR:
867+ {
868+ ErrorHandler handler;
869+ if (!handler (reader, buff))
870+ return false ;
871+
872+ m_expectation = Expectation::KEY;
873+ break ;
874+ }
792875
793876 case Expectation::KEY:
794877 case Expectation::NONE:
@@ -802,7 +885,8 @@ namespace mtconnect::pipeline {
802885 PropertiesHandler props (m_dataItem, m_props);
803886 if (!props (reader, buff))
804887 return false ;
805- m_context.send (m_dataItem, m_props);
888+ if (m_props.size () > 0 )
889+ m_context.send (m_dataItem, m_props);
806890 }
807891 else
808892 {
0 commit comments