Add NATS message header
This commit is contained in:
parent
609fdfb31a
commit
76b83896ed
3 changed files with 24 additions and 9 deletions
1
go.mod
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
||||||
github.com/d2r2/go-i2c v0.0.0-20191123181816-73a8a799d6bc
|
github.com/d2r2/go-i2c v0.0.0-20191123181816-73a8a799d6bc
|
||||||
github.com/fsnotify/fsnotify v1.5.4
|
github.com/fsnotify/fsnotify v1.5.4
|
||||||
github.com/getsentry/sentry-go v0.13.0
|
github.com/getsentry/sentry-go v0.13.0
|
||||||
|
github.com/gofrs/uuid v4.2.0+incompatible
|
||||||
github.com/nats-io/nats.go v1.16.0
|
github.com/nats-io/nats.go v1.16.0
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/prometheus/client_golang v1.12.2
|
github.com/prometheus/client_golang v1.12.2
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -97,6 +97,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
|
||||||
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
|
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
|
||||||
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||||
|
github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0=
|
||||||
|
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||||
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
|
||||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"git.joco.dk/sng/fermentord/pkg/temperature"
|
"git.joco.dk/sng/fermentord/pkg/temperature"
|
||||||
"git.joco.dk/sng/fermentord/pkg/tilt"
|
"git.joco.dk/sng/fermentord/pkg/tilt"
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
"github.com/gofrs/uuid"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,6 +22,7 @@ type DWIngest struct {
|
||||||
chState chan controllers.ChamberState
|
chState chan controllers.ChamberState
|
||||||
|
|
||||||
hub *sentry.Hub
|
hub *sentry.Hub
|
||||||
|
serial int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDWIngest() *DWIngest {
|
func NewDWIngest() *DWIngest {
|
||||||
|
@ -62,16 +64,16 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case reading := <-p.chTemperatureReading:
|
case reading := <-p.chTemperatureReading:
|
||||||
publish(config.NATS.Subject.Temp, js, p.hub, reading)
|
p.publish(config.NATS.Subject.Temp, js, reading)
|
||||||
|
|
||||||
case state := <-p.chState:
|
case state := <-p.chState:
|
||||||
publish(config.NATS.Subject.State, js, p.hub, State{
|
p.publish(config.NATS.Subject.State, js, State{
|
||||||
Time: time.Now().UTC(),
|
Time: time.Now().UTC(),
|
||||||
State: controllers.ChamberStateMap[state],
|
State: controllers.ChamberStateMap[state],
|
||||||
})
|
})
|
||||||
|
|
||||||
case t := <-tilt.C:
|
case t := <-tilt.C:
|
||||||
publish(config.NATS.Subject.Tilt, js, p.hub, Tilt{
|
p.publish(config.NATS.Subject.Tilt, js, Tilt{
|
||||||
Time: time.Now().UTC(),
|
Time: time.Now().UTC(),
|
||||||
Color: string(t.Color()),
|
Color: string(t.Color()),
|
||||||
Gravity: t.Gravity(),
|
Gravity: t.Gravity(),
|
||||||
|
@ -84,19 +86,29 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func publish(subject string, js nats.JetStream, hub *sentry.Hub, reading any) error {
|
func (p *DWIngest) publish(subject string, js nats.JetStream, reading any) error {
|
||||||
b, err := json.Marshal(reading)
|
b, err := json.Marshal(reading)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
hub.CaptureException(err)
|
p.hub.CaptureException(err)
|
||||||
log.Printf("Error marshaling JSON: %v", err)
|
log.Printf("Error marshaling JSON: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = js.Publish(subject, b, nats.AckWait(5*time.Second))
|
msgID, err := uuid.NewV7(uuid.MillisecondPrecision)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
hub.CaptureException(err)
|
p.hub.CaptureException(err)
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
msg := nats.NewMsg(subject)
|
||||||
|
msg.Header.Add("Nats-Msg-Id", msgID.String())
|
||||||
|
msg.Data = b
|
||||||
|
_, err = js.PublishMsg(msg, nats.AckWait(5*time.Second))
|
||||||
|
if err != nil {
|
||||||
|
p.hub.CaptureException(err)
|
||||||
|
log.Print(err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue