1
1
package com .kesque .pulsar .sink .cassandra .astra ;
2
2
3
3
import java .io .IOException ;
4
- import java .nio .charset .StandardCharsets ;
5
- import java .nio .file .Paths ;
6
-
7
- import static java .nio .charset .StandardCharsets .UTF_8 ;
8
-
9
- import java .util .HashMap ;
10
4
import java .util .List ;
11
5
import java .util .Map ;
12
- import java .util .Optional ;
13
6
import java .util .UUID ;
14
7
import java .util .concurrent .Executors ;
15
8
import java .util .concurrent .ScheduledExecutorService ;
20
13
import org .apache .avro .SchemaParseException ;
21
14
import org .apache .avro .reflect .AvroSchema ;
22
15
import org .apache .commons .collections4 .CollectionUtils ;
16
+ import org .apache .commons .lang3 .StringUtils ;
23
17
import org .apache .logging .log4j .util .Strings ;
24
18
import org .apache .pulsar .client .api .Message ;
25
19
import org .apache .pulsar .client .api .Schema ;
39
33
40
34
import kong .unirest .HttpResponse ;
41
35
import kong .unirest .Unirest ;
42
- import kong .unirest .HttpResponse ;
43
36
import kong .unirest .JsonNode ;
44
- import kong .unirest .Unirest ;
45
37
import kong .unirest .json .JSONObject ;
46
38
47
39
/**
@@ -60,7 +52,6 @@ public class AstraSink implements Sink<byte[]> {
60
52
private static final Logger log = LoggerFactory .getLogger (AstraSink .class );
61
53
62
54
private AstraConfig astraConfig ;
63
- private String filePrefix = "" ;
64
55
65
56
private List <Record <byte []>> incomingList ;
66
57
private ScheduledExecutorService flushExecutor ;
@@ -81,15 +72,8 @@ public class AstraSink implements Sink<byte[]> {
81
72
*/
82
73
@ Override
83
74
public void write (Record <byte []> record ) throws Exception {
84
- synchronized (this ) {
85
- int len = record .getValue ().length ;
86
-
87
- Long ledgerId = getLedgerId (record .getRecordSequence ().get ());
88
- log .info ("ledgerID {} and value's length {}" , ledgerId , len );
89
- // Optional<Message<byte[]>> msgOption = record.getMessage(); //.get();
90
- // log.error("message option isPresent {}", msgOption.isPresent());
91
-
92
- }
75
+ //TODO: toAstra can be its own thread
76
+ toAstra (record );
93
77
}
94
78
95
79
@ Override
@@ -126,7 +110,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
126
110
127
111
String err = "clusterId, region, keyspace, table name, and user credentials must be specified." ;
128
112
log .error (err );
129
- throw new Exception (err );
113
+ throw new IllegalArgumentException (err );
130
114
}
131
115
this .baseURL = "https://" + astraConfig .getClusterId () + "-" + astraConfig .getClusterRegion ()
132
116
+ ".apps.astra.datastax.com/api/rest/v1/" ;
@@ -191,13 +175,13 @@ private boolean createTable(String schema) {
191
175
}
192
176
193
177
private void addRow (String rows , String table ) {
194
- String columns = "{ \" columns \" :[" + rows + "]}" ;
178
+ log . info ( "addRow {}" , rows ) ;
195
179
String url = this .baseURL +"keyspaces/" + astraConfig .getKeySpace () + "/tables/" + table + "/rows" ;
196
180
HttpResponse <String > response = Unirest .post (url )
197
181
.header ("x-cassandra-request-id" , genUUID ())
198
182
.header ("x-cassandra-token" , this .token )
199
183
.header ("Content-Type" , "application/json" )
200
- .body (columns )
184
+ .body (rows )
201
185
.asString ();
202
186
203
187
if (response .getStatus () != 201 ) {
@@ -206,6 +190,27 @@ private void addRow(String rows, String table) {
206
190
}
207
191
}
208
192
193
+ private String buildColumnData (Record <byte []> record ) {
194
+ JSONObject obj = new JSONObject (new String (record .getValue ()));
195
+ String objStr = "{\" columns\" :[" ;
196
+ for (String key : obj .keySet ()) {
197
+ objStr = objStr + "{\" name\" :\" " + key + "\" ,\" value\" :\" " + obj .getString (key ) + "\" }," ;
198
+ }
199
+
200
+ // remove the last comma
201
+ return StringUtils .substring (objStr , 0 , objStr .length () - 1 ) + "]}" ;
202
+ }
203
+
204
+ private void toAstra (Record <byte []> record ) {
205
+ try {
206
+ addRow (buildColumnData (record ), this .table );
207
+ } catch (Exception e ) {
208
+ log .error ("failed to send to astra " , e );
209
+ }
210
+
211
+ record .ack ();
212
+ }
213
+
209
214
private String genUUID () {
210
215
return UUID .randomUUID ().toString ();
211
216
}
0 commit comments