Skip to content

Commit 05bb617

Browse files
authored
feat: pruner (#13)
* implement pruner and change state dir location * update gitignore
1 parent 0a92243 commit 05bb617

11 files changed

Lines changed: 638 additions & 42 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ go.work.sum
3131
.idea/
3232
# .vscode/
3333
/walship
34+
/.cache
35+
/.gocache

cmd/walship/main.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ func main() {
5656
cfg := agent.DefaultConfig()
5757
var cfgPath string
5858

59+
log := agent.Logger()
60+
5961
root := &cobra.Command{
6062
Use: "walship",
6163
Short: "Stream your node's consensus feed to apphash.io without slowing your validator",
@@ -103,7 +105,7 @@ func main() {
103105
if len(logCfg.AuthKey) > 0 {
104106
logCfg.AuthKey = "*****"
105107
}
106-
fmt.Fprintf(os.Stderr, "Configuration: %+v\n", logCfg)
108+
log.Info().Interface("config", logCfg).Msg("configuration")
107109

108110
if err := agent.Run(context.Background(), cfg); err != nil {
109111
return err
@@ -119,7 +121,7 @@ func main() {
119121

120122
root.Flags().StringVar(&cfg.ServiceURL, "service-url", cfg.ServiceURL, fmt.Sprintf("base service URL (defaults to %s; override only for internal testing)", agent.DefaultServiceURL))
121123
if err := root.Flags().MarkHidden("service-url"); err != nil {
122-
fmt.Fprintf(os.Stderr, "warning: failed to hide service-url flag: %v\n", err)
124+
log.Info().Err(err).Msg("failed to hide service-url flag")
123125
}
124126
root.Flags().StringVar(&cfg.AuthKey, "auth-key", cfg.AuthKey, "API key for authentication")
125127

@@ -135,15 +137,15 @@ func main() {
135137

136138
root.Flags().StringVar(&cfg.StateDir, "state-dir", cfg.StateDir, "state directory for status.json (defaults to wal-dir)")
137139
if err := root.Flags().MarkHidden("state-dir"); err != nil {
138-
fmt.Fprintf(os.Stderr, "warning: failed to hide state-dir flag: %v\n", err)
140+
log.Info().Err(err).Msg("failed to hide state-dir flag")
139141
}
140142
root.Flags().DurationVar(&cfg.HTTPTimeout, "timeout", cfg.HTTPTimeout, "HTTP timeout")
141143
root.Flags().BoolVar(&cfg.Verify, "verify", cfg.Verify, "verify CRC/line counts while reading (debug)")
142144
root.Flags().BoolVar(&cfg.Meta, "meta", cfg.Meta, "print frame metadata to stderr (debug)")
143145
root.Flags().BoolVar(&cfg.Once, "once", cfg.Once, "process available frames and exit")
144146

145147
if err := root.Execute(); err != nil {
146-
fmt.Fprintf(os.Stderr, "walship: %v\n", err)
148+
log.Error().Err(err).Msg("walship")
147149
os.Exit(1)
148150
}
149151
}

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ module github.com/bft-labs/walship
33
go 1.22
44

55
require (
6+
github.com/fsnotify/fsnotify v1.9.0
67
github.com/pelletier/go-toml/v2 v2.2.2
78
github.com/spf13/cobra v1.8.0
89
github.com/spf13/pflag v1.0.5
910
)
1011

1112
require (
12-
github.com/fsnotify/fsnotify v1.9.0 // indirect
1313
github.com/inconshreveable/mousetrap v1.1.0 // indirect
14+
github.com/mattn/go-colorable v0.1.13 // indirect
15+
github.com/mattn/go-isatty v0.0.19 // indirect
16+
github.com/rs/zerolog v1.33.0 // indirect
1417
golang.org/x/sys v0.13.0 // indirect
1518
)

go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,26 @@
1+
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
12
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
23
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
34
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
45
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
56
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
67
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
8+
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
79
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
810
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
11+
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
12+
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
13+
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
14+
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
15+
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
916
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
1017
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
18+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
1119
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1220
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
21+
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
22+
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
23+
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
1324
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
1425
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
1526
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
@@ -24,6 +35,9 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
2435
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
2536
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
2637
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
38+
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
39+
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
40+
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2741
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
2842
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
2943
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

internal/agent/agent.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ type batchFrame struct {
2727
}
2828

2929
func Run(ctx context.Context, cfg Config) error {
30+
ctx, cancel := context.WithCancel(ctx)
31+
defer cancel()
32+
3033
if cfg.ServiceURL == "" {
3134
return fmt.Errorf("service-url is required")
3235
}
@@ -38,6 +41,7 @@ func Run(ctx context.Context, cfg Config) error {
3841
cfgPtr := &cfg
3942
watcher := NewConfigWatcher(cfgPtr)
4043
go watcher.Run(ctx)
44+
go walCleanupLoop(ctx, cfg.WALDir, cfg.StateDir)
4145

4246
// Load prior state; if none, start from the oldest index (first logs)
4347
st, _ := loadState(cfg.StateDir)
@@ -137,7 +141,13 @@ func Run(ctx context.Context, cfg Config) error {
137141
st.CurGz = fm.File
138142
}
139143
if cfg.Meta {
140-
fmt.Fprintf(os.Stderr, "frame file=%s seq=%d off=%d len=%d recs=%d\n", fm.File, fm.Frame, fm.Off, fm.Len, fm.Recs)
144+
logger.Info().
145+
Str("file", fm.File).
146+
Uint64("frame", fm.Frame).
147+
Uint64("off", fm.Off).
148+
Uint64("len", fm.Len).
149+
Uint32("recs", fm.Recs).
150+
Msg("frame metadata")
141151
}
142152
// Read compressed bytes for this frame
143153
b, rerr := preadSection(gz, int64(fm.Off), int64(fm.Len))
@@ -197,37 +207,37 @@ func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchByte
197207

198208
manifestJSON, err := json.Marshal(manifest)
199209
if err != nil {
200-
fmt.Fprintf(os.Stderr, "Error marshaling manifest: %v\n", err)
210+
logger.Error().Err(err).Msg("marshal manifest")
201211
back.Sleep()
202212
return
203213
}
204214
manifestPart, err := writer.CreateFormField("manifest")
205215
if err != nil {
206-
fmt.Fprintf(os.Stderr, "Error creating manifest field: %v\n", err)
216+
logger.Error().Err(err).Msg("create manifest field")
207217
back.Sleep()
208218
return
209219
}
210220
if _, err := manifestPart.Write(manifestJSON); err != nil {
211-
fmt.Fprintf(os.Stderr, "Error writing manifest field: %v\n", err)
221+
logger.Error().Err(err).Msg("write manifest field")
212222
back.Sleep()
213223
return
214224
}
215225

216226
framesPart, err := writer.CreateFormFile("frames", curIdxBase)
217227
if err != nil {
218-
fmt.Fprintf(os.Stderr, "Error creating frames field: %v\n", err)
228+
logger.Error().Err(err).Msg("create frames field")
219229
back.Sleep()
220230
return
221231
}
222232
for _, fr := range *batch {
223233
if _, err := framesPart.Write(fr.Compressed); err != nil {
224-
fmt.Fprintf(os.Stderr, "Error writing frames payload: %v\n", err)
234+
logger.Error().Err(err).Msg("write frames payload")
225235
back.Sleep()
226236
return
227237
}
228238
}
229239
if err := writer.Close(); err != nil {
230-
fmt.Fprintf(os.Stderr, "Error finalizing multipart payload: %v\n", err)
240+
logger.Error().Err(err).Msg("finalize multipart payload")
231241
back.Sleep()
232242
return
233243
}
@@ -245,19 +255,25 @@ func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchByte
245255

246256
resp, err := httpClient.Do(req)
247257
if err != nil {
248-
fmt.Fprintf(os.Stderr, "Error sending batch: %v\n", err)
258+
logger.Error().Err(err).Msg("send batch")
249259
back.Sleep()
250260
return
251261
}
252262
defer resp.Body.Close()
253263
if resp.StatusCode/100 != 2 {
254264
body, _ := io.ReadAll(resp.Body)
255-
fmt.Fprintf(os.Stderr, "Server returned error: status=%d body=%s\n", resp.StatusCode, string(body))
265+
logger.Error().
266+
Int("status", resp.StatusCode).
267+
Str("body", string(body)).
268+
Msg("server returned error")
256269
back.Sleep()
257270
return
258271
}
259272

260-
fmt.Fprintf(os.Stderr, "Successfully sent batch of %d frames (%d bytes)\n", len(*batch), *batchBytes)
273+
logger.Info().
274+
Int("frames", len(*batch)).
275+
Int("bytes", *batchBytes).
276+
Msg("sent batch")
261277

262278
// Success: commit idx offset
263279
st.IdxOffset += advance

internal/agent/agent_test.go

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import (
66
"encoding/base64"
77
"encoding/json"
88
"errors"
9-
"fmt"
109
"io"
10+
"mime"
11+
"mime/multipart"
1112
"net/http"
1213
"net/http/httptest"
1314
"os"
1415
"path/filepath"
16+
"strings"
1517
"testing"
1618
"time"
1719
)
@@ -31,9 +33,43 @@ func TestTrySend(t *testing.T) {
3133
}
3234

3335
// Verify body
34-
body, _ := io.ReadAll(r.Body)
35-
if string(body) != "compressed-data" {
36-
t.Errorf("Body = %v, want compressed-data", string(body))
36+
mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
37+
if err != nil {
38+
t.Fatalf("parse content-type: %v", err)
39+
}
40+
if !strings.HasPrefix(mediaType, "multipart/") {
41+
t.Fatalf("expected multipart content type, got %s", mediaType)
42+
}
43+
mr := multipart.NewReader(r.Body, params["boundary"])
44+
var framesPayload []byte
45+
var hasManifest bool
46+
for {
47+
part, err := mr.NextPart()
48+
if errors.Is(err, io.EOF) {
49+
break
50+
}
51+
if err != nil {
52+
t.Fatalf("multipart read: %v", err)
53+
}
54+
data, err := io.ReadAll(part)
55+
if err != nil {
56+
t.Fatalf("read part: %v", err)
57+
}
58+
switch part.FormName() {
59+
case "manifest":
60+
hasManifest = len(data) > 0
61+
case "frames":
62+
framesPayload = data
63+
}
64+
}
65+
if len(framesPayload) == 0 {
66+
t.Fatalf("frames payload missing")
67+
}
68+
if string(framesPayload) != "compressed-data" {
69+
t.Errorf("Body = %v, want compressed-data", string(framesPayload))
70+
}
71+
if !hasManifest {
72+
t.Fatalf("manifest payload missing")
3773
}
3874

3975
w.WriteHeader(http.StatusOK)
@@ -42,9 +78,9 @@ func TestTrySend(t *testing.T) {
4278

4379
cfg := Config{
4480
ServiceURL: ts.URL,
45-
ChainID: "test-chain",
46-
NodeID: "test-node",
47-
AuthKey: "secret",
81+
ChainID: "test-chain",
82+
NodeID: "test-node",
83+
AuthKey: "secret",
4884
}
4985

5086
batch := []batchFrame{
@@ -121,7 +157,7 @@ func TestRun_Startup(t *testing.T) {
121157
cfg := Config{
122158
NodeHome: tmpDir,
123159
WALDir: walDir,
124-
ServiceURL: "http://localhost:8080",
160+
ServiceURL: "http://localhost:8080",
125161
PollInterval: time.Millisecond,
126162
StateDir: filepath.Join(tmpDir, ".walship"),
127163
}
@@ -179,7 +215,7 @@ func TestTrySend_Timeout(t *testing.T) {
179215
defer ts.Close()
180216

181217
cfg := Config{
182-
ServiceURL: ts.URL,
218+
ServiceURL: ts.URL,
183219
HTTPTimeout: 10 * time.Millisecond,
184220
}
185221
httpClient := &http.Client{Timeout: cfg.HTTPTimeout}
@@ -204,14 +240,14 @@ func TestRun_MissingWALDir(t *testing.T) {
204240
// Test that Run returns error when WALDir is empty/invalid
205241
cfg := Config{
206242
ServiceURL: "http://test",
207-
StateDir: "/tmp",
243+
StateDir: "/tmp",
208244
// WALDir is empty - should fail in oldestIndex
209245
}
210246
err := Run(context.Background(), cfg)
211247
if err == nil {
212248
t.Error("Run() expected error for missing/invalid WALDir")
213249
}
214-
fmt.Fprintf(os.Stdout, "Run() error = %v\n", err)
250+
t.Logf("Run() error = %v", err)
215251
}
216252

217253
func TestTrySend_StateVerification(t *testing.T) {
@@ -224,7 +260,7 @@ func TestTrySend_StateVerification(t *testing.T) {
224260
tmpDir := t.TempDir()
225261
cfg := Config{
226262
ServiceURL: ts.URL,
227-
StateDir: tmpDir,
263+
StateDir: tmpDir,
228264
}
229265

230266
batch := []batchFrame{
@@ -286,7 +322,7 @@ func TestRun_OnceMode(t *testing.T) {
286322
}
287323

288324
cfg := Config{
289-
ServiceURL: "http://localhost:9999",
325+
ServiceURL: "http://localhost:9999",
290326
StateDir: filepath.Join(tmpDir, ".state"),
291327
WALDir: walDir,
292328
Once: true,
@@ -312,7 +348,7 @@ func TestTrySend_LargeFrame(t *testing.T) {
312348
defer ts.Close()
313349

314350
cfg := Config{
315-
ServiceURL: ts.URL,
351+
ServiceURL: ts.URL,
316352
MaxBatchBytes: 100, // Small limit
317353
}
318354

@@ -352,7 +388,7 @@ func TestTrySend_BatchOverflow(t *testing.T) {
352388
defer ts.Close()
353389

354390
cfg := Config{
355-
ServiceURL: ts.URL,
391+
ServiceURL: ts.URL,
356392
MaxBatchBytes: 100,
357393
}
358394

@@ -390,8 +426,8 @@ func TestTrySend_URLConstruction(t *testing.T) {
390426

391427
cfg := Config{
392428
ServiceURL: ts.URL, // Base URL only, no /v1/ingest/wal-frames
393-
ChainID: "test-chain",
394-
NodeID: "test-node",
429+
ChainID: "test-chain",
430+
NodeID: "test-node",
395431
}
396432

397433
batch := []batchFrame{

0 commit comments

Comments
 (0)