Skip to content

Commit e6f77f6

Browse files
committed
fix astra debug log
1 parent 0cabebc commit e6f77f6

File tree

3 files changed

+20
-20
lines changed

3 files changed

+20
-20
lines changed

cassandra-astra/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ ASTRA_DB_PASSWORD=
3535
```
3636
Additionally, `keyspace` and `table name` are mandatory to add rows into Astra Cassandra database.
3737

38-
`table schema` is optional, however, the table schema has to be created on the table before any row can be added if the schema is not specified in the configuration.
38+
`table schema` is optional. However, the table schema has to be created on the table before any row can be added if the schema is not specified in the configuration.
3939

40-
These configuration must be specifed in [the configuration yaml](./config/pulsar-s3-io.yaml)
40+
These configuration must be specifed in [the configuration yaml](./config/pulsar-astra-sink.yaml)
4141

4242
When set `logLevel: "debug"`, debug logs will be printed by the sink.
4343

cassandra-astra/src/main/java/com/kesque/pulsar/sink/cassandra/astra/AstraConfig.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,18 @@ public void setBatchSize(int batchSize) {
8585
this.batchSize = batchSize;
8686
}
8787

88-
private boolean isDebug = false;
89-
public boolean debugLoglevel() {
90-
return isDebug;
91-
}
92-
9388
// currently only support debug level
9489
private String logLevel = "";
9590
public String getLogLevel() {
9691
return this.logLevel;
9792
}
93+
public void setLogLevel(String loglevel) {
94+
this.logLevel = loglevel;
95+
}
96+
97+
public boolean debugLoglevel() {
98+
return this.logLevel.equalsIgnoreCase("debug");
99+
}
98100

99101
public static AstraConfig load(String yamlFile) throws IOException {
100102
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

cassandra-astra/src/main/java/com/kesque/pulsar/sink/cassandra/astra/AstraSink.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,16 @@ public class AstraSink implements Sink<byte[]> {
7272
*/
7373
@Override
7474
public void write(Record<byte[]> record) throws Exception {
75-
//TODO: toAstra can be its own thread
76-
toAstra(record);
75+
//TODO: this can be its own thread
76+
try {
77+
addRow(buildColumnData(record), this.table);
78+
record.ack();
79+
if (astraConfig.debugLoglevel()) {
80+
log.info("successfully add row");
81+
}
82+
} catch (Exception e) {
83+
log.error("failed to send to astra ", e);
84+
}
7785
}
7886

7987
@Override
@@ -194,23 +202,13 @@ private String buildColumnData(Record<byte[]> record) {
194202
JSONObject obj = new JSONObject(new String(record.getValue()));
195203
String objStr = "{\"columns\":[";
196204
for (String key : obj.keySet()) {
197-
objStr = objStr + "{\"name\":\"" + key + "\",\"value\":\"" + obj.getString(key) + "\"},";
205+
objStr = objStr + "{\"name\":\"" + key + "\",\"value\":\"" + obj.getString(key) + "\"},";
198206
}
199207

200208
// remove the last comma
201209
return StringUtils.substring(objStr, 0, objStr.length() - 1) + "]}";
202210
}
203211

204-
private void toAstra(Record<byte[]> record) {
205-
try {
206-
addRow(buildColumnData(record), this.table);
207-
} catch (Exception e) {
208-
log.error("failed to send to astra ", e);
209-
}
210-
211-
record.ack();
212-
}
213-
214212
private String genUUID() {
215213
return UUID.randomUUID().toString();
216214
}

0 commit comments

Comments
 (0)