Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9dfa51d
feat(pubsub): implement core PubSub infrastructure and FFI integration
jbrinkman Oct 9, 2025
4cbd22a
feat(rust): add PubSub FFI functions for callback registration and me…
jbrinkman Oct 10, 2025
73a842c
feat(rust): implement proper PubSub callback storage and management
jbrinkman Oct 10, 2025
4697c40
feat: Add comprehensive integration tests for FFI PubSub callback flow
jbrinkman Oct 13, 2025
0a75eea
test(pubsub): Refactor PubSub FFI Callback Integration Tests
jbrinkman Oct 15, 2025
03dc000
refactor(pubsub): Implement instance-based PubSub callback architecture
jbrinkman Oct 17, 2025
0e84419
fix: resolve critical memory leak in PubSub FFI message processing
jbrinkman Oct 17, 2025
dd0c85d
feat(pubsub): add thread safety to PubSub handler access in BaseClient
jbrinkman Oct 17, 2025
426e1a9
feat(pubsub): replace Task.Run with channel-based message processing
jbrinkman Oct 17, 2025
069af2d
feat(pubsub): implement graceful shutdown coordination between Rust a…
jbrinkman Oct 18, 2025
da86a39
feat(pubsub): Add queue-based message retrieval and comprehensive int…
jbrinkman Oct 20, 2025
13ba6c4
refactor(pubsub): Remove unused PubSubConfigurationExtensions class
jbrinkman Oct 20, 2025
5868588
style: Apply code formatting to PubSub files
jbrinkman Oct 20, 2025
fa8bff7
chore(reports): Remove legacy reporting artifacts and unused files
jbrinkman Oct 20, 2025
600b48e
refactor(pubsub): Simplify synchronization primitives in PubSub messa…
jbrinkman Oct 20, 2025
d59edfd
fix: Address Lint configuration errors
jbrinkman Oct 20, 2025
60c2c30
fix: enable pattern subscriptions in cluster mode
jbrinkman Oct 20, 2025
d2eec0e
test: remove redundant and inaccurate PubSub tests
jbrinkman Oct 21, 2025
e12fad2
chore: remove doc created for development
jbrinkman Oct 31, 2025
34185e0
chore: cleanup temp script
jbrinkman Oct 31, 2025
5391029
fix: combine null and string empty check
jbrinkman Oct 31, 2025
cd18fbd
refactor(pubsub): implement lazy initialization for PubSubMessageQueue
jbrinkman Oct 31, 2025
8a3f237
refactor: improve PubSubMessageQueue documentation and clarity
jbrinkman Nov 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,7 @@ $RECYCLE.BIN/
_NCrunch*

glide-logs/

# Test results and coverage reports
testresults/
reports/
118 changes: 116 additions & 2 deletions rust/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,95 @@ pub struct ConnectionConfig {
pub protocol: redis::ProtocolVersion,
/// zero pointer is valid, means no client name is given (`None`)
pub client_name: *const c_char,
pub has_pubsub_config: bool,
pub pubsub_config: PubSubConfigInfo,
/*
TODO below
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
pub inflight_requests_limit: Option<u32>,
pub otel_endpoint: Option<String>,
pub otel_flush_interval_ms: Option<u64>,
*/
}

#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct PubSubConfigInfo {
pub channels_ptr: *const *const c_char,
pub channel_count: u32,
pub patterns_ptr: *const *const c_char,
pub pattern_count: u32,
pub sharded_channels_ptr: *const *const c_char,
pub sharded_channel_count: u32,
}

/// Convert a C string array to a Vec of Vec<u8>
///
/// # Safety
///
/// * `ptr` must point to an array of `count` valid C string pointers
/// * Each C string pointer must be valid and null-terminated
unsafe fn convert_string_array(ptr: *const *const c_char, count: u32) -> Vec<Vec<u8>> {
if ptr.is_null() || count == 0 {
return Vec::new();
}

let slice = unsafe { std::slice::from_raw_parts(ptr, count as usize) };
slice
.iter()
.map(|&str_ptr| {
let c_str = unsafe { CStr::from_ptr(str_ptr) };
c_str.to_bytes().to_vec()
})
.collect()
}

/// Convert PubSubConfigInfo to the format expected by glide-core
///
/// # Safety
///
/// * All pointers in `config` must be valid or null
/// * String arrays must contain valid C strings
unsafe fn convert_pubsub_config(
config: &PubSubConfigInfo,
) -> std::collections::HashMap<redis::PubSubSubscriptionKind, std::collections::HashSet<Vec<u8>>> {
use redis::PubSubSubscriptionKind;
use std::collections::{HashMap, HashSet};

let mut subscriptions = HashMap::new();

// Convert exact channels
if config.channel_count > 0 {
let channels = unsafe { convert_string_array(config.channels_ptr, config.channel_count) };
subscriptions.insert(
PubSubSubscriptionKind::Exact,
channels.into_iter().collect::<HashSet<_>>(),
);
}

// Convert patterns
if config.pattern_count > 0 {
let patterns = unsafe { convert_string_array(config.patterns_ptr, config.pattern_count) };
subscriptions.insert(
PubSubSubscriptionKind::Pattern,
patterns.into_iter().collect::<HashSet<_>>(),
);
}

// Convert sharded channels
if config.sharded_channel_count > 0 {
let sharded = unsafe {
convert_string_array(config.sharded_channels_ptr, config.sharded_channel_count)
};
subscriptions.insert(
PubSubSubscriptionKind::Sharded,
sharded.into_iter().collect::<HashSet<_>>(),
);
}

subscriptions
}

/// Convert connection configuration to a corresponding object.
///
/// # Safety
Expand Down Expand Up @@ -147,9 +226,18 @@ pub(crate) unsafe fn create_connection_request(
} else {
None
},
pubsub_subscriptions: if config.has_pubsub_config {
let subscriptions = unsafe { convert_pubsub_config(&config.pubsub_config) };
if subscriptions.is_empty() {
None
} else {
Some(subscriptions)
}
} else {
None
},
// TODO below
periodic_checks: None,
pubsub_subscriptions: None,
inflight_requests_limit: None,
lazy_connect: false,
}
Expand Down Expand Up @@ -593,3 +681,29 @@ pub(crate) unsafe fn get_pipeline_options(
PipelineRetryStrategy::new(info.retry_server_error, info.retry_connection_error),
)
}

/// FFI callback function type for PubSub messages.
/// This callback is invoked by Rust when a PubSub message is received.
/// The callback signature matches the C# expectations for marshaling PubSub data.
///
/// # Parameters
/// * `push_kind` - The type of push notification (message, pmessage, smessage, etc.)
/// * `message_ptr` - Pointer to the raw message bytes
/// * `message_len` - Length of the message data in bytes
/// * `channel_ptr` - Pointer to the raw channel name bytes
/// * `channel_len` - Length of the channel name in bytes
/// * `pattern_ptr` - Pointer to the raw pattern bytes (null if no pattern)
/// * `pattern_len` - Length of the pattern in bytes (0 if no pattern)
pub type PubSubCallback = unsafe extern "C" fn(
push_kind: u32,
message_ptr: *const u8,
message_len: i64,
channel_ptr: *const u8,
channel_len: i64,
pattern_ptr: *const u8,
pattern_len: i64,
);

// PubSub callback functions removed - using instance-based callbacks instead.
// The pubsub_callback parameter in create_client will be used to configure glide-core's
// PubSub message handler when full integration is implemented.
Loading