fermentord/internal/dwingest/nats.go
Søren Rasmussen e25905c20f
All checks were successful
continuous-integration/drone/push Build is passing
Reset 1-wire bus when temperature readings fail 60 consecutive times
2022-08-01 20:40:32 +02:00

114 lines
2.5 KiB
Go

package dwingest
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/pkg/temperature"
"git.joco.dk/sng/fermentord/pkg/tilt"
"github.com/getsentry/sentry-go"
"github.com/gofrs/uuid"
"github.com/nats-io/nats.go"
)
type DWIngest struct {
chTemperatureReading chan temperature.TemperatureReading
chState chan controllers.ChamberState
hub *sentry.Hub
serial int64
}
func NewDWIngest() *DWIngest {
return &DWIngest{
chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
chState: make(chan controllers.ChamberState, 100),
hub: sentry.CurrentHub().Clone(),
}
}
func (p *DWIngest) AddReading(reading temperature.TemperatureReading) {
select {
case p.chTemperatureReading <- reading:
break
default:
err := fmt.Errorf("channel overflow on dwingest temperature channel")
p.hub.CaptureException(err)
log.Print(err)
}
}
func (p *DWIngest) AddState(state controllers.ChamberState) {
select {
case p.chState <- state:
break
default:
err := fmt.Errorf("channel overflow on dwingest state channel")
p.hub.CaptureException(err)
log.Print(err)
}
}
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) {
defer wg.Done()
defer p.hub.Flush(10 * time.Second)
for {
select {
case reading := <-p.chTemperatureReading:
p.Publish(config.NATS.Subject.Temp, js, reading)
case state := <-p.chState:
p.Publish(config.NATS.Subject.State, js, State{
Time: time.Now().UTC(),
State: controllers.ChamberStateMap[state],
})
case t := <-tilt.C:
p.Publish(config.NATS.Subject.Tilt, js, Tilt{
Time: time.Now().UTC(),
Color: string(t.Color()),
Gravity: t.Gravity(),
Temperature: t.Celsius(),
})
case <-ctx.Done():
return
}
}
}
func (p *DWIngest) Publish(subject string, js nats.JetStream, reading any) error {
b, err := json.Marshal(reading)
if err != nil {
p.hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
return err
}
msgID, err := uuid.NewV7(uuid.MillisecondPrecision)
if err != nil {
p.hub.CaptureException(err)
log.Print(err)
return err
}
msg := nats.NewMsg(subject)
msg.Header.Add("Nats-Msg-Id", msgID.String())
msg.Data = b
_, err = js.PublishMsg(msg, nats.AckWait(5*time.Second))
if err != nil {
p.hub.CaptureException(err)
log.Print(err)
return err
}
return nil
}