footprint-processor-sdk is sdk to handle message from footprint-ingestor server for the Footprint SDK for the
Go programming language.
This Developer Preview is provided to receive feedback from other teams
on SDK changes prior to the final release.
As such users should expect the SDK to release minor
version releases that break backwards compatability.
The release notes for the breaking change will include information
about the breaking change, and how you can migrate to the latest version.
Check out the [Issues] and [Projects] for design and updates being made to the SDK. The SDK requires a minimum version of Go 1.14.
We’ll be expanding out the [Issues] and [Projects] sections with additional changes to the SDK based on your feedback, and SDK’s core’s improvements. Check the the SDK’s [CHANGELOG] for information about the latest updates to the SDK.
Jump To:
Project Status
The SDK is in preview state as we work to design and implement potentially breaking changes to the SDK as we update the SDK’s layout and usage patterns based on your feedback. You can also expect periodic service API model updates as well.
We are actively seeking team feedback for several changes to the SDK. Please review our [design] page on issues that are currently pending team feedback.
Users should expect significant changes that could affect the following (non-exhaustive) areas:
- Service Footprint-ingestor
Getting started
The best way to get started working with the SDK is to use go get to add the SDK to your Go Workspace or application using Go modules.
git config --global url."git@git.teko.vn:".insteadOf "https://git.teko.vn/"
export GOSUMDB=off
go get -u -v https://git.teko.vn/data/footprint/footprint-sdk/golang
Hello Footprint
This example shows how you can use the SDK to handle request using the SDK’s [Footprint-processor]. It makes use of the protobuf defined in the example in the previous section.
With builtin stage and custom stage
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"golang.org/x/xerrors"
"go.tekoapis.com/footprint/golang/common/logger"
"go.tekoapis.com/footprint/golang/processor/config"
"go.tekoapis.com/footprint/golang/processor/pipeline"
"go.tekoapis.com/footprint/golang/processor/pipeline/builder"
"go.tekoapis.com/footprint/golang/processor/pipeline/builtinstage/stage"
"go.tekoapis.com/footprint/golang/processor/pipeline/builtinstage/stage/tracking"
"go.tekoapis.com/footprint/golang/processor/pipeline/configparser"
"go.tekoapis.com/footprint/golang/processor/server"
"go.tekoapis.com/footprint/golang/sdk"
)
func main() {
cfg := config.LoadAppConfig()
if _, err := logger.InitLogger(cfg.LogConfig, logger.LoggerBackendZap); err != nil {
panic(fmt.Sprintf("error when init new logger: %v", err))
}
logger.Infof("load yaml config completed")
globalCtx := context.Background()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// Start server http.
server.Serve(globalCtx, server.Config{HTTPPort: 3000})
processorsdk, err := sdk.NewProcessorSDK(sdk.ProcessorSDKConfig{
LogConfig: cfg.LogConfig,
ConsumerConfig: cfg.ConsumerConfig,
BuiltinConfig: sdk.BuiltinConfig{
ProducerConfig: cfg.ProducerConfig,
SchemaRegistryConfig: cfg.SchemaRegistryConfig,
},
})
if err != nil {
panic(fmt.Sprintf("error when create Processor: %v", err))
}
// Add more custom stage before apply pipelineconfig
err = addCustomStage(cfg, processorsdk)
if err != nil {
panic(fmt.Sprintf("error when add custom stage: %v", err))
}
err = processorsdk.ApplyPipelineConfig(cfg.YamlFilePath)
if err != nil {
panic(fmt.Sprintf("error when apply pipeline config: %v", err))
}
err = processorsdk.Start(globalCtx)
if err != nil {
panic(fmt.Sprintf("error when apply pipeline config: %v", err))
}
// nolint:S1000
for {
select {
case <-signals:
closeCtx, closeFn := context.WithTimeout(context.Background(), 5*time.Second)
if err := processorsdk.Close(closeCtx); err != nil {
logger.Errorf("error when close log entry processor: %+v", err)
}
closeFn()
return
}
}
}
// add tracking custom stage
func addCustomStage(cfg config.AppConfig, processorSDK *sdk.ProcessorSDK) error {
// stage tracking json enrichment
processorSDK.AddCustomStage(builder.StageProvider{
Type: stage.ConvertProtobufToTrackingJsonStageType,
ProvideFn: func(eventType string, options configparser.Options) (pipeline.Stage, error) {
// TODO: Do something ...
return nil, nil
},
})
return nil
}
Without builtin stage and custom stage
Instead of
processorsdk, err := sdk.NewProcessorSDK(sdk.ProcessorSDKConfig{
LogConfig: cfg.LogConfig,
ConsumerConfig: cfg.ConsumerConfig,
BuiltinConfig: sdk.BuiltinConfig{
ProducerConfig: cfg.ProducerConfig,
SchemaRegistryConfig: cfg.SchemaRegistryConfig,
HTTPPort: cfg.HTTPPort,
},
})
Change to
processorsdk, err := sdk.NewPureProcessorSDK(sdk.PureProcessorSDKConfig{
LogConfig: cfg.LogConfig,
ConsumerConfig: cfg.ConsumerConfig,
})
Pipeline define
This example shows how you can config pipelines to process
templates:
tracking-pipeline-template:
stages:
- id: "1"
type: builtin/convert_log_entry_to_protobuf
source_id: __ROOT_KAFKA__
- id: "2"
type: tracking/convert_protobuf_to_json
source_id: "1"
- id: "3"
type: builtin/convert_data_to_byte_array
source_id: "2"
- id: "4"
type: builtin/write_to_kafka
options:
kafka:
topic: footprint
source_id: "3"
dead_letter_topic: deadletter_tracking_{{ .eventType }}
pipelines:
tracking.v1.web_exception_event:
include: tracking-pipeline-template
args:
eventType: web_exception_event
stages: # Overwrite stage 4
- id: "4"
type: builtin/write_to_kafka
options:
kafka:
topic: footprint-exception
source_id: "3"
tracking.v1.web_alert_event:
include: tracking-pipeline-template
args:
eventType: web_alert_event
List builtin stage:
- builtin/convert_log_entry_to_protobuf: Marshaler to protobuf
+ input: byte[]
+ output: protobuf
- builtin/convert_protobuf_to_avro:
+ input: protobuf
+ output: avro
- builtin/convert_protobuf_to_json
+ input: protobuf
+ output: json
- builtin/convert_data_to_byte_array
+ input: any
+ output: byte[]
- builtin/validate_protobuf
+ input: protobuf
+ output: protobuf
- builtin/write_to_elasticsearch
+ input: json
+ output: nil
- builtin/write_to_kafka
+ input: byte[]
+ output: nil
- builtin/write_to_scylladb
+ input: byte[]
+ output: nil
Getting Help
Please contact to my team for supporting . We use the GitLab issues for tracking bugs and feature requests.
Opening Issues
If you encounter a bug with the Footprint SDK for Go we would like to hear about it. Search the [existing issues][issues] and see if others are also experiencing the issue before opening a new issue. Please include the version of Footprint SDK for Go, Go language, and OS you’re using. Please also include reproduction case when appropriate.
The GitLab issues are intended for bug reports and feature requests. For help and questions with using Footprint SDK for Go.