Open-source ETL for CometBFT logs. Drop in log files, get a normalized database with core metrics. A lightweight plugin system enables advanced analyses as separate OSS or premium plugins without changing the core.
Note: This project is under active development. Interfaces, plugin names, and collection schemas may evolve. Check release notes and examples before pinning to a specific API.
- Requirements: Go 1.21+ and a local MongoDB.
- Example logs are included under
example-logs/.
Run with defaults (example logs + demo simulation id):
make run # uses DIR=example-logs/normal SIM=demo-simOr run directly:
go run . -dir example-logs/normal -simulation demo-simFlags:
-dir: Directory containing CometBFT.logfiles (required)-simulation: Simulation ID for DB naming (required)-mongo-uri: MongoDB URI (default:mongodb://localhost:27017)-config: Optional YAML config path (plugins enablement)
- Parses log lines into typed raw events
- Converts raw events into normalized events
- Stores events and computed results in MongoDB
- All processing is plugin-driven. Each built-in processor ships as a separate OSS plugin so contributors can add/replace processors easily.
- Dispatches each event to enabled plugins via the plugin SDK.
Plugins are dynamically registered and enabled via config. Core provides only the SDK, loader, and OSS processor plugins.
Config example (config.yaml):
plugins:
- name: "vote-latency"
enabled: true
- name: "block-parts"
enabled: true
- name: "p2p-messages"
enabled: true
- name: "consensus-steps"
enabled: true
- name: "consensus-timing"
enabled: true
- name: "validator-participation"
enabled: true
- name: "network-latency"
enabled: true
- name: "timeout-analysis"
enabled: true
- name: "peer-participation" # example placeholder; not included
enabled: false
- name: "anomaly-detection" # Premium-only, ignored in OSS unless provided
enabled: falseRun with config:
go run . -dir example-logs/normal -simulation demo-sim -config config.yamlEnvironment flag for premium:
CV_PRO_ENABLED=true– if set but named premium plugins aren’t present, a warning is logged and the app runs normally. Note: if no config is provided, a default set of core processor plugins is auto-enabled so the pipeline works out of the box.
- OSS includes:
- Core ETL: parsing, normalization, DB storage (Mongo)
- Plugin SDK (
pkg/pluginsdk): interfaces and context - Plugin Loader (
pkg/pluginloader): registry + lifecycle - Example OSS plugin(s) under
ossplugins/
- Premium/Custom:
- Never included in this repo
- Built in private repos; register via
pluginloader.Register()ininit() - Referenced at build time in private distributions
internal/app,internal/parser,internal/converter,internal/storage,internal/config– core ETL code used by the binarypkg/pluginsdk– public Plugin SDK (interfaces, Context, PluginConfig)pkg/pluginloader– public Loader (registry + lifecycle + dispatching)ossplugins/– Open-source plugins (each processor owns its logic under its plugin directory)premiumplugins/– README + stubs only (no code)
Interface:
type Plugin interface {
Name() string
Init(ctx Context) error
Process(event types.RawEvent) error
Finalize() error
}Context gives access to logger, storage, metrics, and app config:
type Context struct {
Ctx context.Context
Logger Logger
Storage Storage
Metrics Metrics
Config AppConfig
}Storage interface (backed by Mongo in OSS):
type Storage interface {
StoreResults(ctx context.Context, results []interface{}, collectionName string) error
}Each built-in processor is provided as a standalone plugin (logic + registration co-located):
ossplugins/vote-latencyossplugins/block-partsossplugins/p2p-messagesossplugins/consensus-stepsossplugins/consensus-timingossplugins/validator-participationossplugins/network-latencyossplugins/timeout-analysis
- Add a new package (OSS) under
ossplugins/<your-plugin>in this repo, or create a premium plugin in a private repo. - Implement
pluginsdk.Pluginand callpluginloader.Register("your-name", factory)ininit(). - Enable it in
config.yaml.
HelloWorld example:
type HelloWorld struct { ctx pluginsdk.Context }
func (p *HelloWorld) Name() string { return "hello-world" }
func (p *HelloWorld) Init(ctx pluginsdk.Context) error { p.ctx = ctx; return nil }
func (p *HelloWorld) Process(event interface{}) error { p.ctx.Logger.Printf("saw: %T", event); return nil }
func (p *HelloWorld) Finalize() error { return nil }
func init() { pluginloader.Register("hello-world", func() pluginsdk.Plugin { return &HelloWorld{} }) }- Plugin lifecycle test at
pkg/pluginloader/loader_test.goverifies init/process/finalize and storage calls. - Example logs under
example-logs/support quick manual runs.
- For core changes: keep updates minimal and focused. Avoid coupling core to plugins.
- For plugins: prefer storing results via the provided
Storageinterface. Avoid importing internals beyond the SDK. - PRs adding more OSS plugins and hooks are welcome.
This project is licensed under the Apache License 2.0.
See the LICENSE file for details.