Refactor dwingest

This commit is contained in:
Søren Rasmussen 2022-08-02 07:04:47 +02:00
parent 2c8db8ff0b
commit 613f7b6455
2 changed files with 13 additions and 11 deletions

View file

@ -51,7 +51,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
viper.WatchConfig() viper.WatchConfig()
// NATS // NATS
ingest := dwingest.NewDWIngest() ingest := dwingest.NewDWIngest(js)
gpio, err := hw.NewGpio() gpio, err := hw.NewGpio()
if err != nil { if err != nil {
@ -63,7 +63,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
wg.Add(5) wg.Add(5)
go display.Run(ctx, wg) go display.Run(ctx, wg)
go ctrl.Run(ctx, wg) go ctrl.Run(ctx, wg)
go ingest.Run(ctx, wg, js, config) go ingest.Run(ctx, wg, config)
go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight) go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight)
go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second) go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second)
@ -89,7 +89,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
Time: time.Now().UTC(), Time: time.Now().UTC(),
Event: "ONEWIRE_RESET", Event: "ONEWIRE_RESET",
} }
if err := ingest.Publish(config.NATS.Subject.Event, js, e); err != nil { if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil {
hub.CaptureException(err) hub.CaptureException(err)
log.Print(err) log.Print(err)
} }
@ -124,7 +124,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
Time: time.Now().UTC(), Time: time.Now().UTC(),
Event: evs, Event: evs,
} }
if err := ingest.Publish(config.NATS.Subject.Event, js, e); err != nil { if err := ingest.Publish(config.NATS.Subject.Event, e); err != nil {
hub.CaptureException(err) hub.CaptureException(err)
log.Print(err) log.Print(err)
} }

View file

@ -21,15 +21,17 @@ type DWIngest struct {
chTemperatureReading chan temperature.TemperatureReading chTemperatureReading chan temperature.TemperatureReading
chState chan controllers.ChamberState chState chan controllers.ChamberState
js nats.JetStream
hub *sentry.Hub hub *sentry.Hub
serial int64 serial int64
} }
func NewDWIngest() *DWIngest { func NewDWIngest(js nats.JetStream) *DWIngest {
return &DWIngest{ return &DWIngest{
chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
chState: make(chan controllers.ChamberState, 100), chState: make(chan controllers.ChamberState, 100),
hub: sentry.CurrentHub().Clone(), hub: sentry.CurrentHub().Clone(),
js: js,
} }
} }
@ -57,23 +59,23 @@ func (p *DWIngest) AddState(state controllers.ChamberState) {
} }
} }
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.Configuration) { func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, config *configuration.Configuration) {
defer wg.Done() defer wg.Done()
defer p.hub.Flush(10 * time.Second) defer p.hub.Flush(10 * time.Second)
for { for {
select { select {
case reading := <-p.chTemperatureReading: case reading := <-p.chTemperatureReading:
p.Publish(config.NATS.Subject.Temp, js, reading) p.Publish(config.NATS.Subject.Temp, reading)
case state := <-p.chState: case state := <-p.chState:
p.Publish(config.NATS.Subject.State, js, State{ p.Publish(config.NATS.Subject.State, State{
Time: time.Now().UTC(), Time: time.Now().UTC(),
State: controllers.ChamberStateMap[state], State: controllers.ChamberStateMap[state],
}) })
case t := <-tilt.C: case t := <-tilt.C:
p.Publish(config.NATS.Subject.Tilt, js, Tilt{ p.Publish(config.NATS.Subject.Tilt, Tilt{
Time: time.Now().UTC(), Time: time.Now().UTC(),
Color: string(t.Color()), Color: string(t.Color()),
Gravity: t.Gravity(), Gravity: t.Gravity(),
@ -86,7 +88,7 @@ func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStrea
} }
} }
func (p *DWIngest) Publish(subject string, js nats.JetStream, reading any) error { func (p *DWIngest) Publish(subject string, reading any) error {
b, err := json.Marshal(reading) b, err := json.Marshal(reading)
if err != nil { if err != nil {
p.hub.CaptureException(err) p.hub.CaptureException(err)
@ -103,7 +105,7 @@ func (p *DWIngest) Publish(subject string, js nats.JetStream, reading any) error
msg := nats.NewMsg(subject) msg := nats.NewMsg(subject)
msg.Header.Add("Nats-Msg-Id", msgID.String()) msg.Header.Add("Nats-Msg-Id", msgID.String())
msg.Data = b msg.Data = b
_, err = js.PublishMsg(msg, nats.AckWait(5*time.Second)) _, err = p.js.PublishMsg(msg, nats.AckWait(5*time.Second))
if err != nil { if err != nil {
p.hub.CaptureException(err) p.hub.CaptureException(err)
log.Print(err) log.Print(err)