SDK Reference
The NeuroSim Go SDK provides everything you need to build Kafka-based simulation plugins. This reference covers the complete public API surface.
Installation
Add the SDK to your Go project:
go get github.com/NeurosimIO/neurosim-sdk-go
Import it in your plugin:
import (
sdk "github.com/NeurosimIO/neurosim-sdk-go"
)
Plugin Interface
All plugins must implement the Plugin interface with these five methods:
type Plugin interface {
// GetRegistration returns the plugin's registration information
GetRegistration() PluginRegistration
// OnScenarioInitialize is called when a scenario starts
OnScenarioInitialize(init ScenarioInitialize, scenario Scenario) error
// OnCommand handles control commands for this scenario
OnCommand(scenarioID string, command string) string
// OnScenarioMessage processes incoming messages
// Returns: (handled bool, replies []ScenarioMessage)
OnScenarioMessage(msg ScenarioMessage, scenario Scenario) (bool, []ScenarioMessage)
// OnStop is called during graceful scenario shutdown
OnStop(stopMsgId string, stop ScenarioStop) error
}
GetRegistration
Returns the plugin's metadata and capabilities:
func (p *MyPlugin) GetRegistration() sdk.PluginRegistration {
return sdk.PluginRegistration{
ID: "my-plugin",
Name: "My Plugin",
ProtocolVersion: "1.0",
Description: "A sample plugin",
Configuration: sdk.SchemaReference{
SchemaURL: "https://example.com/schema.json",
},
SupportedMessages: []sdk.SchemaReference{
{SchemaURL: "https://example.com/message-schema.json"},
},
Capabilities: []string{"message-processing", "state-management"},
Metadata: map[string]string{"version": "1.0.0"},
ControlTopicName: "my-plugin-control",
}
}
OnScenarioInitialize
Called when a scenario starts. Initialize your plugin's state here:
func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
scenarioID := scenario.GetScenarioID()
scenarioName := scenario.GetScenarioName()
config := scenario.GetConfiguration()
// Initialize plugin state for this scenario
p.state[scenarioID] = &PluginState{
Config: config,
Name: scenarioName,
}
return nil
}
OnCommand
Handles control commands for a specific scenario:
func (p *MyPlugin) OnCommand(scenarioID string, command string) string {
switch command {
case "status":
return fmt.Sprintf("Plugin running for scenario: %s", scenarioID)
case "reset":
p.resetState(scenarioID)
return "State reset"
default:
return fmt.Sprintf("Unknown command: %s", command)
}
}
OnScenarioMessage
Processes incoming messages. Return true if handled, and optionally reply:
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
// Check if we can handle this message type
if msg.MessageType != "request" {
return false, nil
}
// Process the message
result := p.processRequest(msg.Payload)
// Send reply
reply := sdk.ScenarioMessage{
MessageType: "response",
Payload: result,
Headers: map[string]string{"status": "success"},
}
return true, []sdk.ScenarioMessage{reply}
}
OnStop
Called during graceful shutdown. Clean up resources:
func (p *MyPlugin) OnStop(stopMsgId string, stop sdk.ScenarioStop) error {
scenarioID := stop.ScenarioID
// Clean up scenario state
delete(p.state, scenarioID)
// Close any connections
if conn, exists := p.connections[scenarioID]; exists {
conn.Close()
delete(p.connections, scenarioID)
}
return nil
}
Core Types
PluginRegistration
Describes your plugin's metadata and capabilities:
type PluginRegistration struct {
// ID is the unique plugin identifier
ID string
// Name is the human-readable plugin name
Name string
// ProtocolVersion indicates the plugin protocol version
ProtocolVersion string
// Description explains what the plugin does
Description string
// Configuration defines the plugin's config schema
Configuration SchemaReference
// SupportedMessages lists message types this plugin handles
SupportedMessages []SchemaReference
// Capabilities lists plugin capabilities
Capabilities []string
// Metadata contains additional key-value metadata
Metadata map[string]string
// ControlTopicName is the Kafka topic for control messages
ControlTopicName string
}
Scenario
Provides access to scenario information and messaging:
type Scenario interface {
// SendMessage sends a message to other plugins
SendMessage(msg ScenarioMessage) error
// SendAck sends an acknowledgment
SendAck(originalMessage ScenarioMessage) error
// Stopped notifies the system this scenario has stopped
Stopped(stopMsgId string, message string) error
// GetScenarioID returns the scenario identifier
GetScenarioID() string
// GetScenarioName returns the scenario name
GetScenarioName() string
// GetConfiguration returns scenario-specific config
GetConfiguration() map[string]any
}
ScenarioMessage
Represents messages exchanged between plugins:
type ScenarioMessage struct {
// MessageID is the unique message identifier
MessageID string
// MessageType identifies the message type
MessageType string
// Payload contains the message data
Payload []byte
// Headers contains additional metadata
Headers map[string]string
// Timestamp is when the message was created
Timestamp time.Time
// ScenarioID links the message to a scenario
ScenarioID string
// SourcePlugin is the sending plugin
SourcePlugin string
// TargetPlugin is the destination (optional)
TargetPlugin string
// CorrelationID links related messages
CorrelationID string
}
PluginConfig
Configuration for the plugin runner:
type PluginConfig struct {
// KafkaBrokers is the list of Kafka broker addresses
KafkaBrokers []string
// CoreAPIURL is the NeuroSim Core API endpoint
CoreAPIURL string
// PluginInstanceID overrides the auto-generated instance ID
PluginInstanceID string
// LogLevel sets the logging level
LogLevel string
// ConsumerGroupID sets the Kafka consumer group
ConsumerGroupID string
// EnableMetrics enables Prometheus metrics
EnableMetrics bool
// MetricsPort sets the metrics server port
MetricsPort int
}
PluginRunner
The PluginRunner manages the plugin lifecycle, Kafka connectivity, and message routing.
Creating a Runner
func main() {
plugin := &MyPlugin{}
// Get default configuration
config := sdk.DefaultConfig()
config.KafkaBrokers = []string{"localhost:9092"}
config.CoreAPIURL = "http://localhost:8080"
// Create the runner
runner, err := sdk.NewPluginRunner(plugin, config)
if err != nil {
log.Fatal(err)
}
// Run the plugin (blocks until shutdown)
ctx := context.Background()
if err := runner.Run(ctx); err != nil {
log.Fatal(err)
}
}
DefaultConfig
Provides sensible defaults:
config := sdk.DefaultConfig()
// Returns:
// - KafkaBrokers: ["localhost:9092"]
// - CoreAPIURL: "http://localhost:8080"
// - LogLevel: "info"
// - EnableMetrics: true
// - MetricsPort: 9090
Graceful Shutdown
The runner handles shutdown signals automatically:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle interrupt signals
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
cancel()
}()
runner, _ := sdk.NewPluginRunner(plugin, config)
runner.Run(ctx)
}
Sending Messages and Replying
Sending Messages
Use the Scenario interface to send messages:
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
// Create a new message
outgoingMsg := sdk.ScenarioMessage{
MessageType: "status-update",
TargetPlugin: "other-plugin",
Payload: []byte(`{"status": "processing"}`),
Headers: map[string]string{
"priority": "high",
},
}
// Send it
if err := scenario.SendMessage(outgoingMsg); err != nil {
// Handle error
return true, nil
}
return true, nil
}
Replying to Messages
Return reply messages from OnScenarioMessage:
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
// Process the message
result := p.process(msg.Payload)
// Create reply
reply := sdk.ScenarioMessage{
MessageType: "response",
Payload: result,
CorrelationID: msg.MessageID, // Link to original
}
// Return reply
return true, []sdk.ScenarioMessage{reply}
}
Sending Acknowledgments
Quickly acknowledge message receipt:
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
// Send acknowledgment
if err := scenario.SendAck(msg); err != nil {
// Handle error
}
// Process asynchronously
go p.processAsync(msg)
return true, nil
}
Configuration Schema
Define your plugin's configuration using JSON Schema. The schema validates configuration at runtime.
Basic Schema
func (p *MyPlugin) GetRegistration() sdk.PluginRegistration {
return sdk.PluginRegistration{
ID: "my-plugin",
Name: "My Plugin",
Configuration: sdk.SchemaReference{
Schema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"endpoint": map[string]interface{}{
"type": "string",
"format": "uri",
"description": "Target system endpoint",
},
"timeout": map[string]interface{}{
"type": "integer",
"minimum": 1,
"maximum": 300,
"default": 30,
"description": "Request timeout in seconds",
},
},
"required": []string{"endpoint"},
},
},
}
}
Complex Schema
Configuration: sdk.SchemaReference{
Schema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"database": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"host": map[string]interface{}{
"type": "string",
"description": "Database host",
},
"port": map[string]interface{}{
"type": "integer",
"minimum": 1,
"maximum": 65535,
"default": 5432,
},
"ssl": map[string]interface{}{
"type": "boolean",
"default": true,
},
},
"required": []string{"host"},
},
"features": map[string]interface{}{
"type": "array",
"items": map[string]interface{}{
"type": "string",
"enum": []string{"logging", "metrics", "tracing"},
},
"uniqueItems": true,
"description": "Enabled features",
},
"mode": map[string]interface{}{
"type": "string",
"enum": []string{"development", "staging", "production"},
"default": "development",
},
},
"required": []string{"database"},
},
}
Using External Schema
Reference an external JSON Schema file:
Configuration: sdk.SchemaReference{
SchemaURL: "https://example.com/schemas/my-plugin-config-v1.json",
}
Accessing Configuration
Read configuration in your plugin:
func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
config := scenario.GetConfiguration()
// Type-safe access
endpoint, ok := config["endpoint"].(string)
if !ok {
return errors.New("endpoint not configured")
}
timeout := 30 // default
if t, ok := config["timeout"].(float64); ok {
timeout = int(t)
}
p.client = NewClient(endpoint, timeout)
return nil
}
Error Handling
The SDK provides structured error handling patterns.
Standard Error Handling
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
result, err := p.processMessage(msg)
if err != nil {
// Log the error
log.Printf("Failed to process message %s: %v", msg.MessageID, err)
// Send error response
errorMsg := sdk.ScenarioMessage{
MessageType: "error",
Payload: []byte(fmt.Sprintf(`{"error": "%s"}`, err.Error())),
CorrelationID: msg.MessageID,
}
return true, []sdk.ScenarioMessage{errorMsg}
}
return true, []sdk.ScenarioMessage{result}
}
Initialization Errors
Return errors from OnScenarioInitialize to prevent scenario start:
func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
config := scenario.GetConfiguration()
endpoint, ok := config["endpoint"].(string)
if !ok || endpoint == "" {
return fmt.Errorf("endpoint is required")
}
if err := p.validateConnection(endpoint); err != nil {
return fmt.Errorf("connection validation failed: %w", err)
}
return nil
}
Panic Recovery
The SDK recovers from panics in plugin code:
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
// Even if this panics, the runner will recover
result := p.riskyOperation(msg)
return true, []sdk.ScenarioMessage{result}
}
Logging
The SDK includes structured logging capabilities.
Basic Logging
import "log"
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
log.Printf("Processing message: type=%s, id=%s", msg.MessageType, msg.MessageID)
result := p.process(msg)
log.Printf("Message processed successfully: id=%s", msg.MessageID)
return true, []sdk.ScenarioMessage{result}
}
Structured Logging
Use structured logging for better observability:
import (
"log/slog"
)
func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
p.logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
p.logger.Info("Scenario initialized",
"scenario_id", scenario.GetScenarioID(),
"scenario_name", scenario.GetScenarioName(),
"plugin", "my-plugin",
)
return nil
}
func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
p.logger.Debug("Message received",
"message_id", msg.MessageID,
"message_type", msg.MessageType,
"source", msg.SourcePlugin,
)
result, err := p.process(msg)
if err != nil {
p.logger.Error("Processing failed",
"message_id", msg.MessageID,
"error", err,
)
return false, nil
}
p.logger.Info("Message processed",
"message_id", msg.MessageID,
"response_type", result.MessageType,
)
return true, []sdk.ScenarioMessage{result}
}
Log Levels
Configure logging via the plugin config:
config := sdk.DefaultConfig()
config.LogLevel = "debug" // trace, debug, info, warn, error
Testing Utilities
The SDK provides testing utilities to help test plugins without Kafka infrastructure.
Test Harness
package myplugin_test
import (
"testing"
sdk "github.com/NeurosimIO/neurosim-sdk-go"
"github.com/NeurosimIO/neurosim-sdk-go/testing"
)
func TestMyPlugin(t *testing.T) {
plugin := &MyPlugin{}
// Create test harness
harness := testing.NewHarness(plugin)
// Initialize scenario
err := harness.Initialize(testing.ScenarioInit{
ScenarioID: "test-scenario",
ScenarioName: "Test Scenario",
Config: map[string]interface{}{
"endpoint": "http://test.example.com",
"timeout": 10,
},
})
if err != nil {
t.Fatalf("Initialize failed: %v", err)
}
// Test message handling
handled, replies := harness.SendMessage(sdk.ScenarioMessage{
MessageID: "test-msg-1",
MessageType: "request",
Payload: []byte(`{"action": "test"}`),
})
if !handled {
t.Error("Message not handled")
}
if len(replies) != 1 {
t.Fatalf("Expected 1 reply, got %d", len(replies))
}
if replies[0].MessageType != "response" {
t.Errorf("Expected response, got %s", replies[0].MessageType)
}
}
Testing Message Flow
func TestMessageFlow(t *testing.T) {
plugin := &MyPlugin{}
harness := testing.NewHarness(plugin)
harness.Initialize(testing.ScenarioInit{
ScenarioID: "test",
Config: map[string]interface{}{"endpoint": "http://test"},
})
// Send multiple messages
messages := []sdk.ScenarioMessage{
{MessageType: "init", Payload: []byte(`{"step": 1}`)},
{MessageType: "process", Payload: []byte(`{"step": 2}`)},
{MessageType: "finalize", Payload: []byte(`{"step": 3}`)},
}
for i, msg := range messages {
handled, replies := harness.SendMessage(msg)
if !handled {
t.Errorf("Message %d not handled", i)
}
if len(replies) == 0 {
t.Errorf("Message %d produced no replies", i)
}
}
}
Testing Configuration Validation
func TestConfigValidation(t *testing.T) {
plugin := &MyPlugin{}
harness := testing.NewHarness(plugin)
// Test invalid config
err := harness.Initialize(testing.ScenarioInit{
ScenarioID: "test",
Config: map[string]interface{}{}, // missing required endpoint
})
if err == nil {
t.Error("Expected error for invalid config")
}
// Test valid config
err = harness.Initialize(testing.ScenarioInit{
ScenarioID: "test",
Config: map[string]interface{}{
"endpoint": "http://valid.example.com",
},
})
if err != nil {
t.Errorf("Valid config failed: %v", err)
}
}
Mocking Scenario
Create a mock scenario for unit testing:
type MockScenario struct {
id string
name string
config map[string]any
messages []sdk.ScenarioMessage
}
func (m *MockScenario) GetScenarioID() string {
return m.id
}
func (m *MockScenario) GetScenarioName() string {
return m.name
}
func (m *MockScenario) GetConfiguration() map[string]any {
return m.config
}
func (m *MockScenario) SendMessage(msg sdk.ScenarioMessage) error {
m.messages = append(m.messages, msg)
return nil
}
func (m *MockScenario) SendAck(msg sdk.ScenarioMessage) error {
return nil
}
func (m *MockScenario) Stopped(stopMsgId string, message string) error {
return nil
}
func TestWithMock(t *testing.T) {
plugin := &MyPlugin{}
mock := &MockScenario{
id: "test",
name: "Test",
config: map[string]any{"endpoint": "http://test"},
}
err := plugin.OnScenarioInitialize(sdk.ScenarioInitialize{}, mock)
if err != nil {
t.Fatalf("Initialize failed: %v", err)
}
handled, _ := plugin.OnScenarioMessage(sdk.ScenarioMessage{
MessageType: "test",
Payload: []byte(`{}`),
}, mock)
if !handled {
t.Error("Message not handled")
}
if len(mock.messages) == 0 {
t.Error("No messages sent")
}
}
Next Steps
- Build your first plugin - Follow the getting started guide
- Understand the architecture - Learn how NeuroSim works
- Explore tutorials - See complete plugin examples