diff --git a/Cargo.lock b/Cargo.lock index 853d21533..6b9be523c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1100,6 +1100,19 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -3916,6 +3929,7 @@ dependencies = [ "colored", "config", "crossbeam-skiplist", + "dashmap", "derive_more", "fern", "futures", @@ -4022,6 +4036,7 @@ dependencies = [ "async-std", "criterion", "crossbeam-skiplist", + "dashmap", "futures", "rstest", "tokio", diff --git a/Cargo.toml b/Cargo.toml index f440799cc..ef0c39d4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ clap = { version = "4", features = ["derive", "env"] } colored = "2" config = "0" crossbeam-skiplist = "0.1" +dashmap = "5.5.3" derive_more = "0" fern = "0" futures = "0" @@ -77,7 +78,7 @@ url = "2" uuid = { version = "1", features = ["v4"] } [package.metadata.cargo-machete] -ignored = ["serde_bytes", "crossbeam-skiplist"] +ignored = ["serde_bytes", "crossbeam-skiplist", "dashmap"] [dev-dependencies] local-ip-address = "0" diff --git a/cSpell.json b/cSpell.json index 0480590af..24ef6b0a0 100644 --- a/cSpell.json +++ b/cSpell.json @@ -163,6 +163,7 @@ "Weidendorfer", "Werror", "whitespaces", + "Xacrimon", "XBTT", "Xdebug", "Xeon", diff --git a/packages/torrent-repository/Cargo.toml b/packages/torrent-repository/Cargo.toml index 5f1a20d32..6bc8bfcdd 100644 --- a/packages/torrent-repository/Cargo.toml +++ b/packages/torrent-repository/Cargo.toml @@ -17,6 +17,7 @@ version.workspace = true [dependencies] crossbeam-skiplist = "0.1" +dashmap = "5.5.3" futures = "0.3.29" tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] } torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" } diff --git a/packages/torrent-repository/benches/repository_benchmark.rs b/packages/torrent-repository/benches/repository_benchmark.rs index 65608c86c..58cd70d9a 100644 --- a/packages/torrent-repository/benches/repository_benchmark.rs +++ b/packages/torrent-repository/benches/repository_benchmark.rs @@ -4,8 +4,8 @@ mod helpers; use criterion::{criterion_group, criterion_main, Criterion}; use torrust_tracker_torrent_repository::{ - TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, - TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, + TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, }; use crate::helpers::{asyn, sync}; @@ -49,6 +49,10 @@ fn add_one_torrent(c: &mut Criterion) { b.iter_custom(sync::add_one_torrent::); }); + group.bench_function("DashMapMutexStd", |b| { + b.iter_custom(sync::add_one_torrent::); + }); + group.finish(); } @@ -98,6 +102,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("DashMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::add_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.finish(); } @@ -147,6 +156,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); }); + group.bench_function("DashMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_one_torrent_in_parallel::(&rt, iters, None)); + }); + group.finish(); } @@ -197,6 +211,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) { .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); }); + group.bench_function("DashMapMutexStd", |b| { + b.to_async(&rt) + .iter_custom(|iters| sync::update_multiple_torrents_in_parallel::(&rt, iters, None)); + }); + group.finish(); } diff --git a/packages/torrent-repository/src/lib.rs b/packages/torrent-repository/src/lib.rs index ccaf579e3..7a6d209b9 100644 --- a/packages/torrent-repository/src/lib.rs +++ b/packages/torrent-repository/src/lib.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use repository::dash_map_mutex_std::XacrimonDashMap; use repository::rw_lock_std::RwLockStd; use repository::rw_lock_tokio::RwLockTokio; use repository::skip_map_mutex_std::CrossbeamSkipList; @@ -20,6 +21,7 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio; pub type TorrentsRwLockTokioMutexTokio = RwLockTokio; pub type TorrentsSkipMapMutexStd = CrossbeamSkipList; +pub type TorrentsDashMapMutexStd = XacrimonDashMap; /// This code needs to be copied into each crate. /// Working version, for production. diff --git a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs new file mode 100644 index 000000000..67c47973e --- /dev/null +++ b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs @@ -0,0 +1,106 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use dashmap::DashMap; +use torrust_tracker_configuration::TrackerPolicy; +use torrust_tracker_primitives::info_hash::InfoHash; +use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; +use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; + +use super::Repository; +use crate::entry::{Entry, EntrySync}; +use crate::{EntryMutexStd, EntrySingle}; + +#[derive(Default, Debug)] +pub struct XacrimonDashMap { + pub torrents: DashMap, +} + +impl Repository for XacrimonDashMap +where + EntryMutexStd: EntrySync, + EntrySingle: Entry, +{ + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + if let Some(entry) = self.torrents.get(info_hash) { + entry.insert_or_update_peer_and_get_stats(peer) + } else { + let _unused = self.torrents.insert(*info_hash, Arc::default()); + + match self.torrents.get(info_hash) { + Some(entry) => entry.insert_or_update_peer_and_get_stats(peer), + None => (false, SwarmMetadata::zeroed()), + } + } + } + + fn get(&self, key: &InfoHash) -> Option { + let maybe_entry = self.torrents.get(key); + maybe_entry.map(|entry| entry.clone()) + } + + fn get_metrics(&self) -> TorrentsMetrics { + let mut metrics = TorrentsMetrics::default(); + + for entry in &self.torrents { + let stats = entry.value().lock().expect("it should get a lock").get_stats(); + metrics.complete += u64::from(stats.complete); + metrics.downloaded += u64::from(stats.downloaded); + metrics.incomplete += u64::from(stats.incomplete); + metrics.torrents += 1; + } + + metrics + } + + fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> { + match pagination { + Some(pagination) => self + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + None => self + .torrents + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect(), + } + } + + fn import_persistent(&self, persistent_torrents: &PersistentTorrents) { + for (info_hash, completed) in persistent_torrents { + if self.torrents.contains_key(info_hash) { + continue; + } + + let entry = EntryMutexStd::new( + EntrySingle { + peers: BTreeMap::default(), + downloaded: *completed, + } + .into(), + ); + + self.torrents.insert(*info_hash, entry); + } + } + + fn remove(&self, key: &InfoHash) -> Option { + self.torrents.remove(key).map(|(_key, value)| value.clone()) + } + + fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { + for entry in &self.torrents { + entry.value().remove_inactive_peers(current_cutoff); + } + } + + fn remove_peerless_torrents(&self, policy: &TrackerPolicy) { + self.torrents.retain(|_, entry| entry.is_good(policy)); + } +} diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index 975a876d8..c7c64c54a 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -5,6 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +pub mod dash_map_mutex_std; pub mod rw_lock_std; pub mod rw_lock_std_mutex_std; pub mod rw_lock_std_mutex_tokio; diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 5a86aa3cf..5a6eddf97 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -6,8 +6,8 @@ use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ - EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, - TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, + EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, + TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd, }; #[derive(Debug)] @@ -19,6 +19,7 @@ pub(crate) enum Repo { RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd), RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio), SkipMapMutexStd(TorrentsSkipMapMutexStd), + DashMapMutexStd(TorrentsDashMapMutexStd), } impl Repo { @@ -31,6 +32,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), + Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()), } } @@ -43,6 +45,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await, Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await, Repo::SkipMapMutexStd(repo) => repo.get_metrics(), + Repo::DashMapMutexStd(repo) => repo.get_metrics(), } } @@ -82,6 +85,11 @@ impl Repo { .iter() .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) .collect(), + Repo::DashMapMutexStd(repo) => repo + .get_paginated(pagination) + .iter() + .map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone())) + .collect(), } } @@ -94,6 +102,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await, Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await, Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents), + Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents), } } @@ -106,6 +115,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()), Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()), Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), + Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()), } } @@ -118,6 +128,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await, Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), + Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff), } } @@ -130,6 +141,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await, Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await, Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy), + Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy), } } @@ -146,6 +158,7 @@ impl Repo { Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), + Repo::DashMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), } } @@ -172,6 +185,9 @@ impl Repo { Repo::SkipMapMutexStd(repo) => { repo.torrents.insert(*info_hash, torrent.into()); } + Repo::DashMapMutexStd(repo) => { + repo.torrents.insert(*info_hash, torrent.into()); + } }; self.get(info_hash).await } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index ab9648584..a6784bf57 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -8,6 +8,7 @@ use torrust_tracker_primitives::info_hash::InfoHash; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; +use torrust_tracker_torrent_repository::repository::dash_map_mutex_std::XacrimonDashMap; use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd; use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio; use torrust_tracker_torrent_repository::repository::skip_map_mutex_std::CrossbeamSkipList; @@ -51,6 +52,11 @@ fn skip_list_std() -> Repo { Repo::SkipMapMutexStd(CrossbeamSkipList::default()) } +#[fixture] +fn dash_map_std() -> Repo { + Repo::DashMapMutexStd(XacrimonDashMap::default()) +} + type Entries = Vec<(InfoHash, EntrySingle)>; #[fixture] @@ -239,7 +245,8 @@ async fn it_should_get_a_torrent_entry( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_std(), + dash_map_std() )] repo: Repo, #[case] entries: Entries, @@ -370,7 +377,8 @@ async fn it_should_get_metrics( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_std(), + dash_map_std() )] repo: Repo, #[case] entries: Entries, @@ -411,7 +419,8 @@ async fn it_should_import_persistent_torrents( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_std(), + dash_map_std() )] repo: Repo, #[case] entries: Entries, @@ -449,7 +458,8 @@ async fn it_should_remove_an_entry( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_std(), + dash_map_std() )] repo: Repo, #[case] entries: Entries, @@ -485,7 +495,8 @@ async fn it_should_remove_inactive_peers( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_std(), + dash_map_std() )] repo: Repo, #[case] entries: Entries, @@ -567,7 +578,8 @@ async fn it_should_remove_peerless_torrents( tokio_std(), tokio_mutex(), tokio_tokio(), - skip_list_std() + skip_list_std(), + dash_map_std() )] repo: Repo, #[case] entries: Entries,