Processor SDK

footprint-processor-sdk is sdk to handle message from footprint-ingestor server for the Footprint SDK for the Go programming language. This Developer Preview is provided to receive feedback from other teams on SDK changes prior to the final release. As such users should expect the SDK to release minor version releases that break backwards compatability. The release notes for the breaking change will include information about the breaking change, and how you can migrate to the latest version.

Check out the [Issues] and [Projects] for design and updates being made to the SDK. The SDK requires a minimum version of Go 1.14.

We’ll be expanding out the [Issues] and [Projects] sections with additional changes to the SDK based on your feedback, and SDK’s core’s improvements. Check the the SDK’s [CHANGELOG] for information about the latest updates to the SDK.

Jump To:

Project Status

The SDK is in preview state as we work to design and implement potentially breaking changes to the SDK as we update the SDK’s layout and usage patterns based on your feedback. You can also expect periodic service API model updates as well.

We are actively seeking team feedback for several changes to the SDK. Please review our [design] page on issues that are currently pending team feedback.

Users should expect significant changes that could affect the following (non-exhaustive) areas:

  • Service Footprint-ingestor

Getting started

The best way to get started working with the SDK is to use go get to add the SDK to your Go Workspace or application using Go modules.

git config --global url."git@git.teko.vn:".insteadOf  "https://git.teko.vn/"

export GOSUMDB=off
go get -u -v https://git.teko.vn/data/footprint/footprint-sdk/golang

Hello Footprint

This example shows how you can use the SDK to handle request using the SDK’s [Footprint-processor]. It makes use of the protobuf defined in the example in the previous section.

With builtin stage and custom stage

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	"golang.org/x/xerrors"

	"go.tekoapis.com/footprint/golang/common/logger"
	"go.tekoapis.com/footprint/golang/processor/config"
	"go.tekoapis.com/footprint/golang/processor/pipeline"
	"go.tekoapis.com/footprint/golang/processor/pipeline/builder"
	"go.tekoapis.com/footprint/golang/processor/pipeline/builtinstage/stage"
	"go.tekoapis.com/footprint/golang/processor/pipeline/builtinstage/stage/tracking"
	"go.tekoapis.com/footprint/golang/processor/pipeline/configparser"
	"go.tekoapis.com/footprint/golang/processor/server"
	"go.tekoapis.com/footprint/golang/sdk"
)

func main() {
	cfg := config.LoadAppConfig()
	if _, err := logger.InitLogger(cfg.LogConfig, logger.LoggerBackendZap); err != nil {
		panic(fmt.Sprintf("error when init new logger: %v", err))
	}

	logger.Infof("load yaml config completed")

	globalCtx := context.Background()

	// Trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// Start server http.
	server.Serve(globalCtx, server.Config{HTTPPort: 3000})

	processorsdk, err := sdk.NewProcessorSDK(sdk.ProcessorSDKConfig{
		LogConfig:       cfg.LogConfig,
		ConsumerConfig:  cfg.ConsumerConfig,
		BuiltinConfig: sdk.BuiltinConfig{
			ProducerConfig:       cfg.ProducerConfig,
			SchemaRegistryConfig: cfg.SchemaRegistryConfig,
		},
	})

    if err != nil {
		panic(fmt.Sprintf("error when create Processor: %v", err))
	}

	// Add more custom stage before apply pipelineconfig
    err = addCustomStage(cfg, processorsdk)
	if err != nil {
		panic(fmt.Sprintf("error when add custom stage: %v", err))
	}

	err = processorsdk.ApplyPipelineConfig(cfg.YamlFilePath)
	if err != nil {
		panic(fmt.Sprintf("error when apply pipeline config: %v", err))
	}

	err = processorsdk.Start(globalCtx)
	if err != nil {
		panic(fmt.Sprintf("error when apply pipeline config: %v", err))
	}

	// nolint:S1000
	for {
		select {
		case <-signals:
			closeCtx, closeFn := context.WithTimeout(context.Background(), 5*time.Second)
			if err := processorsdk.Close(closeCtx); err != nil {
				logger.Errorf("error when close log entry processor: %+v", err)
			}
			closeFn()
			return
		}
	}
}

// add tracking custom stage
func addCustomStage(cfg config.AppConfig, processorSDK *sdk.ProcessorSDK) error {
	// stage tracking json enrichment
	processorSDK.AddCustomStage(builder.StageProvider{
		Type: stage.ConvertProtobufToTrackingJsonStageType,
		ProvideFn: func(eventType string, options configparser.Options) (pipeline.Stage, error) {
			// TODO: Do something ...
			return nil, nil
		},
	})
	return nil
}

Without builtin stage and custom stage

Instead of

processorsdk, err := sdk.NewProcessorSDK(sdk.ProcessorSDKConfig{
    LogConfig:       cfg.LogConfig,
    ConsumerConfig:  cfg.ConsumerConfig,
    BuiltinConfig: sdk.BuiltinConfig{
        ProducerConfig:       cfg.ProducerConfig,
        SchemaRegistryConfig: cfg.SchemaRegistryConfig,
        HTTPPort:             cfg.HTTPPort,
    },
})

Change to

processorsdk, err := sdk.NewPureProcessorSDK(sdk.PureProcessorSDKConfig{
    LogConfig:       cfg.LogConfig,
    ConsumerConfig:  cfg.ConsumerConfig,
})

Pipeline define

This example shows how you can config pipelines to process

templates:
  tracking-pipeline-template:
    stages:
      - id: "1"
        type: builtin/convert_log_entry_to_protobuf
        source_id: __ROOT_KAFKA__
      - id: "2"
        type: tracking/convert_protobuf_to_json
        source_id: "1"
      - id: "3"
        type: builtin/convert_data_to_byte_array
        source_id: "2"
      - id: "4"
        type: builtin/write_to_kafka
        options:
          kafka:
            topic: footprint
        source_id: "3"
    dead_letter_topic: deadletter_tracking_{{ .eventType }}

pipelines:
  tracking.v1.web_exception_event:
    include: tracking-pipeline-template
    args:
      eventType: web_exception_event
    stages: # Overwrite stage 4
      - id: "4"
        type: builtin/write_to_kafka
        options:
          kafka:
            topic: footprint-exception
        source_id: "3"
  tracking.v1.web_alert_event:
    include: tracking-pipeline-template
    args:
      eventType: web_alert_event
List builtin stage:
- builtin/convert_log_entry_to_protobuf: Marshaler to protobuf
    + input: byte[]
    + output: protobuf
- builtin/convert_protobuf_to_avro:
    + input: protobuf
    + output: avro
- builtin/convert_protobuf_to_json
    + input: protobuf
    + output: json
- builtin/convert_data_to_byte_array
    + input: any
    + output: byte[]
- builtin/validate_protobuf
    + input: protobuf
    + output: protobuf
- builtin/write_to_elasticsearch
    + input: json
    + output: nil
- builtin/write_to_kafka
    + input: byte[]
    + output: nil
- builtin/write_to_scylladb
    + input: byte[]
    + output: nil

Getting Help

Please contact to my team for supporting . We use the GitLab issues for tracking bugs and feature requests.

Opening Issues

If you encounter a bug with the Footprint SDK for Go we would like to hear about it. Search the [existing issues][issues] and see if others are also experiencing the issue before opening a new issue. Please include the version of Footprint SDK for Go, Go language, and OS you’re using. Please also include reproduction case when appropriate.

The GitLab issues are intended for bug reports and feature requests. For help and questions with using Footprint SDK for Go.