Use reloaded configuration in dwingest

This commit is contained in:
Søren Rasmussen 2022-08-02 16:04:06 +02:00
parent a5e506b0a5
commit 52b1108798
3 changed files with 58 additions and 21 deletions

View file

@ -13,7 +13,6 @@ import (
"git.joco.dk/sng/fermentord/internal/lcd"
"git.joco.dk/sng/fermentord/pkg/temperature"
"git.joco.dk/sng/fermentord/pkg/tilt"
"github.com/fsnotify/fsnotify"
"github.com/getsentry/sentry-go"
"github.com/nats-io/nats.go"
"github.com/spf13/viper"
@ -36,24 +35,18 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) {
// Controller
config := configuration.Global()
ctrl := controllers.NewChamberController(config)
chn := configuration.NewChangeNotifier()
viper.OnConfigChange(chn.OnConfigChange)
// Configuration reload
loadConfiguration := func() {
cfg := configuration.Global()
temperature.ConfigUpdate <- cfg
ctrl.ConfigUpdates <- cfg
//display.SetSetpointTemp(config.FermentationTemperature)
}
viper.OnConfigChange(func(in fsnotify.Event) {
log.Print("Reloading configuration")
loadConfiguration()
})
loadConfiguration()
// Configuration updates
chn.Subscribe(temperature.ConfigUpdate)
chn.Subscribe(ctrl.ConfigUpdates)
//chn.Subscribe(display.ConfiguUpdate)
chn.Notify(configuration.Global())
viper.WatchConfig()
// NATS
ingest := dwingest.NewDWIngest(js)
ingest := dwingest.NewDWIngest(js, config)
gpio, err := hw.NewGpio()
if err != nil {
@ -65,7 +58,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) {
wg.Add(5)
go display.Run(ctx, wg)
go ctrl.Run(ctx, wg)
go ingest.Run(ctx, wg, config)
go ingest.Run(ctx, wg)
go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight)
go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second)

View file

@ -0,0 +1,36 @@
package configuration
import (
"log"
"github.com/fsnotify/fsnotify"
)
type ChangeNotifier struct {
channels []chan<- Configuration
}
func NewChangeNotifier() *ChangeNotifier {
return &ChangeNotifier{
channels: make([]chan<- Configuration, 0),
}
}
func (p *ChangeNotifier) Subscribe(ch chan<- Configuration) {
p.channels = append(p.channels, ch)
}
func (p *ChangeNotifier) Notify(config Configuration) {
for _, ch := range p.channels {
select {
case ch <- config:
default:
}
}
}
func (p *ChangeNotifier) OnConfigChange(in fsnotify.Event) {
log.Print("Reloading configuration")
config := Global()
p.Notify(config)
}

View file

@ -18,20 +18,25 @@ import (
)
type DWIngest struct {
ConfigUpdates chan configuration.Configuration
chTemperatureReading chan temperature.TemperatureReading
chState chan controllers.ChamberState
js nats.JetStream
config configuration.Configuration
hub *sentry.Hub
serial int64
}
func NewDWIngest(js nats.JetStream) *DWIngest {
func NewDWIngest(js nats.JetStream, config configuration.Configuration) *DWIngest {
return &DWIngest{
chTemperatureReading: make(chan temperature.TemperatureReading, 3600),
chState: make(chan controllers.ChamberState, 100),
hub: sentry.CurrentHub().Clone(),
js: js,
config: config,
ConfigUpdates: make(chan configuration.Configuration, 1),
}
}
@ -59,29 +64,32 @@ func (p *DWIngest) AddState(state controllers.ChamberState) {
}
}
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, config configuration.Configuration) {
func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
defer p.hub.Flush(10 * time.Second)
for {
select {
case reading := <-p.chTemperatureReading:
p.Publish(config.NATS.Subject.Temp, reading)
p.Publish(p.config.NATS.Subject.Temp, reading)
case state := <-p.chState:
p.Publish(config.NATS.Subject.State, State{
p.Publish(p.config.NATS.Subject.State, State{
Time: time.Now().UTC(),
State: controllers.ChamberStateMap[state],
})
case t := <-tilt.C:
p.Publish(config.NATS.Subject.Tilt, Tilt{
p.Publish(p.config.NATS.Subject.Tilt, Tilt{
Time: time.Now().UTC(),
Color: string(t.Color()),
Gravity: t.Gravity(),
Temperature: t.Celsius(),
})
case config := <-p.ConfigUpdates:
p.config = config
case <-ctx.Done():
return
}