Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
name = "fib"
harness = false

[[bench]]
name = "integrate"
harness = false

[[bench]]
name = "cold_boot"
harness = false
49 changes: 49 additions & 0 deletions benches/integrate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::path::Path;

use scheme_rs::{
ast::DefinitionBody,
cps::Compile,
env::Environment,
proc::Closure,
registry::Library,
runtime::Runtime,
syntax::{Span, Syntax},
};

use criterion::*;

async fn integrate_fn() -> Closure {
let rt = Runtime::new();
let prog = Library::new_program(&rt, Path::new("integrate.scm"));
let env = Environment::Top(prog);

let sexprs = Syntax::from_str(include_str!("integrate.scm"), Some("integrate.scm")).unwrap();
let base = DefinitionBody::parse_lib_body(&rt, &sexprs, &env, &Span::default())
.await
.unwrap();
let compiled = base.compile_top_level();
rt.compile_expr(compiled).await
}

fn integrate_benchmark(c: &mut Criterion) {
// Set up and compile the closure
let runtime = tokio::runtime::Runtime::new().unwrap();
let closure = runtime.block_on(async move { integrate_fn().await });

c.bench_function("integrate", |b| {
b.to_async(&runtime).iter(|| {
let val = closure.clone();
async move { val.call(&[]).await }
})
});
}

criterion_group!(
name = benches;
config = Criterion::default()
.sample_size(10)
.measurement_time(std::time::Duration::from_secs(26));
targets = integrate_benchmark
);

criterion_main!(benches);
85 changes: 85 additions & 0 deletions benches/integrate.scm
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
(import (rnrs))

(define integrate-system
(lambda (system-derivative initial-state h)
(let ((next (runge-kutta-4 system-derivative h)))
(letrec ((states
(cons initial-state
(lambda ()
(map-streams next states)))))
states))))

(define runge-kutta-4
(lambda (f h)
(let ((*h (scale-vector h))
(*2 (scale-vector 2))
(*1/2 (scale-vector (/ 1 2)))
(*1/6 (scale-vector (/ 1 6))))
(lambda (y)
(let* ((k0 (*h (f y)))
(k1 (*h (f (add-vectors y (*1/2 k0)))))
(k2 (*h (f (add-vectors y (*1/2 k1)))))
(k3 (*h (f (add-vectors y k2)))))
(add-vectors y
(*1/6 (add-vectors k0
(*2 k1)
(*2 k2)
k3))))))))

(define elementwise
(lambda (f)
(lambda vectors
(generate-vector
(vector-length (car vectors))
(lambda (i)
(apply f
(map (lambda (v) (vector-ref v i))
vectors)))))))

(define generate-vector
(lambda (size proc)
(let ((ans (make-vector size)))
(letrec ((loop
(lambda (i)
(cond ((= i size) ans)
(else
(vector-set! ans i (proc i))
(loop (+ i 1)))))))
(loop 0)))))

(define add-vectors (elementwise +))

(define scale-vector
(lambda (s)
(elementwise (lambda (x) (* x s)))))

(define map-streams
(lambda (f s)
(cons (f (head s))
(lambda () (map-streams f (tail s))))))

(define head car)

(define tail
(lambda (stream) ((cdr stream))))

(define damped-oscillator
(lambda (R L C)
(lambda (state)
(let ((Vc (vector-ref state 0))
(Il (vector-ref state 1)))
(vector (- 0 (+ (/ Vc (* R C)) (/ Il C)))
(/ Vc L))))))

(define the-states
(integrate-system
(damped-oscillator 10000 1000 .001)
'#(1 0)
.01))

(letrec ((loop
(lambda (s i)
(cond ((= i 0) (head s))
(else
(loop (tail s) (- i 1)))))))
(loop the-states 50))
6 changes: 3 additions & 3 deletions proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ fn derive_trace_struct(
_ => {
return quote! {
unsafe impl ::scheme_rs::gc::Trace for #name {
unsafe fn visit_children(&self, visitor: unsafe fn(::scheme_rs::gc::OpaqueGcPtr)) {}
unsafe fn visit_children(&self, visitor: &mut dyn FnMut(::scheme_rs::gc::OpaqueGcPtr)) {}
}
};
}
Expand Down Expand Up @@ -367,7 +367,7 @@ fn derive_trace_struct(
unsafe impl<#params> ::scheme_rs::gc::Trace for #name <#unbound_params>
#where_clause
{
unsafe fn visit_children(&self, visitor: unsafe fn(::scheme_rs::gc::OpaqueGcPtr)) {
unsafe fn visit_children(&self, visitor: &mut dyn FnMut(::scheme_rs::gc::OpaqueGcPtr)) {
#(
#field_visits
)*
Expand Down Expand Up @@ -486,7 +486,7 @@ fn derive_trace_enum(
unsafe impl<#params> ::scheme_rs::gc::Trace for #name <#unbound_params>
#where_clause
{
unsafe fn visit_children(&self, visitor: unsafe fn(::scheme_rs::gc::OpaqueGcPtr)) {
unsafe fn visit_children(&self, visitor: &mut dyn FnMut(::scheme_rs::gc::OpaqueGcPtr)) {
match self {
#( #visit_match_clauses, )*
_ => (),
Expand Down
100 changes: 72 additions & 28 deletions src/gc/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::{
ptr::NonNull,
sync::{Mutex, OnceLock},
sync::{Mutex, OnceLock, atomic::AtomicUsize},
time::{Duration, Instant},
};
use tokio::{
Expand Down Expand Up @@ -42,6 +42,8 @@ struct MutationBuffer {
mutation_buffer_rx: Mutex<Option<UnboundedReceiver<Mutation>>>,
}

static PENDING_MUTATIONS: AtomicUsize = AtomicUsize::new(0);

unsafe impl Sync for MutationBuffer {}

impl Default for MutationBuffer {
Expand All @@ -56,6 +58,16 @@ impl Default for MutationBuffer {

static MUTATION_BUFFER: OnceLock<MutationBuffer> = OnceLock::new();

static MAX_PENDING_MUTATIONS_ALLOWED: usize = 100_000;

pub(crate) async fn yield_until_gc_cleared() {
while PENDING_MUTATIONS.load(std::sync::atomic::Ordering::Relaxed)
> MAX_PENDING_MUTATIONS_ALLOWED
{
tokio::task::yield_now().await
}
}

pub(super) fn inc_rc<T: ?Sized>(gc: NonNull<GcInner<T>>) {
// Disregard any send errors. If the receiver was dropped then the process
// is exiting and we don't care if we leak.
Expand Down Expand Up @@ -83,13 +95,13 @@ pub fn init_gc() {
.get_or_init(|| tokio::task::spawn(async { unsafe { run_garbage_collector().await } }));
}

const MIN_MUTATIONS_PER_EPOCH: usize = 10;
const MAX_MUTATIONS_PER_EPOCH: usize = 10_000; // No idea what a good value is here.
const MIN_MUTATIONS_PER_EPOCH: usize = 1_000;
const AVG_MUTATIONS_PER_EPOCH: usize = MAX_PENDING_MUTATIONS_ALLOWED >> 1; // 10_000; // No idea what a good value is here.

async unsafe fn run_garbage_collector() {
unsafe {
let mut last_epoch = Instant::now();
let mut mutation_buffer: Vec<_> = Vec::with_capacity(MAX_MUTATIONS_PER_EPOCH);
let mut mutation_buffer: Vec<_> = Vec::with_capacity(AVG_MUTATIONS_PER_EPOCH);
let mut mutation_buffer_rx = MUTATION_BUFFER
.get_or_init(MutationBuffer::default)
.mutation_buffer_rx
Expand Down Expand Up @@ -138,6 +150,11 @@ async unsafe fn process_mutation_buffer(
) {
// It is very important that we do not delay any mutations that
// have occurred at this point by an extra epoch.
PENDING_MUTATIONS.store(
mutation_buffer_rx.len(),
std::sync::atomic::Ordering::Relaxed,
);

let to_recv = mutation_buffer_rx.len().max(MIN_MUTATIONS_PER_EPOCH);

mutation_buffer_rx.recv_many(mutation_buffer, to_recv).await;
Expand Down Expand Up @@ -176,7 +193,7 @@ unsafe fn decrement(s: OpaqueGcPtr) {

unsafe fn release(s: OpaqueGcPtr) {
unsafe {
for_each_child(s, decrement);
for_each_child(s, &mut |c| decrement(c));
s.set_color(Color::Black);
if !s.buffered() {
free(s);
Expand Down Expand Up @@ -255,51 +272,78 @@ unsafe fn collect_roots() {
}
}

enum MarkGrayPhase {
MarkGray(OpaqueGcPtr),
SetCrc(OpaqueGcPtr),
}

unsafe fn mark_gray(s: OpaqueGcPtr) {
unsafe {
let mut stack = Vec::new();
if s.color() != Color::Gray {
s.set_color(Color::Gray);
s.set_crc(s.rc() as isize);
for_each_child(s, |t| {
mark_gray(t);
let t_crc = t.crc();
if t_crc > 0 {
t.set_crc(t_crc - 1);
for_each_child(s, &mut |t| stack.push(MarkGrayPhase::MarkGray(t)))
}
while let Some(s) = stack.pop() {
match s {
MarkGrayPhase::MarkGray(s) => {
if s.color() != Color::Gray {
s.set_color(Color::Gray);
s.set_crc(s.rc() as isize);
for_each_child(s, &mut |t| stack.push(MarkGrayPhase::MarkGray(t)))
}
stack.push(MarkGrayPhase::SetCrc(s))
}
});
MarkGrayPhase::SetCrc(s) => {
let s_crc = s.crc();
if s_crc > 0 {
s.set_crc(s_crc - 1);
}
}
}
}
}
}

unsafe fn scan(s: OpaqueGcPtr) {
unsafe {
if s.color() == Color::Gray {
if s.crc() == 0 {
s.set_color(Color::White);
for_each_child(s, scan);
} else {
scan_black(s);
let mut stack = vec![s];
while let Some(s) = stack.pop() {
if s.color() == Color::Gray {
if s.crc() == 0 {
s.set_color(Color::White);
for_each_child(s, &mut |c| stack.push(c));
} else {
scan_black(s);
}
}
}
}
}

unsafe fn scan_black(s: OpaqueGcPtr) {
unsafe {
if s.color() != Color::Black {
s.set_color(Color::Black);
for_each_child(s, scan_black);
let mut stack = vec![s];
while let Some(s) = stack.pop() {
if s.color() != Color::Black {
s.set_color(Color::Black);
for_each_child(s, &mut |c| stack.push(c));
}
}
}
}

unsafe fn collect_white(s: OpaqueGcPtr) {
unsafe {
if s.color() == Color::White {
s.set_color(Color::Orange);
s.set_buffered(true);
(&raw mut CURRENT_CYCLE).as_mut().unwrap().push(s);
for_each_child(s, collect_white);
let mut stack = vec![s];
while let Some(s) = stack.pop() {
if s.color() == Color::White {
s.set_color(Color::Orange);
s.set_buffered(true);
(&raw mut CURRENT_CYCLE).as_mut().unwrap().push(s);
for_each_child(s, &mut |c| stack.push(c));
}
}
}
}
Expand All @@ -312,7 +356,7 @@ unsafe fn sigma_preparation() {
n.set_crc(n.rc() as isize);
}
for n in c {
for_each_child(*n, |m| {
for_each_child(*n, &mut |m| {
if m.color() == Color::Red && m.crc() > 0 {
m.set_crc(m.crc() - 1);
}
Expand Down Expand Up @@ -397,7 +441,7 @@ unsafe fn free_cycle(c: &[OpaqueGcPtr]) {
n.set_color(Color::Red);
}
for n in c {
for_each_child(*n, cyclic_decrement);
for_each_child(*n, &mut |c| cyclic_decrement(c));
}
for n in c {
free(*n);
Expand All @@ -418,7 +462,7 @@ unsafe fn cyclic_decrement(m: OpaqueGcPtr) {
}
}

unsafe fn for_each_child(s: OpaqueGcPtr, visitor: unsafe fn(OpaqueGcPtr)) {
unsafe fn for_each_child(s: OpaqueGcPtr, visitor: &mut dyn FnMut(OpaqueGcPtr)) {
unsafe {
let lock = s.lock().read().unwrap();
(s.visit_children())(s.data(), visitor);
Expand Down
Loading