Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,882 changes: 1,514 additions & 1,368 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,4 @@ anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madnin
# beacon = { path = "../proto/beacon" }

# [patch.'https://github.com/helium/proto']
# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "fix-carrier-id" }
# helium-proto = { git = "https://www.github.com/helium/proto.git", branch = "kurotych/carrier-enablement" }
3 changes: 3 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub const VERIFIED_SUBSCRIBER_MAPPING_ACTIVITY_REPORT: &str =
"verified_subscriber_mapping_activity_report";
pub const MOBILE_BAN_REPORT: &str = "mobile_ban_report";
pub const VERIFIED_MOBILE_BAN_REPORT: &str = "verified_mobile_ban_report";
pub const ENABLED_CARRIERS_REPORT: &str = "enabled_carriers_report";

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -248,6 +249,7 @@ pub enum FileType {
VerifiedSubscriberMappingActivityReport,
MobileBanReport,
VerifiedMobileBanReport,
EnabledCarriersInfoReport,
}

impl fmt::Display for FileType {
Expand Down Expand Up @@ -343,6 +345,7 @@ impl FileType {
}
Self::MobileBanReport => MOBILE_BAN_REPORT,
Self::VerifiedMobileBanReport => VERIFIED_MOBILE_BAN_REPORT,
Self::EnabledCarriersInfoReport => ENABLED_CARRIERS_REPORT,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,8 @@ impl_file_sink!(
FileType::VerifiedSubscriberMappingActivityReport.to_str(),
"verified_subscriber_mapping_activity_report"
);
impl_file_sink!(
poc_mobile::EnabledCarriersInfoReportV1,
FileType::EnabledCarriersInfoReport.to_str(),
"enabled_carriers_report"
);
1 change: 1 addition & 0 deletions file_store/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl_msg_verify!(poc_mobile::RadioUsageStatsReqV1, signature);
impl_msg_verify!(poc_mobile::UniqueConnectionsReqV1, signature);
impl_msg_verify!(poc_mobile::SubscriberMappingActivityReqV1, signature);
impl_msg_verify!(poc_mobile::BanReqV1, signature);
impl_msg_verify!(poc_mobile::EnabledCarriersInfoReqV1, signature);

#[cfg(test)]
mod test {
Expand Down
57 changes: 56 additions & 1 deletion ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use helium_proto::services::poc_mobile::{
self, BanIngestReportV1, BanReqV1, BanRespV1, CellHeartbeatReqV1, CellHeartbeatRespV1,
CoverageObjectIngestReportV1, CoverageObjectReqV1, CoverageObjectRespV1,
DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1,
DataTransferSessionRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
DataTransferSessionRespV1, EnabledCarriersInfoReportV1, EnabledCarriersInfoReqV1,
EnabledCarriersInfoRespV1, HexUsageStatsIngestReportV1, HexUsageStatsReqV1, HexUsageStatsResV1,
InvalidatedRadioThresholdIngestReportV1, InvalidatedRadioThresholdReportReqV1,
InvalidatedRadioThresholdReportRespV1, RadioThresholdIngestReportV1, RadioThresholdReportReqV1,
RadioThresholdReportRespV1, RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1,
Expand Down Expand Up @@ -58,6 +59,7 @@ pub struct GrpcServer<AV> {
unique_connections_sink: FileSinkClient<UniqueConnectionsIngestReportV1>,
subscriber_mapping_activity_sink: FileSinkClient<SubscriberMappingActivityIngestReportV1>,
ban_sink: FileSinkClient<BanIngestReportV1>,
enabled_carriers_sink: FileSinkClient<EnabledCarriersInfoReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand Down Expand Up @@ -108,6 +110,7 @@ where
unique_connections_sink: FileSinkClient<UniqueConnectionsIngestReportV1>,
subscriber_mapping_activity_sink: FileSinkClient<SubscriberMappingActivityIngestReportV1>,
ban_sink: FileSinkClient<BanIngestReportV1>,
enabled_carriers_sink: FileSinkClient<EnabledCarriersInfoReportV1>,
required_network: Network,
address: SocketAddr,
api_token: MetadataValue<Ascii>,
Expand All @@ -128,6 +131,7 @@ where
unique_connections_sink,
subscriber_mapping_activity_sink,
ban_sink,
enabled_carriers_sink,
required_network,
address,
api_token,
Expand Down Expand Up @@ -609,6 +613,46 @@ where
let timestamp_ms = received_timestamp_ms;
Ok(Response::new(BanRespV1 { timestamp_ms }))
}

async fn submit_enabled_carriers_info(
&self,
request: Request<EnabledCarriersInfoReqV1>,
) -> GrpcResult<EnabledCarriersInfoRespV1> {
const MESSAGE_EXPIRATION_TIME_MS: u64 = 10 * 60 * 1000; // 10 minutes
let event = request.into_inner();
let received_timestamp_ms = Utc::now().timestamp_millis() as u64;

if event.timestamp_ms > received_timestamp_ms {
return Err(Status::invalid_argument(
"The timestamp_ms field is invalid, it can't be greater than now()",
));
}

if received_timestamp_ms - event.timestamp_ms > MESSAGE_EXPIRATION_TIME_MS {
return Err(Status::invalid_argument(format!(
"The message is expired. It is generated more than {} seconds ago",
MESSAGE_EXPIRATION_TIME_MS / 1000
)));
}
custom_tracing::record_b58("pub_key", &event.hotspot_pubkey);

let (verified_pubkey, event) = self
.verify_public_key(&event.signer_pubkey)
.and_then(|public_key| self.verify_network(public_key))
.and_then(|public_key| self.verify_signature(public_key, event))?;
self.verify_known_carrier_key(verified_pubkey).await?;

let report = EnabledCarriersInfoReportV1 {
received_timestamp_ms,
report: Some(event),
};

_ = self.enabled_carriers_sink.write(report, []).await;

Ok(Response::new(EnabledCarriersInfoRespV1 {
timestamp_ms: received_timestamp_ms,
}))
}
}

fn is_data_transfer_for_cbrs(event: &DataTransferSessionReqV1) -> bool {
Expand Down Expand Up @@ -752,6 +796,15 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
)
.await?;

let (enabled_carriers_sink, enabled_carriers_server) = EnabledCarriersInfoReportV1::file_sink(
&settings.cache,
file_upload.clone(),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;

let (subscriber_mapping_activity_sink, subscriber_mapping_activity_server) =
SubscriberMappingActivityIngestReportV1::file_sink(
&settings.cache,
Expand Down Expand Up @@ -789,6 +842,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
unique_connections_sink,
subscriber_mapping_activity_sink,
ban_sink,
enabled_carriers_sink,
settings.network,
settings.listen_addr,
api_token,
Expand Down Expand Up @@ -817,6 +871,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(unique_connections_server)
.add_task(subscriber_mapping_activity_server)
.add_task(ban_server)
.add_task(enabled_carriers_server)
.add_task(grpc_server)
.build()
.start()
Expand Down
51 changes: 50 additions & 1 deletion ingest/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use helium_crypto::{KeyTag, Keypair, Network, PublicKeyBinary, Sign};
use helium_proto::services::poc_mobile::{
BanIngestReportV1, BanReqV1, BanRespV1, CarrierIdV2, CellHeartbeatReqV1, CellHeartbeatRespV1,
DataTransferEvent, DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1,
DataTransferSessionReqV1, DataTransferSessionRespV1, HexUsageStatsIngestReportV1,
DataTransferSessionReqV1, DataTransferSessionRespV1, EnabledCarriersInfoReportV1,
EnabledCarriersInfoReqV1, EnabledCarriersInfoRespV1, HexUsageStatsIngestReportV1,
HexUsageStatsReqV1, HexUsageStatsResV1, RadioUsageCarrierTransferInfo,
RadioUsageStatsIngestReportV1, RadioUsageStatsReqV1, RadioUsageStatsResV1,
UniqueConnectionsIngestReportV1, UniqueConnectionsReqV1, UniqueConnectionsRespV1,
Expand All @@ -24,6 +25,7 @@ use mobile_config::client::authorization_client::AuthorizationVerifier;
use mobile_config::client::ClientError;
use prost::Message;
use rand::rngs::OsRng;
use std::str::FromStr;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::{net::TcpListener, sync::mpsc::Receiver, time::timeout};
Expand Down Expand Up @@ -78,6 +80,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
let (subscriber_mapping_activity_tx, _subscriber_mapping_activity_rx) =
tokio::sync::mpsc::channel(10);
let (ban_tx, ban_rx) = tokio::sync::mpsc::channel(10);
let (enabled_carriers_tx, enabled_carriers_rx) = tokio::sync::mpsc::channel(10);

tokio::spawn(async move {
let grpc_server = GrpcServer::new(
Expand All @@ -95,6 +98,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
FileSinkClient::new(unique_connections_tx, "noop"),
FileSinkClient::new(subscriber_mapping_activity_tx, "noop"),
FileSinkClient::new(ban_tx, "noop"),
FileSinkClient::new(enabled_carriers_tx, "enabled_carriers_sink"),
Network::MainNet,
socket_addr,
api_token,
Expand All @@ -114,6 +118,7 @@ pub async fn setup_mobile() -> anyhow::Result<(TestClient, Trigger)> {
unique_connections_rx,
ban_rx,
data_transfer_rx,
enabled_carriers_rx,
)
.await;

Expand All @@ -134,6 +139,7 @@ pub struct TestClient {
Receiver<file_store::file_sink::Message<UniqueConnectionsIngestReportV1>>,
ban_file_sink_rx: Receiver<file_store::file_sink::Message<BanIngestReportV1>>,
data_transfer_rx: Receiver<file_store::file_sink::Message<DataTransferSessionIngestReportV1>>,
enabled_carriers_rx: Receiver<file_store::file_sink::Message<EnabledCarriersInfoReportV1>>,
}

impl TestClient {
Expand All @@ -158,6 +164,7 @@ impl TestClient {
data_transfer_rx: Receiver<
file_store::file_sink::Message<DataTransferSessionIngestReportV1>,
>,
enabled_carriers_rx: Receiver<file_store::file_sink::Message<EnabledCarriersInfoReportV1>>,
) -> TestClient {
let client = (|| PocMobileClient::connect(format!("http://{socket_addr}")))
.retry(&ExponentialBuilder::default())
Expand All @@ -174,6 +181,7 @@ impl TestClient {
unique_connections_file_sink_rx,
ban_file_sink_rx,
data_transfer_rx,
enabled_carriers_rx,
}
}

Expand Down Expand Up @@ -281,6 +289,20 @@ impl TestClient {
}
}

pub async fn enabled_carriers_info_recv(
mut self,
) -> anyhow::Result<EnabledCarriersInfoReportV1> {
match timeout(Duration::from_secs(2), self.enabled_carriers_rx.recv()).await {
Ok(Some(msg)) => match msg {
file_store::file_sink::Message::Commit(_) => bail!("got Commit"),
file_store::file_sink::Message::Rollback(_) => bail!("got Rollback"),
file_store::file_sink::Message::Data(_, data) => Ok(data),
},
Ok(None) => bail!("got none"),
Err(reason) => bail!("got error {reason}"),
}
}

pub async fn submit_ban(&mut self, hotspot_pubkey: Vec<u8>) -> anyhow::Result<BanRespV1> {
use helium_proto::services::poc_mobile::BanType;
let mut req = BanReqV1 {
Expand Down Expand Up @@ -505,6 +527,33 @@ impl TestClient {

Ok(res.into_inner())
}

pub async fn submit_enabled_carriers_info(
&mut self,
keypair: &Keypair,
hotspot_pubkey: &str,
enabled_carriers: Vec<CarrierIdV2>,
request_timestamp: u64,
) -> anyhow::Result<EnabledCarriersInfoRespV1> {
let hotspot_pubkey = PublicKeyBinary::from_str(hotspot_pubkey)?;

let mut carrier_req = EnabledCarriersInfoReqV1 {
hotspot_pubkey: hotspot_pubkey.into(),
enabled_carriers: enabled_carriers.into_iter().map(|v| v.into()).collect(),
firmware_version: "v11".to_string(),
timestamp_ms: request_timestamp,
signer_pubkey: keypair.public_key().into(),
signature: vec![],
};
carrier_req.signature = keypair.sign(&carrier_req.encode_to_vec()).expect("sign");
let mut request = Request::new(carrier_req);
let metadata = request.metadata_mut();

metadata.insert("authorization", self.authorization.clone());

let res = self.client.submit_enabled_carriers_info(request).await?;
Ok(res.into_inner())
}
}

pub fn generate_keypair() -> Keypair {
Expand Down
54 changes: 53 additions & 1 deletion ingest/tests/mobile_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,66 @@ use chrono::{TimeZone, Utc};
use common::generate_keypair;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
DataTransferRadioAccessTechnology, RadioUsageCarrierTransferInfo,
CarrierIdV2, DataTransferRadioAccessTechnology, RadioUsageCarrierTransferInfo,
};
use std::str::FromStr;

mod common;

const PUBKEY1: &str = "113HRxtzxFbFUjDEJJpyeMRZRtdAW38LAUnB5mshRwi6jt7uFbt";

#[tokio::test]
async fn submit_enabled_carriers_info_valid() -> anyhow::Result<()> {
let keypair = generate_keypair();
let (mut client, trigger) = common::setup_mobile().await?;
client
.submit_enabled_carriers_info(
&keypair,
PUBKEY1,
vec![CarrierIdV2::Carrier0],
Utc::now().timestamp_millis() as u64,
)
.await?;

let report = client.enabled_carriers_info_recv().await?;
let inner_report = report.report.expect("inner report");

assert_eq!(
PublicKeyBinary::from(inner_report.hotspot_pubkey).to_string(),
PUBKEY1
);
assert_eq!(
inner_report.enabled_carriers,
vec![CarrierIdV2::Carrier0 as i32]
);

trigger.trigger();
Ok(())
}

#[tokio::test]
async fn submit_enabled_carriers_info_expired() -> anyhow::Result<()> {
let keypair = generate_keypair();
let (mut client, trigger) = common::setup_mobile().await?;
let res = client
.submit_enabled_carriers_info(
&keypair,
PUBKEY1,
vec![CarrierIdV2::Carrier0],
Utc::now().timestamp_millis() as u64 - (610 * 1000), // 11 min
)
.await;

let binding = res.unwrap_err();
let err = binding.downcast_ref::<tonic::Status>().unwrap();
assert_eq!(
err.message(),
"The message is expired. It is generated more than 600 seconds ago"
);
trigger.trigger();
Ok(())
}

#[tokio::test]
async fn submit_ban() -> anyhow::Result<()> {
let (mut client, trigger) = common::setup_mobile().await?;
Expand Down
2 changes: 1 addition & 1 deletion iot_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ task-manager = { path = "../task_manager" }

[dev-dependencies]
rand = { workspace = true }
backon = "0"
backon = { version = "0", features = ["tokio-sleep"] }
1 change: 0 additions & 1 deletion iot_config/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const EPOCH_CHAIN_LOOKUP_METRIC: &str = concat!(env!("CARGO_PKG_NAME"), "-", "ep

pub fn initialize() {
metrics::gauge!(STREAM_METRIC).set(0.0);
metrics::counter!(STREAM_THROTTLE_COUNT_METRIC);
}

pub fn count_request(service: &'static str, rpc: &'static str) {
Expand Down
2 changes: 1 addition & 1 deletion poc_entropy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ blake3 = { workspace = true }
http = { workspace = true }
tonic = { workspace = true }
hyper = "0"
jsonrpsee = { version = "0", features = ["async-client", "http-client"] }
jsonrpsee = { version = "0.19", features = ["async-client", "http-client"] }
tower = { version = "0.4" }
triggered = { workspace = true }
futures = { workspace = true }
Expand Down