From 09243f2820ca47a09894479149bdb36b0b336349 Mon Sep 17 00:00:00 2001 From: Jonas Leder <jonas@jonasled.de> Date: Sun, 26 Jan 2025 14:39:02 +0100 Subject: [PATCH] remove zeromq --- .gitignore | 4 +++- go.mod | 1 - go.sum | 2 -- main.go | 5 ++-- messageworker/main.go | 20 ++++++++-------- mqttclient/main.go | 4 ++-- queue/main.go | 35 +++++++++++++++++++++++++++ queue/queue.go | 55 +++++++++++++++++++++++++++++++++++++++++++ zeromq/main.go | 20 ---------------- 9 files changed, 107 insertions(+), 39 deletions(-) create mode 100644 queue/main.go create mode 100644 queue/queue.go delete mode 100644 zeromq/main.go diff --git a/.gitignore b/.gitignore index 0400075..68f09f4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +!.gitkeep .env boiler.csv -boiler.txt \ No newline at end of file +boiler.txt +queues/ \ No newline at end of file diff --git a/go.mod b/go.mod index 1bd3fb7..f1f7efa 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/glebarez/sqlite v1.11.0 github.com/joho/godotenv v1.5.1 github.com/nekomeowww/gorm-logger-logrus v1.0.8 - github.com/pebbe/zmq4 v1.2.11 github.com/sirupsen/logrus v1.9.3 github.com/wind-c/comqtt/v2 v2.6.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 5e87357..2713a23 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY= github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA= -github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q= -github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= diff --git a/main.go b/main.go index 46e6f53..1c83f09 100644 --- a/main.go +++ b/main.go @@ -10,14 +10,13 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/messageworker" "jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttserver" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" + "jonasled.dev/jonasled/ems-esp-logger/queue" ) func main() { log.Init() - zeromq.Init() - defer zeromq.Pusher.Close() + queue.Init() if os.Getenv("OUTPUT_DATABSE") != "" { database.Init() diff --git a/messageworker/main.go b/messageworker/main.go index 8281364..f7b6a93 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -2,26 +2,26 @@ package messageworker import ( "encoding/json" - "os" + "time" - zmq "github.com/pebbe/zmq4" "jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/database/tables" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) func Run() { - worker, _ := zmq.NewSocket(zmq.PULL) - defer worker.Close() - worker.Connect(os.Getenv("ZEROMQ_WORKER")) for { - taskData, _ := worker.Recv(0) + taskData, err := queue.MainQueue.Dequeue() + if err != nil { + time.Sleep(time.Second) + continue + } log.Log.Debug("Received new task: ", taskData) var task types.Task - err := json.Unmarshal([]byte(taskData), &task) + err = json.Unmarshal([]byte(taskData), &task) if err != nil { log.Log.Error("Failed decoding task as JSON: ", err.Error()) continue @@ -30,7 +30,7 @@ func Run() { err = database.Db.Where("name = ?", task.Instance).First(&instance).Error if err != nil { log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) - zeromq.Pusher.Send(taskData, 0) + queue.MainQueue.Enqueue(taskData) continue } @@ -38,7 +38,7 @@ func Run() { err = json.Unmarshal([]byte(task.Data), &jsonData) if err != nil { log.Log.Error("Failed decoding boiler JSON: ", err.Error()) - zeromq.Pusher.Send(taskData, 0) + queue.MainQueue.Enqueue(taskData) continue } for key, value := range jsonData { diff --git a/mqttclient/main.go b/mqttclient/main.go index 6207aa5..268565e 100644 --- a/mqttclient/main.go +++ b/mqttclient/main.go @@ -10,8 +10,8 @@ import ( "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" + "jonasled.dev/jonasled/ems-esp-logger/queue" "jonasled.dev/jonasled/ems-esp-logger/types" - "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { @@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m if err != nil { log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) } - zeromq.Pusher.Send(string(taskData), 0) + queue.MainQueue.Enqueue(string(taskData)) } } diff --git a/queue/main.go b/queue/main.go new file mode 100644 index 0000000..af4deda --- /dev/null +++ b/queue/main.go @@ -0,0 +1,35 @@ +package queue + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/wind-c/comqtt/v2/cluster/log" +) + +var MainQueue Queue + +func Init() { + queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json") + if err != nil { + log.Fatal("Error initializing main queue:", err) + } + + // Set up signal handling for clean exit + exitChan := make(chan os.Signal, 1) + signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-exitChan + fmt.Println("\nSaving main queue state before exiting...") + if err := queue.Save(); err != nil { + fmt.Println("Error saving main queue state:", err) + } else { + fmt.Println("Main queue state saved successfully.") + } + os.Exit(0) + }() + MainQueue = *queue +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..4bd627c --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,55 @@ +package queue + +import ( + "encoding/json" + "fmt" + "os" +) + +type Queue struct { + Elements []string + FilePath string +} + +func NewQueue(filePath string) (*Queue, error) { + q := &Queue{ + Elements: make([]string, 0), + FilePath: filePath, + } + + // Load state from file + if _, err := os.Stat(filePath); err == nil { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("error reading state file: %w", err) + } + if err := json.Unmarshal(data, &q.Elements); err != nil { + return nil, fmt.Errorf("error parsing state file: %w", err) + } + } + return q, nil +} + +func (q *Queue) Enqueue(element string) { + q.Elements = append(q.Elements, element) +} + +func (q *Queue) Dequeue() (string, error) { + if len(q.Elements) == 0 { + return "", fmt.Errorf("queue is empty") + } + element := q.Elements[0] + q.Elements = q.Elements[1:] + return element, nil +} + +func (q *Queue) Save() error { + data, err := json.Marshal(q.Elements) + if err != nil { + return fmt.Errorf("error serializing state: %w", err) + } + if err := os.WriteFile(q.FilePath, data, 0644); err != nil { + return fmt.Errorf("error writing state to file: %w", err) + } + return nil +} diff --git a/zeromq/main.go b/zeromq/main.go deleted file mode 100644 index d27d819..0000000 --- a/zeromq/main.go +++ /dev/null @@ -1,20 +0,0 @@ -package zeromq - -import ( - "os" - - zmq "github.com/pebbe/zmq4" - "jonasled.dev/jonasled/ems-esp-logger/log" -) - -var Pusher *zmq.Socket - -func Init() { - var err error - Pusher, err = zmq.NewSocket(zmq.PUSH) - if err != nil { - log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error()) - } - Pusher.Bind(os.Getenv("ZEROMQ_BIND")) - log.Log.Info("Successfully initialized Message broker") -} -- GitLab