@@ -5,31 +5,35 @@ import (
55 "sync"
66 "time"
77
8+ "emperror.dev/errors"
89 "github.com/sirupsen/logrus"
910
1011 "github.com/wework/grabbit/gbus"
12+ "github.com/wework/grabbit/gbus/deduplicator/implementation"
1113 "github.com/wework/grabbit/gbus/saga"
1214 "github.com/wework/grabbit/gbus/serialization"
1315 "github.com/wework/grabbit/gbus/tx/mysql"
1416)
1517
1618type defaultBuilder struct {
17- PrefetchCount uint
18- connStr string
19- purgeOnStartup bool
20- sagaStoreConnStr string
21- txnl bool
22- txConnStr string
23- txnlProvider string
24- workerNum uint
25- serializer gbus.Serializer
26- dlx string
27- defaultPolicies []gbus.MessagePolicy
28- confirm bool
29- dbPingTimeout time.Duration
30- usingPingTimeout bool
31- logger logrus.FieldLogger
32- busCfg gbus.BusConfiguration
19+ PrefetchCount uint
20+ connStr string
21+ purgeOnStartup bool
22+ sagaStoreConnStr string
23+ txnl bool
24+ txConnStr string
25+ txnlProvider string
26+ workerNum uint
27+ serializer gbus.Serializer
28+ dlx string
29+ defaultPolicies []gbus.MessagePolicy
30+ confirm bool
31+ dbPingTimeout time.Duration
32+ usingPingTimeout bool
33+ logger logrus.FieldLogger
34+ busCfg gbus.BusConfiguration
35+ deduplicationPolicy gbus.DeduplicationPolicy
36+ deduplicationRetentionAge time.Duration
3337}
3438
3539func (builder * defaultBuilder ) Build (svcName string ) gbus.Bus {
@@ -53,6 +57,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
5357 DefaultPolicies : builder .defaultPolicies ,
5458 DbPingTimeout : 3 ,
5559 Confirm : builder .confirm ,
60+ DeduplicationPolicy : builder .deduplicationPolicy ,
5661 }
5762
5863 var finalLogger logrus.FieldLogger
@@ -107,6 +112,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
107112 if builder .usingPingTimeout {
108113 gb .DbPingTimeout = builder .dbPingTimeout
109114 }
115+ gb .Deduplicator = implementation .NewDeduplicator (svcName , builder .deduplicationPolicy , gb .TxProvider , builder .deduplicationRetentionAge , gb .Log ())
110116
111117 //TODO move this into the NewSagaStore factory methods
112118 if builder .purgeOnStartup {
@@ -115,6 +121,11 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
115121 errMsg := fmt .Errorf ("grabbit: saga store faild to purge. error: %v" , err )
116122 panic (errMsg )
117123 }
124+ err = gb .Deduplicator .Purge ()
125+ if err != nil {
126+ errMsg := errors .NewWithDetails ("duplicator failed to purge" , "component" , "grabbit" , "feature" , "deduplicator" )
127+ panic (errMsg )
128+ }
118129 }
119130 glue := saga .NewGlue (gb , sagaStore , svcName , gb .TxProvider , gb .Log , timeoutManager )
120131 glue .SetLogger (gb .Log ())
@@ -206,6 +217,12 @@ func (builder *defaultBuilder) WithLogger(logger logrus.FieldLogger) gbus.Builde
206217 return builder
207218}
208219
220+ func (builder * defaultBuilder ) WithDeduplicationPolicy (policy gbus.DeduplicationPolicy , age time.Duration ) gbus.Builder {
221+ builder .deduplicationPolicy = policy
222+ builder .deduplicationRetentionAge = age
223+ return builder
224+ }
225+
209226//New :)
210227func New () Nu {
211228 return Nu {}
@@ -218,9 +235,11 @@ type Nu struct {
218235//Bus inits a new BusBuilder
219236func (Nu ) Bus (brokerConnStr string ) gbus.Builder {
220237 return & defaultBuilder {
221- busCfg : gbus.BusConfiguration {},
222- PrefetchCount : 1 ,
223- connStr : brokerConnStr ,
224- serializer : serialization .NewGobSerializer (),
225- defaultPolicies : make ([]gbus.MessagePolicy , 0 )}
238+ busCfg : gbus.BusConfiguration {},
239+ PrefetchCount : 1 ,
240+ connStr : brokerConnStr ,
241+ serializer : serialization .NewGobSerializer (),
242+ defaultPolicies : make ([]gbus.MessagePolicy , 0 ),
243+ deduplicationPolicy : gbus .DeduplicationPolicyNone ,
244+ }
226245}
0 commit comments