SDK Reference

The NeuroSim Go SDK provides everything you need to build Kafka-based simulation plugins. This reference covers the complete public API surface.

Installation

Add the SDK to your Go project:

go get github.com/NeurosimIO/neurosim-sdk-go

Import it in your plugin:

import (
    sdk "github.com/NeurosimIO/neurosim-sdk-go"
)

Plugin Interface

All plugins must implement the Plugin interface with these five methods:

type Plugin interface {
    // GetRegistration returns the plugin's registration information
    GetRegistration() PluginRegistration

    // OnScenarioInitialize is called when a scenario starts
    OnScenarioInitialize(init ScenarioInitialize, scenario Scenario) error

    // OnCommand handles control commands for this scenario
    OnCommand(scenarioID string, command string) string

    // OnScenarioMessage processes incoming messages
    // Returns: (handled bool, replies []ScenarioMessage)
    OnScenarioMessage(msg ScenarioMessage, scenario Scenario) (bool, []ScenarioMessage)

    // OnStop is called during graceful scenario shutdown
    OnStop(stopMsgId string, stop ScenarioStop) error
}

GetRegistration

Returns the plugin's metadata and capabilities:

func (p *MyPlugin) GetRegistration() sdk.PluginRegistration {
    return sdk.PluginRegistration{
        ID:              "my-plugin",
        Name:            "My Plugin",
        ProtocolVersion: "1.0",
        Description:     "A sample plugin",
        Configuration: sdk.SchemaReference{
            SchemaURL: "https://example.com/schema.json",
        },
        SupportedMessages: []sdk.SchemaReference{
            {SchemaURL: "https://example.com/message-schema.json"},
        },
        Capabilities:     []string{"message-processing", "state-management"},
        Metadata:         map[string]string{"version": "1.0.0"},
        ControlTopicName: "my-plugin-control",
    }
}

OnScenarioInitialize

Called when a scenario starts. Initialize your plugin's state here:

func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
    scenarioID := scenario.GetScenarioID()
    scenarioName := scenario.GetScenarioName()
    config := scenario.GetConfiguration()

    // Initialize plugin state for this scenario
    p.state[scenarioID] = &PluginState{
        Config: config,
        Name:   scenarioName,
    }

    return nil
}

OnCommand

Handles control commands for a specific scenario:

func (p *MyPlugin) OnCommand(scenarioID string, command string) string {
    switch command {
    case "status":
        return fmt.Sprintf("Plugin running for scenario: %s", scenarioID)
    case "reset":
        p.resetState(scenarioID)
        return "State reset"
    default:
        return fmt.Sprintf("Unknown command: %s", command)
    }
}

OnScenarioMessage

Processes incoming messages. Return true if handled, and optionally reply:

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    // Check if we can handle this message type
    if msg.MessageType != "request" {
        return false, nil
    }

    // Process the message
    result := p.processRequest(msg.Payload)

    // Send reply
    reply := sdk.ScenarioMessage{
        MessageType: "response",
        Payload:     result,
        Headers:     map[string]string{"status": "success"},
    }

    return true, []sdk.ScenarioMessage{reply}
}

OnStop

Called during graceful shutdown. Clean up resources:

func (p *MyPlugin) OnStop(stopMsgId string, stop sdk.ScenarioStop) error {
    scenarioID := stop.ScenarioID

    // Clean up scenario state
    delete(p.state, scenarioID)

    // Close any connections
    if conn, exists := p.connections[scenarioID]; exists {
        conn.Close()
        delete(p.connections, scenarioID)
    }

    return nil
}

Core Types

PluginRegistration

Describes your plugin's metadata and capabilities:

type PluginRegistration struct {
    // ID is the unique plugin identifier
    ID string

    // Name is the human-readable plugin name
    Name string

    // ProtocolVersion indicates the plugin protocol version
    ProtocolVersion string

    // Description explains what the plugin does
    Description string

    // Configuration defines the plugin's config schema
    Configuration SchemaReference

    // SupportedMessages lists message types this plugin handles
    SupportedMessages []SchemaReference

    // Capabilities lists plugin capabilities
    Capabilities []string

    // Metadata contains additional key-value metadata
    Metadata map[string]string

    // ControlTopicName is the Kafka topic for control messages
    ControlTopicName string
}

Scenario

Provides access to scenario information and messaging:

type Scenario interface {
    // SendMessage sends a message to other plugins
    SendMessage(msg ScenarioMessage) error

    // SendAck sends an acknowledgment
    SendAck(originalMessage ScenarioMessage) error

    // Stopped notifies the system this scenario has stopped
    Stopped(stopMsgId string, message string) error

    // GetScenarioID returns the scenario identifier
    GetScenarioID() string

    // GetScenarioName returns the scenario name
    GetScenarioName() string

    // GetConfiguration returns scenario-specific config
    GetConfiguration() map[string]any
}

ScenarioMessage

Represents messages exchanged between plugins:

type ScenarioMessage struct {
    // MessageID is the unique message identifier
    MessageID string

    // MessageType identifies the message type
    MessageType string

    // Payload contains the message data
    Payload []byte

    // Headers contains additional metadata
    Headers map[string]string

    // Timestamp is when the message was created
    Timestamp time.Time

    // ScenarioID links the message to a scenario
    ScenarioID string

    // SourcePlugin is the sending plugin
    SourcePlugin string

    // TargetPlugin is the destination (optional)
    TargetPlugin string

    // CorrelationID links related messages
    CorrelationID string
}

PluginConfig

Configuration for the plugin runner:

type PluginConfig struct {
    // KafkaBrokers is the list of Kafka broker addresses
    KafkaBrokers []string

    // CoreAPIURL is the NeuroSim Core API endpoint
    CoreAPIURL string

    // PluginInstanceID overrides the auto-generated instance ID
    PluginInstanceID string

    // LogLevel sets the logging level
    LogLevel string

    // ConsumerGroupID sets the Kafka consumer group
    ConsumerGroupID string

    // EnableMetrics enables Prometheus metrics
    EnableMetrics bool

    // MetricsPort sets the metrics server port
    MetricsPort int
}

PluginRunner

The PluginRunner manages the plugin lifecycle, Kafka connectivity, and message routing.

Creating a Runner

func main() {
    plugin := &MyPlugin{}

    // Get default configuration
    config := sdk.DefaultConfig()
    config.KafkaBrokers = []string{"localhost:9092"}
    config.CoreAPIURL = "http://localhost:8080"

    // Create the runner
    runner, err := sdk.NewPluginRunner(plugin, config)
    if err != nil {
        log.Fatal(err)
    }

    // Run the plugin (blocks until shutdown)
    ctx := context.Background()
    if err := runner.Run(ctx); err != nil {
        log.Fatal(err)
    }
}

DefaultConfig

Provides sensible defaults:

config := sdk.DefaultConfig()
// Returns:
// - KafkaBrokers: ["localhost:9092"]
// - CoreAPIURL: "http://localhost:8080"
// - LogLevel: "info"
// - EnableMetrics: true
// - MetricsPort: 9090

Graceful Shutdown

The runner handles shutdown signals automatically:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Handle interrupt signals
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-sigCh
        cancel()
    }()

    runner, _ := sdk.NewPluginRunner(plugin, config)
    runner.Run(ctx)
}

Sending Messages and Replying

Sending Messages

Use the Scenario interface to send messages:

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    // Create a new message
    outgoingMsg := sdk.ScenarioMessage{
        MessageType:  "status-update",
        TargetPlugin: "other-plugin",
        Payload:      []byte(`{"status": "processing"}`),
        Headers: map[string]string{
            "priority": "high",
        },
    }

    // Send it
    if err := scenario.SendMessage(outgoingMsg); err != nil {
        // Handle error
        return true, nil
    }

    return true, nil
}

Replying to Messages

Return reply messages from OnScenarioMessage:

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    // Process the message
    result := p.process(msg.Payload)

    // Create reply
    reply := sdk.ScenarioMessage{
        MessageType:   "response",
        Payload:       result,
        CorrelationID: msg.MessageID, // Link to original
    }

    // Return reply
    return true, []sdk.ScenarioMessage{reply}
}

Sending Acknowledgments

Quickly acknowledge message receipt:

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    // Send acknowledgment
    if err := scenario.SendAck(msg); err != nil {
        // Handle error
    }

    // Process asynchronously
    go p.processAsync(msg)

    return true, nil
}

Configuration Schema

Define your plugin's configuration using JSON Schema. The schema validates configuration at runtime.

Basic Schema

func (p *MyPlugin) GetRegistration() sdk.PluginRegistration {
    return sdk.PluginRegistration{
        ID:   "my-plugin",
        Name: "My Plugin",
        Configuration: sdk.SchemaReference{
            Schema: map[string]interface{}{
                "type": "object",
                "properties": map[string]interface{}{
                    "endpoint": map[string]interface{}{
                        "type":        "string",
                        "format":      "uri",
                        "description": "Target system endpoint",
                    },
                    "timeout": map[string]interface{}{
                        "type":        "integer",
                        "minimum":     1,
                        "maximum":     300,
                        "default":     30,
                        "description": "Request timeout in seconds",
                    },
                },
                "required": []string{"endpoint"},
            },
        },
    }
}

Complex Schema

Configuration: sdk.SchemaReference{
    Schema: map[string]interface{}{
        "type": "object",
        "properties": map[string]interface{}{
            "database": map[string]interface{}{
                "type": "object",
                "properties": map[string]interface{}{
                    "host": map[string]interface{}{
                        "type":        "string",
                        "description": "Database host",
                    },
                    "port": map[string]interface{}{
                        "type":    "integer",
                        "minimum": 1,
                        "maximum": 65535,
                        "default": 5432,
                    },
                    "ssl": map[string]interface{}{
                        "type":    "boolean",
                        "default": true,
                    },
                },
                "required": []string{"host"},
            },
            "features": map[string]interface{}{
                "type": "array",
                "items": map[string]interface{}{
                    "type": "string",
                    "enum": []string{"logging", "metrics", "tracing"},
                },
                "uniqueItems": true,
                "description": "Enabled features",
            },
            "mode": map[string]interface{}{
                "type": "string",
                "enum": []string{"development", "staging", "production"},
                "default": "development",
            },
        },
        "required": []string{"database"},
    },
}

Using External Schema

Reference an external JSON Schema file:

Configuration: sdk.SchemaReference{
    SchemaURL: "https://example.com/schemas/my-plugin-config-v1.json",
}

Accessing Configuration

Read configuration in your plugin:

func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
    config := scenario.GetConfiguration()

    // Type-safe access
    endpoint, ok := config["endpoint"].(string)
    if !ok {
        return errors.New("endpoint not configured")
    }

    timeout := 30 // default
    if t, ok := config["timeout"].(float64); ok {
        timeout = int(t)
    }

    p.client = NewClient(endpoint, timeout)
    return nil
}

Error Handling

The SDK provides structured error handling patterns.

Standard Error Handling

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    result, err := p.processMessage(msg)
    if err != nil {
        // Log the error
        log.Printf("Failed to process message %s: %v", msg.MessageID, err)

        // Send error response
        errorMsg := sdk.ScenarioMessage{
            MessageType: "error",
            Payload:     []byte(fmt.Sprintf(`{"error": "%s"}`, err.Error())),
            CorrelationID: msg.MessageID,
        }
        return true, []sdk.ScenarioMessage{errorMsg}
    }

    return true, []sdk.ScenarioMessage{result}
}

Initialization Errors

Return errors from OnScenarioInitialize to prevent scenario start:

func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
    config := scenario.GetConfiguration()

    endpoint, ok := config["endpoint"].(string)
    if !ok || endpoint == "" {
        return fmt.Errorf("endpoint is required")
    }

    if err := p.validateConnection(endpoint); err != nil {
        return fmt.Errorf("connection validation failed: %w", err)
    }

    return nil
}

Panic Recovery

The SDK recovers from panics in plugin code:

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    // Even if this panics, the runner will recover
    result := p.riskyOperation(msg)
    return true, []sdk.ScenarioMessage{result}
}

Logging

The SDK includes structured logging capabilities.

Basic Logging

import "log"

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    log.Printf("Processing message: type=%s, id=%s", msg.MessageType, msg.MessageID)

    result := p.process(msg)

    log.Printf("Message processed successfully: id=%s", msg.MessageID)

    return true, []sdk.ScenarioMessage{result}
}

Structured Logging

Use structured logging for better observability:

import (
    "log/slog"
)

func (p *MyPlugin) OnScenarioInitialize(init sdk.ScenarioInitialize, scenario sdk.Scenario) error {
    p.logger = slog.New(slog.NewJSONHandler(os.Stdout, nil))

    p.logger.Info("Scenario initialized",
        "scenario_id", scenario.GetScenarioID(),
        "scenario_name", scenario.GetScenarioName(),
        "plugin", "my-plugin",
    )

    return nil
}

func (p *MyPlugin) OnScenarioMessage(msg sdk.ScenarioMessage, scenario sdk.Scenario) (bool, []sdk.ScenarioMessage) {
    p.logger.Debug("Message received",
        "message_id", msg.MessageID,
        "message_type", msg.MessageType,
        "source", msg.SourcePlugin,
    )

    result, err := p.process(msg)
    if err != nil {
        p.logger.Error("Processing failed",
            "message_id", msg.MessageID,
            "error", err,
        )
        return false, nil
    }

    p.logger.Info("Message processed",
        "message_id", msg.MessageID,
        "response_type", result.MessageType,
    )

    return true, []sdk.ScenarioMessage{result}
}

Log Levels

Configure logging via the plugin config:

config := sdk.DefaultConfig()
config.LogLevel = "debug" // trace, debug, info, warn, error

Testing Utilities

The SDK provides testing utilities to help test plugins without Kafka infrastructure.

Test Harness

package myplugin_test

import (
    "testing"

    sdk "github.com/NeurosimIO/neurosim-sdk-go"
    "github.com/NeurosimIO/neurosim-sdk-go/testing"
)

func TestMyPlugin(t *testing.T) {
    plugin := &MyPlugin{}

    // Create test harness
    harness := testing.NewHarness(plugin)

    // Initialize scenario
    err := harness.Initialize(testing.ScenarioInit{
        ScenarioID:   "test-scenario",
        ScenarioName: "Test Scenario",
        Config: map[string]interface{}{
            "endpoint": "http://test.example.com",
            "timeout":  10,
        },
    })
    if err != nil {
        t.Fatalf("Initialize failed: %v", err)
    }

    // Test message handling
    handled, replies := harness.SendMessage(sdk.ScenarioMessage{
        MessageID:   "test-msg-1",
        MessageType: "request",
        Payload:     []byte(`{"action": "test"}`),
    })

    if !handled {
        t.Error("Message not handled")
    }

    if len(replies) != 1 {
        t.Fatalf("Expected 1 reply, got %d", len(replies))
    }

    if replies[0].MessageType != "response" {
        t.Errorf("Expected response, got %s", replies[0].MessageType)
    }
}

Testing Message Flow

func TestMessageFlow(t *testing.T) {
    plugin := &MyPlugin{}
    harness := testing.NewHarness(plugin)

    harness.Initialize(testing.ScenarioInit{
        ScenarioID: "test",
        Config:     map[string]interface{}{"endpoint": "http://test"},
    })

    // Send multiple messages
    messages := []sdk.ScenarioMessage{
        {MessageType: "init", Payload: []byte(`{"step": 1}`)},
        {MessageType: "process", Payload: []byte(`{"step": 2}`)},
        {MessageType: "finalize", Payload: []byte(`{"step": 3}`)},
    }

    for i, msg := range messages {
        handled, replies := harness.SendMessage(msg)

        if !handled {
            t.Errorf("Message %d not handled", i)
        }

        if len(replies) == 0 {
            t.Errorf("Message %d produced no replies", i)
        }
    }
}

Testing Configuration Validation

func TestConfigValidation(t *testing.T) {
    plugin := &MyPlugin{}
    harness := testing.NewHarness(plugin)

    // Test invalid config
    err := harness.Initialize(testing.ScenarioInit{
        ScenarioID: "test",
        Config:     map[string]interface{}{}, // missing required endpoint
    })

    if err == nil {
        t.Error("Expected error for invalid config")
    }

    // Test valid config
    err = harness.Initialize(testing.ScenarioInit{
        ScenarioID: "test",
        Config: map[string]interface{}{
            "endpoint": "http://valid.example.com",
        },
    })

    if err != nil {
        t.Errorf("Valid config failed: %v", err)
    }
}

Mocking Scenario

Create a mock scenario for unit testing:

type MockScenario struct {
    id       string
    name     string
    config   map[string]any
    messages []sdk.ScenarioMessage
}

func (m *MockScenario) GetScenarioID() string {
    return m.id
}

func (m *MockScenario) GetScenarioName() string {
    return m.name
}

func (m *MockScenario) GetConfiguration() map[string]any {
    return m.config
}

func (m *MockScenario) SendMessage(msg sdk.ScenarioMessage) error {
    m.messages = append(m.messages, msg)
    return nil
}

func (m *MockScenario) SendAck(msg sdk.ScenarioMessage) error {
    return nil
}

func (m *MockScenario) Stopped(stopMsgId string, message string) error {
    return nil
}

func TestWithMock(t *testing.T) {
    plugin := &MyPlugin{}
    mock := &MockScenario{
        id:     "test",
        name:   "Test",
        config: map[string]any{"endpoint": "http://test"},
    }

    err := plugin.OnScenarioInitialize(sdk.ScenarioInitialize{}, mock)
    if err != nil {
        t.Fatalf("Initialize failed: %v", err)
    }

    handled, _ := plugin.OnScenarioMessage(sdk.ScenarioMessage{
        MessageType: "test",
        Payload:     []byte(`{}`),
    }, mock)

    if !handled {
        t.Error("Message not handled")
    }

    if len(mock.messages) == 0 {
        t.Error("No messages sent")
    }
}

Next Steps