diff --git a/docs/config/dev.toml b/docs/config/dev.toml
index 1fb582a64..8582d666b 100644
--- a/docs/config/dev.toml
+++ b/docs/config/dev.toml
@@ -2,6 +2,7 @@
max_channels = 512
channels_find_limit = 200
+campaigns_find_limit = 200
wait_time = 500
# V4 Deprecated
diff --git a/docs/config/prod.toml b/docs/config/prod.toml
index b41656965..34f5b502d 100644
--- a/docs/config/prod.toml
+++ b/docs/config/prod.toml
@@ -2,6 +2,8 @@
max_channels = 512
channels_find_limit = 512
+campaigns_find_limit = 512
+
wait_time = 40000
# V4 Deprecated
diff --git a/primitives/src/config.rs b/primitives/src/config.rs
index df7bb8b21..bd76e0b44 100644
--- a/primitives/src/config.rs
+++ b/primitives/src/config.rs
@@ -25,6 +25,7 @@ pub struct TokenInfo {
pub struct Config {
pub max_channels: u32,
pub channels_find_limit: u32,
+ pub campaigns_find_limit: u32,
pub wait_time: u32,
#[deprecated = "redundant V4 value. No aggregates are needed for V5"]
pub aggr_throttle: u32,
diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs
index 62016c0c8..ef51bbbbc 100644
--- a/primitives/src/sentry.rs
+++ b/primitives/src/sentry.rs
@@ -351,6 +351,9 @@ pub mod campaign {
pub creator: Option
,
/// filters the campaigns containing a specific validator if provided
pub validator: Option,
+ /// filters the campaigns where the provided validator is a leader if true
+ /// if no validator is provided, but is_leader is true, it uses Auth to obtain a validator
+ pub is_leader: Option,
}
}
diff --git a/sentry/src/db.rs b/sentry/src/db.rs
index bf9dbf580..5e1ce60b7 100644
--- a/sentry/src/db.rs
+++ b/sentry/src/db.rs
@@ -1,7 +1,10 @@
use deadpool_postgres::{Manager, ManagerConfig, RecyclingMethod};
use redis::aio::MultiplexedConnection;
-use std::env;
-use tokio_postgres::NoTls;
+use std::{env, str::FromStr};
+use tokio_postgres::{
+ types::{accepts, FromSql, Type},
+ NoTls,
+};
use lazy_static::lazy_static;
@@ -53,6 +56,21 @@ lazy_static! {
};
}
+pub struct TotalCount(pub u64);
+impl<'a> FromSql<'a> for TotalCount {
+ fn from_sql(
+ ty: &Type,
+ raw: &'a [u8],
+ ) -> Result> {
+ let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
+
+ Ok(Self(u64::from_str(str_slice)?))
+ }
+
+ // Use a varchar or text, since otherwise `int8` fails deserialization
+ accepts!(VARCHAR, TEXT);
+}
+
pub async fn redis_connection(url: &str) -> Result {
let client = redis::Client::open(url)?;
diff --git a/sentry/src/db/campaign.rs b/sentry/src/db/campaign.rs
index 1074dc78e..96b9fdfd6 100644
--- a/sentry/src/db/campaign.rs
+++ b/sentry/src/db/campaign.rs
@@ -1,6 +1,10 @@
-use crate::db::{DbPool, PoolError};
-use primitives::{Campaign, CampaignId, ChannelId};
-use tokio_postgres::types::Json;
+use crate::db::{DbPool, PoolError, TotalCount};
+use chrono::{DateTime, Utc};
+use primitives::{
+ sentry::{campaign::CampaignListResponse, Pagination},
+ Address, Campaign, CampaignId, ChannelId, ValidatorId,
+};
+use tokio_postgres::types::{Json, ToSql};
pub use campaign_remaining::CampaignRemaining;
@@ -58,6 +62,98 @@ pub async fn fetch_campaign(
Ok(row.as_ref().map(Campaign::from))
}
+pub async fn list_campaigns(
+ pool: &DbPool,
+ skip: u64,
+ limit: u32,
+ creator: Option,
+ validator: Option,
+ is_leader: Option,
+ active_to_ge: &DateTime,
+) -> Result {
+ let client = pool.get().await?;
+
+ let (where_clauses, params) =
+ campaign_list_query_params(&creator, &validator, is_leader, active_to_ge);
+ let total_count_params = (where_clauses.clone(), params.clone());
+
+ // To understand why we use Order by, see Postgres Documentation: https://www.postgresql.org/docs/8.1/queries-limit.html
+ let statement = format!("SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM campaigns INNER JOIN channels ON campaigns.channel_id=channels.id WHERE {} ORDER BY campaigns.created ASC LIMIT {} OFFSET {}", where_clauses.join(" AND "), limit, skip);
+ let stmt = client.prepare(&statement).await?;
+ let rows = client.query(&stmt, params.as_slice()).await?;
+ let campaigns = rows.iter().map(Campaign::from).collect();
+
+ let total_count =
+ list_campaigns_total_count(pool, (&total_count_params.0, total_count_params.1)).await?;
+
+ // fast ceil for total_pages
+ let total_pages = if total_count == 0 {
+ 1
+ } else {
+ 1 + ((total_count - 1) / limit as u64)
+ };
+
+ let pagination = Pagination {
+ total_pages,
+ total: total_pages,
+ page: skip / limit as u64,
+ };
+
+ Ok(CampaignListResponse {
+ pagination,
+ campaigns,
+ })
+}
+
+fn campaign_list_query_params<'a>(
+ creator: &'a Option,
+ validator: &'a Option,
+ is_leader: Option,
+ active_to_ge: &'a DateTime,
+) -> (Vec, Vec<&'a (dyn ToSql + Sync)>) {
+ let mut where_clauses = vec!["active_to >= $1".to_string()];
+ let mut params: Vec<&(dyn ToSql + Sync)> = vec![active_to_ge];
+
+ if let Some(creator) = creator {
+ where_clauses.push(format!("creator = ${}", params.len() + 1));
+ params.push(creator);
+ }
+
+ // if clause for is_leader is true, the other clause is also always true
+ match (validator, is_leader) {
+ (Some(validator), Some(true)) => {
+ where_clauses.push(format!("channels.leader = ${}", params.len() + 1));
+ params.push(validator);
+ }
+ (Some(validator), _) => {
+ where_clauses.push(format!(
+ "(channels.leader = ${x} OR channels.follower = ${x})",
+ x = params.len() + 1,
+ ));
+ params.push(validator);
+ }
+ _ => (),
+ }
+
+ (where_clauses, params)
+}
+
+async fn list_campaigns_total_count<'a>(
+ pool: &DbPool,
+ (where_clauses, params): (&'a [String], Vec<&'a (dyn ToSql + Sync)>),
+) -> Result {
+ let client = pool.get().await?;
+
+ let statement = format!(
+ "SELECT COUNT(campaigns.id)::varchar FROM campaigns INNER JOIN channels ON campaigns.channel_id=channels.id WHERE {}",
+ where_clauses.join(" AND ")
+ );
+ let stmt = client.prepare(&statement).await?;
+ let row = client.query_one(&stmt, params.as_slice()).await?;
+
+ Ok(row.get::<_, TotalCount>(0).0)
+}
+
// TODO: We might need to use LIMIT to implement pagination
/// ```text
/// SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to,
@@ -386,22 +482,25 @@ mod campaign_remaining {
#[cfg(test)]
mod test {
+ use crate::db::{
+ insert_channel,
+ tests_postgres::{setup_test_migrations, DATABASE_POOL},
+ };
+ use chrono::TimeZone;
use primitives::{
campaign,
+ campaign::Validators,
event_submission::{RateLimit, Rule},
sentry::campaign_create::ModifyCampaign,
targeting::Rules,
- util::tests::prep_db::{DUMMY_AD_UNITS, DUMMY_CAMPAIGN},
- EventSubmission, UnifiedNum,
+ util::tests::prep_db::{
+ ADDRESSES, DUMMY_AD_UNITS, DUMMY_CAMPAIGN, DUMMY_VALIDATOR_FOLLOWER, IDS,
+ },
+ EventSubmission, UnifiedNum, ValidatorDesc,
};
- use std::time::Duration;
+ use std::{convert::TryFrom, time::Duration};
use tokio_postgres::error::SqlState;
- use crate::db::{
- insert_channel,
- tests_postgres::{setup_test_migrations, DATABASE_POOL},
- };
-
use super::*;
#[tokio::test]
@@ -485,4 +584,233 @@ mod test {
);
}
}
+
+ // Campaigns are sorted in ascending order when retrieved
+ // Therefore the last campaign inserted will come up first in results
+ #[tokio::test]
+ async fn it_lists_campaigns_properly() {
+ let database = DATABASE_POOL.get().await.expect("Should get a DB pool");
+
+ setup_test_migrations(database.pool.clone())
+ .await
+ .expect("Migrations should succeed");
+
+ let campaign = DUMMY_CAMPAIGN.clone();
+ let mut channel_with_different_leader = DUMMY_CAMPAIGN.channel;
+ channel_with_different_leader.leader = IDS["user"];
+
+ insert_channel(&database, DUMMY_CAMPAIGN.channel)
+ .await
+ .expect("Should insert");
+ insert_channel(&database, channel_with_different_leader)
+ .await
+ .expect("Should insert");
+
+ let mut campaign_new_id = DUMMY_CAMPAIGN.clone();
+ campaign_new_id.id = CampaignId::new();
+ campaign_new_id.created = Utc.ymd(2020, 2, 1).and_hms(7, 0, 0); // 1 year before previous
+
+ // campaign with a different creator
+ let mut campaign_new_creator = DUMMY_CAMPAIGN.clone();
+ campaign_new_creator.id = CampaignId::new();
+ campaign_new_creator.creator = ADDRESSES["tester"];
+ campaign_new_creator.created = Utc.ymd(2019, 2, 1).and_hms(7, 0, 0); // 1 year before previous
+
+ let mut campaign_new_leader = DUMMY_CAMPAIGN.clone();
+ campaign_new_leader.id = CampaignId::new();
+ campaign_new_leader.created = Utc.ymd(2018, 2, 1).and_hms(7, 0, 0); // 1 year before previous
+
+ let different_leader: ValidatorDesc = ValidatorDesc {
+ id: ValidatorId::try_from("0x20754168c00a6e58116ccfd0a5f7d1bb66c5de9d")
+ .expect("Failed to parse DUMMY_VALIDATOR_DIFFERENT_LEADER id"),
+ url: "http://localhost:8005".to_string(),
+ fee: 100.into(),
+ fee_addr: None,
+ };
+ campaign_new_leader.channel = channel_with_different_leader;
+ campaign_new_leader.validators =
+ Validators::new((different_leader.clone(), DUMMY_VALIDATOR_FOLLOWER.clone()));
+
+ insert_campaign(&database, &campaign)
+ .await
+ .expect("Should insert"); // fourth
+ insert_campaign(&database, &campaign_new_id)
+ .await
+ .expect("Should insert"); // third
+ insert_campaign(&database, &campaign_new_creator)
+ .await
+ .expect("Should insert"); // second
+ insert_campaign(&database, &campaign_new_leader)
+ .await
+ .expect("Should insert"); // first
+
+ // 2 out of 3 results
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 2,
+ Some(ADDRESSES["creator"]),
+ None,
+ None,
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(
+ first_page.campaigns,
+ vec![campaign_new_leader.clone(), campaign_new_id.clone()]
+ );
+
+ // 3rd result
+ let second_page = list_campaigns(
+ &database.pool,
+ 2,
+ 2,
+ Some(ADDRESSES["creator"]),
+ None,
+ None,
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(second_page.campaigns, vec![campaign.clone()]);
+
+ // No results past limit
+ let third_page = list_campaigns(
+ &database.pool,
+ 4,
+ 2,
+ Some(ADDRESSES["creator"]),
+ None,
+ None,
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(third_page.campaigns.len(), 0);
+
+ // Test with a different creator
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 2,
+ Some(ADDRESSES["tester"]),
+ None,
+ None,
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(first_page.campaigns, vec![campaign_new_creator.clone()]);
+
+ // Test with validator
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 5,
+ None,
+ Some(IDS["follower"]),
+ None,
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(
+ first_page.campaigns,
+ vec![
+ campaign_new_leader.clone(),
+ campaign_new_creator.clone(),
+ campaign_new_id.clone(),
+ campaign.clone()
+ ]
+ );
+
+ // Test with validator and is_leader
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 5,
+ None,
+ Some(IDS["leader"]),
+ Some(true),
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(
+ first_page.campaigns,
+ vec![
+ campaign_new_creator.clone(),
+ campaign_new_id.clone(),
+ campaign.clone()
+ ]
+ );
+
+ // Test with a different validator and is_leader
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 5,
+ None,
+ Some(IDS["user"]),
+ Some(true),
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(first_page.campaigns, vec![campaign_new_leader.clone()]);
+
+ // Test with validator and is_leader but validator isn't the leader of any campaign
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 5,
+ None,
+ Some(IDS["follower"]),
+ Some(true),
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(first_page.campaigns.len(), 0);
+
+ // Test with is_leader set to false
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 5,
+ None,
+ Some(IDS["follower"]),
+ Some(false),
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(
+ first_page.campaigns,
+ vec![
+ campaign_new_leader.clone(),
+ campaign_new_creator.clone(),
+ campaign_new_id.clone(),
+ campaign.clone()
+ ]
+ );
+
+ // Test with creator, provided validator and is_leader set to true
+ let first_page = list_campaigns(
+ &database.pool,
+ 0,
+ 5,
+ Some(ADDRESSES["creator"]),
+ Some(IDS["leader"]),
+ Some(true),
+ &Utc::now(),
+ )
+ .await
+ .expect("should fetch");
+ assert_eq!(
+ first_page.campaigns,
+ vec![campaign_new_id.clone(), campaign.clone()]
+ );
+ }
}
diff --git a/sentry/src/db/channel.rs b/sentry/src/db/channel.rs
index b44787cce..491629016 100644
--- a/sentry/src/db/channel.rs
+++ b/sentry/src/db/channel.rs
@@ -98,25 +98,8 @@ mod list_channels {
sentry::{channel_list::ChannelListResponse, Pagination},
Channel, ValidatorId,
};
- use std::str::FromStr;
- use tokio_postgres::types::{accepts, FromSql, Type};
- use crate::db::{DbPool, PoolError};
-
- struct TotalCount(pub u64);
- impl<'a> FromSql<'a> for TotalCount {
- fn from_sql(
- ty: &Type,
- raw: &'a [u8],
- ) -> Result> {
- let str_slice = <&str as FromSql>::from_sql(ty, raw)?;
-
- Ok(Self(u64::from_str(str_slice)?))
- }
-
- // Use a varchar or text, since otherwise `int8` fails deserialization
- accepts!(VARCHAR, TEXT);
- }
+ use crate::db::{DbPool, PoolError, TotalCount};
/// Lists the `Channel`s in `ASC` order.
/// This makes sure that if a new `Channel` is added
diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs
index 5defdfbb8..cfb2c2a4e 100644
--- a/sentry/src/lib.rs
+++ b/sentry/src/lib.rs
@@ -22,7 +22,7 @@ use {
db::{CampaignRemaining, DbPool},
routes::{
campaign,
- campaign::{create_campaign, update_campaign},
+ campaign::{campaign_list, create_campaign, update_campaign},
cfg::config,
channel::{
channel_list, create_validator_messages, get_all_spender_limits, get_spender_limits,
@@ -204,6 +204,10 @@ async fn campaigns_router(
// }
Err(ResponseError::NotFound)
+ } else if method == Method::POST && path == "/v5/campaign/list" {
+ req = AuthRequired.call(req, app).await?;
+
+ campaign_list(req, app).await
} else {
Err(ResponseError::NotFound)
}
diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs
index bf5dc620f..9e2973dde 100644
--- a/sentry/src/routes/campaign.rs
+++ b/sentry/src/routes/campaign.rs
@@ -1,7 +1,7 @@
use crate::{
db::{
accounting::{get_accounting, Side},
- campaign::{get_campaigns_by_channel, update_campaign},
+ campaign::{get_campaigns_by_channel, list_campaigns, update_campaign},
insert_campaign, insert_channel,
spendable::update_spendable,
CampaignRemaining, DbPool, RedisError,
@@ -14,6 +14,7 @@ use primitives::{
adapter::{Adapter, AdapterErrorKind, Error as AdapterError},
campaign_validator::Validator,
config::TokenInfo,
+ sentry::campaign::CampaignListQuery,
sentry::campaign_create::{CreateCampaign, ModifyCampaign},
spender::Spendable,
Address, Campaign, Channel, Deposit, UnifiedNum,
@@ -217,6 +218,38 @@ pub async fn create_campaign(
Ok(success_response(serde_json::to_string(&campaign)?))
}
+pub async fn campaign_list(
+ req: Request,
+ app: &Application,
+) -> Result, ResponseError> {
+ let mut query =
+ serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?;
+
+ query.validator = match (query.validator, query.is_leader, req.extensions().get::()) {
+ (None, Some(true), Some(session)) => Some(session.uid), // only case where session.uid is used
+ (Some(validator), _, _) => Some(validator), // for all cases with a validator passed
+ _ => None, // default, no filtration by validator
+ };
+
+ let limit = app.config.campaigns_find_limit;
+ let skip = query
+ .page
+ .checked_mul(limit.into())
+ .ok_or_else(|| ResponseError::BadRequest("Page and/or limit is too large".into()))?;
+ let list_response = list_campaigns(
+ &app.pool,
+ skip,
+ limit,
+ query.creator,
+ query.validator,
+ query.is_leader,
+ &query.active_to_ge,
+ )
+ .await?;
+
+ Ok(success_response(serde_json::to_string(&list_response)?))
+}
+
pub mod update_campaign {
use primitives::Config;
diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs
index 649244bd2..487387b29 100644
--- a/validator_worker/src/sentry_interface.rs
+++ b/validator_worker/src/sentry_interface.rs
@@ -382,6 +382,7 @@ pub mod campaigns {
active_to_ge: Utc::now(),
creator: None,
validator: Some(validator),
+ is_leader: None,
};
let endpoint = sentry_url