diff --git a/cmd/fermentord/loop.go b/cmd/fermentord/loop.go index 272f499..2d95bb3 100644 --- a/cmd/fermentord/loop.go +++ b/cmd/fermentord/loop.go @@ -13,7 +13,6 @@ import ( "git.joco.dk/sng/fermentord/internal/lcd" "git.joco.dk/sng/fermentord/pkg/temperature" "git.joco.dk/sng/fermentord/pkg/tilt" - "github.com/fsnotify/fsnotify" "github.com/getsentry/sentry-go" "github.com/nats-io/nats.go" "github.com/spf13/viper" @@ -36,24 +35,18 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) { // Controller config := configuration.Global() ctrl := controllers.NewChamberController(config) + chn := configuration.NewChangeNotifier() + viper.OnConfigChange(chn.OnConfigChange) - // Configuration reload - loadConfiguration := func() { - cfg := configuration.Global() - temperature.ConfigUpdate <- cfg - ctrl.ConfigUpdates <- cfg - //display.SetSetpointTemp(config.FermentationTemperature) - } - - viper.OnConfigChange(func(in fsnotify.Event) { - log.Print("Reloading configuration") - loadConfiguration() - }) - loadConfiguration() + // Configuration updates + chn.Subscribe(temperature.ConfigUpdate) + chn.Subscribe(ctrl.ConfigUpdates) + //chn.Subscribe(display.ConfiguUpdate) + chn.Notify(configuration.Global()) viper.WatchConfig() // NATS - ingest := dwingest.NewDWIngest(js) + ingest := dwingest.NewDWIngest(js, config) gpio, err := hw.NewGpio() if err != nil { @@ -65,7 +58,7 @@ func mainLoop(ctx context.Context, wg *sync.WaitGroup, js nats.JetStream) { wg.Add(5) go display.Run(ctx, wg) go ctrl.Run(ctx, wg) - go ingest.Run(ctx, wg, config) + go ingest.Run(ctx, wg) go temperature.PollSensors(ctx, wg, 1*time.Second, config.Sensors.Weight) go tilt.PollSensors(ctx, wg, 1*time.Minute, 20*time.Second) diff --git a/internal/configuration/observer.go b/internal/configuration/observer.go new file mode 100644 index 0000000..00d0b6d --- /dev/null +++ b/internal/configuration/observer.go @@ -0,0 +1,36 @@ +package configuration + +import ( + "log" + + "github.com/fsnotify/fsnotify" +) + +type ChangeNotifier struct { + channels []chan<- Configuration +} + +func NewChangeNotifier() *ChangeNotifier { + return &ChangeNotifier{ + channels: make([]chan<- Configuration, 0), + } +} + +func (p *ChangeNotifier) Subscribe(ch chan<- Configuration) { + p.channels = append(p.channels, ch) +} + +func (p *ChangeNotifier) Notify(config Configuration) { + for _, ch := range p.channels { + select { + case ch <- config: + default: + } + } +} + +func (p *ChangeNotifier) OnConfigChange(in fsnotify.Event) { + log.Print("Reloading configuration") + config := Global() + p.Notify(config) +} diff --git a/internal/dwingest/nats.go b/internal/dwingest/nats.go index 3c86947..78be56a 100644 --- a/internal/dwingest/nats.go +++ b/internal/dwingest/nats.go @@ -18,20 +18,25 @@ import ( ) type DWIngest struct { + ConfigUpdates chan configuration.Configuration + chTemperatureReading chan temperature.TemperatureReading chState chan controllers.ChamberState js nats.JetStream + config configuration.Configuration hub *sentry.Hub serial int64 } -func NewDWIngest(js nats.JetStream) *DWIngest { +func NewDWIngest(js nats.JetStream, config configuration.Configuration) *DWIngest { return &DWIngest{ chTemperatureReading: make(chan temperature.TemperatureReading, 3600), chState: make(chan controllers.ChamberState, 100), hub: sentry.CurrentHub().Clone(), js: js, + config: config, + ConfigUpdates: make(chan configuration.Configuration, 1), } } @@ -59,29 +64,32 @@ func (p *DWIngest) AddState(state controllers.ChamberState) { } } -func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup, config configuration.Configuration) { +func (p *DWIngest) Run(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() defer p.hub.Flush(10 * time.Second) for { select { case reading := <-p.chTemperatureReading: - p.Publish(config.NATS.Subject.Temp, reading) + p.Publish(p.config.NATS.Subject.Temp, reading) case state := <-p.chState: - p.Publish(config.NATS.Subject.State, State{ + p.Publish(p.config.NATS.Subject.State, State{ Time: time.Now().UTC(), State: controllers.ChamberStateMap[state], }) case t := <-tilt.C: - p.Publish(config.NATS.Subject.Tilt, Tilt{ + p.Publish(p.config.NATS.Subject.Tilt, Tilt{ Time: time.Now().UTC(), Color: string(t.Color()), Gravity: t.Gravity(), Temperature: t.Celsius(), }) + case config := <-p.ConfigUpdates: + p.config = config + case <-ctx.Done(): return }