Skip to content
Snippets Groups Projects
Commit 09243f28 authored by Jonas Leder's avatar Jonas Leder
Browse files

remove zeromq

parent fca8d3be
No related branches found
No related tags found
1 merge request!1WIP: write values to a database
Pipeline #54570 passed
!.gitkeep
.env .env
boiler.csv boiler.csv
boiler.txt boiler.txt
\ No newline at end of file queues/
\ No newline at end of file
...@@ -32,8 +32,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE ...@@ -32,8 +32,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY= github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY=
github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA= github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA=
github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
......
...@@ -10,14 +10,13 @@ import ( ...@@ -10,14 +10,13 @@ import (
"jonasled.dev/jonasled/ems-esp-logger/messageworker" "jonasled.dev/jonasled/ems-esp-logger/messageworker"
"jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttclient"
"jonasled.dev/jonasled/ems-esp-logger/mqttserver" "jonasled.dev/jonasled/ems-esp-logger/mqttserver"
"jonasled.dev/jonasled/ems-esp-logger/zeromq" "jonasled.dev/jonasled/ems-esp-logger/queue"
) )
func main() { func main() {
log.Init() log.Init()
zeromq.Init() queue.Init()
defer zeromq.Pusher.Close()
if os.Getenv("OUTPUT_DATABSE") != "" { if os.Getenv("OUTPUT_DATABSE") != "" {
database.Init() database.Init()
......
...@@ -2,26 +2,26 @@ package messageworker ...@@ -2,26 +2,26 @@ package messageworker
import ( import (
"encoding/json" "encoding/json"
"os" "time"
zmq "github.com/pebbe/zmq4"
"jonasled.dev/jonasled/ems-esp-logger/database" "jonasled.dev/jonasled/ems-esp-logger/database"
"jonasled.dev/jonasled/ems-esp-logger/database/tables" "jonasled.dev/jonasled/ems-esp-logger/database/tables"
"jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/helper"
"jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/log"
"jonasled.dev/jonasled/ems-esp-logger/queue"
"jonasled.dev/jonasled/ems-esp-logger/types" "jonasled.dev/jonasled/ems-esp-logger/types"
"jonasled.dev/jonasled/ems-esp-logger/zeromq"
) )
func Run() { func Run() {
worker, _ := zmq.NewSocket(zmq.PULL)
defer worker.Close()
worker.Connect(os.Getenv("ZEROMQ_WORKER"))
for { for {
taskData, _ := worker.Recv(0) taskData, err := queue.MainQueue.Dequeue()
if err != nil {
time.Sleep(time.Second)
continue
}
log.Log.Debug("Received new task: ", taskData) log.Log.Debug("Received new task: ", taskData)
var task types.Task var task types.Task
err := json.Unmarshal([]byte(taskData), &task) err = json.Unmarshal([]byte(taskData), &task)
if err != nil { if err != nil {
log.Log.Error("Failed decoding task as JSON: ", err.Error()) log.Log.Error("Failed decoding task as JSON: ", err.Error())
continue continue
...@@ -30,7 +30,7 @@ func Run() { ...@@ -30,7 +30,7 @@ func Run() {
err = database.Db.Where("name = ?", task.Instance).First(&instance).Error err = database.Db.Where("name = ?", task.Instance).First(&instance).Error
if err != nil { if err != nil {
log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error()) log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error())
zeromq.Pusher.Send(taskData, 0) queue.MainQueue.Enqueue(taskData)
continue continue
} }
...@@ -38,7 +38,7 @@ func Run() { ...@@ -38,7 +38,7 @@ func Run() {
err = json.Unmarshal([]byte(task.Data), &jsonData) err = json.Unmarshal([]byte(task.Data), &jsonData)
if err != nil { if err != nil {
log.Log.Error("Failed decoding boiler JSON: ", err.Error()) log.Log.Error("Failed decoding boiler JSON: ", err.Error())
zeromq.Pusher.Send(taskData, 0) queue.MainQueue.Enqueue(taskData)
continue continue
} }
for key, value := range jsonData { for key, value := range jsonData {
......
...@@ -10,8 +10,8 @@ import ( ...@@ -10,8 +10,8 @@ import (
"jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/csv"
"jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/helper"
"jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/log"
"jonasled.dev/jonasled/ems-esp-logger/queue"
"jonasled.dev/jonasled/ems-esp-logger/types" "jonasled.dev/jonasled/ems-esp-logger/types"
"jonasled.dev/jonasled/ems-esp-logger/zeromq"
) )
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
...@@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m ...@@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
if err != nil { if err != nil {
log.Log.Error("Failed encoding new data task as JSON: ", err.Error()) log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
} }
zeromq.Pusher.Send(string(taskData), 0) queue.MainQueue.Enqueue(string(taskData))
} }
} }
......
package queue
import (
"fmt"
"os"
"os/signal"
"syscall"
"github.com/wind-c/comqtt/v2/cluster/log"
)
var MainQueue Queue
func Init() {
queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json")
if err != nil {
log.Fatal("Error initializing main queue:", err)
}
// Set up signal handling for clean exit
exitChan := make(chan os.Signal, 1)
signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-exitChan
fmt.Println("\nSaving main queue state before exiting...")
if err := queue.Save(); err != nil {
fmt.Println("Error saving main queue state:", err)
} else {
fmt.Println("Main queue state saved successfully.")
}
os.Exit(0)
}()
MainQueue = *queue
}
package queue
import (
"encoding/json"
"fmt"
"os"
)
type Queue struct {
Elements []string
FilePath string
}
func NewQueue(filePath string) (*Queue, error) {
q := &Queue{
Elements: make([]string, 0),
FilePath: filePath,
}
// Load state from file
if _, err := os.Stat(filePath); err == nil {
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("error reading state file: %w", err)
}
if err := json.Unmarshal(data, &q.Elements); err != nil {
return nil, fmt.Errorf("error parsing state file: %w", err)
}
}
return q, nil
}
func (q *Queue) Enqueue(element string) {
q.Elements = append(q.Elements, element)
}
func (q *Queue) Dequeue() (string, error) {
if len(q.Elements) == 0 {
return "", fmt.Errorf("queue is empty")
}
element := q.Elements[0]
q.Elements = q.Elements[1:]
return element, nil
}
func (q *Queue) Save() error {
data, err := json.Marshal(q.Elements)
if err != nil {
return fmt.Errorf("error serializing state: %w", err)
}
if err := os.WriteFile(q.FilePath, data, 0644); err != nil {
return fmt.Errorf("error writing state to file: %w", err)
}
return nil
}
package zeromq
import (
"os"
zmq "github.com/pebbe/zmq4"
"jonasled.dev/jonasled/ems-esp-logger/log"
)
var Pusher *zmq.Socket
func Init() {
var err error
Pusher, err = zmq.NewSocket(zmq.PUSH)
if err != nil {
log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error())
}
Pusher.Bind(os.Getenv("ZEROMQ_BIND"))
log.Log.Info("Successfully initialized Message broker")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment