fermentord/cmd/filer/main.go
2022-03-05 20:42:17 +01:00

126 lines
2.5 KiB
Go

package main
import (
"context"
"errors"
"log"
"sync"
"time"
"git.joco.dk/sng/fermentord/pkg/daemon"
"github.com/getsentry/sentry-go"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/nats-io/nats.go"
)
func serve(ctx context.Context, wg *sync.WaitGroup, sub *nats.Subscription, db *pgxpool.Pool) {
hub := sentry.CurrentHub().Clone()
defer wg.Done()
for {
m, err := sub.NextMsgWithContext(ctx)
if errors.Is(err, nats.ErrInvalidContext) {
return
}
if errors.Is(err, nats.ErrBadSubscription) {
hub.CaptureException(err)
log.Fatal(err)
}
if errors.Is(err, nats.ErrConnectionClosed) {
hub.CaptureException(err)
time.Sleep(10 * time.Second)
continue
}
if errors.Is(err, nats.ErrTimeout) {
hub.CaptureException(err)
log.Print(err)
continue
}
if err != nil {
hub.CaptureException(err)
log.Fatal(err)
}
err = db.BeginFunc(ctx, func(tx pgx.Tx) error {
switch m.Subject {
case "state":
q := "INSERT INTO fermentor.state (c_time, state) VALUES ($1, $2)"
// TODO Unmarshal JSON.
if _, err := tx.Query(ctx, q, ); err != nil {
return err
}
case "temp":
q := "INSERT INTO fermentor.temperature_reading (c_time, sensor, millis) VALUES ($1, $2, $3)"
// TODO Unmarshal JSON.
if _, err := tx.Query(ctx, q); err != nil {
return err
}
}
if err := tx.Commit(ctx); err != nil {
return err
}
return nil
})
if err != nil {
hub.CaptureException(err)
log.Print(err)
continue
}
if err := m.AckSync(); err != nil {
hub.CaptureException(err)
log.Print(err)
continue
}
}
}
func main() {
// Sentry
err := sentry.Init(sentry.ClientOptions{
Dsn: "https://7278625538334140991ce433e0ad292f@sentry.joco.dk/24",
TracesSampleRate: 1.0,
})
if err != nil {
log.Fatal(err)
}
defer sentry.Flush(10 * time.Second)
// NATS
nc, err := nats.Connect("nats://nats.service.consul")
if err != nil {
log.Fatal(err)
}
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatal(err)
}
sub, err := js.SubscribeSync("FERMENTOR.*", nats.Durable("filer"))
if err != nil {
log.Fatal(err)
}
ctx, shutdown := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
db, err := pgxpool.Connect(ctx, "pgdb1.service.s18.consul")
if err != nil {
log.Fatal(err)
}
wg.Add(1)
log.Print("Ready")
go serve(ctx, wg, sub, db)
daemon.WaitForSignalsDefault()
log.Print("Shutting down")
shutdown()
log.Print("Shutdown complete")
}