-
Notifications
You must be signed in to change notification settings - Fork 71
Custom Plugin Development
Osmany Montero edited this page Jan 16, 2026
·
1 revision
The EventProcessor is designed for high extensibility through a micro-plugin architecture. By implementing custom plugins, you can extend the system's capabilities in log ingestion, parsing, analysis, and notifications.
- Communication: gRPC over Unix Domain Sockets.
-
Location: Sockets are automatically managed in
{{WorkDir}}/sockets/. - Language: While gRPC supports many languages, we highly recommend Go and our official go-sdk.
| Type | SDK Helper | Interaction Pattern |
|---|---|---|
| Parsing | InitParsingPlugin |
Transforms logs within a pipeline (unary RPC). |
| Analysis | InitAnalysisPlugin |
Evaluates events to emit alerts (server streaming). |
| Input | SendLogsFromChannel |
Ingests data from external sources and pushes to the engine. |
| Correlation | InitCorrelationPlugin |
Processes alerts to find high-level patterns. |
| Notification | InitNotificationPlugin |
Delivers messages to external endpoints (Email, Slack, etc). |
mkdir utmstack-plugin-custom && cd utmstack-plugin-custom
go mod init github.com/user/utmstack-plugin-custom
go get github.com/threatwinds/go-sdkParsing plugins are used for specialized logic that cannot be achieved with standard steps (Grok, JSON, etc.).
package main
import (
"context"
"github.com/threatwinds/go-sdk/plugins"
"github.com/tidwall/sjson"
)
func main() {
// "my-enricher" results in socket: {{WorkDir}}/sockets/my-enricher_parsing.sock
plugins.InitParsingPlugin("my-enricher", parseLog)
}
func parseLog(ctx context.Context, tr *plugins.Transform) (*plugins.Draft, error) {
// Add custom field to the log
newLog, _ := sjson.Set(tr.Draft.Log, "log.custom_status", "processed_by_plugin")
tr.Draft.Log = newLog
return tr.Draft, nil
}Input plugins act as producers. They don't wait for the engine to call them; they push data to the engine.
package main
import (
"github.com/google/uuid"
"github.com/threatwinds/go-sdk/plugins"
"time"
)
func main() {
pluginName := "my-data-source"
// Start the background sender
go plugins.SendLogsFromChannel(pluginName)
for {
// Simulate fetching data
plugins.EnqueueLog(&plugins.Log{
Id: uuid.NewString(),
DataType: "custom_data",
DataSource: "external_api",
Timestamp: time.Now().Format(time.RFC3339),
Raw: `{"message": "hello world"}`,
}, pluginName)
time.Sleep(10 * time.Second)
}
}-
Build:
go build -o plugin-binary -
Naming: The plugin name passed to
InitParsingPlugin("name")must match the configuration in your YAML. -
YAML Integration:
pipeline: - dataTypes: [custom_data] steps: - dynamic: plugin: my-enricher params: { key: value }
-
Use the Catcher: Always use
github.com/threatwinds/go-sdk/catcherfor error reporting to ensure logs are standardized. -
Reserved Fields: Ensure your parsing logic maps data to the common schema (
origin.*,target.*,action). Avoid creating top-level fields outside oflog.*unless they are part of the core schema. -
Graceful Shutdown: The SDK's
Init...functions handle SIGINT and SIGTERM automatically. -
Config Management: Use
plugins.PluginCfg("plugin_name", true)to fetch configuration variables from the UTMStack environment.