Skip to content

Commit 9e3e892

Browse files
authored
feat(p2p): add network explorer and community pools (#3125)
* WIP Signed-off-by: Ettore Di Giacinto <[email protected]> * Fixups Signed-off-by: Ettore Di Giacinto <[email protected]> * Wire up a simple explorer DB Signed-off-by: Ettore Di Giacinto <[email protected]> * wip Signed-off-by: Ettore Di Giacinto <[email protected]> * WIP Signed-off-by: Ettore Di Giacinto <[email protected]> * refactor: group services id so can be identified easily in the ledger table Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(discovery): discovery service now gather worker informations correctly Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): display network token Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): display form to add new networks Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): stop from overwriting networks Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): display only networks with active workers Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): list only clusters in a network if it has online workers Signed-off-by: Ettore Di Giacinto <[email protected]> * remove invalid and inactive networks if networks have no workers delete them from the database, similarly, if invalid. Signed-off-by: Ettore Di Giacinto <[email protected]> * ci: add workflow to deploy new explorer versions automatically Signed-off-by: Ettore Di Giacinto <[email protected]> * build-api: build with p2p tag Signed-off-by: Ettore Di Giacinto <[email protected]> * Allow to specify a connection timeout Signed-off-by: Ettore Di Giacinto <[email protected]> * logging Signed-off-by: Ettore Di Giacinto <[email protected]> * Better p2p defaults Signed-off-by: Ettore Di Giacinto <[email protected]> * Set loglevel Signed-off-by: Ettore Di Giacinto <[email protected]> * Fix dht enable Signed-off-by: Ettore Di Giacinto <[email protected]> * Default to info for loglevel Signed-off-by: Ettore Di Giacinto <[email protected]> * Add navbar Signed-off-by: Ettore Di Giacinto <[email protected]> * Slightly improve rendering Signed-off-by: Ettore Di Giacinto <[email protected]> * Allow to copy the token easily Signed-off-by: Ettore Di Giacinto <[email protected]> * ci fixups Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent 5fcafc3 commit 9e3e892

File tree

19 files changed

+1082
-17
lines changed

19 files changed

+1082
-17
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
name: Explorer deployment
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
tags:
8+
- 'v*'
9+
10+
concurrency:
11+
group: ci-deploy-${{ github.head_ref || github.ref }}-${{ github.repository }}
12+
13+
jobs:
14+
build-linux:
15+
runs-on: ubuntu-latest
16+
steps:
17+
- name: Clone
18+
uses: actions/checkout@v4
19+
with:
20+
submodules: true
21+
- uses: actions/setup-go@v5
22+
with:
23+
go-version: '1.21.x'
24+
cache: false
25+
- name: Dependencies
26+
run: |
27+
sudo apt-get update
28+
sudo apt-get install -y wget curl build-essential ffmpeg protobuf-compiler ccache upx-ucl gawk cmake libgmock-dev
29+
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af
30+
go install google.golang.org/protobuf/cmd/[email protected]
31+
make protogen-go
32+
- name: Build api
33+
run: |
34+
make build-api
35+
- name: rm
36+
uses: appleboy/[email protected]
37+
with:
38+
host: ${{ secrets.EXPLORER_SSH_HOST }}
39+
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
40+
key: ${{ secrets.EXPLORER_SSH_KEY }}
41+
port: ${{ secrets.EXPLORER_SSH_PORT }}
42+
script: |
43+
sudo rm -rf local-ai/ || true
44+
- name: copy file via ssh
45+
uses: appleboy/[email protected]
46+
with:
47+
host: ${{ secrets.EXPLORER_SSH_HOST }}
48+
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
49+
key: ${{ secrets.EXPLORER_SSH_KEY }}
50+
port: ${{ secrets.EXPLORER_SSH_PORT }}
51+
source: "local-ai"
52+
overwrite: true
53+
rm: true
54+
target: ./local-ai
55+
- name: restarting
56+
uses: appleboy/[email protected]
57+
with:
58+
host: ${{ secrets.EXPLORER_SSH_HOST }}
59+
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
60+
key: ${{ secrets.EXPLORER_SSH_KEY }}
61+
port: ${{ secrets.EXPLORER_SSH_PORT }}
62+
script: |
63+
sudo cp -rfv local-ai/local-ai /usr/bin/local-ai
64+
sudo systemctl restart local-ai

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ build-minimal:
376376
BUILD_GRPC_FOR_BACKEND_LLAMA=true GRPC_BACKENDS="backend-assets/grpc/llama-cpp-avx2" GO_TAGS=p2p $(MAKE) build
377377

378378
build-api:
379-
BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=none $(MAKE) build
379+
BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=p2p $(MAKE) build
380380

381381
backend-assets/lib:
382382
mkdir -p backend-assets/lib

core/cli/cli.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ var CLI struct {
1515
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
1616
Worker worker.Worker `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"`
1717
Util UtilCMD `cmd:"" help:"Utility commands"`
18+
Explorer ExplorerCMD `cmd:"" help:"Run p2p explorer"`
1819
}

core/cli/explorer.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package cli
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
cliContext "github.com/mudler/LocalAI/core/cli/context"
8+
"github.com/mudler/LocalAI/core/explorer"
9+
"github.com/mudler/LocalAI/core/http"
10+
)
11+
12+
type ExplorerCMD struct {
13+
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
14+
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
15+
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
16+
}
17+
18+
func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
19+
20+
db, err := explorer.NewDatabase(e.PoolDatabase)
21+
if err != nil {
22+
return err
23+
}
24+
25+
dur, err := time.ParseDuration(e.ConnectionTimeout)
26+
if err != nil {
27+
return err
28+
}
29+
ds := explorer.NewDiscoveryServer(db, dur)
30+
31+
go ds.Start(context.Background())
32+
appHTTP := http.Explorer(db, ds)
33+
34+
return appHTTP.Listen(e.Address)
35+
}

core/cli/run.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
121121
}
122122

123123
log.Info().Msg("Starting P2P server discovery...")
124-
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) {
124+
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
125125
var tunnelAddresses []string
126-
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) {
126+
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
127127
if v.IsOnline() {
128128
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
129129
} else {

core/cli/worker/worker_p2p.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
6060
p = r.RunnerPort
6161
}
6262

63-
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
63+
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
6464
if err != nil {
6565
return err
6666
}
@@ -100,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
100100
}
101101
}()
102102

103-
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
103+
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
104104
if err != nil {
105105
return err
106106
}

core/explorer/database.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package explorer
2+
3+
// A simple JSON database for storing and retrieving p2p network tokens and a name and description.
4+
5+
import (
6+
"encoding/json"
7+
"os"
8+
"sort"
9+
"sync"
10+
)
11+
12+
// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
13+
type Database struct {
14+
sync.RWMutex
15+
path string
16+
data map[string]TokenData
17+
}
18+
19+
// TokenData is a p2p network token with a name and description.
20+
type TokenData struct {
21+
Name string `json:"name"`
22+
Description string `json:"description"`
23+
}
24+
25+
// NewDatabase creates a new Database with the given path.
26+
func NewDatabase(path string) (*Database, error) {
27+
db := &Database{
28+
data: make(map[string]TokenData),
29+
path: path,
30+
}
31+
return db, db.load()
32+
}
33+
34+
// Get retrieves a Token from the Database by its token.
35+
func (db *Database) Get(token string) (TokenData, bool) {
36+
db.RLock()
37+
defer db.RUnlock()
38+
t, ok := db.data[token]
39+
return t, ok
40+
}
41+
42+
// Set stores a Token in the Database by its token.
43+
func (db *Database) Set(token string, t TokenData) error {
44+
db.Lock()
45+
db.data[token] = t
46+
db.Unlock()
47+
48+
return db.Save()
49+
}
50+
51+
// Delete removes a Token from the Database by its token.
52+
func (db *Database) Delete(token string) error {
53+
db.Lock()
54+
delete(db.data, token)
55+
db.Unlock()
56+
return db.Save()
57+
}
58+
59+
func (db *Database) TokenList() []string {
60+
db.RLock()
61+
defer db.RUnlock()
62+
tokens := []string{}
63+
for k := range db.data {
64+
tokens = append(tokens, k)
65+
}
66+
67+
sort.Slice(tokens, func(i, j int) bool {
68+
// sort by token
69+
return tokens[i] < tokens[j]
70+
})
71+
72+
return tokens
73+
}
74+
75+
// load reads the Database from disk.
76+
func (db *Database) load() error {
77+
db.Lock()
78+
defer db.Unlock()
79+
80+
if _, err := os.Stat(db.path); os.IsNotExist(err) {
81+
return nil
82+
}
83+
84+
// Read the file from disk
85+
// Unmarshal the JSON into db.data
86+
f, err := os.ReadFile(db.path)
87+
if err != nil {
88+
return err
89+
}
90+
return json.Unmarshal(f, &db.data)
91+
}
92+
93+
// Save writes the Database to disk.
94+
func (db *Database) Save() error {
95+
db.RLock()
96+
defer db.RUnlock()
97+
98+
// Marshal db.data into JSON
99+
// Write the JSON to the file
100+
f, err := os.Create(db.path)
101+
if err != nil {
102+
return err
103+
}
104+
defer f.Close()
105+
return json.NewEncoder(f).Encode(db.data)
106+
}

core/explorer/database_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package explorer_test
2+
3+
import (
4+
"os"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
9+
"github.com/mudler/LocalAI/core/explorer"
10+
)
11+
12+
var _ = Describe("Database", func() {
13+
var (
14+
dbPath string
15+
db *explorer.Database
16+
err error
17+
)
18+
19+
BeforeEach(func() {
20+
// Create a temporary file path for the database
21+
dbPath = "test_db.json"
22+
db, err = explorer.NewDatabase(dbPath)
23+
Expect(err).To(BeNil())
24+
})
25+
26+
AfterEach(func() {
27+
// Clean up the temporary database file
28+
os.Remove(dbPath)
29+
})
30+
31+
Context("when managing tokens", func() {
32+
It("should add and retrieve a token", func() {
33+
token := "token123"
34+
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}
35+
36+
err = db.Set(token, t)
37+
Expect(err).To(BeNil())
38+
39+
retrievedToken, exists := db.Get(token)
40+
Expect(exists).To(BeTrue())
41+
Expect(retrievedToken).To(Equal(t))
42+
})
43+
44+
It("should delete a token", func() {
45+
token := "token123"
46+
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}
47+
48+
err = db.Set(token, t)
49+
Expect(err).To(BeNil())
50+
51+
err = db.Delete(token)
52+
Expect(err).To(BeNil())
53+
54+
_, exists := db.Get(token)
55+
Expect(exists).To(BeFalse())
56+
})
57+
58+
It("should persist data to disk", func() {
59+
token := "token123"
60+
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}
61+
62+
err = db.Set(token, t)
63+
Expect(err).To(BeNil())
64+
65+
// Recreate the database object to simulate reloading from disk
66+
db, err = explorer.NewDatabase(dbPath)
67+
Expect(err).To(BeNil())
68+
69+
retrievedToken, exists := db.Get(token)
70+
Expect(exists).To(BeTrue())
71+
Expect(retrievedToken).To(Equal(t))
72+
73+
// Check the token list
74+
tokenList := db.TokenList()
75+
Expect(tokenList).To(ContainElement(token))
76+
})
77+
})
78+
79+
Context("when loading an empty or non-existent file", func() {
80+
It("should start with an empty database", func() {
81+
dbPath = "empty_db.json"
82+
db, err = explorer.NewDatabase(dbPath)
83+
Expect(err).To(BeNil())
84+
85+
_, exists := db.Get("nonexistent")
86+
Expect(exists).To(BeFalse())
87+
88+
// Clean up
89+
os.Remove(dbPath)
90+
})
91+
})
92+
})

0 commit comments

Comments
 (0)