Skip to content

kinesis-producer-go/kinesis-producer

Repository files navigation

Amazon kinesis producer CI status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official AWS SDK for Go V2 and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
	"context"
	"log"
	"net/http"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/kinesis"
	"github.com/kinesis-producer-go/kinesis-producer"
)

func main() {
	transport := http.DefaultTransport.(*http.Transport).Clone()
	transport.MaxIdleConns = 20
	transport.MaxIdleConnsPerHost = 20
	httpClient := &http.Client{
		Transport: transport,
	}
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-west-2"), config.WithHTTPClient(httpClient))
	if err != nil {
		log.Fatalf("unable to load SDK config, %v", err)
	}
	client := kinesis.NewFromConfig(cfg)
	pr := producer.New(&producer.Config{
		StreamName:   aws.String("test"),
		BacklogCount: 2000,
		Client:       client,
	})

	pr.Start()

	// Handle failures
	go func() {
		for r := range pr.NotifyFailures() {
			// r contains `Data`, `PartitionKey` and `Error()`
			log.Printf("failure record: %+v\n", r)
		}
	}()

	go func() {
		for i := 0; i < 5000; i++ {
			err := pr.Put([]byte("foo"))
			if err != nil {
				log.Printf("error producing: %+v\n", err)
				time.Sleep(1 * time.Second)
			}
		}
	}()

	time.Sleep(1 * time.Minute)
	pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger handler
    logger := slog.New(
        slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
            Level: slog.LevelError,
        }),
    )
    pr := producer.New(&producer.Config{
        StreamName:   aws.String("test"),
        BacklogCount: 2000,
        Client:       client,
        Logger:       logger,
    })

Using logrus

import (
	"github.com/kinesis-producer-go/kinesis-producer"
	sloglogrus "github.com/samber/slog-logrus/v2"
	"github.com/sirupsen/logrus"
)

    logrusLogger := logrus.New()
    logger := slog.New(sloglogrus.Option{Level: slog.LevelError, Logger: logrusLogger}.NewLogrusHandler())

    pr := producer.New(&producer.Config{
        StreamName:   aws.String("test"),
        BacklogCount: 2000,
        Client:       client,
        Logger:       logger,
    })

License

MIT

About

An aggregated records producer for Amazon Kinesis

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published