Skip to content

Commit 632ecc5

Browse files
committed
Swap to using iterators for uploading to avoid storing all chunks in memory
1 parent 08598f4 commit 632ecc5

File tree

4 files changed

+58
-32
lines changed

4 files changed

+58
-32
lines changed

client/client_test.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2829,11 +2829,24 @@ func testMultipleUsers(t *testing.T, tester shellTester) {
28292829
func createSyntheticImportEntries(t testing.TB, numSyntheticEntries int) {
28302830
homedir, err := os.UserHomeDir()
28312831
require.NoError(t, err)
2832-
f, err := os.OpenFile(path.Join(homedir, ".bash_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
2832+
filenames := []string{".bash_history", ".zsh_history", ".zhistory"}
2833+
numFiles := len(filenames) + 1 // The +1 accounts for the fish history file
2834+
for _, filename := range filenames {
2835+
f, err := os.OpenFile(path.Join(homedir, filename), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
2836+
require.NoError(t, err)
2837+
defer f.Close()
2838+
for i := 1; i <= numSyntheticEntries/numFiles; i++ {
2839+
_, err := f.WriteString(fmt.Sprintf("echo command-%s-%d\n", filename, i))
2840+
require.NoError(t, err)
2841+
}
2842+
require.NoError(t, f.Close())
2843+
}
2844+
// Write the file for fish too, in the special fish format
2845+
f, err := os.OpenFile(path.Join(homedir, ".local/share/fish/fish_history"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
28332846
require.NoError(t, err)
28342847
defer f.Close()
2835-
for i := 1; i <= numSyntheticEntries; i++ {
2836-
_, err := f.WriteString(fmt.Sprintf("echo command-%d\n", i))
2848+
for i := 1; i <= numSyntheticEntries/numFiles; i++ {
2849+
_, err := f.WriteString(fmt.Sprintf("- cmd: echo command-fish-%d\n", i))
28372850
require.NoError(t, err)
28382851
}
28392852
require.NoError(t, f.Close())
@@ -2868,10 +2881,26 @@ func TestImportHistory(t *testing.T) {
28682881
testutils.CompareGoldens(t, out, "TestImportHistory-export")
28692882
}
28702883

2884+
func TestAugmentedIsOfflineError(t *testing.T) {
2885+
defer testutils.BackupAndRestore(t)()
2886+
installHishtory(t, zshTester{}, "")
2887+
defer testutils.BackupAndRestoreEnv("HISHTORY_SIMULATE_NETWORK_ERROR")()
2888+
ctx := hctx.MakeContext()
2889+
2890+
// By default, when the hishtory server is up, then IsOfflineError checks the error msg
2891+
require.True(t, lib.CanReachHishtoryServer(ctx))
2892+
require.False(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type")))
2893+
2894+
// When the hishtory server is down, then all error messages are treated as being due to offline errors
2895+
os.Setenv("HISHTORY_SIMULATE_NETWORK_ERROR", "1")
2896+
require.False(t, lib.CanReachHishtoryServer(ctx))
2897+
require.True(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type")))
2898+
}
2899+
28712900
func BenchmarkImport(b *testing.B) {
28722901
b.StopTimer()
28732902
// Setup
2874-
tester := bashTester{}
2903+
tester := zshTester{}
28752904
defer testutils.BackupAndRestore(b)()
28762905

28772906
// Benchmark it
@@ -2881,7 +2910,7 @@ func BenchmarkImport(b *testing.B) {
28812910
installHishtory(b, tester, "")
28822911

28832912
// Create a large history in bash that we will pre-import
2884-
numSyntheticEntries := 100_000
2913+
numSyntheticEntries := 1_000_000
28852914
createSyntheticImportEntries(b, numSyntheticEntries)
28862915

28872916
// Benchmarked code:
@@ -2894,20 +2923,4 @@ func BenchmarkImport(b *testing.B) {
28942923
}
28952924
}
28962925

2897-
func TestAugmentedIsOfflineError(t *testing.T) {
2898-
defer testutils.BackupAndRestore(t)()
2899-
installHishtory(t, zshTester{}, "")
2900-
defer testutils.BackupAndRestoreEnv("HISHTORY_SIMULATE_NETWORK_ERROR")()
2901-
ctx := hctx.MakeContext()
2902-
2903-
// By default, when the hishtory server is up, then IsOfflineError checks the error msg
2904-
require.True(t, lib.CanReachHishtoryServer(ctx))
2905-
require.False(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type")))
2906-
2907-
// When the hishtory server is down, then all error messages are treated as being due to offline errors
2908-
os.Setenv("HISHTORY_SIMULATE_NETWORK_ERROR", "1")
2909-
require.False(t, lib.CanReachHishtoryServer(ctx))
2910-
require.True(t, lib.IsOfflineError(ctx, fmt.Errorf("unchecked error type")))
2911-
}
2912-
29132926
// TODO: somehow test/confirm that hishtory works even if only bash/only zsh is installed

client/lib/lib.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func countLinesInFiles(filenames ...string) (int, error) {
246246
if err != nil {
247247
return 0, err
248248
}
249+
hctx.GetLogger().Infof("Importing history entries, file=%#v contains %d lines", f, l)
249250
total += l
250251
}
251252
return total, nil
@@ -680,7 +681,7 @@ func Reupload(ctx context.Context) error {
680681
defer bar.Finish()
681682
}
682683
chunkSize := 500
683-
chunks := shared.Chunks(entries, chunkSize)
684+
chunks := shared.ChunksIter(entries, chunkSize)
684685
return shared.ForEach(chunks, 10, func(chunk []*data.HistoryEntry) error {
685686
jsonValue, err := EncryptAndMarshal(config, chunk)
686687
if err != nil {

shared/data.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,20 @@ func Chunks[k any](slice []k, chunkSize int) [][]k {
122122
}
123123
return chunks
124124
}
125+
126+
type Seq1[K any] func(yield func(K) bool) bool
127+
128+
func ChunksIter[k any](slice []k, chunkSize int) Seq1[[]k] {
129+
return func(yield func([]k) bool) bool {
130+
for i := 0; i < len(slice); i += chunkSize {
131+
end := i + chunkSize
132+
if end > len(slice) {
133+
end = len(slice)
134+
}
135+
if !yield(slice[i:end]) {
136+
return false
137+
}
138+
}
139+
return true
140+
}
141+
}

shared/utils.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ package shared
22

33
import "sync"
44

5-
func ForEach[T any](arr []T, numThreads int, fn func(T) error) error {
5+
func ForEach[T any](iter Seq1[T], numThreads int, fn func(T) error) error {
66
wg := &sync.WaitGroup{}
7-
wg.Add(len(arr))
8-
97
limiter := make(chan bool, numThreads)
10-
118
var errors []error
12-
for _, item := range arr {
9+
iter(func(item T) bool {
10+
wg.Add(1)
1311
limiter <- true
1412
go func(x T) {
1513
defer wg.Done()
@@ -19,11 +17,8 @@ func ForEach[T any](arr []T, numThreads int, fn func(T) error) error {
1917
}
2018
<-limiter
2119
}(item)
22-
if len(errors) > 0 {
23-
return errors[0]
24-
}
25-
}
26-
20+
return true
21+
})
2722
wg.Wait()
2823
if len(errors) > 0 {
2924
return errors[0]

0 commit comments

Comments
 (0)