Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ chrono = { version = "0", default-features = false, features = ["clock"] }
clap = { version = "4", features = ["derive", "env"] }
colored = "2"
config = "0"
crossbeam-skiplist = "0.1"
derive_more = "0"
fern = "0"
futures = "0"
Expand All @@ -63,8 +64,8 @@ serde_json = "1"
serde_repr = "0"
thiserror = "1"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "packages/clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "packages/configuration" }
torrust-tracker-contrib-bencode = { version = "3.0.0-alpha.12-develop", path = "contrib/bencode" }
torrust-tracker-located-error = { version = "3.0.0-alpha.12-develop", path = "packages/located-error" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "packages/primitives" }
Expand All @@ -76,7 +77,7 @@ url = "2"
uuid = { version = "1", features = ["v4"] }

[package.metadata.cargo-machete]
ignored = ["serde_bytes"]
ignored = ["serde_bytes", "crossbeam-skiplist"]

[dev-dependencies]
local-ip-address = "0"
Expand Down Expand Up @@ -105,4 +106,4 @@ opt-level = 3

[profile.release-debug]
inherits = "release"
debug = true
debug = true
1 change: 1 addition & 0 deletions cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"Shareaza",
"sharktorrent",
"SHLVL",
"skiplist",
"socketaddr",
"sqllite",
"subsec",
Expand Down
5 changes: 3 additions & 2 deletions packages/torrent-repository/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ rust-version.workspace = true
version.workspace = true

[dependencies]
crossbeam-skiplist = "0.1"
futures = "0.3.29"
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock" }
torrust-tracker-configuration = { version = "3.0.0-alpha.12-develop", path = "../configuration" }
torrust-tracker-primitives = { version = "3.0.0-alpha.12-develop", path = "../primitives" }

[dev-dependencies]
criterion = { version = "0", features = ["async_tokio"] }
Expand Down
21 changes: 20 additions & 1 deletion packages/torrent-repository/benches/repository_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod helpers;
use criterion::{criterion_group, criterion_main, Criterion};
use torrust_tracker_torrent_repository::{
TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd,
TorrentsRwLockTokioMutexTokio,
TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
};

use crate::helpers::{asyn, sync};
Expand Down Expand Up @@ -45,6 +45,10 @@ fn add_one_torrent(c: &mut Criterion) {
.iter_custom(asyn::add_one_torrent::<TorrentsRwLockTokioMutexTokio, _>);
});

group.bench_function("SkipMapMutexStd", |b| {
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
});

group.finish();
}

Expand Down Expand Up @@ -89,6 +93,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| asyn::add_multiple_torrents_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -133,6 +142,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
.iter_custom(|iters| asyn::update_one_torrent_in_parallel::<TorrentsRwLockTokioMutexTokio, _>(&rt, iters, None));
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down Expand Up @@ -178,6 +192,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
});
});

group.bench_function("SkipMapMutexStd", |b| {
b.to_async(&rt)
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
});

group.finish();
}

Expand Down
17 changes: 11 additions & 6 deletions packages/torrent-repository/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::Arc;

use repository::rw_lock_std::RwLockStd;
use repository::rw_lock_tokio::RwLockTokio;
use repository::skip_map_mutex_std::CrossbeamSkipList;
use torrust_tracker_clock::clock;

pub mod entry;
Expand All @@ -9,12 +12,14 @@ pub type EntrySingle = entry::Torrent;
pub type EntryMutexStd = Arc<std::sync::Mutex<entry::Torrent>>;
pub type EntryMutexTokio = Arc<tokio::sync::Mutex<entry::Torrent>>;

pub type TorrentsRwLockStd = repository::RwLockStd<EntrySingle>;
pub type TorrentsRwLockStdMutexStd = repository::RwLockStd<EntryMutexStd>;
pub type TorrentsRwLockStdMutexTokio = repository::RwLockStd<EntryMutexTokio>;
pub type TorrentsRwLockTokio = repository::RwLockTokio<EntrySingle>;
pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio<EntryMutexTokio>;
pub type TorrentsRwLockStd = RwLockStd<EntrySingle>;
pub type TorrentsRwLockStdMutexStd = RwLockStd<EntryMutexStd>;
pub type TorrentsRwLockStdMutexTokio = RwLockStd<EntryMutexTokio>;
pub type TorrentsRwLockTokio = RwLockTokio<EntrySingle>;
pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;

pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;

/// This code needs to be copied into each crate.
/// Working version, for production.
Expand Down
35 changes: 1 addition & 34 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod rw_lock_std_mutex_tokio;
pub mod rw_lock_tokio;
pub mod rw_lock_tokio_mutex_std;
pub mod rw_lock_tokio_mutex_tokio;
pub mod skip_map_mutex_std;

use std::fmt::Debug;

Expand Down Expand Up @@ -40,37 +41,3 @@ pub trait RepositoryAsync<T>: Debug + Default + Sized + 'static {
peer: &peer::Peer,
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
}

#[derive(Default, Debug)]
pub struct RwLockStd<T> {
torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

#[derive(Default, Debug)]
pub struct RwLockTokio<T> {
torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

impl<T> RwLockStd<T> {
/// # Panics
///
/// Panics if unable to get a lock.
pub fn write(
&self,
) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>> {
self.torrents.write().expect("it should get lock")
}
}

impl<T> RwLockTokio<T> {
pub fn write(
&self,
) -> impl std::future::Future<
Output = tokio::sync::RwLockWriteGuard<
'_,
std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>,
>,
> {
self.torrents.write()
}
}
16 changes: 16 additions & 0 deletions packages/torrent-repository/src/repository/rw_lock_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ use super::Repository;
use crate::entry::Entry;
use crate::{EntrySingle, TorrentsRwLockStd};

#[derive(Default, Debug)]
pub struct RwLockStd<T> {
pub(crate) torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

impl<T> RwLockStd<T> {
/// # Panics
///
/// Panics if unable to get a lock.
pub fn write(
&self,
) -> std::sync::RwLockWriteGuard<'_, std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>> {
self.torrents.write().expect("it should get lock")
}
}

impl TorrentsRwLockStd {
fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap<InfoHash, EntrySingle>>
where
Expand Down
18 changes: 18 additions & 0 deletions packages/torrent-repository/src/repository/rw_lock_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,24 @@ use super::RepositoryAsync;
use crate::entry::Entry;
use crate::{EntrySingle, TorrentsRwLockTokio};

#[derive(Default, Debug)]
pub struct RwLockTokio<T> {
pub(crate) torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
}

impl<T> RwLockTokio<T> {
pub fn write(
&self,
) -> impl std::future::Future<
Output = tokio::sync::RwLockWriteGuard<
'_,
std::collections::BTreeMap<torrust_tracker_primitives::info_hash::InfoHash, T>,
>,
> {
self.torrents.write()
}
}

impl TorrentsRwLockTokio {
async fn get_torrents<'a>(&'a self) -> tokio::sync::RwLockReadGuard<'a, std::collections::BTreeMap<InfoHash, EntrySingle>>
where
Expand Down
106 changes: 106 additions & 0 deletions packages/torrent-repository/src/repository/skip_map_mutex_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use crossbeam_skiplist::SkipMap;
use torrust_tracker_configuration::TrackerPolicy;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::pagination::Pagination;
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};

use super::Repository;
use crate::entry::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};

#[derive(Default, Debug)]
pub struct CrossbeamSkipList<T> {
pub torrents: SkipMap<InfoHash, T>,
}

impl Repository<EntryMutexStd> for CrossbeamSkipList<EntryMutexStd>
where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
let entry = self.torrents.get_or_insert(*info_hash, Arc::default());
entry.value().insert_or_update_peer_and_get_stats(peer)
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.value().clone())
}

fn get_metrics(&self) -> TorrentsMetrics {
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().expect("it should get a lock").get_stats();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
metrics.torrents += 1;
}

metrics
}

fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
match pagination {
Some(pagination) => self
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
None => self
.torrents
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect(),
}
}

fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
for (info_hash, completed) in persistent_torrents {
if self.torrents.contains_key(info_hash) {
continue;
}

let entry = EntryMutexStd::new(
EntrySingle {
peers: BTreeMap::default(),
downloaded: *completed,
}
.into(),
);

// Since SkipMap is lock-free the torrent could have been inserted
// after checking if it exists.
self.torrents.get_or_insert(*info_hash, entry);
}
}

fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
self.torrents.remove(key).map(|entry| entry.value().clone())
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
for entry in &self.torrents {
entry.value().remove_inactive_peers(current_cutoff);
}
}

fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
for entry in &self.torrents {
if entry.value().is_good(policy) {
continue;
}

entry.remove();
}
}
}
Loading