@@ -11,6 +11,7 @@ import (
1111	"reflect" 
1212	"sync" 
1313	"syscall" 
14+ 	"time" 
1415
1516	"github.com/ethereum/go-ethereum/cmd/utils" 
1617	"github.com/ethereum/go-ethereum/event" 
@@ -31,7 +32,7 @@ import (
3132	"gopkg.in/urfave/cli.v1" 
3233)
3334
34- const  shardChainDbName  =  "shardchaindata" 
35+ const  shardChainDBName  =  "shardchaindata" 
3536
3637// ShardEthereum is a service that is registered and started when geth is launched. 
3738// it contains APIs and fields that handle the different components of the sharded 
@@ -43,9 +44,10 @@ type ShardEthereum struct {
4344	eventFeed    * event.Feed     // Used to enable P2P related interactions via different sharding actors. 
4445
4546	// Lifecycle and service stores. 
46- 	services  map [reflect.Type ]sharding.Service  // Service registry. 
47- 	lock      sync.RWMutex 
48- 	stop      chan  struct {} // Channel to wait for termination notifications 
47+ 	services      map [reflect.Type ]sharding.Service  // Service registry. 
48+ 	serviceTypes  []reflect.Type                     // Keeps an ordered slice of registered service types. 
49+ 	lock          sync.RWMutex 
50+ 	stop          chan  struct {} // Channel to wait for termination notifications 
4951}
5052
5153// New creates a new sharding-enabled Ethereum instance. This is called in the main 
@@ -81,12 +83,8 @@ func New(ctx *cli.Context) (*ShardEthereum, error) {
8183		return  nil , err 
8284	}
8385
84- 	// Should not trigger simulation requests if actor is a notary, as this 
85- 	// is supposed to "simulate" notaries sending requests via p2p. 
86- 	if  actorFlag  !=  "notary"  {
87- 		if  err  :=  shardEthereum .registerSimulatorService (shardEthereum .shardConfig , shardIDFlag ); err  !=  nil  {
88- 			return  nil , err 
89- 		}
86+ 	if  err  :=  shardEthereum .registerSimulatorService (actorFlag , shardEthereum .shardConfig , shardIDFlag ); err  !=  nil  {
87+ 		return  nil , err 
9088	}
9189
9290	if  err  :=  shardEthereum .registerSyncerService (shardEthereum .shardConfig , shardIDFlag ); err  !=  nil  {
@@ -102,9 +100,9 @@ func (s *ShardEthereum) Start() {
102100
103101	log .Info ("Starting sharding node" )
104102
105- 	for  _ , service  :=  range  s .services  {
106- 		// Start the next  service. 
107- 		service .Start ()
103+ 	for  _ , kind  :=  range  s .serviceTypes  {
104+ 		// Start each  service in order of registration . 
105+ 		s . services [ kind ] .Start ()
108106	}
109107
110108	stop  :=  s .stop 
@@ -148,53 +146,55 @@ func (s *ShardEthereum) Close() {
148146	close (s .stop )
149147}
150148
151- // Register  appends a service constructor function to the service registry of the 
149+ // registerService  appends a service constructor function to the service registry of the 
152150// sharding node. 
153- func  (s  * ShardEthereum ) Register (constructor  sharding.ServiceConstructor ) error  {
154- 	s .lock .Lock ()
155- 	defer  s .lock .Unlock ()
156- 
157- 	ctx  :=  & sharding.ServiceContext {
158- 		Services : make (map [reflect.Type ]sharding.Service ),
159- 	}
160- 
161- 	// Copy needed for threaded access. 
162- 	for  kind , s  :=  range  s .services  {
163- 		ctx .Services [kind ] =  s 
164- 	}
165- 
166- 	service , err  :=  constructor (ctx )
167- 	if  err  !=  nil  {
168- 		return  err 
169- 	}
170- 
151+ func  (s  * ShardEthereum ) registerService (service  sharding.Service ) error  {
171152	kind  :=  reflect .TypeOf (service )
172153	if  _ , exists  :=  s .services [kind ]; exists  {
173154		return  fmt .Errorf ("service already exists: %v" , kind )
174155	}
175156	s .services [kind ] =  service 
176- 
157+ 	 s . serviceTypes   =   append ( s . serviceTypes ,  kind ) 
177158	return  nil 
178159}
179160
161+ // fetchService takes in a struct pointer and sets the value of that pointer 
162+ // to a service currently stored in the service registry. This ensures the input argument is 
163+ // set to the right pointer that refers to the originally registered service. 
164+ func  (s  * ShardEthereum ) fetchService (service  interface {}) error  {
165+ 	if  reflect .TypeOf (service ).Kind () !=  reflect .Ptr  {
166+ 		return  fmt .Errorf ("input must be of pointer type, received value type instead: %T" , service )
167+ 	}
168+ 	element  :=  reflect .ValueOf (service ).Elem ()
169+ 	if  running , ok  :=  s .services [element .Type ()]; ok  {
170+ 		element .Set (reflect .ValueOf (running ))
171+ 		return  nil 
172+ 	}
173+ 	return  fmt .Errorf ("unknown service: %T" , service )
174+ }
175+ 
180176// registerShardChainDB attaches a LevelDB wrapped object to the shardEthereum instance. 
181177func  (s  * ShardEthereum ) registerShardChainDB (ctx  * cli.Context ) error  {
182178	path  :=  node .DefaultDataDir ()
183179	if  ctx .GlobalIsSet (utils .DataDirFlag .Name ) {
184180		path  =  ctx .GlobalString (utils .DataDirFlag .Name )
185181	}
186- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
187- 		return  database .NewShardDB (path , shardChainDbName , false )
188- 	})
182+ 	shardDB , err  :=  database .NewShardDB (path , shardChainDBName , false )
183+ 	if  err  !=  nil  {
184+ 		return  fmt .Errorf ("could not register shardDB service: %v" , err )
185+ 	}
186+ 	return  s .registerService (shardDB )
189187}
190188
191189// registerP2P attaches a p2p server to the ShardEthereum instance. 
192190// TODO: Design this p2p service and the methods it should expose as well as 
193191// its event loop. 
194192func  (s  * ShardEthereum ) registerP2P () error  {
195- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
196- 		return  p2p .NewServer ()
197- 	})
193+ 	shardp2p , err  :=  p2p .NewServer ()
194+ 	if  err  !=  nil  {
195+ 		return  fmt .Errorf ("could not register shardp2p service: %v" , err )
196+ 	}
197+ 	return  s .registerService (shardp2p )
198198}
199199
200200// registerMainchainClient 
@@ -214,9 +214,11 @@ func (s *ShardEthereum) registerMainchainClient(ctx *cli.Context) error {
214214	passwordFile  :=  ctx .GlobalString (utils .PasswordFileFlag .Name )
215215	depositFlag  :=  ctx .GlobalBool (utils .DepositFlag .Name )
216216
217- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
218- 		return  mainchain .NewSMCClient (endpoint , path , depositFlag , passwordFile )
219- 	})
217+ 	client , err  :=  mainchain .NewSMCClient (endpoint , path , depositFlag , passwordFile )
218+ 	if  err  !=  nil  {
219+ 		return  fmt .Errorf ("could not register smc client service: %v" , err )
220+ 	}
221+ 	return  s .registerService (client )
220222}
221223
222224// registerTXPool is only relevant to proposers in the sharded system. It will 
@@ -228,53 +230,101 @@ func (s *ShardEthereum) registerTXPool(actor string) error {
228230	if  actor  !=  "proposer"  {
229231		return  nil 
230232	}
231- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
232- 		var  p2p  * p2p.Server 
233- 		ctx .RetrieveService (& p2p )
234- 		return  txpool .NewTXPool (p2p )
235- 	})
233+ 	var  shardp2p  * p2p.Server 
234+ 	if  err  :=  s .fetchService (& shardp2p ); err  !=  nil  {
235+ 		return  err 
236+ 	}
237+ 	pool , err  :=  txpool .NewTXPool (shardp2p )
238+ 	if  err  !=  nil  {
239+ 		return  fmt .Errorf ("could not register shard txpool service: %v" , err )
240+ 	}
241+ 	return  s .registerService (pool )
236242}
237243
238244// Registers the actor according to CLI flags. Either notary/proposer/observer. 
239245func  (s  * ShardEthereum ) registerActorService (config  * params.Config , actor  string , shardID  int ) error  {
240- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
241- 
242- 		var  p2p  * p2p.Server 
243- 		ctx .RetrieveService (& p2p )
244- 		var  smcClient  * mainchain.SMCClient 
245- 		ctx .RetrieveService (& smcClient )
246- 		var  shardChainDB  * database.ShardDB 
247- 		ctx .RetrieveService (& shardChainDB )
248- 
249- 		if  actor  ==  "notary"  {
250- 			return  notary .NewNotary (config , smcClient , p2p , shardChainDB )
251- 		} else  if  actor  ==  "proposer"  {
252- 			var  txPool  * txpool.TXPool 
253- 			ctx .RetrieveService (& txPool )
254- 			return  proposer .NewProposer (config , smcClient , p2p , txPool , shardChainDB .DB (), shardID )
246+ 	var  shardp2p  * p2p.Server 
247+ 	if  err  :=  s .fetchService (& shardp2p ); err  !=  nil  {
248+ 		return  err 
249+ 	}
250+ 	var  client  * mainchain.SMCClient 
251+ 	if  err  :=  s .fetchService (& client ); err  !=  nil  {
252+ 		return  err 
253+ 	}
254+ 
255+ 	var  shardChainDB  * database.ShardDB 
256+ 	if  err  :=  s .fetchService (& shardChainDB ); err  !=  nil  {
257+ 		return  err 
258+ 	}
259+ 
260+ 	if  actor  ==  "notary"  {
261+ 		not , err  :=  notary .NewNotary (config , client , shardp2p , shardChainDB )
262+ 		if  err  !=  nil  {
263+ 			return  fmt .Errorf ("could not register notary service: %v" , err )
264+ 		}
265+ 		return  s .registerService (not )
266+ 	} else  if  actor  ==  "proposer"  {
267+ 
268+ 		var  pool  * txpool.TXPool 
269+ 		if  err  :=  s .fetchService (& pool ); err  !=  nil  {
270+ 			return  err 
271+ 		}
272+ 
273+ 		prop , err  :=  proposer .NewProposer (config , client , shardp2p , pool , shardChainDB , shardID )
274+ 		if  err  !=  nil  {
275+ 			return  fmt .Errorf ("could not register proposer service: %v" , err )
255276		}
256- 		return  observer .NewObserver (p2p , shardChainDB .DB (), shardID )
257- 	})
277+ 		return  s .registerService (prop )
278+ 	}
279+ 	obs , err  :=  observer .NewObserver (shardp2p , shardChainDB , shardID )
280+ 	if  err  !=  nil  {
281+ 		return  fmt .Errorf ("could not register observer service: %v" , err )
282+ 	}
283+ 	return  s .registerService (obs )
258284}
259285
260- func  (s  * ShardEthereum ) registerSimulatorService (config  * params.Config , shardID  int ) error  {
261- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
262- 		var  p2p  * p2p.Server 
263- 		ctx .RetrieveService (& p2p )
264- 		var  smcClient  * mainchain.SMCClient 
265- 		ctx .RetrieveService (& smcClient )
266- 		return  simulator .NewSimulator (config , smcClient , p2p , shardID , 15 ) // 15 second delay between simulator requests. 
267- 	})
286+ func  (s  * ShardEthereum ) registerSimulatorService (actorFlag  string , config  * params.Config , shardID  int ) error  {
287+ 	// Should not trigger simulation requests if actor is a notary, as this 
288+ 	// is supposed to "simulate" notaries sending requests via p2p. 
289+ 	if  actorFlag  ==  "notary"  {
290+ 		return  nil 
291+ 	}
292+ 
293+ 	var  shardp2p  * p2p.Server 
294+ 	if  err  :=  s .fetchService (& shardp2p ); err  !=  nil  {
295+ 		return  err 
296+ 	}
297+ 	var  client  * mainchain.SMCClient 
298+ 	if  err  :=  s .fetchService (& client ); err  !=  nil  {
299+ 		return  err 
300+ 	}
301+ 
302+ 	// 15 second delay between simulator requests. 
303+ 	sim , err  :=  simulator .NewSimulator (config , client , shardp2p , shardID , 15 * time .Second )
304+ 	if  err  !=  nil  {
305+ 		return  fmt .Errorf ("could not register simulator service: %v" , err )
306+ 	}
307+ 	return  s .registerService (sim )
268308}
269309
270310func  (s  * ShardEthereum ) registerSyncerService (config  * params.Config , shardID  int ) error  {
271- 	return  s .Register (func (ctx  * sharding.ServiceContext ) (sharding.Service , error ) {
272- 		var  p2p  * p2p.Server 
273- 		ctx .RetrieveService (& p2p )
274- 		var  smcClient  * mainchain.SMCClient 
275- 		ctx .RetrieveService (& smcClient )
276- 		var  shardChainDB  * database.ShardDB 
277- 		ctx .RetrieveService (& shardChainDB )
278- 		return  syncer .NewSyncer (config , smcClient , p2p , shardChainDB , shardID )
279- 	})
311+ 	var  shardp2p  * p2p.Server 
312+ 	if  err  :=  s .fetchService (& shardp2p ); err  !=  nil  {
313+ 		return  err 
314+ 	}
315+ 	var  client  * mainchain.SMCClient 
316+ 	if  err  :=  s .fetchService (& client ); err  !=  nil  {
317+ 		return  err 
318+ 	}
319+ 
320+ 	var  shardChainDB  * database.ShardDB 
321+ 	if  err  :=  s .fetchService (& shardChainDB ); err  !=  nil  {
322+ 		return  err 
323+ 	}
324+ 
325+ 	sync , err  :=  syncer .NewSyncer (config , client , shardp2p , shardChainDB , shardID )
326+ 	if  err  !=  nil  {
327+ 		return  fmt .Errorf ("could not register syncer service: %v" , err )
328+ 	}
329+ 	return  s .registerService (sync )
280330}
0 commit comments