Skip to content

Commit fb009f4

Browse files
committed
feat(discovery): discovery service now gather worker informations correctly
1 parent 0996389 commit fb009f4

File tree

4 files changed

+84
-32
lines changed

4 files changed

+84
-32
lines changed

core/cli/explorer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
1919
if err != nil {
2020
return err
2121
}
22-
appHTTP := http.Explorer(db)
2322

2423
ds := explorer.NewDiscoveryServer(db)
2524

2625
go ds.Start(context.Background())
26+
appHTTP := http.Explorer(db, ds)
2727

2828
return appHTTP.Listen(e.Address)
2929
}

core/explorer/discovery.go

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,47 @@ package explorer
33
import (
44
"context"
55
"fmt"
6+
"strings"
7+
"sync"
68
"time"
79

810
"github.com/mudler/LocalAI/core/p2p"
911
"github.com/mudler/edgevpn/pkg/blockchain"
1012
)
1113

1214
type DiscoveryServer struct {
15+
sync.Mutex
1316
database *Database
1417
networkState *NetworkState
1518
}
1619

1720
type NetworkState struct {
18-
Nodes map[string]map[string]p2p.NodeData
21+
Networks map[string]Network
22+
}
23+
24+
func (s *DiscoveryServer) NetworkState() *NetworkState {
25+
s.Lock()
26+
defer s.Unlock()
27+
return s.networkState
1928
}
2029

2130
func NewDiscoveryServer(db *Database) *DiscoveryServer {
2231
return &DiscoveryServer{
2332
database: db,
2433
networkState: &NetworkState{
25-
Nodes: map[string]map[string]p2p.NodeData{},
34+
Networks: map[string]Network{},
2635
},
2736
}
2837
}
2938

39+
type Network struct {
40+
Clusters []ClusterData
41+
}
42+
3043
func (s *DiscoveryServer) runBackground() {
3144
for _, token := range s.database.TokenList() {
32-
33-
fmt.Println("Checking token", token)
3445
c, cancel := context.WithTimeout(context.Background(), 50*time.Second)
3546
defer cancel()
36-
fmt.Println("Starting node", token)
3747

3848
// Connect to the network
3949
// Get the number of nodes
@@ -45,39 +55,38 @@ func (s *DiscoveryServer) runBackground() {
4555
continue
4656
}
4757

48-
fmt.Println("Starting network", token)
4958
err = n.Start(c)
5059
if err != nil {
5160
fmt.Println(err)
5261
continue
5362
}
54-
fmt.Println("ledger", token)
5563

5664
ledger, err := n.Ledger()
5765
if err != nil {
5866
fmt.Println(err)
5967
continue
6068
}
6169

62-
ledgerKeys := make(chan string)
63-
go s.getLedgerKeys(c, ledger, ledgerKeys)
70+
networkData := make(chan ClusterData)
6471

65-
ledgerK := []string{}
66-
fmt.Println("waiting for ledger keys", token)
72+
// get the network data - it takes the whole timeout
73+
// as we might not be connected to the network yet,
74+
// and few attempts would have to be made before bailing out
75+
go s.retrieveNetworkData(c, ledger, networkData)
6776

68-
LOOP:
69-
for {
70-
select {
71-
case <-c.Done():
72-
fmt.Println("Context exhausted")
73-
break LOOP
74-
case key := <-ledgerKeys:
75-
ledgerK = append(ledgerK, key)
76-
}
77+
ledgerK := []ClusterData{}
78+
for key := range networkData {
79+
ledgerK = append(ledgerK, key)
7780
}
7881

7982
fmt.Println("Token network", token)
80-
fmt.Println("Found the following ledger keys in the network", ledgerK)
83+
fmt.Println("Found the following workers in the network", ledgerK)
84+
85+
s.Lock()
86+
s.networkState.Networks[token] = Network{
87+
Clusters: ledgerK,
88+
}
89+
s.Unlock()
8190
// get new services, allocate and return to the channel
8291

8392
// TODO:
@@ -89,26 +98,69 @@ func (s *DiscoveryServer) runBackground() {
8998
}
9099
}
91100

92-
func (s *DiscoveryServer) getLedgerKeys(c context.Context, ledger *blockchain.Ledger, ledgerKeys chan string) {
93-
keys := map[string]struct{}{}
101+
type ClusterData struct {
102+
Workers []string
103+
Type string
104+
}
105+
106+
func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) {
107+
clusters := map[string]ClusterData{}
108+
109+
defer func() {
110+
fmt.Println("Defer clusters", clusters)
111+
112+
for _, n := range clusters {
113+
networkData <- n
114+
}
115+
close(networkData)
116+
}()
94117

95118
for {
96119
select {
97120
case <-c.Done():
121+
fmt.Println("Closing with ccluster")
122+
fmt.Println(clusters)
98123
return
99124
default:
100125
time.Sleep(5 * time.Second)
101126

102127
data := ledger.LastBlock().Storage
103-
for k, _ := range data {
104-
if _, ok := keys[k]; !ok {
105-
keys[k] = struct{}{}
106-
ledgerKeys <- k
128+
LEDGER:
129+
for d := range data {
130+
toScanForWorkers := false
131+
cd := ClusterData{}
132+
isWorkerCluster := d == p2p.WorkerID || (strings.Contains(d, "_") && strings.Contains(d, p2p.WorkerID))
133+
isFederatedCluster := d == p2p.FederatedID || (strings.Contains(d, "_") && strings.Contains(d, p2p.FederatedID))
134+
switch {
135+
case isWorkerCluster:
136+
toScanForWorkers = true
137+
cd.Type = "worker"
138+
case isFederatedCluster:
139+
toScanForWorkers = true
140+
cd.Type = "federated"
141+
142+
}
143+
144+
if !toScanForWorkers {
145+
continue LEDGER
107146
}
147+
148+
DATA:
149+
for _, v := range data[d] {
150+
nd := &p2p.NodeData{}
151+
if err := v.Unmarshal(nd); err != nil {
152+
continue DATA
153+
}
154+
155+
if nd.IsOnline() {
156+
(&cd).Workers = append(cd.Workers, nd.ID)
157+
}
158+
}
159+
160+
clusters[d] = cd
108161
}
109162
}
110163
}
111-
112164
}
113165

114166
// Start the discovery server. This is meant to be run in to a goroutine.

core/http/explorer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/mudler/LocalAI/core/http/routes"
77
)
88

9-
func Explorer(db *explorer.Database) *fiber.App {
9+
func Explorer(db *explorer.Database, discoveryServer *explorer.DiscoveryServer) *fiber.App {
1010

1111
fiberCfg := fiber.Config{
1212
Views: renderEngine(),
@@ -18,7 +18,7 @@ func Explorer(db *explorer.Database) *fiber.App {
1818

1919
app := fiber.New(fiberCfg)
2020

21-
routes.RegisterExplorerRoutes(app, db)
21+
routes.RegisterExplorerRoutes(app, db, discoveryServer)
2222

2323
return app
2424
}

core/http/routes/explorer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/mudler/LocalAI/core/http/endpoints/explorer"
77
)
88

9-
func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database) {
9+
func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database, ds *coreExplorer.DiscoveryServer) {
1010
app.Get("/", explorer.Dashboard())
1111
app.Post("/network/add", explorer.AddNetwork(db))
1212
}

0 commit comments

Comments
 (0)