Skip to content

Commit 5b17c8b

Browse files
authored
Merge pull request #2 from kafkaesque-io/s3RolloverSchedule
s3 object time based rollover
2 parents 8f7d79d + 1c63621 commit 5b17c8b

File tree

7 files changed

+144
-46
lines changed

7 files changed

+144
-46
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,33 @@ $ bin/pulsar-admin sinks delete --name aws-s3-test
3535
"Deleted successfully"
3636
```
3737

38+
### Sink configuration
39+
AWS S3 configuration such as accessKeyId, secretAccessKey, region, and bucketname needs to be specifed in [the configuration yaml](./s3/config/pulsar-s3-io.yaml)
40+
41+
When set `logLevel: "debug"`, debug logs will be printed by the sink.
42+
43+
Pulsar messages under the same ledger ID are grouped under a single S3 Object file. S3 object file follows the naming convention prefix with the input topic with the ledger Id. All messages are under the same ledger Id are written this file.
44+
45+
Since the sink uses the latest Pulsar message's ledger ID to detect the ledger rollover, a time based S3 Object rollover is also required to write the last ledger's messages into S3 in the case the messages over an topic are permanently stopped. `s3ObjectRolloverMinutes` in the config must be greater than `managedLedgerMaxLedgerRolloverTimeMinutes` set up in the Pulsar's broker.conf.
46+
47+
Because of ledger Id is used to identify an S3 object, the sink currently only supports a single input topic.
48+
49+
How Pulsar managed the ledger is configurable. Here are the default settings (from broker.conf):
50+
```
51+
# Max number of entries to append to a ledger before triggering a rollover
52+
# A ledger rollover is triggered on these conditions
53+
# * Either the max rollover time has been reached
54+
# * or max entries have been written to the ledged and at least min-time
55+
# has passed
56+
managedLedgerMaxEntriesPerLedger=50000
57+
58+
# Minimum time between ledger rollover for a topic
59+
managedLedgerMinLedgerRolloverTimeMinutes=10
60+
61+
# Maximum time before forcing a ledger rollover for a topic
62+
managedLedgerMaxLedgerRolloverTimeMinutes=240
63+
```
64+
3865
### Topic schema registry
3966
It is mandatory a schema is enforced over the input topics. The Sink would have fatal error to create parquet format when it receives messages with different schemas.
4067

s3/config/pulsar-s3-io.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ configs:
44
awsregion: "us-east-2"
55
bucketName: "bucket-name"
66
partSize: 5242880
7+
s3ObjectRolloverMinutes: 10
8+
logLevel: "debug"

s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Config.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,33 @@ public void setSecretAccessKey(String secretKey) {
6565
this.secretAccessKey = secretKey;
6666
}
6767

68-
// support trigger types are ledger, time based, size based
69-
private String triggerType = "ledger";
70-
public String getTriggerType() {
71-
return this.triggerType;
68+
// a timer interval for s3 Object rollover in minutes
69+
private int s3ObjectRolloverMinutes = 10;
70+
public int getS3ObjectRolloverMinutes() {
71+
return this.s3ObjectRolloverMinutes;
72+
}
73+
public void setS3ObjectRolloverMinutes(int s3ObjectRolloverMinutes) {
74+
if (s3ObjectRolloverMinutes>0) {
75+
this.s3ObjectRolloverMinutes = s3ObjectRolloverMinutes;
76+
}
77+
}
78+
79+
private boolean isDebug = false;
80+
public boolean debugLoglevel() {
81+
return isDebug;
82+
}
83+
84+
// currently only support debug level
85+
private String logLevel = "";
86+
public String getLogLevel() {
87+
return this.logLevel;
7288
}
73-
public void setTriggerType(String triggerType) {
74-
this.triggerType = triggerType;
89+
/**
90+
* @param logLevel the logLevel to set
91+
*/
92+
public void setLogLevel(String logLevel) {
93+
this.logLevel = logLevel;
94+
this.isDebug = this.logLevel.equalsIgnoreCase("debug");
7595
}
7696

7797
@FieldDoc(

s3/src/main/java/com/kesque/pulsar/sink/s3/AWSS3Sink.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Optional;
1414
import java.util.concurrent.Executors;
1515
import java.util.concurrent.ScheduledExecutorService;
16+
import java.util.concurrent.TimeUnit;
1617

1718
import com.amazonaws.services.s3.AmazonS3;
1819
import com.google.common.collect.Lists;
@@ -40,9 +41,8 @@
4041
import org.slf4j.LoggerFactory;
4142

4243
/**
43-
* A Simple Redis sink, which stores the key/value records from Pulsar in redis.
44-
* Note that records from Pulsar with null keys or values will be ignored.
45-
* This class expects records from Pulsar to have a key and value that are stored as bytes or a string.
44+
* This is an AWS S3 sink that receives JSON message and store them
45+
* Apache Parquet format in AWS S3.
4646
*/
4747
@Connector(
4848
name = "aws-s3",
@@ -54,15 +54,13 @@ public class AWSS3Sink implements Sink<byte[]> {
5454

5555
private static final Logger LOG = LoggerFactory.getLogger(AWSS3Sink.class);
5656

57-
5857
private AWSS3Config s3Config;
5958
private String bucketName;
6059
private String filePrefix = "";
61-
60+
6261
private int fileSizeBytes = 10 * 1024 * 1024;
6362

64-
private List<Record<byte[]>> incomingList;
65-
private ScheduledExecutorService flushExecutor;
63+
private ScheduledExecutorService s3RolloverExecutor;
6664

6765
private SchemaInfo schemaInfo;
6866
private org.apache.avro.Schema avroSchema;
@@ -73,35 +71,36 @@ public class AWSS3Sink implements Sink<byte[]> {
7371

7472
private long lastRecordEpoch = 0;
7573

76-
private Duration timeTriggerDuration = Duration.ofHours(1);
74+
private long s3ObjectRolloverMinutes = 10;
75+
private long MILLIS_IN_MINUTE = 60 * 1000;
7776

7877
/**
79-
* Write a message to Sink
80-
* @param inputRecordContext Context of input record from the source
81-
* @param record record to write to sink
82-
* @throws Exception
83-
*/
78+
* Write a message to Sink
79+
*
80+
* @param inputRecordContext Context of input record from the source
81+
* @param record record to write to sink
82+
* @throws Exception
83+
*/
8484
@Override
8585
public void write(Record<byte[]> record) throws Exception {
8686
synchronized (this) {
87-
Optional<Long> eventTimeOptional = record.getEventTime();
88-
if (eventTimeOptional.isPresent()) {
89-
this.lastRecordEpoch = eventTimeOptional.get();
90-
}
87+
this.lastRecordEpoch = Util.getNowMilli();
9188
Long ledgerId = getLedgerId(record.getRecordSequence().get());
92-
LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch);
89+
if (this.s3Config.debugLoglevel()) {
90+
LOG.info("ledgerID {} event time {}", ledgerId, this.lastRecordEpoch);
91+
}
9392
// Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();
9493
// LOG.error("message option isPresent {}", msgOption.isPresent());
95-
94+
9695
this.filename = getFilename(this.filePrefix, ledgerId);
9796
this.recordWriter.write(record, this.filename);
9897
}
99-
10098
}
10199

102100
@Override
103101
public void close() throws IOException {
104102
LOG.info("s3 sink stopped...");
103+
s3RolloverExecutor.shutdown();
105104
}
106105

107106
/**
@@ -122,12 +121,13 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
122121
}
123122
LOG.info("filePrefix is " + this.filePrefix);
124123

125-
incomingList = Lists.newArrayList();
126-
127-
flushExecutor = Executors.newScheduledThreadPool(1);
128-
129124
S3Storage storage = new S3Storage(this.s3Config, "");
130125
this.recordWriter = RecordWriterProvider.createParquetRecordWriter(s3Config, storage);
126+
127+
this.s3ObjectRolloverMinutes = s3Config.getS3ObjectRolloverMinutes();
128+
LOG.info("s3 object rollover interval {} minutes", this.s3ObjectRolloverMinutes);
129+
s3RolloverExecutor = Executors.newScheduledThreadPool(1);
130+
s3RolloverExecutor.scheduleAtFixedRate(() -> triggerS3ObjectRollover(), 0, s3Config.getS3ObjectRolloverMinutes(), TimeUnit.MINUTES);
131131
}
132132

133133
public static long getLedgerId(long sequenceId) {
@@ -149,7 +149,13 @@ private static String getFilename(String prefix, Long ledger) {
149149
return prefix + Long.toString(ledger);
150150
}
151151

152-
private boolean isTimeTriggered() {
153-
return Util.isOver(Instant.ofEpochMilli(lastRecordEpoch), timeTriggerDuration);
152+
private void triggerS3ObjectRollover() {
153+
if (this.lastRecordEpoch == 0) {
154+
return;
155+
}
156+
157+
if (MILLIS_IN_MINUTE * s3ObjectRolloverMinutes < (Util.getNowMilli() - this.lastRecordEpoch)) {
158+
this.recordWriter.commit();
159+
}
154160
}
155161
}

s3/src/main/java/com/kesque/pulsar/sink/s3/Util.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,21 @@
33
import java.text.SimpleDateFormat;
44
import java.time.Duration;
55
import java.time.Instant;
6+
import java.util.Calendar;
67

78
public class Util {
89

9-
public static String ensureValidBucketName(String bucketName) {
10-
String formatted = bucketName.replaceAll("\\s+","_");
10+
public static String ensureValidBucketName(String bucketName) {
11+
String formatted = bucketName.replaceAll("\\s+", "_");
1112
int length = bucketName.length();
12-
if(length >= 62)
13+
if (length >= 62)
1314
length = 62;
14-
formatted = formatted.substring(0,length);
15-
formatted = formatted.replace(".","d");
15+
formatted = formatted.substring(0, length);
16+
formatted = formatted.replace(".", "d");
1617
formatted = formatted.toLowerCase();
17-
if(formatted.endsWith("-"))
18-
formatted = formatted.substring(0,length - 1);
19-
18+
if (formatted.endsWith("-"))
19+
formatted = formatted.substring(0, length - 1);
20+
2021
return formatted;
2122
}
2223

@@ -32,11 +33,19 @@ public static String getMinuteTimestamp(long epoch) {
3233

3334
/**
3435
* Check if the current time is over the duration limite since the start.
36+
*
3537
* @param start
3638
* @param limit
3739
* @return
3840
*/
3941
public static boolean isOver(Instant start, Duration limit) {
4042
return Duration.between(start, Instant.now()).compareTo(limit) > 0;
4143
}
44+
45+
/**
46+
* Get the current time in millieseconds.
47+
*/
48+
public static long getNowMilli() {
49+
return Calendar.getInstance().getTimeInMillis();
50+
}
4251
}

s3/src/main/java/com/kesque/pulsar/sink/s3/format/RecordWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ public interface RecordWriter extends Closeable {
2121
void close();
2222

2323
/**
24-
* Flush writer's data and commit the records in Kafka. Optionally, this operation might also
24+
* Flush writer's data and commit the records in Pulsar. Optionally, this operation might also
2525
* close the writer.
2626
*/
2727
void commit();
28-
2928
}

s3/src/main/java/com/kesque/pulsar/sink/s3/format/parquet/ParquetRecordWriter.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,19 @@ public void write(Record<byte[]> record, String file) {
7373
String convJson = new String(data); // StandardCharsets.UTF_8);
7474
JsonNode datum = JsonUtil.parse(convJson);
7575
this.avroSchema = JsonUtil.inferSchema(JsonUtil.parse(convJson), "schemafromjson");
76-
log.info(avroSchema.toString());
76+
if (this.config.debugLoglevel()) {
77+
log.info(avroSchema.toString());
78+
}
7779

7880
GenericData.Record convertedRecord = (org.apache.avro.generic.GenericData.Record) JsonUtil.convertToAvro(GenericData.get(), datum, avroSchema);
7981
writeParquet(convertedRecord, file);
8082
this.currentRecord = record;
8183
}
8284

8385
private synchronized void writeParquet(GenericData.Record record, String file) {
84-
log.info("currentFile is {} file name is {}", this.currentFile, file);
86+
if (this.config.debugLoglevel()) {
87+
log.info("currentFile is {} file name is {}", this.currentFile, file);
88+
}
8589
String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread
8690
Record<byte[]> lastRecord = this.currentRecord; // ditto save a copy
8791
if (Strings.isNotBlank(lastFile) && !file.equals(lastFile)) {
@@ -152,8 +156,39 @@ public void close() {
152156
}
153157

154158
@Override
155-
public void commit() {
159+
public synchronized void commit() {
156160
log.info("ParquetRecordWriter commit()");
161+
if (Strings.isBlank(this.currentFile)) {
162+
return;
163+
}
164+
// TODO: reduce the synchronized block by only protect these two variables
165+
String lastFile = this.currentFile; // save a copy because currentFile can be replace in the main thread
166+
Record<byte[]> lastRecord = this.currentRecord; // ditto save a copy
167+
168+
ParquetWriter<GenericData.Record> writer = writerMap.get(lastFile);
169+
if (writer == null) {
170+
log.error("fatal error - failed to find parquet writer to match file {}", lastFile);
171+
return;
172+
}
173+
S3ParquetOutputFile s3ParquetOutputFile = s3ParquetOutputFileMap.get(lastFile);
174+
if (s3ParquetOutputFile == null) {
175+
log.error("fatal error - failed to find s3ParquetOutputFile to match file {}", lastFile);
176+
return;
177+
}
178+
179+
// when a new file and parquet writer is required
180+
s3ParquetOutputFile.s3out.setCommit();
181+
try {
182+
writer.close();
183+
} catch (IOException e) {
184+
log.error("close parquet writer exception {}", e.getMessage());
185+
e.printStackTrace();
186+
}
187+
writerMap.remove(lastFile);
188+
s3ParquetOutputFileMap.remove(lastFile);
189+
log.info("cumulative ack all pulsar messages and write to existing parquet writer, map size {}", writerMap.size());
190+
lastRecord.ack(); // depends on cumulative ack
191+
192+
this.currentFile = "";
157193
}
158-
159194
}

0 commit comments

Comments
 (0)