From 76b83896ed2c795667958abdd1d51987e622a841 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Rasmussen?= Date: Fri, 29 Jul 2022 23:53:55 +0200 Subject: [PATCH] Add NATS message header --- go.mod | 1 + go.sum | 2 ++ internal/dwingest/nats.go | 30 +++++++++++++++++++++--------- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index eca30d4..8419a3d 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/d2r2/go-i2c v0.0.0-20191123181816-73a8a799d6bc github.com/fsnotify/fsnotify v1.5.4 github.com/getsentry/sentry-go v0.13.0 + github.com/gofrs/uuid v4.2.0+incompatible github.com/nats-io/nats.go v1.16.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.2 diff --git a/go.sum b/go.sum index 8ea888e..26b64a4 100644 --- a/go.sum +++ b/go.sum @@ -97,6 +97,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0= +github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= diff --git a/internal/dwingest/nats.go b/internal/dwingest/nats.go index af78d34..532714c 100644 --- a/internal/dwingest/nats.go +++ b/internal/dwingest/nats.go @@ -13,6 +13,7 @@ import ( "git.joco.dk/sng/fermentord/pkg/temperature" "git.joco.dk/sng/fermentord/pkg/tilt" "github.com/getsentry/sentry-go" + "github.com/gofrs/uuid" "github.com/nats-io/nats.go" ) @@ -20,7 +21,8 @@ type DWIngest struct { chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState - hub *sentry.Hub + hub *sentry.Hub + serial int64 } func NewDWIngest() *DWIngest { @@ -62,16 +64,16 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea for { select { case reading := <-p.chTemperatureReading: - publish(config.NATS.Subject.Temp, js, p.hub, reading) + p.publish(config.NATS.Subject.Temp, js, reading) case state := <-p.chState: - publish(config.NATS.Subject.State, js, p.hub, State{ + p.publish(config.NATS.Subject.State, js, State{ Time: time.Now().UTC(), State: controllers.ChamberStateMap[state], }) case t := <-tilt.C: - publish(config.NATS.Subject.Tilt, js, p.hub, Tilt{ + p.publish(config.NATS.Subject.Tilt, js, Tilt{ Time: time.Now().UTC(), Color: string(t.Color()), Gravity: t.Gravity(), @@ -84,19 +86,29 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea } } -func publish(subject string, js nats.JetStream, hub *sentry.Hub, reading any) error { +func (p *DWIngest) publish(subject string, js nats.JetStream, reading any) error { b, err := json.Marshal(reading) if err != nil { - hub.CaptureException(err) + p.hub.CaptureException(err) log.Printf("Error marshaling JSON: %v", err) return err } - _, err = js.Publish(subject, b, nats.AckWait(5*time.Second)) + msgID, err := uuid.NewV7(uuid.MillisecondPrecision) if err != nil { - hub.CaptureException(err) + p.hub.CaptureException(err) log.Print(err) + return err + } + msg := nats.NewMsg(subject) + msg.Header.Add("Nats-Msg-Id", msgID.String()) + msg.Data = b + _, err = js.PublishMsg(msg, nats.AckWait(5*time.Second)) + if err != nil { + p.hub.CaptureException(err) + log.Print(err) + return err } - return err + return nil }