@@ -120,13 +120,19 @@ public interface Sink {
120120 public long incWriteFailureCount ();
121121 public Map <String ,String > getWriteFailures ();
122122 public void updateWriteFailures (String regionName , String serverName );
123+ public long getReadSuccessCount ();
124+ public long incReadSuccessCount ();
125+ public long getWriteSuccessCount ();
126+ public long incWriteSuccessCount ();
123127 }
124128
125129 // Simple implementation of canary sink that allows to plot on
126130 // file or standard output timings or failures.
127131 public static class StdOutSink implements Sink {
128132 private AtomicLong readFailureCount = new AtomicLong (0 ),
129- writeFailureCount = new AtomicLong (0 );
133+ writeFailureCount = new AtomicLong (0 ),
134+ readSuccessCount = new AtomicLong (0 ),
135+ writeSuccessCount = new AtomicLong (0 );
130136
131137 private Map <String , String > readFailures = new ConcurrentHashMap <String , String >();
132138 private Map <String , String > writeFailures = new ConcurrentHashMap <String , String >();
@@ -170,6 +176,26 @@ public Map<String, String> getWriteFailures() {
170176 public void updateWriteFailures (String regionName , String serverName ) {
171177 writeFailures .put (regionName , serverName );
172178 }
179+
180+ @ Override
181+ public long getReadSuccessCount () {
182+ return readSuccessCount .get ();
183+ }
184+
185+ @ Override
186+ public long incReadSuccessCount () {
187+ return readSuccessCount .incrementAndGet ();
188+ }
189+
190+ @ Override
191+ public long getWriteSuccessCount () {
192+ return writeSuccessCount .get ();
193+ }
194+
195+ @ Override
196+ public long incWriteSuccessCount () {
197+ return writeSuccessCount .incrementAndGet ();
198+ }
173199 }
174200
175201 public static class RegionServerStdOutSink extends StdOutSink {
@@ -202,6 +228,7 @@ public static class RegionStdOutSink extends StdOutSink {
202228
203229 private Map <String , AtomicLong > perTableReadLatency = new HashMap <>();
204230 private AtomicLong writeLatency = new AtomicLong ();
231+ private Map <String , RegionTaskResult > regionMap = new ConcurrentHashMap <>();
205232
206233 public void publishReadFailure (ServerName serverName , HRegionInfo region , Exception e ) {
207234 incReadFailureCount ();
@@ -215,6 +242,10 @@ public void publishReadFailure(ServerName serverName, HRegionInfo region, HColum
215242 }
216243
217244 public void publishReadTiming (ServerName serverName , HRegionInfo region , HColumnDescriptor column , long msTime ) {
245+ incReadSuccessCount ();
246+ RegionTaskResult res = this .regionMap .get (region .getRegionNameAsString ());
247+ res .setReadSuccess ();
248+ res .setReadLatency (msTime );
218249 LOG .info (String .format ("read from region %s on regionserver %s column family %s in %dms" ,
219250 region .getRegionNameAsString (), serverName , column .getNameAsString (), msTime ));
220251 }
@@ -231,6 +262,10 @@ public void publishWriteFailure(ServerName serverName, HRegionInfo region, HColu
231262 }
232263
233264 public void publishWriteTiming (ServerName serverName , HRegionInfo region , HColumnDescriptor column , long msTime ) {
265+ incWriteSuccessCount ();
266+ RegionTaskResult res = this .regionMap .get (region .getRegionNameAsString ());
267+ res .setWriteSuccess ();
268+ res .setWriteLatency (msTime );
234269 LOG .info (String .format ("write to region %s on regionserver %s column family %s in %dms" ,
235270 region .getRegionNameAsString (), serverName , column .getNameAsString (), msTime ));
236271 }
@@ -252,6 +287,14 @@ public void initializeWriteLatency() {
252287 public AtomicLong getWriteLatency () {
253288 return this .writeLatency ;
254289 }
290+
291+ public Map <String , RegionTaskResult > getRegionMap () {
292+ return this .regionMap ;
293+ }
294+
295+ public int getTotalExpectedRegions () {
296+ return this .regionMap .size ();
297+ }
255298 }
256299
257300 static class ZookeeperTask implements Callable <Void > {
@@ -883,6 +926,96 @@ private void printUsageAndExit() {
883926 System .exit (USAGE_EXIT_CODE );
884927 }
885928
929+ /**
930+ * Canary region mode-specific data structure which stores information about each region
931+ * to be scanned
932+ */
933+ public static class RegionTaskResult {
934+ private HRegionInfo region ;
935+ private TableName tableName ;
936+ private ServerName serverName ;
937+ private AtomicLong readLatency = null ;
938+ private AtomicLong writeLatency = null ;
939+ private boolean readSuccess = false ;
940+ private boolean writeSuccess = false ;
941+
942+ public RegionTaskResult (HRegionInfo region , TableName tableName , ServerName serverName ) {
943+ this .region = region ;
944+ this .tableName = tableName ;
945+ this .serverName = serverName ;
946+ }
947+
948+ public HRegionInfo getRegionInfo () {
949+ return this .region ;
950+ }
951+
952+ public String getRegionNameAsString () {
953+ return this .region .getRegionNameAsString ();
954+ }
955+
956+ public TableName getTableName () {
957+ return this .tableName ;
958+ }
959+
960+ public String getTableNameAsString () {
961+ return this .tableName .getNameAsString ();
962+ }
963+
964+ public ServerName getServerName () {
965+ return this .serverName ;
966+ }
967+
968+ public String getServerNameAsString () {
969+ return this .serverName .getServerName ();
970+ }
971+
972+ public long getReadLatency () {
973+ if (this .readLatency == null ) {
974+ return -1 ;
975+ }
976+ return this .readLatency .get ();
977+ }
978+
979+ public void setReadLatency (long readLatency ) {
980+ if (this .readLatency != null ) {
981+ this .readLatency .set (readLatency );
982+ } else {
983+ this .readLatency = new AtomicLong (readLatency );
984+ }
985+ }
986+
987+ public long getWriteLatency () {
988+ if (this .writeLatency == null ) {
989+ return -1 ;
990+ }
991+ return this .writeLatency .get ();
992+ }
993+
994+ public void setWriteLatency (long writeLatency ) {
995+ if (this .writeLatency != null ) {
996+ this .writeLatency .set (writeLatency );
997+ } else {
998+ this .writeLatency = new AtomicLong (writeLatency );
999+ }
1000+ }
1001+
1002+ public boolean isReadSuccess () {
1003+ return this .readSuccess ;
1004+ }
1005+
1006+ public void setReadSuccess () {
1007+ this .readSuccess = true ;
1008+ }
1009+
1010+ public boolean isWriteSuccess () {
1011+ return this .writeSuccess ;
1012+ }
1013+
1014+ public void setWriteSuccess () {
1015+ this .writeSuccess = true ;
1016+ }
1017+ }
1018+
8861019 /**
8871020 * A Factory method for {@link Monitor}.
8881021 * Can be overridden by user.
@@ -1295,6 +1428,9 @@ private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
12951428 HRegionInfo region = location .getRegionInfo ();
12961429 tasks .add (new RegionTask (admin .getConnection (), region , rs , (RegionStdOutSink ) sink , taskType , rawScanEnabled ,
12971430 rwLatency ));
1431+ Map <String , RegionTaskResult > regionMap = ((RegionStdOutSink ) sink ).getRegionMap ();
1432+ regionMap .put (region .getRegionNameAsString (), new RegionTaskResult (region ,
1433+ region .getTable (), rs ));
12981434 }
12991435 } finally {
13001436 if (regionLocator != null ) {
0 commit comments