127 lines
2.5 KiB
Go
127 lines
2.5 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"log"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"git.joco.dk/sng/fermentord/pkg/daemon"
|
||
|
"github.com/getsentry/sentry-go"
|
||
|
"github.com/jackc/pgx/v4"
|
||
|
"github.com/jackc/pgx/v4/pgxpool"
|
||
|
"github.com/nats-io/nats.go"
|
||
|
)
|
||
|
|
||
|
func serve(ctx context.Context, wg *sync.WaitGroup, sub *nats.Subscription, db *pgxpool.Pool) {
|
||
|
hub := sentry.CurrentHub().Clone()
|
||
|
defer wg.Done()
|
||
|
|
||
|
for {
|
||
|
m, err := sub.NextMsgWithContext(ctx)
|
||
|
if errors.Is(err, nats.ErrInvalidContext) {
|
||
|
return
|
||
|
}
|
||
|
if errors.Is(err, nats.ErrBadSubscription) {
|
||
|
hub.CaptureException(err)
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
if errors.Is(err, nats.ErrConnectionClosed) {
|
||
|
hub.CaptureException(err)
|
||
|
time.Sleep(10 * time.Second)
|
||
|
continue
|
||
|
}
|
||
|
if errors.Is(err, nats.ErrTimeout) {
|
||
|
hub.CaptureException(err)
|
||
|
log.Print(err)
|
||
|
continue
|
||
|
}
|
||
|
if err != nil {
|
||
|
hub.CaptureException(err)
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
err = db.BeginFunc(ctx, func(tx pgx.Tx) error {
|
||
|
switch m.Subject {
|
||
|
case "state":
|
||
|
q := "INSERT INTO fermentor.state (c_time, state) VALUES ($1, $2)"
|
||
|
// TODO Unmarshal JSON.
|
||
|
if _, err := tx.Query(ctx, q, ); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
case "temp":
|
||
|
q := "INSERT INTO fermentor.temperature_reading (c_time, sensor, millis) VALUES ($1, $2, $3)"
|
||
|
// TODO Unmarshal JSON.
|
||
|
if _, err := tx.Query(ctx, q); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := tx.Commit(ctx); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
if err != nil {
|
||
|
hub.CaptureException(err)
|
||
|
log.Print(err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if err := m.AckSync(); err != nil {
|
||
|
hub.CaptureException(err)
|
||
|
log.Print(err)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func main() {
|
||
|
// Sentry
|
||
|
err := sentry.Init(sentry.ClientOptions{
|
||
|
Dsn: "https://7278625538334140991ce433e0ad292f@sentry.joco.dk/24",
|
||
|
TracesSampleRate: 1.0,
|
||
|
})
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
defer sentry.Flush(10 * time.Second)
|
||
|
|
||
|
// NATS
|
||
|
nc, err := nats.Connect("nats://nats.service.consul")
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
sub, err := js.SubscribeSync("FERMENTOR.*", nats.Durable("filer"))
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
ctx, shutdown := context.WithCancel(context.Background())
|
||
|
wg := &sync.WaitGroup{}
|
||
|
|
||
|
db, err := pgxpool.Connect(ctx, "pgdb1.service.s18.consul")
|
||
|
if err != nil {
|
||
|
log.Fatal(err)
|
||
|
}
|
||
|
|
||
|
wg.Add(1)
|
||
|
log.Print("Ready")
|
||
|
go serve(ctx, wg, sub, db)
|
||
|
daemon.WaitForSignalsDefault()
|
||
|
|
||
|
log.Print("Shutting down")
|
||
|
shutdown()
|
||
|
log.Print("Shutdown complete")
|
||
|
|
||
|
}
|