1- use  std:: { 
2-     convert:: TryInto , 
3-     marker:: PhantomData , 
4-     sync:: { Arc ,  Mutex } , 
5-     time:: Duration , 
6- } ; 
1+ use  std:: { convert:: TryInto ,  marker:: PhantomData ,  sync:: Arc ,  time:: Duration } ; 
72
83use  crossbeam:: channel; 
4+ use  parking_lot:: Mutex ; 
95use  sbp:: { 
106    messages:: { ConcreteMessage ,  SBPMessage ,  SBP } , 
117    time:: { GpsTime ,  GpsTimeError } , 
@@ -19,8 +15,6 @@ pub struct Broadcaster {
1915} 
2016
2117impl  Broadcaster  { 
22-     const  CHANNELS_LOCK_FAILURE :  & ' static  str  = "failed to aquire lock on channels" ; 
23- 
2418    pub  fn  new ( )  -> Self  { 
2519        Self  { 
2620            channels :  Arc :: new ( Mutex :: new ( HopSlotMap :: with_key ( ) ) ) , 
@@ -29,7 +23,7 @@ impl Broadcaster {
2923
3024    pub  fn  send ( & self ,  message :  & SBP ,  gps_time :  MaybeGpsTime )  { 
3125        let  msg_type = message. get_message_type ( ) ; 
32-         let  mut  channels = self . channels . lock ( ) . expect ( Self :: CHANNELS_LOCK_FAILURE ) ; 
26+         let  mut  channels = self . channels . lock ( ) ; 
3327        channels. retain ( |_,  chan| { 
3428            if  chan. msg_types . iter ( ) . any ( |ty| ty == & msg_type)  { 
3529                chan. inner . send ( ( message. clone ( ) ,  gps_time. clone ( ) ) ) . is_ok ( ) 
@@ -44,7 +38,7 @@ impl Broadcaster {
4438        E :  Event , 
4539    { 
4640        let  ( tx,  rx)  = channel:: unbounded ( ) ; 
47-         let  mut  channels = self . channels . lock ( ) . expect ( Self :: CHANNELS_LOCK_FAILURE ) ; 
41+         let  mut  channels = self . channels . lock ( ) ; 
4842        let  key = channels. insert ( Sender  { 
4943            inner :  tx, 
5044            msg_types :  E :: MESSAGE_TYPES , 
@@ -59,7 +53,7 @@ impl Broadcaster {
5953    } 
6054
6155    pub  fn  unsubscribe ( & self ,  key :  Key )  { 
62-         let  mut  channels = self . channels . lock ( ) . expect ( Self :: CHANNELS_LOCK_FAILURE ) ; 
56+         let  mut  channels = self . channels . lock ( ) ; 
6357        channels. remove ( key. inner ) ; 
6458    } 
6559
@@ -206,18 +200,69 @@ where
206200    } 
207201} 
208202
209- struct  Callback < ' a >  { 
210-     func :  Box < dyn  FnMut ( SBP ,  MaybeGpsTime )  + Send  + ' a > , 
211-     msg_types :  & ' static  [ u16 ] , 
203+ enum  Callback < ' a >  { 
204+     Id  { 
205+         func :  Box < dyn  FnMut ( SBP ,  MaybeGpsTime )  + Send  + ' a > , 
206+         msg_type :  u16 , 
207+     } , 
208+     Event  { 
209+         func :  Box < dyn  FnMut ( SBP ,  MaybeGpsTime )  + Send  + ' a > , 
210+         msg_types :  & ' static  [ u16 ] , 
211+     } , 
212+ } 
213+ 
214+ impl < ' a >  Callback < ' a >  { 
215+     fn  run ( & mut  self ,  msg :  SBP ,  time :  MaybeGpsTime )  { 
216+         match  self  { 
217+             Callback :: Id  {  func,  .. }  | Callback :: Event  {  func,  .. }  => ( func) ( msg,  time) , 
218+         } 
219+     } 
220+ 
221+     fn  should_run ( & self ,  msg :  u16 )  -> bool  { 
222+         match  self  { 
223+             Callback :: Id  {  msg_type,  .. }  => msg_type == & msg, 
224+             Callback :: Event  {  msg_types,  .. }  => msg_types. contains ( & msg) , 
225+         } 
226+     } 
227+ } 
228+ 
229+ pub  fn  with_link < ' env ,  F > ( f :  F ) 
230+ where 
231+     F :  FnOnce ( & LinkSource < ' env > ) , 
232+ { 
233+     let  source:  LinkSource < ' env >  = LinkSource  {  link :  Link :: new ( )  } ; 
234+     f ( & source) ; 
235+ } 
236+ 
237+ pub  struct  LinkSource < ' env >  { 
238+     link :  Link < ' env > , 
239+ } 
240+ 
241+ impl < ' env >  LinkSource < ' env >  { 
242+     pub  fn  link < ' scope > ( & self )  -> Link < ' scope > 
243+     where 
244+         ' env :  ' scope , 
245+     { 
246+         let  link:  Link < ' scope >  = unsafe  {  std:: mem:: transmute ( self . link . clone ( ) )  } ; 
247+         link
248+     } 
249+ 
250+     pub  fn  send ( & self ,  message :  & SBP ,  gps_time :  MaybeGpsTime )  -> bool  { 
251+         self . link . send ( message,  gps_time) 
252+     } 
253+ } 
254+ 
255+ impl < ' env >  Drop  for  LinkSource < ' env >  { 
256+     fn  drop ( & mut  self )  { 
257+         self . link . callbacks . lock ( ) . clear ( ) ; 
258+     } 
212259} 
213260
214261pub  struct  Link < ' a >  { 
215262    callbacks :  Arc < Mutex < DenseSlotMap < KeyInner ,  Callback < ' a > > > > , 
216263} 
217264
218265impl < ' a >  Link < ' a >  { 
219-     const  CB_LOCK_FAILURE :  & ' static  str  = "failed to aquire lock on callbacks" ; 
220- 
221266    pub  fn  new ( )  -> Self  { 
222267        Self  { 
223268            callbacks :  Arc :: new ( Mutex :: new ( DenseSlotMap :: with_key ( ) ) ) , 
@@ -226,25 +271,35 @@ impl<'a> Link<'a> {
226271
227272    pub  fn  send ( & self ,  message :  & SBP ,  gps_time :  MaybeGpsTime )  -> bool  { 
228273        let  msg_type = message. get_message_type ( ) ; 
229-         let  mut  cbs = self . callbacks . lock ( ) . expect ( Self :: CB_LOCK_FAILURE ) ; 
230-         let  to_call = cbs
231-             . values_mut ( ) 
232-             . filter ( |cb| cb. msg_types . is_empty ( )  || cb. msg_types . iter ( ) . any ( |ty| ty == & msg_type) ) ; 
274+         let  mut  cbs = self . callbacks . lock ( ) ; 
275+         let  to_call = cbs. values_mut ( ) . filter ( |cb| cb. should_run ( msg_type) ) ; 
233276        let  mut  called = false ; 
234277        for  cb in  to_call { 
235-             ( cb. func ) ( message. clone ( ) ,  gps_time. clone ( ) ) ; 
278+             cb. run ( message. clone ( ) ,  gps_time. clone ( ) ) ; 
236279            called = true ; 
237280        } 
238281        called
239282    } 
240283
241-     pub  fn  register_cb < H ,  E ,  K > ( & mut  self ,  mut  handler :  H )  -> Key 
284+     pub  fn  register_cb_by_id < H > ( & self ,  id :  u16 ,  mut  handler :  H )  -> Key 
285+     where 
286+         H :  FnMut ( SBP )  + Send  + ' a , 
287+     { 
288+         let  mut  cbs = self . callbacks . lock ( ) ; 
289+         let  inner = cbs. insert ( Callback :: Id  { 
290+             func :  Box :: new ( move  |msg,  _time| ( handler) ( msg) ) , 
291+             msg_type :  id, 
292+         } ) ; 
293+         Key  {  inner } 
294+     } 
295+ 
296+     pub  fn  register_cb < H ,  E ,  K > ( & self ,  mut  handler :  H )  -> Key 
242297    where 
243298        H :  Handler < E ,  K >  + Send  + ' a , 
244299        E :  Event , 
245300    { 
246-         let  mut  cbs = self . callbacks . lock ( ) . expect ( Self :: CB_LOCK_FAILURE ) ; 
247-         let  inner = cbs. insert ( Callback  { 
301+         let  mut  cbs = self . callbacks . lock ( ) ; 
302+         let  inner = cbs. insert ( Callback :: Event  { 
248303            func :  Box :: new ( move  |msg,  time| { 
249304                let  event = E :: from_sbp ( msg) ; 
250305                handler. run ( event,  time) 
@@ -255,7 +310,7 @@ impl<'a> Link<'a> {
255310    } 
256311
257312    pub  fn  unregister_cb ( & self ,  key :  Key )  { 
258-         let  mut  cbs = self . callbacks . lock ( ) . expect ( Self :: CB_LOCK_FAILURE ) ; 
313+         let  mut  cbs = self . callbacks . lock ( ) ; 
259314        cbs. remove ( key. inner ) ; 
260315    } 
261316} 
@@ -268,7 +323,7 @@ impl Clone for Link<'_> {
268323    } 
269324} 
270325
271- impl  Default  for  Link < ' _ >  { 
326+ impl < ' a >  Default  for  Link < ' a >  { 
272327    fn  default ( )  -> Self  { 
273328        Self :: new ( ) 
274329    } 
@@ -296,7 +351,7 @@ mod tests {
296351
297352    #[ test]  
298353    fn  test_dispatcher ( )  { 
299-         let  mut   d = Link :: new ( ) ; 
354+         let  d = Link :: new ( ) ; 
300355        let  mut  c = Counter  {  count :  0  } ; 
301356        d. register_cb ( |obs| c. obs ( obs) ) ; 
302357        d. send ( & make_msg_obs ( ) ,  None ) ; 
0 commit comments