Skip to content

Commit b2802cc

Browse files
committed
cover empty eventime case in record
1 parent fc3dafe commit b2802cc

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ GET `admin/v2/functions/connectors` displays the nar is loaded successfully as
1212
```
1313

1414
```
15-
$ bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-s3-1.0.nar --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml
15+
$ bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-s3-1.0.nar --inputs aws-s3-input-topic --name aws-s3-test --sink-config-file ./connectors/pulsar-s3-io.yaml --processing-guarantees EFFECTIVELY_ONCE --subs-position Earliest
1616
"Created successfully"
1717
1818
$ bin/pulsar-admin sinks list

s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,10 @@ public class AWSS3Sink implements Sink<byte[]> {
8484
@Override
8585
public void write(Record<byte[]> record) throws Exception {
8686
synchronized (this) {
87-
//int len = record.getValue().length;
88-
89-
this.lastRecordEpoch = record.getEventTime().get();
87+
Optional<Long> eventTimeOptional = record.getEventTime();
88+
if (eventTimeOptional.isPresent()) {
89+
this.lastRecordEpoch = eventTimeOptional.get();
90+
}
9091
Long ledgerId = getLedgerId(record.getRecordSequence().get());
9192
LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch);
9293
// Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();

0 commit comments

Comments
 (0)