Skip to content

Commit fc3dafe

Browse files
committed
keep the last record for cumulative ack
1 parent 78b8d29 commit fc3dafe

File tree

6 files changed

+50
-15
lines changed

6 files changed

+50
-15
lines changed

s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ public void setSecretAccessKey(String secretKey) {
6565
this.secretAccessKey = secretKey;
6666
}
6767

68+
// support trigger types are ledger, time based, size based
69+
private String triggerType = "ledger";
70+
public String getTriggerType() {
71+
return this.triggerType;
72+
}
73+
public void setTriggerType(String triggerType) {
74+
this.triggerType = triggerType;
75+
}
76+
6877
@FieldDoc(
6978
required = false,
7079
defaultValue = "",

s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import java.io.IOException;
44
import java.nio.charset.StandardCharsets;
5+
import java.time.Duration;
6+
import java.time.Instant;
7+
58
import static java.nio.charset.StandardCharsets.UTF_8;
69

710
import java.util.HashMap;
@@ -66,12 +69,11 @@ public class AWSS3Sink implements Sink<byte[]> {
6669

6770
private ParquetRecordWriter recordWriter;
6871

69-
private String filename;
72+
private volatile String filename;
73+
74+
private long lastRecordEpoch = 0;
7075

71-
// debug
72-
private int groupCounter = 0;
73-
private int fileSuffix = 0;
74-
// end of debug
76+
private Duration timeTriggerDuration = Duration.ofHours(1);
7577

7678
/**
7779
* Write a message to Sink
@@ -82,18 +84,18 @@ public class AWSS3Sink implements Sink<byte[]> {
8284
@Override
8385
public void write(Record<byte[]> record) throws Exception {
8486
synchronized (this) {
85-
int len = record.getValue().length;
87+
//int len = record.getValue().length;
8688

89+
this.lastRecordEpoch = record.getEventTime().get();
8790
Long ledgerId = getLedgerId(record.getRecordSequence().get());
88-
LOG.info("ledgerID {} and value's length {}", ledgerId, len);
91+
LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch);
8992
// Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();
9093
// LOG.error("message option isPresent {}", msgOption.isPresent());
9194

9295
this.filename = getFilename(this.filePrefix, ledgerId);
9396
this.recordWriter.write(record, this.filename);
9497
}
9598

96-
//bytes to generic data ??
9799
}
98100

99101
@Override
@@ -117,7 +119,6 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
117119
for (String topicName : sinkContext.getInputTopics()){
118120
filePrefix = topicName + "-" + filePrefix;
119121
}
120-
System.out.println("filePrefix " + this.filePrefix);
121122
LOG.info("filePrefix is " + this.filePrefix);
122123

123124
incomingList = Lists.newArrayList();
@@ -146,4 +147,8 @@ private boolean isAvroSchema(SchemaData schemaData) {
146147
private static String getFilename(String prefix, Long ledger) {
147148
return prefix + Long.toString(ledger);
148149
}
150+
151+
private boolean isTimeTriggered() {
152+
return Util.isOver(Instant.ofEpochMilli(lastRecordEpoch), timeTriggerDuration);
153+
}
149154
}

s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package com.kesque.pulsar.sink.s3;
22

3+
import java.text.SimpleDateFormat;
4+
import java.time.Duration;
5+
import java.time.Instant;
6+
37
public class Util {
48

59
public static String ensureValidBucketName(String bucketName) {
@@ -15,4 +19,24 @@ public static String ensureValidBucketName(String bucketName) {
1519

1620
return formatted;
1721
}
22+
23+
public static String getHourlyTimestamp(long epoch) {
24+
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH");
25+
return format.format(epoch);
26+
}
27+
28+
public static String getMinuteTimestamp(long epoch) {
29+
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
30+
return format.format(epoch);
31+
}
32+
33+
/**
34+
* Check if the current time is over the duration limite since the start.
35+
* @param start
36+
* @param limit
37+
* @return
38+
*/
39+
public static boolean isOver(Instant start, Duration limit) {
40+
return Duration.between(start, Instant.now()).compareTo(limit) > 0;
41+
}
1842
}

s3/src/main/java/com/kesque/pulsar/sink/s3/format/parquet/ParquetRecordWriter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ParquetRecordWriter implements RecordWriter {
4343
private Configuration parquetWriterConfig;
4444
private Schema avroSchema;
4545
private volatile String currentFile = "";
46-
private volatile Record<byte[]> lastRecord; // kept for batch ack
46+
private volatile Record<byte[]> currentRecord; // kept for batch ack
4747

4848
// parallel writer size
4949
int WRITER_LIMIT = 4;
@@ -77,12 +77,13 @@ public void write(Record<byte[]> record, String file) {
7777

7878
GenericData.Record convertedRecord = (org.apache.avro.generic.GenericData.Record) JsonUtil.convertToAvro(GenericData.get(), datum, avroSchema);
7979
writeParquet(convertedRecord, file);
80-
this.lastRecord = record;
80+
this.currentRecord = record;
8181
}
8282

8383
private synchronized void writeParquet(GenericData.Record record, String file) {
8484
log.info("currentFile is {} file name is {}", this.currentFile, file);
8585
String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread
86+
Record<byte[]> lastRecord = this.currentRecord; // ditto save a copy
8687
if (Strings.isNotBlank(lastFile) && !file.equals(lastFile)) {
8788
uploaderExecutor.execute(() -> {
8889
ParquetWriter<GenericData.Record> writer = writerMap.get(lastFile);
@@ -108,9 +109,7 @@ private synchronized void writeParquet(GenericData.Record record, String file) {
108109
s3ParquetOutputFileMap.remove(lastFile);
109110
log.info("cumulative ack all pulsar messages and write to existing parquet writer, map size {}", writerMap.size());
110111
lastRecord.ack(); // depends on cumulative ack
111-
112112
});
113-
114113
}
115114
this.currentFile = file; // for the next write
116115

s3/src/main/java/com/kesque/pulsar/sink/s3/storage/S3ParquetOutputFile.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ public S3ParquetOutputFile(S3Storage storage, String filename) {
1818

1919
@Override
2020
public PositionOutputStream create(long blockSizeHint) throws IOException {
21-
System.out.println("S3ParquetOutputStream create PositionOutputStream");
2221
s3out = (S3ParquetOutputStream) storage.create(filename, true);
2322
return s3out;
2423
}

s3/src/main/java/com/kesque/pulsar/sink/s3/storage/S3Storage.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ public S3OutputStream create(String path, boolean overwrite) {
185185
throw new IllegalArgumentException("Path can not be empty!");
186186
}
187187

188-
System.out.println("S3OutputStream . create() ... ");
189188
return new S3ParquetOutputStream(path, this.conf, s3);
190189
}
191190
}

0 commit comments

Comments
 (0)