diff --git a/.gitignore b/.gitignore index 0400075052efd799519ba894abd902d40b3cec96..68f09f4e59c1daf80444590c9f0eab52ab3c6eec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +!.gitkeep .env boiler.csv -boiler.txt \ No newline at end of file +boiler.txt +queues/ \ No newline at end of file diff --git a/go.mod b/go.mod index 1bd3fb7df57d4d64773415ffa70b0279a800ea12..f1f7efa8326aeda20193ef01614ad175bf2918b4 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/glebarez/sqlite v1.11.0 github.com/joho/godotenv v1.5.1 github.com/nekomeowww/gorm-logger-logrus v1.0.8 - github.com/pebbe/zmq4 v1.2.11 github.com/sirupsen/logrus v1.9.3 github.com/wind-c/comqtt/v2 v2.6.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 5e8735709c87685a3114d07a1483917336db330c..2713a237a4659fe702ee56a9d20f7b7dc0f5f425 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY= github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA= -github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= -github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/main.go b/main.go index 46e6f5310a561a2524a1de2e4bca44550b8b312b..1c83f0939155683b2d5c3cda3abd891478598577 100644 --- a/main.go +++ b/main.go @@ -10,14 +10,13 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/messageworker" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttserver" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" + "jonasled.dev/jonasled/ems-esp-logger/queue" ) func main() { log.Init() - zeromq.Init() - defer zeromq.Pusher.Close() + queue.Init() if os.Getenv("OUTPUT_DATABSE") != "" { database.Init() diff --git a/messageworker/main.go b/messageworker/main.go index 828136435223360f55d9cec9e3894105da2afa63..f7b6a93ffb9f31dc07a5ffecc6d673666ef5788e 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -2,26 +2,26 @@ package messageworker import ( "encoding/json" - "os" + "time" - zmq "github.com/pebbe/zmq4" "jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/database/tables" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func Run() { - worker, _ := zmq.NewSocket(zmq.PULL) - defer worker.Close() - worker.Connect(os.Getenv("ZEROMQ_WORKER")) for { - taskData, _ := worker.Recv(0) + taskData, err := queue.MainQueue.Dequeue() + if err != nil { + time.Sleep(time.Second) + continue + } log.Log.Debug("Received new task: ", taskData) var task types.Task - err := json.Unmarshal([]byte(taskData), &task) + err = json.Unmarshal([]byte(taskData), &task) if err != nil { log.Log.Error("Failed decoding task as JSON: ", err.Error()) continue @@ -30,7 +30,7 @@ func Run() { err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) - zeromq.Pusher.Send(taskData, 0) + queue.MainQueue.Enqueue(taskData) continue } @@ -38,7 +38,7 @@ func Run() { err = json.Unmarshal([]byte(task.Data), &jsonData) if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) - zeromq.Pusher.Send(taskData, 0) + queue.MainQueue.Enqueue(taskData) continue } for key, value := range jsonData { diff --git a/mqttclient/main.go b/mqttclient/main.go index 6207aa50d20f117d82af96d217211457a1a7ec71..268565e940fc826fe03f31bcfa7d560a64e3ea17 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -10,8 +10,8 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { @@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m if err != nil { log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) } - zeromq.Pusher.Send(string(taskData), 0) + queue.MainQueue.Enqueue(string(taskData)) } } diff --git a/queue/main.go b/queue/main.go new file mode 100644 index 0000000000000000000000000000000000000000..af4deda4f2727d500d85c5946c36dbc4f88a9fbd --- /dev/null +++ b/queue/main.go @@ -0,0 +1,35 @@ +package queue + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/wind-c/comqtt/v2/cluster/log" +) + +var MainQueue Queue + +func Init() { + queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json") + if err != nil { + log.Fatal("Error initializing main queue:", err) + } + + // Set up signal handling for clean exit + exitChan := make(chan os.Signal, 1) + signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-exitChan + fmt.Println("\nSaving main queue state before exiting...") + if err := queue.Save(); err != nil { + fmt.Println("Error saving main queue state:", err) + } else { + fmt.Println("Main queue state saved successfully.") + } + os.Exit(0) + }() + MainQueue = *queue +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000000000000000000000000000000000000..4bd627ce52db7ead5ab9f64a9e0aaa8263b3d073 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,55 @@ +package queue + +import ( + "encoding/json" + "fmt" + "os" +) + +type Queue struct { + Elements []string + FilePath string +} + +func NewQueue(filePath string) (*Queue, error) { + q := &Queue{ + Elements: make([]string, 0), + FilePath: filePath, + } + + // Load state from file + if _, err := os.Stat(filePath); err == nil { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("error reading state file: %w", err) + } + if err := json.Unmarshal(data, &q.Elements); err != nil { + return nil, fmt.Errorf("error parsing state file: %w", err) + } + } + return q, nil +} + +func (q *Queue) Enqueue(element string) { + q.Elements = append(q.Elements, element) +} + +func (q *Queue) Dequeue() (string, error) { + if len(q.Elements) == 0 { + return "", fmt.Errorf("queue is empty") + } + element := q.Elements[0] + q.Elements = q.Elements[1:] + return element, nil +} + +func (q *Queue) Save() error { + data, err := json.Marshal(q.Elements) + if err != nil { + return fmt.Errorf("error serializing state: %w", err) + } + if err := os.WriteFile(q.FilePath, data, 0644); err != nil { + return fmt.Errorf("error writing state to file: %w", err) + } + return nil +} diff --git a/zeromq/main.go b/zeromq/main.go deleted file mode 100644 index d27d819f88324b4f886f248852a7836b9df7b3ce..0000000000000000000000000000000000000000 --- a/zeromq/main.go +++ /dev/null @@ -1,20 +0,0 @@ -package zeromq - -import ( - "os" - - zmq "github.com/pebbe/zmq4" - "jonasled.dev/jonasled/ems-esp-logger/log" -) - -var Pusher *zmq.Socket - -func Init() { - var err error - Pusher, err = zmq.NewSocket(zmq.PUSH) - if err != nil { - log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error()) - } - Pusher.Bind(os.Getenv("ZEROMQ_BIND")) - log.Log.Info("Successfully initialized Message broker") -}