Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions docs/stream-filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ Msg = amqp10_msg:set_message_annotations(
amqp10_client:send_msg(Sender, Msg),
```
</TabItem>

<TabItem value="javascript" label="JavaScript">
```javascript
const message = createAmqpMessage({
body: "Hello World!",
annotations: { "x-stream-filter-value": "invoices" }, // set Bloom filter value
})
await publisher.publish(message)
```
</TabItem>
</Tabs>

A receiver must use a filter with descriptor `rabbitmq:stream-filter`.
Expand Down Expand Up @@ -317,6 +327,30 @@ after 5000 -> exit(missing_msg)
end,
```
</TabItem>

<TabItem value="javascript" label="JavaScript">
```javascript
const consumer = await connection.createConsumer({
stream: {
name: "some-stream",
offset: Offset.first(),
matchUnfiltered: true,
filterValues: ["invoices", "orders"], // This Bloom filter will be evaluated server-side per chunk (Stage 1).
},
messageHandler: (context, message) => {
// This filter will be evaluated client-side per message (Stage 3).
if (
message.message_annotations &&
["invoices", "orders"].includes(message.message_annotations["x-stream-filter-value"])
) {
// message processing
}
context.accept()
},
})
consumer.start()
```
</TabItem>
</Tabs>


Expand Down Expand Up @@ -542,6 +576,28 @@ Filter = #{<<"filter-name-1">> =>
```
</TabItem>

<TabItem value="javascript" label="JavaScript">
```javascript
const consumer = await connection.createConsumer({
stream: {
name: "my-queue",
offset: Offset.first(),
messagePropertiesFilter: {
subject: "&p:Order",
user_id: "John"
},
applicationPropertiesFilter: {
region: "emea",
},
},
messageHandler: (context, message) => {
// process the messages
},
})
consumer.start()
```
</TabItem>

</Tabs>

### SQL Filter Expressions
Expand Down Expand Up @@ -838,6 +894,23 @@ Filter = #{<<"sql-filter">> => #filter{descriptor = <<"amqp:sql-filter">>,
```
</TabItem>

<TabItem value="javascript" label="JavaScript">
```javascript
const consumer = await connection.createConsumer({
stream: {
name: "my-queue",
offset: Offset.first(),
sqlFilter: "properties.user_id = 'John' AND"
+ "properties.subject LIKE 'Order%' AND region = 'emea'"
},
messageHandler: (context, message) => {
// process the messages
},
})
consumer.start()
```
</TabItem>

</Tabs>

### Error Handling
Expand Down Expand Up @@ -1013,6 +1086,27 @@ Filter = #{%% This Bloom filter will be evaluated server-side per chunk at stage
```
</TabItem>

<TabItem value="javascript" label="JavaScript">
```javascript
const consumer = await connection.createConsumer({
stream: {
name: "my-queue",
offset: Offset.first(),
filterValues: ["order.created"], // This Bloom filter will be evaluated server-side per chunk (Stage 1).
sqlFilter: "p.subject = 'order.created' AND " +
"p.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(h.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)", // This complex SQL filter expression will be evaluted server-side
// per message at stage 2.
},
messageHandler: (context, message) => {
// message processing
},
})
consumer.start()
```
</TabItem>

</Tabs>

If `order.created` events represent only a small percentage of all events, RabbitMQ can filter the stream efficiently because only a small fraction of messages need to be parsed and evaluated in memory.
Expand Down