Skip to content

Commit e83f01a

Browse files
committed
Implement DevAddr / JoinEUI prefix filters.
This makes it possible to not only forward data of certain Gateway IDs to a server, but also only forward data in case it matches the configured DevAddr / JoinEUI filters.
1 parent 01e0ba7 commit e83f01a

14 files changed

+470
-20
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
"derive",
1818
] }
1919
serde = { version = "1.0", features = ["derive"] }
20+
serde_json = "1.0"
21+
base64 = "0.22"
2022
toml = { version = "0.9", default-features = false, features = [
2123
"std",
2224
"parse",

src/cmd/configfile.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,36 @@ pub fn run(config: &Configuration) {
4545
# # If not set, data of all gateways will be forwarded. If set, only data
4646
# # from gateways with a matching Gateway ID will be forwarded.
4747
# #
48-
# # Examplex:
48+
# # Example:
4949
# # * "0102030405060708/32": Exact match (all 32 bits of the filter must match)
5050
# # * "0102030400000000/16": All gateway IDs starting with "01020304" (filter on 16 most significant bits)
5151
# gateway_id_prefixes=[]
52-
{{#each multiplexer.servers}}
52+
#
53+
# # Filter configuration.
54+
# [multiplexer.server.filters]
55+
56+
# # DevAddr prefix filters.
57+
# #
58+
# # Example configuration:
59+
# # dev_addr_prefixes=["0000ff00/24"]
60+
# #
61+
# # The above filter means that the 24MSB of 0000ff00 will be used to
62+
# # filter DevAddrs. Uplinks with DevAddrs that do not match any of the
63+
# # configured filters will not be forwarded. Leaving this option empty
64+
# # disables filtering on DevAddr.
65+
# dev_addr_prefixes=[]
66+
67+
# # JoinEUI prefix filters.
68+
# #
69+
# # Example configuration:
70+
# # join_eui_prefixes=["0000ff0000000000/24"]
71+
# #
72+
# # The above filter means that the 24MSB of 0000ff0000000000 will be used
73+
# # to filter JoinEUIs. Uplinks with JoinEUIs that do not match any of the
74+
# # configured filters will not be forwarded. Leaving this option empty
75+
# # disables filtering on JoinEUI.
76+
# join_eui_prefixes=[]
77+
{{#each multiplexer.server}}
5378
[[multiplexer.server]]
5479
server="{{this.server}}"
5580
uplink_only={{this.uplink_only}}
@@ -59,6 +84,18 @@ pub fn run(config: &Configuration) {
5984
{{/each}}
6085
]
6186
87+
[multiplexer.server.filters]
88+
dev_addr_prefixes=[
89+
{{#each this.filters.dev_addr_prefixes}}
90+
"{{this}}",
91+
{{/each}}
92+
]
93+
94+
join_eui_prefixes=[
95+
{{#each this.filters.join_eui_prefixes}}
96+
"{{this}}",
97+
{{/each}}
98+
]
6299
{{/each}}
63100
64101

src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ pub struct Server {
6666
pub server: String,
6767
pub uplink_only: bool,
6868
pub gateway_id_prefixes: Vec<lrwn_filters::EuiPrefix>,
69+
pub filters: Filters,
70+
}
71+
72+
#[derive(Serialize, Deserialize, Default, Clone)]
73+
#[serde(default)]
74+
pub struct Filters {
75+
pub dev_addr_prefixes: Vec<lrwn_filters::DevAddrPrefix>,
76+
pub join_eui_prefixes: Vec<lrwn_filters::EuiPrefix>,
6977
}
7078

7179
#[derive(Default, Serialize, Deserialize, Clone)]

src/forwarder.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tracing::{Instrument, debug, error, info, trace, warn};
1010

1111
use crate::config;
1212
use crate::monitoring::{inc_server_udp_received_count, inc_server_udp_sent_count};
13-
use crate::packets::{GatewayId, PacketType, get_random_token};
13+
use crate::packets::{GatewayId, PacketType, PushData, get_random_token};
1414
use crate::traits::PrintFullError;
1515

1616
static SERVERS: OnceCell<RwLock<Vec<Server>>> = OnceCell::const_new();
@@ -19,6 +19,7 @@ struct Server {
1919
server: String,
2020
uplink_only: bool,
2121
gateway_id_prefixes: Vec<lrwn_filters::EuiPrefix>,
22+
filters: lrwn_filters::Filters,
2223
downlink_tx: UnboundedSender<(GatewayId, Vec<u8>)>,
2324
sockets: HashMap<GatewayId, ServerSocket>,
2425
}
@@ -106,6 +107,10 @@ pub async fn setup(
106107
server.server.clone(),
107108
server.uplink_only,
108109
server.gateway_id_prefixes.clone(),
110+
lrwn_filters::Filters {
111+
dev_addr_prefixes: server.filters.dev_addr_prefixes.clone(),
112+
join_eui_prefixes: server.filters.join_eui_prefixes.clone(),
113+
},
109114
downlink_tx.clone(),
110115
)
111116
.await?;
@@ -139,19 +144,31 @@ async fn handle_uplink_packet(gateway_id: GatewayId, data: &[u8]) -> Result<()>
139144
continue;
140145
}
141146

147+
let filters = server.filters.clone();
142148
let socket = server.get_server_socket(gateway_id).await?;
143149
socket.last_uplink = SystemTime::now();
144150

145151
let span = tracing::info_span!("", addr = %socket.socket.peer_addr().unwrap());
146152
let _enter = span.enter();
147153

148154
match packet_type {
149-
PacketType::PushData => {
150-
info!(packet_type = %packet_type, "Sending UDP packet");
151-
socket.push_data_token = Some(random_token);
152-
socket.socket.send(data).await.context("Send UDP packet")?;
153-
inc_server_udp_sent_count(&server.server, packet_type).await;
154-
}
155+
PacketType::PushData => match PushData::from_slice(data) {
156+
Ok(mut push_data) => {
157+
push_data.payload.filter_rxpk(&filters);
158+
159+
if push_data.payload.is_empty() {
160+
debug!("Nothing to send, UDP packet is empty or does not match filters");
161+
} else {
162+
info!(packet_type = %packet_type, "Sending UDP packet");
163+
socket.push_data_token = Some(random_token);
164+
socket.socket.send(data).await.context("Send UDP packet")?;
165+
inc_server_udp_sent_count(&server.server, packet_type).await;
166+
}
167+
}
168+
Err(e) => {
169+
error!("Decode PushData payload error: {}", e);
170+
}
171+
},
155172
PacketType::PullData => {
156173
info!(packet_type = %packet_type, "Sending UDP packet");
157174
socket.pull_data_token = Some(random_token);
@@ -278,6 +295,7 @@ async fn add_server(
278295
server: String,
279296
uplink_only: bool,
280297
gateway_id_prefixes: Vec<lrwn_filters::EuiPrefix>,
298+
filters: lrwn_filters::Filters,
281299
downlink_tx: UnboundedSender<(GatewayId, Vec<u8>)>,
282300
) -> Result<()> {
283301
info!(
@@ -296,6 +314,7 @@ async fn add_server(
296314
server,
297315
uplink_only,
298316
gateway_id_prefixes,
317+
filters,
299318
downlink_tx,
300319
sockets: HashMap::new(),
301320
});

src/packets.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::collections::HashMap;
12
use std::fmt;
23

34
use anyhow::{Result, anyhow};
5+
use serde::{Deserialize, Serialize};
46

57
#[derive(Clone, Copy, Debug)]
68
pub enum PacketType {
@@ -111,3 +113,95 @@ pub fn get_random_token(v: &[u8]) -> Result<u16> {
111113

112114
Ok(u16::from_be_bytes([v[1], v[2]]))
113115
}
116+
117+
pub struct PushData {
118+
pub protocol_version: u8,
119+
pub random_token: u16,
120+
pub gateway_id: [u8; 8],
121+
pub payload: PushDataPayload,
122+
}
123+
124+
impl PushData {
125+
pub fn from_slice(b: &[u8]) -> Result<Self> {
126+
if b.len() < 14 {
127+
return Err(anyhow!("At least 14 bytes are expected"));
128+
}
129+
130+
Ok(PushData {
131+
protocol_version: b[0],
132+
random_token: u16::from_be_bytes([b[1], b[2]]),
133+
gateway_id: {
134+
let mut gateway_id: [u8; 8] = [0; 8];
135+
gateway_id.copy_from_slice(&b[4..12]);
136+
gateway_id
137+
},
138+
payload: serde_json::from_slice(&b[12..])?,
139+
})
140+
}
141+
142+
pub fn to_bytes(&self) -> Vec<u8> {
143+
let mut b = vec![self.protocol_version];
144+
145+
b.append(&mut self.random_token.to_be_bytes().to_vec());
146+
b.push(0x00);
147+
b.append(&mut self.gateway_id.to_vec());
148+
149+
let mut j = serde_json::to_vec(&self.payload).unwrap();
150+
b.append(&mut j);
151+
152+
b
153+
}
154+
}
155+
156+
#[derive(Default, Serialize, Deserialize)]
157+
#[serde(default)]
158+
pub struct PushDataPayload {
159+
pub rxpk: Vec<RxPk>,
160+
161+
// Capture all the other fields.
162+
#[serde(flatten)]
163+
other: HashMap<String, serde_json::Value>,
164+
}
165+
166+
impl PushDataPayload {
167+
pub fn filter_rxpk(&mut self, filter: &lrwn_filters::Filters) {
168+
self.rxpk = self
169+
.rxpk
170+
.drain(..)
171+
.filter(|v| lrwn_filters::matches(&v.data, filter))
172+
.collect();
173+
}
174+
175+
pub fn is_empty(&self) -> bool {
176+
self.rxpk.is_empty() && self.other.is_empty()
177+
}
178+
}
179+
180+
#[derive(Default, Serialize, Deserialize)]
181+
#[serde(default)]
182+
pub struct RxPk {
183+
#[serde(with = "base64_codec")]
184+
pub data: Vec<u8>,
185+
186+
// Capture all the other fields.
187+
#[serde(flatten)]
188+
other: HashMap<String, serde_json::Value>,
189+
}
190+
191+
mod base64_codec {
192+
use base64::{Engine as _, engine::general_purpose};
193+
use serde::{Deserialize, Serialize};
194+
use serde::{Deserializer, Serializer};
195+
196+
pub fn serialize<S: Serializer>(v: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
197+
let base64 = general_purpose::STANDARD.encode(v);
198+
String::serialize(&base64, s)
199+
}
200+
201+
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
202+
let base64 = String::deserialize(d)?;
203+
general_purpose::STANDARD
204+
.decode(base64.as_bytes())
205+
.map_err(serde::de::Error::custom)
206+
}
207+
}

tests/test_filtered_server.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ async fn test() {
4646
let gw_sock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
4747
gw_sock.connect("localhost:1710").await.unwrap();
4848

49-
// Send PUSH_DATA.
49+
// Send PUSH_DATA (unconfirmed uplink with DevAddr 01020304).
5050
gw_sock
5151
.send(&[
52-
0x02, 0x01, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x7b, 0x7d,
52+
0x02, 0x01, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x7b, 0x22,
53+
0x72, 0x78, 0x70, 0x6b, 0x22, 0x3a, 0x5b, 0x7b, 0x22, 0x64, 0x61, 0x74, 0x61, 0x22,
54+
0x3a, 0x22, 0x51, 0x41, 0x51, 0x44, 0x41, 0x67, 0x45, 0x3d, 0x22, 0x7d, 0x5d, 0x7d,
5355
])
5456
.await
5557
.unwrap();
@@ -58,7 +60,9 @@ async fn test() {
5860
let size = server1_sock.recv(&mut buffer).await.unwrap();
5961
assert_eq!(
6062
&[
61-
0x02, 0x01, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x7b, 0x7d,
63+
0x02, 0x01, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x7b, 0x22,
64+
0x72, 0x78, 0x70, 0x6b, 0x22, 0x3a, 0x5b, 0x7b, 0x22, 0x64, 0x61, 0x74, 0x61, 0x22,
65+
0x3a, 0x22, 0x51, 0x41, 0x51, 0x44, 0x41, 0x67, 0x45, 0x3d, 0x22, 0x7d, 0x5d, 0x7d,
6266
],
6367
&buffer[..size]
6468
);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::str::FromStr;
2+
3+
use tokio::net::UdpSocket;
4+
use tracing_subscriber::prelude::*;
5+
6+
use chirpstack_packet_multiplexer::{config, forwarder, listener};
7+
8+
#[tokio::test]
9+
async fn test() {
10+
tracing_subscriber::registry()
11+
.with(tracing_subscriber::fmt::layer())
12+
.init();
13+
14+
let conf = config::Configuration {
15+
multiplexer: config::Multiplexer {
16+
bind: "0.0.0.0:1710".into(),
17+
servers: vec![config::Server {
18+
server: "localhost:1711".into(),
19+
filters: config::Filters {
20+
dev_addr_prefixes: vec![
21+
lrwn_filters::DevAddrPrefix::from_str("01000000/8").unwrap(),
22+
],
23+
..Default::default()
24+
},
25+
..Default::default()
26+
}],
27+
},
28+
..Default::default()
29+
};
30+
31+
let (downlink_tx, uplink_rx) = listener::setup(&conf.multiplexer.bind).await.unwrap();
32+
forwarder::setup(downlink_tx, uplink_rx, conf.multiplexer.servers.clone())
33+
.await
34+
.unwrap();
35+
let mut buffer: [u8; 65535] = [0; 65535];
36+
37+
// Server socket.
38+
let server_sock = UdpSocket::bind("0.0.0.0:1711").await.unwrap();
39+
40+
// Gateway socket.
41+
let gw_sock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
42+
gw_sock.connect("localhost:1710").await.unwrap();
43+
44+
// Send PUSH_DATA (unconfirmed uplink with DevAddr 01020304).
45+
gw_sock
46+
.send(&[
47+
0x02, 0x01, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x7b, 0x22,
48+
0x72, 0x78, 0x70, 0x6b, 0x22, 0x3a, 0x5b, 0x7b, 0x22, 0x64, 0x61, 0x74, 0x61, 0x22,
49+
0x3a, 0x22, 0x51, 0x41, 0x51, 0x44, 0x41, 0x67, 0x45, 0x3d, 0x22, 0x7d, 0x5d, 0x7d,
50+
])
51+
.await
52+
.unwrap();
53+
54+
// Expect PUSH_ACK.
55+
let size = gw_sock.recv(&mut buffer).await.unwrap();
56+
assert_eq!(&[0x02, 0x01, 0x02, 0x01], &buffer[..size]);
57+
58+
// Expect PUSH_DATA forwarded to server.
59+
let size = server_sock.recv(&mut buffer).await.unwrap();
60+
assert_eq!(
61+
&[
62+
0x02, 0x01, 0x02, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x7b, 0x22,
63+
0x72, 0x78, 0x70, 0x6b, 0x22, 0x3a, 0x5b, 0x7b, 0x22, 0x64, 0x61, 0x74, 0x61, 0x22,
64+
0x3a, 0x22, 0x51, 0x41, 0x51, 0x44, 0x41, 0x67, 0x45, 0x3d, 0x22, 0x7d, 0x5d, 0x7d,
65+
],
66+
&buffer[..size]
67+
);
68+
}

0 commit comments

Comments
 (0)