Skip to content

Commit df36447

Browse files
committed
cassandra astra sink
1 parent 5b17c8b commit df36447

File tree

7 files changed

+630
-0
lines changed

7 files changed

+630
-0
lines changed

cassandra-astra/README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Pulsar Cassandra Astra sink
2+
3+
Pulsar Cassandra Astra sink receives JSON messages over Pulsar topics with the same schema and add a new row per Pulsar message in Datastax Astra's Cassandra database.
4+
5+
## Operations
6+
### Deployment
7+
Copy the nar file to ./pulsar/connectors directory
8+
9+
GET `admin/v2/functions/connectors` displays the nar is loaded successfully as
10+
```
11+
{"name":"astra-cassandra","description":"write data to cassandra astra","sinkClass":"com.kesque.pulsar.sink.cassandra.astra.AstraSink"}
12+
```
13+
14+
Create a sink from a preloaded nar file.
15+
```
16+
$ bin/pulsar-admin sinks create --sink-type "astra-cassandra" --inputs astra-input-topic --name astra-test --sink-config-file ./connectors/pulsar-astra-sink.yaml --processing-guarantees EFFECTIVELY_ONCE --subs-position Earliest --subs-name auniquename
17+
"Created successfully"
18+
19+
$ bin/pulsar-admin sinks list
20+
[
21+
"astra-test"
22+
]
23+
24+
$ bin/pulsar-admin sinks delete --name astra-test
25+
"Deleted successfully"
26+
```
27+
28+
### Sink configuration
29+
Astra sink configuration requires these parameters to generate auth token.
30+
```
31+
ASTRA_CLUSTER_ID=
32+
ASTRA_CLUSTER_REGION=
33+
ASTRA_DB_USERNAME=
34+
ASTRA_DB_PASSWORD=
35+
```
36+
Additionally, `keyspace` and `table name` are mandatory to add rows into Astra Cassandra database.
37+
38+
`table schema` is optional, however, the table schema has to be created on the table before any row can be added if the schema is not specified in the configuration.
39+
40+
These configuration must be specifed in [the configuration yaml](./config/pulsar-s3-io.yaml)
41+
42+
When set `logLevel: "debug"`, debug logs will be printed by the sink.
43+
44+
### Topic schema registry
45+
It is mandatory a schema is enforced over the input topics. The Sink would have fatal error to create parquet format when it receives messages with different schemas.
46+
47+
## Build
48+
The command to build a nar file.
49+
```
50+
$ cd cassandra-astra
51+
$ mvn clean install
52+
```
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
configs:
2+
username: ""
3+
password: ""
4+
keySpace: ""
5+
clusterId: ""
6+
clusterRegion: "us-east1"
7+
tableName: "products"
8+
tableSchema: "{\"name\":\"products\",\"ifNotExists\":true,\"columnDefinitions\": [ {\"name\":\"id\",\"typeDefinition\":\"uuid\",\"static\":false}, {\"name\":\"name\",\"typeDefinition\":\"text\",\"static\":false}, {\"name\":\"description\",\"typeDefinition\":\"text\",\"static\":false}, {\"name\":\"price\",\"typeDefinition\":\"decimal\",\"static\":false}, {\"name\":\"created\",\"typeDefinition\":\"timestamp\",\"static\":false}],\"primaryKey\": {\"partitionKey\":[\"id\"]},\"tableOptions\":{\"defaultTimeToLive\":0}}"
9+
logLevel: "debug"

cassandra-astra/pom.xml

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<groupId>com.kesque.pulsar</groupId>
5+
<artifactId>pulsar-io-astra-cassandra</artifactId>
6+
<packaging>jar</packaging>
7+
<version>0.0.1</version>
8+
<name>pulsar-io-astra-cassandra: :: cassandra</name>
9+
<url>http://maven.apache.org</url>
10+
<properties>
11+
<maven.compiler.source>1.8</maven.compiler.source>
12+
<maven.compiler.target>1.8</maven.compiler.target>
13+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
14+
<pulsar.version>2.5.0</pulsar.version>
15+
<hadoop.version>2.7.0</hadoop.version>
16+
<parquet.hadoop.version>1.10.0</parquet.hadoop.version>
17+
<avro.version>1.8.2</avro.version>
18+
<parquet.avro.version>1.10.0</parquet.avro.version>
19+
</properties>
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.apache.pulsar</groupId>
23+
<artifactId>pulsar-io-core</artifactId>
24+
<version>2.6.0</version>
25+
</dependency>
26+
27+
<dependency>
28+
<groupId>org.apache.pulsar</groupId>
29+
<artifactId>pulsar-functions-instance</artifactId>
30+
<version>2.6.0</version>
31+
<scope>provided</scope>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>com.fasterxml.jackson.core</groupId>
36+
<artifactId>jackson-core</artifactId>
37+
<version>2.9.6</version>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.kitesdk</groupId>
42+
<artifactId>kite-data-core</artifactId>
43+
<version>1.1.0</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>com.fasterxml.jackson.core</groupId>
48+
<artifactId>jackson-annotations</artifactId>
49+
<version>2.9.6</version>
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>com.fasterxml.jackson.core</groupId>
54+
<artifactId>jackson-databind</artifactId>
55+
<version>2.9.6</version>
56+
</dependency>
57+
58+
<dependency>
59+
<groupId>junit</groupId>
60+
<artifactId>junit</artifactId>
61+
<version>3.8.1</version>
62+
<scope>test</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.projectlombok</groupId>
66+
<artifactId>lombok</artifactId>
67+
<version>1.18.12</version>
68+
<scope>provided</scope>
69+
</dependency>
70+
<!-- dependency>
71+
<groupId>org.apache.pulsar</groupId>
72+
<artifactId>pulsar-client</artifactId>
73+
<version>${pulsar.version}</version>
74+
</dependency -->
75+
<dependency>
76+
<groupId>org.apache.pulsar</groupId>
77+
<artifactId>pulsar-broker-common</artifactId>
78+
<version>${pulsar.version}</version>
79+
<exclusions>
80+
<exclusion>
81+
<groupId>io.grpc</groupId>
82+
<artifactId>*</artifactId>
83+
</exclusion>
84+
</exclusions>
85+
</dependency>
86+
87+
<dependency>
88+
<groupId>org.apache.bookkeeper</groupId>
89+
<artifactId>stream-storage-server</artifactId>
90+
<version>4.9.1</version>
91+
<exclusions>
92+
<exclusion>
93+
<groupId>io.grpc</groupId>
94+
<artifactId>grpc-all</artifactId>
95+
</exclusion>
96+
</exclusions>
97+
</dependency>
98+
99+
<dependency>
100+
<groupId>com.google.guava</groupId>
101+
<artifactId>guava</artifactId>
102+
<version>21.0</version>
103+
</dependency>
104+
105+
<dependency>
106+
<groupId>com.datastax.oss</groupId>
107+
<artifactId>java-driver-core</artifactId>
108+
<version>4.6.0</version>
109+
</dependency>
110+
111+
<dependency>
112+
<groupId>com.konghq</groupId>
113+
<artifactId>unirest-java</artifactId>
114+
<version>3.7.04</version>
115+
</dependency>
116+
117+
<!-- dependency>
118+
<groupId>org.apache.hadoop</groupId>
119+
<artifactId>hadoop-client</artifactId>
120+
<version>${hadoop.version}</version>
121+
</dependency -->
122+
<dependency>
123+
<groupId>org.apache.avro</groupId>
124+
<artifactId>avro</artifactId>
125+
<version>${avro.version}</version>
126+
</dependency>
127+
128+
<dependency>
129+
<groupId>org.apache.hadoop</groupId>
130+
<artifactId>hadoop-common</artifactId>
131+
<version>${hadoop.version}</version>
132+
<exclusions>
133+
<exclusion>
134+
<groupId>log4j</groupId>
135+
<artifactId>log4j</artifactId>
136+
</exclusion>
137+
<exclusion>
138+
<groupId>org.slf4j</groupId>
139+
<artifactId>slf4j-log4j12</artifactId>
140+
</exclusion>
141+
<exclusion>
142+
<groupId>org.apache.avro</groupId>
143+
<artifactId>avro</artifactId>
144+
</exclusion>
145+
<exclusion>
146+
<groupId>com.google.guava</groupId>
147+
<artifactId>guava</artifactId>
148+
</exclusion>
149+
<exclusion>
150+
<groupId>org.apache.htrace</groupId>
151+
<artifactId>htrace-core</artifactId>
152+
</exclusion>
153+
<exclusion>
154+
<groupId>org.mortbay.jetty</groupId>
155+
<artifactId>jetty-util</artifactId>
156+
</exclusion>
157+
<exclusion>
158+
<groupId>commons-beanutils</groupId>
159+
<artifactId>commons-beanutils-core</artifactId>
160+
</exclusion>
161+
<exclusion>
162+
<groupId>org.codehaus.jackson</groupId>
163+
<artifactId>jackson-mapper-asl</artifactId>
164+
</exclusion>
165+
</exclusions>
166+
</dependency>
167+
</dependencies>
168+
169+
<build>
170+
<plugins>
171+
<plugin>
172+
<groupId>org.apache.nifi</groupId>
173+
<artifactId>nifi-nar-maven-plugin</artifactId>
174+
<extensions>true</extensions>
175+
<version>1.1.0</version>
176+
<executions>
177+
<execution>
178+
<id>default-nar</id>
179+
<phase>package</phase>
180+
<goals>
181+
<goal>nar</goal>
182+
</goals>
183+
</execution>
184+
</executions>
185+
</plugin>
186+
</plugins>
187+
</build>
188+
189+
<!-- repositories>
190+
<repository>
191+
<id>gitlab-maven</id>
192+
<url>https://gitlab.com/api/v4/projects/16426331/packages/maven</url>
193+
</repository>
194+
</repositories -->
195+
<distributionManagement>
196+
<repository>
197+
<id>github</id>
198+
<name>GitHub Apache Maven Packages</name>
199+
<url>https://maven.pkg.github.com/kafkaesque-io/pulsar-object-storage-io</url>
200+
</repository>
201+
</distributionManagement>
202+
</project>
203+
<!-- repositories>
204+
<repository>
205+
<id>mvnrepository</id>
206+
<name>mvnrepository</name>
207+
<url>https://repo.maven.apache.org/maven2</url>
208+
</repository>
209+
</repositories-->
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.kesque.pulsar.sink.cassandra.astra;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
5+
6+
import com.google.common.base.Preconditions;
7+
import com.google.common.collect.Lists;
8+
import com.google.common.net.HostAndPort;
9+
10+
import lombok.Data;
11+
import lombok.experimental.Accessors;
12+
import org.apache.commons.lang3.StringUtils;
13+
14+
import java.io.File;
15+
import java.io.Serializable;
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
public class AstraConfig implements Serializable {
21+
22+
private static final long serialVersionUID = 1L;
23+
24+
private String username = "";
25+
public String getUsername() {
26+
return this.username;
27+
}
28+
public void setUsername(String username) {
29+
this.username = username;
30+
}
31+
32+
private String password = "";
33+
public String getPassword() {
34+
return this.password;
35+
}
36+
public void setPassword(String password) {
37+
this.password = password;
38+
}
39+
40+
private String keySpace = "";
41+
public String getKeySpace() {
42+
return this.keySpace;
43+
}
44+
public void setKeySpace(String keySpace) {
45+
this.keySpace = keySpace;
46+
}
47+
48+
private String clusterRegion = "us-east1";
49+
public String getClusterRegion() {
50+
return this.clusterRegion;
51+
}
52+
public void setClusterRegion(String clusterRegion) {
53+
this.clusterRegion = clusterRegion;
54+
}
55+
56+
private String clusterId = "";
57+
public String getClusterId() {
58+
return this.clusterId;
59+
}
60+
public void setClusterId(String clusterId) {
61+
this.clusterId = clusterId;
62+
}
63+
64+
private String tableName = "";
65+
public String getTableName() {
66+
return this.tableName;
67+
}
68+
public void setTableName(String tableName) {
69+
this.tableName = tableName;
70+
}
71+
72+
private String tableSchema = "";
73+
public String getTableSchema() {
74+
return this.tableSchema;
75+
}
76+
public void setTableSchema(String tableSchema) {
77+
this.tableSchema = tableSchema;
78+
}
79+
80+
private boolean isDebug = false;
81+
public boolean debugLoglevel() {
82+
return isDebug;
83+
}
84+
85+
// currently only support debug level
86+
private String logLevel = "";
87+
public String getLogLevel() {
88+
return this.logLevel;
89+
}
90+
91+
public static AstraConfig load(String yamlFile) throws IOException {
92+
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
93+
return mapper.readValue(new File(yamlFile), AstraConfig.class);
94+
}
95+
96+
public static AstraConfig load(Map<String, Object> map) throws IOException {
97+
ObjectMapper mapper = new ObjectMapper();
98+
return mapper.readValue(new ObjectMapper().writeValueAsString(map), AstraConfig.class);
99+
}
100+
101+
}

0 commit comments

Comments
 (0)