Skip to content

Commit 9c7f92c

Browse files
authored
feat(p2p): automatically sync installed models between instances (#6108)
* feat(p2p): sync models between federated nodes This change makes sure that between federated nodes all the models are synced with each other. Note: this works exclusively with models belonging to a gallery. It does not sync files between the nodes, but rather it synces the node setup. E.g. All the nodes needs to have configured the same galleries and install models without any local editing. Signed-off-by: Ettore Di Giacinto <[email protected]> * Make nodes stable Signed-off-by: Ettore Di Giacinto <[email protected]> * Fixups on syncing Signed-off-by: Ettore Di Giacinto <[email protected]> * ui: improve p2p view Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent 060037b commit 9c7f92c

File tree

15 files changed

+617
-145
lines changed

15 files changed

+617
-145
lines changed

core/application/application.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package application
22

33
import (
44
"github.com/mudler/LocalAI/core/config"
5+
"github.com/mudler/LocalAI/core/services"
56
"github.com/mudler/LocalAI/core/templates"
67
"github.com/mudler/LocalAI/pkg/model"
78
)
@@ -11,6 +12,7 @@ type Application struct {
1112
modelLoader *model.ModelLoader
1213
applicationConfig *config.ApplicationConfig
1314
templatesEvaluator *templates.Evaluator
15+
galleryService *services.GalleryService
1416
}
1517

1618
func newApplication(appConfig *config.ApplicationConfig) *Application {
@@ -22,7 +24,7 @@ func newApplication(appConfig *config.ApplicationConfig) *Application {
2224
}
2325
}
2426

25-
func (a *Application) BackendLoader() *config.ModelConfigLoader {
27+
func (a *Application) ModelConfigLoader() *config.ModelConfigLoader {
2628
return a.backendLoader
2729
}
2830

@@ -37,3 +39,19 @@ func (a *Application) ApplicationConfig() *config.ApplicationConfig {
3739
func (a *Application) TemplatesEvaluator() *templates.Evaluator {
3840
return a.templatesEvaluator
3941
}
42+
43+
func (a *Application) GalleryService() *services.GalleryService {
44+
return a.galleryService
45+
}
46+
47+
func (a *Application) start() error {
48+
galleryService := services.NewGalleryService(a.ApplicationConfig(), a.ModelLoader())
49+
err := galleryService.Start(a.ApplicationConfig().Context, a.ModelConfigLoader(), a.ApplicationConfig().SystemState)
50+
if err != nil {
51+
return err
52+
}
53+
54+
a.galleryService = galleryService
55+
56+
return nil
57+
}

core/application/startup.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func New(opts ...config.AppOption) (*Application, error) {
6868

6969
configLoaderOpts := options.ToConfigLoaderOptions()
7070

71-
if err := application.BackendLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil {
71+
if err := application.ModelConfigLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil {
7272
log.Error().Err(err).Msg("error loading config files")
7373
}
7474

@@ -77,12 +77,12 @@ func New(opts ...config.AppOption) (*Application, error) {
7777
}
7878

7979
if options.ConfigFile != "" {
80-
if err := application.BackendLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil {
80+
if err := application.ModelConfigLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil {
8181
log.Error().Err(err).Msg("error loading config file")
8282
}
8383
}
8484

85-
if err := application.BackendLoader().Preload(options.SystemState.Model.ModelsPath); err != nil {
85+
if err := application.ModelConfigLoader().Preload(options.SystemState.Model.ModelsPath); err != nil {
8686
log.Error().Err(err).Msg("error downloading models")
8787
}
8888

@@ -99,7 +99,7 @@ func New(opts ...config.AppOption) (*Application, error) {
9999
}
100100

101101
if options.Debug {
102-
for _, v := range application.BackendLoader().GetAllModelsConfigs() {
102+
for _, v := range application.ModelConfigLoader().GetAllModelsConfigs() {
103103
log.Debug().Msgf("Model: %s (config: %+v)", v.Name, v)
104104
}
105105
}
@@ -132,7 +132,7 @@ func New(opts ...config.AppOption) (*Application, error) {
132132

133133
if options.LoadToMemory != nil && !options.SingleBackend {
134134
for _, m := range options.LoadToMemory {
135-
cfg, err := application.BackendLoader().LoadModelConfigFileByNameDefaultOptions(m, options)
135+
cfg, err := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(m, options)
136136
if err != nil {
137137
return nil, err
138138
}
@@ -152,6 +152,10 @@ func New(opts ...config.AppOption) (*Application, error) {
152152
// Watch the configuration directory
153153
startWatcher(options)
154154

155+
if err := application.start(); err != nil {
156+
return nil, err
157+
}
158+
155159
log.Info().Msg("core/startup process completed!")
156160
return application, nil
157161
}

core/cli/api/p2p.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77
"os"
88
"strings"
99

10+
"github.com/mudler/LocalAI/core/application"
1011
"github.com/mudler/LocalAI/core/p2p"
12+
"github.com/mudler/LocalAI/core/schema"
1113
"github.com/mudler/edgevpn/pkg/node"
1214

1315
"github.com/rs/zerolog/log"
1416
)
1517

16-
func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool) error {
18+
func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool, app *application.Application) error {
1719
var n *node.Node
1820
// Here we are avoiding creating multiple nodes:
1921
// - if the federated mode is enabled, we create a federated node and expose a service
@@ -39,6 +41,11 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa
3941
}
4042

4143
n = node
44+
45+
// start node sync in the background
46+
if err := p2p.Sync(ctx, node, app); err != nil {
47+
return err
48+
}
4249
}
4350

4451
// If the p2p mode is enabled, we start the service discovery
@@ -58,7 +65,7 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa
5865

5966
// Attach a ServiceDiscoverer to the p2p node
6067
log.Info().Msg("Starting P2P server discovery...")
61-
if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
68+
if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) {
6269
var tunnelAddresses []string
6370
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) {
6471
if v.IsOnline() {

core/cli/run.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,6 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
144144

145145
backgroundCtx := context.Background()
146146

147-
if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated); err != nil {
148-
return err
149-
}
150-
151147
idleWatchDog := r.EnableWatchdogIdle
152148
busyWatchDog := r.EnableWatchdogBusy
153149

@@ -216,5 +212,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
216212
return err
217213
}
218214

215+
if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated, app); err != nil {
216+
return err
217+
}
218+
219219
return appHTTP.Listen(r.Address)
220220
}

core/explorer/discovery.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/rs/zerolog/log"
1111

1212
"github.com/mudler/LocalAI/core/p2p"
13+
"github.com/mudler/LocalAI/core/schema"
1314
"github.com/mudler/edgevpn/pkg/blockchain"
1415
)
1516

@@ -177,7 +178,7 @@ func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockch
177178
atLeastOneWorker := false
178179
DATA:
179180
for _, v := range data[d] {
180-
nd := &p2p.NodeData{}
181+
nd := &schema.NodeData{}
181182
if err := v.Unmarshal(nd); err != nil {
182183
continue DATA
183184
}

core/http/app.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -197,21 +197,15 @@ func API(application *application.Application) (*fiber.App, error) {
197197
router.Use(csrf.New())
198198
}
199199

200-
galleryService := services.NewGalleryService(application.ApplicationConfig(), application.ModelLoader())
201-
err = galleryService.Start(application.ApplicationConfig().Context, application.BackendLoader(), application.ApplicationConfig().SystemState)
202-
if err != nil {
203-
return nil, err
204-
}
205-
206-
requestExtractor := middleware.NewRequestExtractor(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())
200+
requestExtractor := middleware.NewRequestExtractor(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())
207201

208-
routes.RegisterElevenLabsRoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())
209-
routes.RegisterLocalAIRoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig(), galleryService)
202+
routes.RegisterElevenLabsRoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())
203+
routes.RegisterLocalAIRoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService())
210204
routes.RegisterOpenAIRoutes(router, requestExtractor, application)
211205
if !application.ApplicationConfig().DisableWebUI {
212-
routes.RegisterUIRoutes(router, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig(), galleryService)
206+
routes.RegisterUIRoutes(router, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService())
213207
}
214-
routes.RegisterJINARoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())
208+
routes.RegisterJINARoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())
215209

216210
// Define a custom 404 handler
217211
// Note: keep this at the bottom!

core/http/elements/p2p.go

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"github.com/chasefleming/elem-go"
88
"github.com/chasefleming/elem-go/attrs"
99
"github.com/microcosm-cc/bluemonday"
10-
"github.com/mudler/LocalAI/core/p2p"
10+
"github.com/mudler/LocalAI/core/schema"
1111
)
1212

1313
func renderElements(n []elem.Node) string {
@@ -18,23 +18,25 @@ func renderElements(n []elem.Node) string {
1818
return render
1919
}
2020

21-
func P2PNodeStats(nodes []p2p.NodeData) string {
21+
func P2PNodeStats(nodes []schema.NodeData) string {
2222
online := 0
2323
for _, n := range nodes {
2424
if n.IsOnline() {
2525
online++
2626
}
2727
}
2828

29-
class := "text-blue-400"
29+
class := "text-green-400"
3030
if online == 0 {
3131
class = "text-red-400"
32+
} else if online < len(nodes) {
33+
class = "text-yellow-400"
3234
}
3335

3436
nodesElements := []elem.Node{
3537
elem.Span(
3638
attrs.Props{
37-
"class": class + " font-bold text-xl",
39+
"class": class + " font-bold text-2xl",
3840
},
3941
elem.Text(fmt.Sprintf("%d", online)),
4042
),
@@ -49,77 +51,106 @@ func P2PNodeStats(nodes []p2p.NodeData) string {
4951
return renderElements(nodesElements)
5052
}
5153

52-
func P2PNodeBoxes(nodes []p2p.NodeData) string {
53-
nodesElements := []elem.Node{}
54+
func P2PNodeBoxes(nodes []schema.NodeData) string {
55+
if len(nodes) == 0 {
56+
return `<div class="col-span-full flex flex-col items-center justify-center py-12 text-center bg-gray-800/50 border border-gray-700/50 rounded-xl">
57+
<i class="fas fa-server text-gray-500 text-4xl mb-4"></i>
58+
<p class="text-gray-400 text-lg font-medium">No nodes available</p>
59+
<p class="text-gray-500 text-sm mt-2">Start some workers to see them here</p>
60+
</div>`
61+
}
5462

63+
render := ""
5564
for _, n := range nodes {
5665
nodeID := bluemonday.StrictPolicy().Sanitize(n.ID)
5766

5867
// Define status-specific classes
5968
statusIconClass := "text-green-400"
6069
statusText := "Online"
6170
statusTextClass := "text-green-400"
71+
cardHoverClass := "hover:shadow-green-500/20 hover:border-green-400/50"
6272

6373
if !n.IsOnline() {
6474
statusIconClass = "text-red-400"
6575
statusText = "Offline"
6676
statusTextClass = "text-red-400"
77+
cardHoverClass = "hover:shadow-red-500/20 hover:border-red-400/50"
6778
}
6879

69-
nodesElements = append(nodesElements,
80+
nodeCard := elem.Div(
81+
attrs.Props{
82+
"class": "bg-gradient-to-br from-gray-800/90 to-gray-900/80 border border-gray-700/50 rounded-xl p-5 shadow-xl transition-all duration-300 " + cardHoverClass + " backdrop-blur-sm",
83+
},
84+
// Header with node icon and status
7085
elem.Div(
7186
attrs.Props{
72-
"class": "bg-gray-800/80 border border-gray-700/50 rounded-xl p-4 shadow-lg transition-all duration-300 hover:shadow-blue-900/20 hover:border-blue-700/50",
87+
"class": "flex items-center justify-between mb-4",
7388
},
74-
// Node ID and status indicator in top row
89+
// Node info
7590
elem.Div(
7691
attrs.Props{
77-
"class": "flex items-center justify-between mb-3",
92+
"class": "flex items-center",
7893
},
79-
// Node ID with icon
8094
elem.Div(
8195
attrs.Props{
82-
"class": "flex items-center",
96+
"class": "w-10 h-10 bg-blue-500/20 rounded-lg flex items-center justify-center mr-3",
8397
},
8498
elem.I(
8599
attrs.Props{
86-
"class": "fas fa-server text-blue-400 mr-2",
87-
},
88-
),
89-
elem.Span(
90-
attrs.Props{
91-
"class": "text-white font-medium",
100+
"class": "fas fa-server text-blue-400 text-lg",
92101
},
93-
elem.Text(nodeID),
94102
),
95103
),
96-
// Status indicator
97104
elem.Div(
98-
attrs.Props{
99-
"class": "flex items-center",
100-
},
101-
elem.I(
105+
attrs.Props{},
106+
elem.H4(
102107
attrs.Props{
103-
"class": "fas fa-circle animate-pulse " + statusIconClass + " mr-1.5",
108+
"class": "text-white font-semibold text-sm",
104109
},
110+
elem.Text("Node"),
105111
),
106-
elem.Span(
112+
elem.P(
107113
attrs.Props{
108-
"class": statusTextClass,
114+
"class": "text-gray-400 text-xs font-mono break-all",
109115
},
110-
elem.Text(statusText),
116+
elem.Text(nodeID),
111117
),
112118
),
113119
),
114-
// Bottom section with timestamp
120+
// Status badge
115121
elem.Div(
116122
attrs.Props{
117-
"class": "text-xs text-gray-400 pt-1 border-t border-gray-700/30",
123+
"class": "flex items-center bg-gray-900/50 rounded-full px-3 py-1.5 border border-gray-700/50",
118124
},
119-
elem.Text("Last updated: "+time.Now().UTC().Format("2006-01-02 15:04:05")),
125+
elem.I(
126+
attrs.Props{
127+
"class": "fas fa-circle animate-pulse " + statusIconClass + " mr-2 text-xs",
128+
},
129+
),
130+
elem.Span(
131+
attrs.Props{
132+
"class": statusTextClass + " text-xs font-medium",
133+
},
134+
elem.Text(statusText),
135+
),
120136
),
121-
))
137+
),
138+
// Footer with timestamp
139+
elem.Div(
140+
attrs.Props{
141+
"class": "text-xs text-gray-500 pt-3 border-t border-gray-700/30 flex items-center",
142+
},
143+
elem.I(
144+
attrs.Props{
145+
"class": "fas fa-clock mr-2",
146+
},
147+
),
148+
elem.Text("Updated: "+time.Now().UTC().Format("15:04:05")),
149+
),
150+
)
151+
152+
render += nodeCard.Render()
122153
}
123154

124-
return renderElements(nodesElements)
155+
return render
125156
}

0 commit comments

Comments
 (0)