Add support for dwingest

This commit is contained in:
Søren Rasmussen 2022-07-19 11:11:50 +02:00
parent cee6149f70
commit 13f6e361c1
2 changed files with 81 additions and 22 deletions

View file

@ -2,7 +2,6 @@ package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
@ -12,6 +11,7 @@ import (
"git.joco.dk/sng/fermentord/internal/api"
"git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/internal/dwingest"
"git.joco.dk/sng/fermentord/internal/hw"
"git.joco.dk/sng/fermentord/internal/metrics"
"git.joco.dk/sng/fermentord/pkg/daemon"
@ -33,6 +33,9 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
defer close(chCtrlTemp)
ctrl := controllers.NewChamberController("Chamber 1", *config, chCtrlTemp)
// NATS
ingest := dwingest.NewDWIngest()
// Configuration reload
viper.OnConfigChange(func(in fsnotify.Event) {
controllers.ReloadConfiguration(config, ctrl)
@ -47,28 +50,16 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
}
defer gpio.Close()
wg.Add(2)
wg.Add(3)
go ctrl.Run(ctx, wg)
go ingest.Run(ctx, wg, js, config)
go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight)
for {
select {
case reading := <-temperature.C:
chCtrlTemp <- reading
b, err := json.Marshal(reading)
if err != nil {
hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
break
}
// Publish to NATS
_, err = js.Publish(config.NATS.Subject.Temp, b)
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
ingest.AddReading(reading)
case state := <-ctrl.C:
switch state {
@ -91,12 +82,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config
gpio.StartHeater()
}
// Publish to NATS
_, err = js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state]))
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
ingest.AddState(state)
case <-ctx.Done():
return

73
internal/dwingest/nats.go Normal file
View file

@ -0,0 +1,73 @@
package dwingest
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"git.joco.dk/sng/fermentord/internal/configuration"
"git.joco.dk/sng/fermentord/internal/controllers"
"git.joco.dk/sng/fermentord/pkg/temperature"
"github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go"
)
type DWIngest struct {
chTemperatureReading chan temperature.TemperatureReading
chState chan controllers.ChamberState
}
func NewDWIngest() *DWIngest {
return &DWIngest{
chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
chState: make(chan controllers.ChamberState, 100),
}
}
func (p *DWIngest) AddReading(reading temperature.TemperatureReading) {
p.chTemperatureReading <- reading
}
func (p *DWIngest) AddState(state controllers.ChamberState) {
p.chState <- state
}
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream, config *configuration.ControllerConfig) {
hub := sentry.CurrentHub().Clone()
defer hub.Flush(10 * time.Second)
defer wg.Done()
for {
select {
case reading := <-p.chTemperatureReading:
b, err := json.Marshal(reading)
if err != nil {
hub.CaptureException(err)
log.Printf("Error marshaling JSON: %v", err)
break
}
_, err = js.Publish(config.NATS.Subject.Temp, b)
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
case state := <-p.chState:
_, err := js.Publish(config.NATS.Subject.State, []byte(controllers.ChamberStateMap[state]))
if err != nil {
hub.CaptureException(err)
log.Printf("Error publishing to NATS: %v", err)
}
case <-ctx.Done():
close(p.chTemperatureReading)
close(p.chState)
return
}
}
}