diff --git a/docs/config/dev.toml b/docs/config/dev.toml index 1fb582a64..8582d666b 100644 --- a/docs/config/dev.toml +++ b/docs/config/dev.toml @@ -2,6 +2,7 @@ max_channels = 512 channels_find_limit = 200 +campaigns_find_limit = 200 wait_time = 500 # V4 Deprecated diff --git a/docs/config/prod.toml b/docs/config/prod.toml index b41656965..34f5b502d 100644 --- a/docs/config/prod.toml +++ b/docs/config/prod.toml @@ -2,6 +2,8 @@ max_channels = 512 channels_find_limit = 512 +campaigns_find_limit = 512 + wait_time = 40000 # V4 Deprecated diff --git a/primitives/src/config.rs b/primitives/src/config.rs index df7bb8b21..bd76e0b44 100644 --- a/primitives/src/config.rs +++ b/primitives/src/config.rs @@ -25,6 +25,7 @@ pub struct TokenInfo { pub struct Config { pub max_channels: u32, pub channels_find_limit: u32, + pub campaigns_find_limit: u32, pub wait_time: u32, #[deprecated = "redundant V4 value. No aggregates are needed for V5"] pub aggr_throttle: u32, diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index 62016c0c8..ef51bbbbc 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -351,6 +351,9 @@ pub mod campaign { pub creator: Option
, /// filters the campaigns containing a specific validator if provided pub validator: Option, + /// filters the campaigns where the provided validator is a leader if true + /// if no validator is provided, but is_leader is true, it uses Auth to obtain a validator + pub is_leader: Option, } } diff --git a/sentry/src/db.rs b/sentry/src/db.rs index bf9dbf580..5e1ce60b7 100644 --- a/sentry/src/db.rs +++ b/sentry/src/db.rs @@ -1,7 +1,10 @@ use deadpool_postgres::{Manager, ManagerConfig, RecyclingMethod}; use redis::aio::MultiplexedConnection; -use std::env; -use tokio_postgres::NoTls; +use std::{env, str::FromStr}; +use tokio_postgres::{ + types::{accepts, FromSql, Type}, + NoTls, +}; use lazy_static::lazy_static; @@ -53,6 +56,21 @@ lazy_static! { }; } +pub struct TotalCount(pub u64); +impl<'a> FromSql<'a> for TotalCount { + fn from_sql( + ty: &Type, + raw: &'a [u8], + ) -> Result> { + let str_slice = <&str as FromSql>::from_sql(ty, raw)?; + + Ok(Self(u64::from_str(str_slice)?)) + } + + // Use a varchar or text, since otherwise `int8` fails deserialization + accepts!(VARCHAR, TEXT); +} + pub async fn redis_connection(url: &str) -> Result { let client = redis::Client::open(url)?; diff --git a/sentry/src/db/campaign.rs b/sentry/src/db/campaign.rs index 1074dc78e..96b9fdfd6 100644 --- a/sentry/src/db/campaign.rs +++ b/sentry/src/db/campaign.rs @@ -1,6 +1,10 @@ -use crate::db::{DbPool, PoolError}; -use primitives::{Campaign, CampaignId, ChannelId}; -use tokio_postgres::types::Json; +use crate::db::{DbPool, PoolError, TotalCount}; +use chrono::{DateTime, Utc}; +use primitives::{ + sentry::{campaign::CampaignListResponse, Pagination}, + Address, Campaign, CampaignId, ChannelId, ValidatorId, +}; +use tokio_postgres::types::{Json, ToSql}; pub use campaign_remaining::CampaignRemaining; @@ -58,6 +62,98 @@ pub async fn fetch_campaign( Ok(row.as_ref().map(Campaign::from)) } +pub async fn list_campaigns( + pool: &DbPool, + skip: u64, + limit: u32, + creator: Option
, + validator: Option, + is_leader: Option, + active_to_ge: &DateTime, +) -> Result { + let client = pool.get().await?; + + let (where_clauses, params) = + campaign_list_query_params(&creator, &validator, is_leader, active_to_ge); + let total_count_params = (where_clauses.clone(), params.clone()); + + // To understand why we use Order by, see Postgres Documentation: https://www.postgresql.org/docs/8.1/queries-limit.html + let statement = format!("SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM campaigns INNER JOIN channels ON campaigns.channel_id=channels.id WHERE {} ORDER BY campaigns.created ASC LIMIT {} OFFSET {}", where_clauses.join(" AND "), limit, skip); + let stmt = client.prepare(&statement).await?; + let rows = client.query(&stmt, params.as_slice()).await?; + let campaigns = rows.iter().map(Campaign::from).collect(); + + let total_count = + list_campaigns_total_count(pool, (&total_count_params.0, total_count_params.1)).await?; + + // fast ceil for total_pages + let total_pages = if total_count == 0 { + 1 + } else { + 1 + ((total_count - 1) / limit as u64) + }; + + let pagination = Pagination { + total_pages, + total: total_pages, + page: skip / limit as u64, + }; + + Ok(CampaignListResponse { + pagination, + campaigns, + }) +} + +fn campaign_list_query_params<'a>( + creator: &'a Option
, + validator: &'a Option, + is_leader: Option, + active_to_ge: &'a DateTime, +) -> (Vec, Vec<&'a (dyn ToSql + Sync)>) { + let mut where_clauses = vec!["active_to >= $1".to_string()]; + let mut params: Vec<&(dyn ToSql + Sync)> = vec![active_to_ge]; + + if let Some(creator) = creator { + where_clauses.push(format!("creator = ${}", params.len() + 1)); + params.push(creator); + } + + // if clause for is_leader is true, the other clause is also always true + match (validator, is_leader) { + (Some(validator), Some(true)) => { + where_clauses.push(format!("channels.leader = ${}", params.len() + 1)); + params.push(validator); + } + (Some(validator), _) => { + where_clauses.push(format!( + "(channels.leader = ${x} OR channels.follower = ${x})", + x = params.len() + 1, + )); + params.push(validator); + } + _ => (), + } + + (where_clauses, params) +} + +async fn list_campaigns_total_count<'a>( + pool: &DbPool, + (where_clauses, params): (&'a [String], Vec<&'a (dyn ToSql + Sync)>), +) -> Result { + let client = pool.get().await?; + + let statement = format!( + "SELECT COUNT(campaigns.id)::varchar FROM campaigns INNER JOIN channels ON campaigns.channel_id=channels.id WHERE {}", + where_clauses.join(" AND ") + ); + let stmt = client.prepare(&statement).await?; + let row = client.query_one(&stmt, params.as_slice()).await?; + + Ok(row.get::<_, TotalCount>(0).0) +} + // TODO: We might need to use LIMIT to implement pagination /// ```text /// SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, @@ -386,22 +482,25 @@ mod campaign_remaining { #[cfg(test)] mod test { + use crate::db::{ + insert_channel, + tests_postgres::{setup_test_migrations, DATABASE_POOL}, + }; + use chrono::TimeZone; use primitives::{ campaign, + campaign::Validators, event_submission::{RateLimit, Rule}, sentry::campaign_create::ModifyCampaign, targeting::Rules, - util::tests::prep_db::{DUMMY_AD_UNITS, DUMMY_CAMPAIGN}, - EventSubmission, UnifiedNum, + util::tests::prep_db::{ + ADDRESSES, DUMMY_AD_UNITS, DUMMY_CAMPAIGN, DUMMY_VALIDATOR_FOLLOWER, IDS, + }, + EventSubmission, UnifiedNum, ValidatorDesc, }; - use std::time::Duration; + use std::{convert::TryFrom, time::Duration}; use tokio_postgres::error::SqlState; - use crate::db::{ - insert_channel, - tests_postgres::{setup_test_migrations, DATABASE_POOL}, - }; - use super::*; #[tokio::test] @@ -485,4 +584,233 @@ mod test { ); } } + + // Campaigns are sorted in ascending order when retrieved + // Therefore the last campaign inserted will come up first in results + #[tokio::test] + async fn it_lists_campaigns_properly() { + let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); + + setup_test_migrations(database.pool.clone()) + .await + .expect("Migrations should succeed"); + + let campaign = DUMMY_CAMPAIGN.clone(); + let mut channel_with_different_leader = DUMMY_CAMPAIGN.channel; + channel_with_different_leader.leader = IDS["user"]; + + insert_channel(&database, DUMMY_CAMPAIGN.channel) + .await + .expect("Should insert"); + insert_channel(&database, channel_with_different_leader) + .await + .expect("Should insert"); + + let mut campaign_new_id = DUMMY_CAMPAIGN.clone(); + campaign_new_id.id = CampaignId::new(); + campaign_new_id.created = Utc.ymd(2020, 2, 1).and_hms(7, 0, 0); // 1 year before previous + + // campaign with a different creator + let mut campaign_new_creator = DUMMY_CAMPAIGN.clone(); + campaign_new_creator.id = CampaignId::new(); + campaign_new_creator.creator = ADDRESSES["tester"]; + campaign_new_creator.created = Utc.ymd(2019, 2, 1).and_hms(7, 0, 0); // 1 year before previous + + let mut campaign_new_leader = DUMMY_CAMPAIGN.clone(); + campaign_new_leader.id = CampaignId::new(); + campaign_new_leader.created = Utc.ymd(2018, 2, 1).and_hms(7, 0, 0); // 1 year before previous + + let different_leader: ValidatorDesc = ValidatorDesc { + id: ValidatorId::try_from("0x20754168c00a6e58116ccfd0a5f7d1bb66c5de9d") + .expect("Failed to parse DUMMY_VALIDATOR_DIFFERENT_LEADER id"), + url: "http://localhost:8005".to_string(), + fee: 100.into(), + fee_addr: None, + }; + campaign_new_leader.channel = channel_with_different_leader; + campaign_new_leader.validators = + Validators::new((different_leader.clone(), DUMMY_VALIDATOR_FOLLOWER.clone())); + + insert_campaign(&database, &campaign) + .await + .expect("Should insert"); // fourth + insert_campaign(&database, &campaign_new_id) + .await + .expect("Should insert"); // third + insert_campaign(&database, &campaign_new_creator) + .await + .expect("Should insert"); // second + insert_campaign(&database, &campaign_new_leader) + .await + .expect("Should insert"); // first + + // 2 out of 3 results + let first_page = list_campaigns( + &database.pool, + 0, + 2, + Some(ADDRESSES["creator"]), + None, + None, + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!( + first_page.campaigns, + vec![campaign_new_leader.clone(), campaign_new_id.clone()] + ); + + // 3rd result + let second_page = list_campaigns( + &database.pool, + 2, + 2, + Some(ADDRESSES["creator"]), + None, + None, + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!(second_page.campaigns, vec![campaign.clone()]); + + // No results past limit + let third_page = list_campaigns( + &database.pool, + 4, + 2, + Some(ADDRESSES["creator"]), + None, + None, + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!(third_page.campaigns.len(), 0); + + // Test with a different creator + let first_page = list_campaigns( + &database.pool, + 0, + 2, + Some(ADDRESSES["tester"]), + None, + None, + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!(first_page.campaigns, vec![campaign_new_creator.clone()]); + + // Test with validator + let first_page = list_campaigns( + &database.pool, + 0, + 5, + None, + Some(IDS["follower"]), + None, + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!( + first_page.campaigns, + vec![ + campaign_new_leader.clone(), + campaign_new_creator.clone(), + campaign_new_id.clone(), + campaign.clone() + ] + ); + + // Test with validator and is_leader + let first_page = list_campaigns( + &database.pool, + 0, + 5, + None, + Some(IDS["leader"]), + Some(true), + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!( + first_page.campaigns, + vec![ + campaign_new_creator.clone(), + campaign_new_id.clone(), + campaign.clone() + ] + ); + + // Test with a different validator and is_leader + let first_page = list_campaigns( + &database.pool, + 0, + 5, + None, + Some(IDS["user"]), + Some(true), + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!(first_page.campaigns, vec![campaign_new_leader.clone()]); + + // Test with validator and is_leader but validator isn't the leader of any campaign + let first_page = list_campaigns( + &database.pool, + 0, + 5, + None, + Some(IDS["follower"]), + Some(true), + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!(first_page.campaigns.len(), 0); + + // Test with is_leader set to false + let first_page = list_campaigns( + &database.pool, + 0, + 5, + None, + Some(IDS["follower"]), + Some(false), + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!( + first_page.campaigns, + vec![ + campaign_new_leader.clone(), + campaign_new_creator.clone(), + campaign_new_id.clone(), + campaign.clone() + ] + ); + + // Test with creator, provided validator and is_leader set to true + let first_page = list_campaigns( + &database.pool, + 0, + 5, + Some(ADDRESSES["creator"]), + Some(IDS["leader"]), + Some(true), + &Utc::now(), + ) + .await + .expect("should fetch"); + assert_eq!( + first_page.campaigns, + vec![campaign_new_id.clone(), campaign.clone()] + ); + } } diff --git a/sentry/src/db/channel.rs b/sentry/src/db/channel.rs index b44787cce..491629016 100644 --- a/sentry/src/db/channel.rs +++ b/sentry/src/db/channel.rs @@ -98,25 +98,8 @@ mod list_channels { sentry::{channel_list::ChannelListResponse, Pagination}, Channel, ValidatorId, }; - use std::str::FromStr; - use tokio_postgres::types::{accepts, FromSql, Type}; - use crate::db::{DbPool, PoolError}; - - struct TotalCount(pub u64); - impl<'a> FromSql<'a> for TotalCount { - fn from_sql( - ty: &Type, - raw: &'a [u8], - ) -> Result> { - let str_slice = <&str as FromSql>::from_sql(ty, raw)?; - - Ok(Self(u64::from_str(str_slice)?)) - } - - // Use a varchar or text, since otherwise `int8` fails deserialization - accepts!(VARCHAR, TEXT); - } + use crate::db::{DbPool, PoolError, TotalCount}; /// Lists the `Channel`s in `ASC` order. /// This makes sure that if a new `Channel` is added diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 5defdfbb8..cfb2c2a4e 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -22,7 +22,7 @@ use { db::{CampaignRemaining, DbPool}, routes::{ campaign, - campaign::{create_campaign, update_campaign}, + campaign::{campaign_list, create_campaign, update_campaign}, cfg::config, channel::{ channel_list, create_validator_messages, get_all_spender_limits, get_spender_limits, @@ -204,6 +204,10 @@ async fn campaigns_router( // } Err(ResponseError::NotFound) + } else if method == Method::POST && path == "/v5/campaign/list" { + req = AuthRequired.call(req, app).await?; + + campaign_list(req, app).await } else { Err(ResponseError::NotFound) } diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index bf5dc620f..9e2973dde 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -1,7 +1,7 @@ use crate::{ db::{ accounting::{get_accounting, Side}, - campaign::{get_campaigns_by_channel, update_campaign}, + campaign::{get_campaigns_by_channel, list_campaigns, update_campaign}, insert_campaign, insert_channel, spendable::update_spendable, CampaignRemaining, DbPool, RedisError, @@ -14,6 +14,7 @@ use primitives::{ adapter::{Adapter, AdapterErrorKind, Error as AdapterError}, campaign_validator::Validator, config::TokenInfo, + sentry::campaign::CampaignListQuery, sentry::campaign_create::{CreateCampaign, ModifyCampaign}, spender::Spendable, Address, Campaign, Channel, Deposit, UnifiedNum, @@ -217,6 +218,38 @@ pub async fn create_campaign( Ok(success_response(serde_json::to_string(&campaign)?)) } +pub async fn campaign_list( + req: Request, + app: &Application, +) -> Result, ResponseError> { + let mut query = + serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; + + query.validator = match (query.validator, query.is_leader, req.extensions().get::()) { + (None, Some(true), Some(session)) => Some(session.uid), // only case where session.uid is used + (Some(validator), _, _) => Some(validator), // for all cases with a validator passed + _ => None, // default, no filtration by validator + }; + + let limit = app.config.campaigns_find_limit; + let skip = query + .page + .checked_mul(limit.into()) + .ok_or_else(|| ResponseError::BadRequest("Page and/or limit is too large".into()))?; + let list_response = list_campaigns( + &app.pool, + skip, + limit, + query.creator, + query.validator, + query.is_leader, + &query.active_to_ge, + ) + .await?; + + Ok(success_response(serde_json::to_string(&list_response)?)) +} + pub mod update_campaign { use primitives::Config; diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index 649244bd2..487387b29 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -382,6 +382,7 @@ pub mod campaigns { active_to_ge: Utc::now(), creator: None, validator: Some(validator), + is_leader: None, }; let endpoint = sentry_url