From c75af5dce41f8b96dcf3e518de4b8c06097a5197 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 13:12:34 +0100
Subject: [PATCH 01/15] write new messages to ZeroMQ broker

---
 .env.example       |  3 ++-
 go.mod             |  1 +
 go.sum             |  2 ++
 main.go            |  4 ++++
 mqttclient/main.go | 37 +++++++++++++++++++++----------------
 zeromq/main.go     | 20 ++++++++++++++++++++
 6 files changed, 50 insertions(+), 17 deletions(-)
 create mode 100644 zeromq/main.go

diff --git a/.env.example b/.env.example
index b7fe173..f092a53 100644
--- a/.env.example
+++ b/.env.example
@@ -7,4 +7,5 @@ MQTT_PASSWORD=password
 MQTT_TOPIC_BOILER=ems-esp/boiler_data
 LOG_LEVEL=INFO
 OUTPUT_FILE_NAME="boiler.csv"
-OUTPUT_FILE_NAME_RAW="boiler.txt"
\ No newline at end of file
+OUTPUT_FILE_NAME_RAW="boiler.txt"
+ZEROMQ_BIND="tcp://127.0.0.1:5557"
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 97a60ef..c441067 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.23.4
 require (
 	github.com/eclipse/paho.mqtt.golang v1.5.0
 	github.com/joho/godotenv v1.5.1
+	github.com/pebbe/zmq4 v1.2.11
 	github.com/sirupsen/logrus v1.9.3
 	github.com/wind-c/comqtt/v2 v2.6.0
 	gopkg.in/natefinch/lumberjack.v2 v2.2.1
diff --git a/go.sum b/go.sum
index 04b0286..40a424b 100644
--- a/go.sum
+++ b/go.sum
@@ -9,6 +9,8 @@ github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
 github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
+github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
+github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
diff --git a/main.go b/main.go
index 26cb826..edee1a5 100644
--- a/main.go
+++ b/main.go
@@ -8,10 +8,14 @@ import (
 	"jonasled.dev/jonasled/ems-esp-logger/log"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttclient"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttserver"
+	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
 )
 
 func main() {
 	log.Init()
+	defer zeromq.Pusher.Close()
+
+	zeromq.Init()
 	if os.Getenv("MQTT_SERVER_ENABLED") == "true" {
 		log.Log.Info("Starting embedded MQTT server")
 		mqttserver.Start()
diff --git a/mqttclient/main.go b/mqttclient/main.go
index f459ffc..5f56dfc 100644
--- a/mqttclient/main.go
+++ b/mqttclient/main.go
@@ -10,6 +10,7 @@ import (
 	"jonasled.dev/jonasled/ems-esp-logger/csv"
 	"jonasled.dev/jonasled/ems-esp-logger/helper"
 	"jonasled.dev/jonasled/ems-esp-logger/log"
+	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
 )
 
 var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
@@ -20,8 +21,26 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
 	log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic())
 	log.Log.Info("Received new boiler data")
 	csv.JsonToCsv(string(msg.Payload()))
+
+	var jsonData map[string]interface{}
+	err := json.Unmarshal(msg.Payload(), &jsonData)
+	if err != nil {
+		panic(err)
+	}
+
+	currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
+	jsonData["date"] = currentTime
+
+	updatedData, err := json.Marshal(jsonData)
+	if err != nil {
+		panic(err)
+	}
+
 	if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" {
-		dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW"))
+		dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW"))
+	}
+	if os.Getenv("OUTPUT_DATABSE") != "" {
+		zeromq.Pusher.Send(string(updatedData), 0)
 	}
 
 }
@@ -53,26 +72,12 @@ func Init() {
 }
 
 func dumpRawData(data string, filename string) {
-	var jsonData map[string]interface{}
-	err := json.Unmarshal([]byte(data), &jsonData)
-	if err != nil {
-		panic(err)
-	}
-
-	currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
-	jsonData["date"] = currentTime
-
-	updatedData, err := json.Marshal(jsonData)
-	if err != nil {
-		panic(err)
-	}
-
 	file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
 	if err != nil {
 		panic(err)
 	}
 	defer file.Close()
 
-	file.WriteString(string(updatedData))
+	file.WriteString(data)
 	file.WriteString("\n")
 }
diff --git a/zeromq/main.go b/zeromq/main.go
new file mode 100644
index 0000000..d27d819
--- /dev/null
+++ b/zeromq/main.go
@@ -0,0 +1,20 @@
+package zeromq
+
+import (
+	"os"
+
+	zmq "github.com/pebbe/zmq4"
+	"jonasled.dev/jonasled/ems-esp-logger/log"
+)
+
+var Pusher *zmq.Socket
+
+func Init() {
+	var err error
+	Pusher, err = zmq.NewSocket(zmq.PUSH)
+	if err != nil {
+		log.Log.Fatal("Failed starting  zeroMQ socket: ", err.Error())
+	}
+	Pusher.Bind(os.Getenv("ZEROMQ_BIND"))
+	log.Log.Info("Successfully initialized Message broker")
+}
-- 
GitLab


From 1fe5f8d2d4a3c83b0cf6fbdd51175fc14a303e33 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 13:17:00 +0100
Subject: [PATCH 02/15] initialize dummy worker for processing tasks

---
 main.go               |  4 +++-
 messageworker/main.go | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+), 1 deletion(-)
 create mode 100644 messageworker/main.go

diff --git a/main.go b/main.go
index edee1a5..a746003 100644
--- a/main.go
+++ b/main.go
@@ -6,6 +6,7 @@ import (
 
 	_ "github.com/joho/godotenv/autoload"
 	"jonasled.dev/jonasled/ems-esp-logger/log"
+	"jonasled.dev/jonasled/ems-esp-logger/messageworker"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttclient"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttserver"
 	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
@@ -21,8 +22,9 @@ func main() {
 		mqttserver.Start()
 	}
 	mqttclient.Init()
+	go messageworker.Run()
 	for {
-		time.Sleep(time.Second)
+		time.Sleep(time.Second) // reduce CPU usage by adding a short sleep here instead of a empty for loop
 	}
 
 }
diff --git a/messageworker/main.go b/messageworker/main.go
new file mode 100644
index 0000000..44334de
--- /dev/null
+++ b/messageworker/main.go
@@ -0,0 +1,18 @@
+package messageworker
+
+import (
+	"fmt"
+	"os"
+
+	zmq "github.com/pebbe/zmq4"
+)
+
+func Run() {
+	worker, _ := zmq.NewSocket(zmq.PULL)
+	defer worker.Close()
+	worker.Connect(os.Getenv("ZEROMQ_WORKER"))
+	for {
+		task, _ := worker.Recv(0)
+		fmt.Println("Processing:", task)
+	}
+}
-- 
GitLab


From 06285f76b435ae8abbbc8bb4c5480a721e75bbe5 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 13:32:49 +0100
Subject: [PATCH 03/15] initialize database connection with sqlite or MySQL

---
 database/main.go            | 88 +++++++++++++++++++++++++++++++++++++
 database/tables/instance.go |  7 +++
 go.mod                      | 18 ++++++++
 go.sum                      | 42 ++++++++++++++++++
 main.go                     |  9 +++-
 5 files changed, 163 insertions(+), 1 deletion(-)
 create mode 100644 database/main.go
 create mode 100644 database/tables/instance.go

diff --git a/database/main.go b/database/main.go
new file mode 100644
index 0000000..08706f6
--- /dev/null
+++ b/database/main.go
@@ -0,0 +1,88 @@
+package database
+
+import (
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/glebarez/sqlite"
+	gormloggerlogrus "github.com/nekomeowww/gorm-logger-logrus"
+	"github.com/sirupsen/logrus"
+	"gorm.io/driver/mysql"
+	"gorm.io/gorm"
+	"gorm.io/gorm/logger"
+	"jonasled.dev/jonasled/ems-esp-logger/database/tables"
+	"jonasled.dev/jonasled/ems-esp-logger/log"
+)
+
+var Db *gorm.DB
+
+func Init() {
+	var err error
+	var dbdriver gorm.Dialector
+	switch os.Getenv("OUTPUT_DATABASE_TYPE") {
+	case "sqlite":
+		dbdriver = sqlite.Open(os.Getenv("OUTPUT_DATABSE"))
+	case "mysql":
+		dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
+			os.Getenv("OUTPUT_DATABASE_USER"),
+			os.Getenv("OUTPUT_DATABASE_PASSWORD"),
+			os.Getenv("OUTPUT_DATABASE_HOST"),
+			os.Getenv("OUTPUT_DATABSE"),
+		)
+		dbdriver = mysql.Open(dsn)
+	default:
+		log.Log.Fatal("Please set OUTPUT_DATABASE_TYPE to a valid value")
+	}
+	Db, err = gorm.Open(dbdriver, &gorm.Config{
+		Logger: gormloggerlogrus.New(gormloggerlogrus.Options{
+			Logger:                    logrus.NewEntry(log.Log),
+			LogLevel:                  logger.Error,
+			IgnoreRecordNotFoundError: false,
+			SlowThreshold:             time.Millisecond * 200,
+			FileWithLineNumField:      "file",
+		}),
+	})
+
+	if err != nil {
+		log.Log.Fatal("Failed initializing database: ", err.Error())
+	}
+
+	if os.Getenv("OUTPUT_DATABASE_EXECUTE_MIGRATIONS") != "false" {
+		log.Log.Info("Executing database migrations")
+		Db.AutoMigrate(&tables.Instance{})
+	}
+}
+
+func CreateInstance() {
+	instanceName := os.Getenv("INSTANCE_NAME")
+	instanceDescription := os.Getenv("INSTANCE_DESCRIPTION")
+	if instanceName == "" || instanceDescription == "" {
+		log.Log.Fatal("INSTANCE_NAME and INSTANCE_DESCRIPTION must be set")
+	}
+	var instance tables.Instance
+	err := Db.Where("name = ?", instanceName).First(&instance).Error
+
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			// Create a new instance if it doesn't exist
+			instance = tables.Instance{
+				Name:        instanceName,
+				Description: instanceDescription,
+			}
+			if err := Db.Create(&instance).Error; err != nil {
+				log.Log.Fatalf("Failed to create instance: %v", err)
+			}
+			log.Log.Infof("Created new instance: %+v\n", instance)
+		} else {
+			log.Log.Fatalf("Failed to query database: %v", err)
+		}
+	} else {
+		// Update the description if the instance exists
+		instance.Description = instanceDescription
+		if err := Db.Save(&instance).Error; err != nil {
+			log.Log.Fatalf("Failed to update instance description: %v", err)
+		}
+		log.Log.Infof("Updated instance description: %+v\n", instance)
+	}
+}
diff --git a/database/tables/instance.go b/database/tables/instance.go
new file mode 100644
index 0000000..1ecfb7f
--- /dev/null
+++ b/database/tables/instance.go
@@ -0,0 +1,7 @@
+package tables
+
+type Instance struct {
+	ID          uint `gorm:"primaryKey"`
+	Name        string
+	Description string
+}
diff --git a/go.mod b/go.mod
index c441067..1bd3fb7 100644
--- a/go.mod
+++ b/go.mod
@@ -4,18 +4,36 @@ go 1.23.4
 
 require (
 	github.com/eclipse/paho.mqtt.golang v1.5.0
+	github.com/glebarez/sqlite v1.11.0
 	github.com/joho/godotenv v1.5.1
+	github.com/nekomeowww/gorm-logger-logrus v1.0.8
 	github.com/pebbe/zmq4 v1.2.11
 	github.com/sirupsen/logrus v1.9.3
 	github.com/wind-c/comqtt/v2 v2.6.0
 	gopkg.in/natefinch/lumberjack.v2 v2.2.1
+	gorm.io/driver/mysql v1.5.7
+	gorm.io/gorm v1.25.12
 )
 
 require (
+	filippo.io/edwards25519 v1.1.0 // indirect
+	github.com/dustin/go-humanize v1.0.1 // indirect
+	github.com/glebarez/go-sqlite v1.21.2 // indirect
+	github.com/go-sql-driver/mysql v1.8.1 // indirect
+	github.com/google/uuid v1.3.0 // indirect
 	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/jinzhu/inflection v1.0.0 // indirect
+	github.com/jinzhu/now v1.1.5 // indirect
+	github.com/mattn/go-isatty v0.0.20 // indirect
+	github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
 	github.com/rs/xid v1.5.0 // indirect
 	golang.org/x/net v0.27.0 // indirect
 	golang.org/x/sync v0.7.0 // indirect
 	golang.org/x/sys v0.22.0 // indirect
+	golang.org/x/text v0.16.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
+	modernc.org/libc v1.22.5 // indirect
+	modernc.org/mathutil v1.5.0 // indirect
+	modernc.org/memory v1.5.0 // indirect
+	modernc.org/sqlite v1.23.1 // indirect
 )
diff --git a/go.sum b/go.sum
index 40a424b..5e87357 100644
--- a/go.sum
+++ b/go.sum
@@ -1,18 +1,44 @@
+filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
+filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
+github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
 github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
 github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
+github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
+github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
+github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GMw=
+github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ=
+github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
+github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
+github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
+github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
 github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
+github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
+github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
+github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY=
+github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA=
 github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
 github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
 github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
 github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
@@ -28,8 +54,11 @@ golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
 golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
 golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
 golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
+golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
@@ -37,3 +66,16 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYs
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
+gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
+gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
+gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
+gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
+modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
+modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
+modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
+modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
+modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
+modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
+modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
+modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=
diff --git a/main.go b/main.go
index a746003..46e6f53 100644
--- a/main.go
+++ b/main.go
@@ -5,6 +5,7 @@ import (
 	"time"
 
 	_ "github.com/joho/godotenv/autoload"
+	"jonasled.dev/jonasled/ems-esp-logger/database"
 	"jonasled.dev/jonasled/ems-esp-logger/log"
 	"jonasled.dev/jonasled/ems-esp-logger/messageworker"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttclient"
@@ -14,9 +15,15 @@ import (
 
 func main() {
 	log.Init()
-	defer zeromq.Pusher.Close()
 
 	zeromq.Init()
+	defer zeromq.Pusher.Close()
+
+	if os.Getenv("OUTPUT_DATABSE") != "" {
+		database.Init()
+		database.CreateInstance()
+	}
+
 	if os.Getenv("MQTT_SERVER_ENABLED") == "true" {
 		log.Log.Info("Starting embedded MQTT server")
 		mqttserver.Start()
-- 
GitLab


From 7f9414ca00d181864df307e7f92e3358c34ff232 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 13:39:28 +0100
Subject: [PATCH 04/15] define tables for value types and value

---
 database/main.go              |  2 +-
 database/tables/valueTypes.go |  7 +++++++
 database/tables/values.go     | 13 +++++++++++++
 3 files changed, 21 insertions(+), 1 deletion(-)
 create mode 100644 database/tables/valueTypes.go
 create mode 100644 database/tables/values.go

diff --git a/database/main.go b/database/main.go
index 08706f6..2b62e9d 100644
--- a/database/main.go
+++ b/database/main.go
@@ -50,7 +50,7 @@ func Init() {
 
 	if os.Getenv("OUTPUT_DATABASE_EXECUTE_MIGRATIONS") != "false" {
 		log.Log.Info("Executing database migrations")
-		Db.AutoMigrate(&tables.Instance{})
+		Db.AutoMigrate(&tables.Instance{}, &tables.ValueType{}, &tables.Value{})
 	}
 }
 
diff --git a/database/tables/valueTypes.go b/database/tables/valueTypes.go
new file mode 100644
index 0000000..5e3c14f
--- /dev/null
+++ b/database/tables/valueTypes.go
@@ -0,0 +1,7 @@
+package tables
+
+type ValueType struct {
+	ID          uint `gorm:"primaryKey"`
+	Name        string
+	Description string
+}
diff --git a/database/tables/values.go b/database/tables/values.go
new file mode 100644
index 0000000..51edd6e
--- /dev/null
+++ b/database/tables/values.go
@@ -0,0 +1,13 @@
+package tables
+
+import "time"
+
+type Value struct {
+	ID          uint `gorm:"primaryKey"`
+	Date        time.Time
+	InstanceID  int
+	Instance    Instance
+	ValueTypeID int
+	ValueType   ValueType
+	Value       string
+}
-- 
GitLab


From f3f90042fef456b88d91b5fc80dbaea6f8cc4939 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 13:46:11 +0100
Subject: [PATCH 05/15] introduce struct for transmitting task

---
 mqttclient/main.go | 45 ++++++++++++++++++++++++++++-----------------
 types/task.go      |  9 +++++++++
 2 files changed, 37 insertions(+), 17 deletions(-)
 create mode 100644 types/task.go

diff --git a/mqttclient/main.go b/mqttclient/main.go
index 5f56dfc..ee1c61e 100644
--- a/mqttclient/main.go
+++ b/mqttclient/main.go
@@ -10,6 +10,7 @@ import (
 	"jonasled.dev/jonasled/ems-esp-logger/csv"
 	"jonasled.dev/jonasled/ems-esp-logger/helper"
 	"jonasled.dev/jonasled/ems-esp-logger/log"
+	"jonasled.dev/jonasled/ems-esp-logger/types"
 	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
 )
 
@@ -22,25 +23,20 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
 	log.Log.Info("Received new boiler data")
 	csv.JsonToCsv(string(msg.Payload()))
 
-	var jsonData map[string]interface{}
-	err := json.Unmarshal(msg.Payload(), &jsonData)
-	if err != nil {
-		panic(err)
-	}
-
-	currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
-	jsonData["date"] = currentTime
-
-	updatedData, err := json.Marshal(jsonData)
-	if err != nil {
-		panic(err)
-	}
-
 	if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" {
-		dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW"))
+		dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW"))
 	}
 	if os.Getenv("OUTPUT_DATABSE") != "" {
-		zeromq.Pusher.Send(string(updatedData), 0)
+		task := types.Task{
+			Date:     time.Now().UTC(),
+			Data:     string(msg.Payload()),
+			Instance: os.Getenv("INSTANCE_NAME"),
+		}
+		taskData, err := json.Marshal(task)
+		if err != nil {
+			log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
+		}
+		zeromq.Pusher.Send(string(taskData), 0)
 	}
 
 }
@@ -72,12 +68,27 @@ func Init() {
 }
 
 func dumpRawData(data string, filename string) {
+
+	var jsonData map[string]interface{}
+	err := json.Unmarshal([]byte(data), &jsonData)
+	if err != nil {
+		panic(err)
+	}
+
+	currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
+	jsonData["date"] = currentTime
+
+	updatedData, err := json.Marshal(jsonData)
+	if err != nil {
+		panic(err)
+	}
+
 	file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
 	if err != nil {
 		panic(err)
 	}
 	defer file.Close()
 
-	file.WriteString(data)
+	file.WriteString(string(updatedData))
 	file.WriteString("\n")
 }
diff --git a/types/task.go b/types/task.go
new file mode 100644
index 0000000..44e1d04
--- /dev/null
+++ b/types/task.go
@@ -0,0 +1,9 @@
+package types
+
+import "time"
+
+type Task struct {
+	Date     time.Time
+	Instance string
+	Data     string
+}
-- 
GitLab


From 7fd01df016012e5e29b575a1201c9d7606eef1ec Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 13:59:42 +0100
Subject: [PATCH 06/15] never exit with a panic

---
 mqttclient/main.go | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/mqttclient/main.go b/mqttclient/main.go
index ee1c61e..6207aa5 100644
--- a/mqttclient/main.go
+++ b/mqttclient/main.go
@@ -72,7 +72,8 @@ func dumpRawData(data string, filename string) {
 	var jsonData map[string]interface{}
 	err := json.Unmarshal([]byte(data), &jsonData)
 	if err != nil {
-		panic(err)
+		log.Log.Error(err)
+		return
 	}
 
 	currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
@@ -80,12 +81,14 @@ func dumpRawData(data string, filename string) {
 
 	updatedData, err := json.Marshal(jsonData)
 	if err != nil {
-		panic(err)
+		log.Log.Error(err)
+		return
 	}
 
 	file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
 	if err != nil {
-		panic(err)
+		log.Log.Error(err)
+		return
 	}
 	defer file.Close()
 
-- 
GitLab


From b13408912d9c6949a6fae9f9cbcc3c1b400e4bd3 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:00:03 +0100
Subject: [PATCH 07/15] move createInstance to a seperate file

---
 database/createInstance.go | 42 ++++++++++++++++++++++++++++++++++++++
 database/main.go           | 33 ------------------------------
 2 files changed, 42 insertions(+), 33 deletions(-)
 create mode 100644 database/createInstance.go

diff --git a/database/createInstance.go b/database/createInstance.go
new file mode 100644
index 0000000..509ac93
--- /dev/null
+++ b/database/createInstance.go
@@ -0,0 +1,42 @@
+package database
+
+import (
+	"os"
+
+	"gorm.io/gorm"
+	"jonasled.dev/jonasled/ems-esp-logger/database/tables"
+	"jonasled.dev/jonasled/ems-esp-logger/log"
+)
+
+func CreateInstance() {
+	instanceName := os.Getenv("INSTANCE_NAME")
+	instanceDescription := os.Getenv("INSTANCE_DESCRIPTION")
+	if instanceName == "" || instanceDescription == "" {
+		log.Log.Fatal("INSTANCE_NAME and INSTANCE_DESCRIPTION must be set")
+	}
+	var instance tables.Instance
+	err := Db.Where("name = ?", instanceName).First(&instance).Error
+
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			// Create a new instance if it doesn't exist
+			instance = tables.Instance{
+				Name:        instanceName,
+				Description: instanceDescription,
+			}
+			if err := Db.Create(&instance).Error; err != nil {
+				log.Log.Fatalf("Failed to create instance: %v", err)
+			}
+			log.Log.Infof("Created new instance: %+v\n", instance)
+		} else {
+			log.Log.Fatalf("Failed to query database: %v", err)
+		}
+	} else {
+		// Update the description if the instance exists
+		instance.Description = instanceDescription
+		if err := Db.Save(&instance).Error; err != nil {
+			log.Log.Fatalf("Failed to update instance description: %v", err)
+		}
+		log.Log.Infof("Updated instance description: %+v\n", instance)
+	}
+}
diff --git a/database/main.go b/database/main.go
index 2b62e9d..93b6959 100644
--- a/database/main.go
+++ b/database/main.go
@@ -53,36 +53,3 @@ func Init() {
 		Db.AutoMigrate(&tables.Instance{}, &tables.ValueType{}, &tables.Value{})
 	}
 }
-
-func CreateInstance() {
-	instanceName := os.Getenv("INSTANCE_NAME")
-	instanceDescription := os.Getenv("INSTANCE_DESCRIPTION")
-	if instanceName == "" || instanceDescription == "" {
-		log.Log.Fatal("INSTANCE_NAME and INSTANCE_DESCRIPTION must be set")
-	}
-	var instance tables.Instance
-	err := Db.Where("name = ?", instanceName).First(&instance).Error
-
-	if err != nil {
-		if err == gorm.ErrRecordNotFound {
-			// Create a new instance if it doesn't exist
-			instance = tables.Instance{
-				Name:        instanceName,
-				Description: instanceDescription,
-			}
-			if err := Db.Create(&instance).Error; err != nil {
-				log.Log.Fatalf("Failed to create instance: %v", err)
-			}
-			log.Log.Infof("Created new instance: %+v\n", instance)
-		} else {
-			log.Log.Fatalf("Failed to query database: %v", err)
-		}
-	} else {
-		// Update the description if the instance exists
-		instance.Description = instanceDescription
-		if err := Db.Save(&instance).Error; err != nil {
-			log.Log.Fatalf("Failed to update instance description: %v", err)
-		}
-		log.Log.Infof("Updated instance description: %+v\n", instance)
-	}
-}
-- 
GitLab


From b424a1a17315e276732c630712a1b75a923a9535 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:00:23 +0100
Subject: [PATCH 08/15] add function to get a valueType

---
 database/getOrCreateValueType.go | 30 ++++++++++++++++++++++++++++++
 1 file changed, 30 insertions(+)
 create mode 100644 database/getOrCreateValueType.go

diff --git a/database/getOrCreateValueType.go b/database/getOrCreateValueType.go
new file mode 100644
index 0000000..c8d85a7
--- /dev/null
+++ b/database/getOrCreateValueType.go
@@ -0,0 +1,30 @@
+package database
+
+import (
+	"gorm.io/gorm"
+	"jonasled.dev/jonasled/ems-esp-logger/database/tables"
+	"jonasled.dev/jonasled/ems-esp-logger/log"
+)
+
+func getOrCreateValueType(name string) tables.ValueType {
+	var valueType tables.ValueType
+	err := Db.Where("name = ?", name).First(&valueType).Error
+
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			// Create a new instance if it doesn't exist
+			valueType = tables.ValueType{
+				Name: name,
+			}
+			if err := Db.Create(&valueType).Error; err != nil {
+				log.Log.Fatalf("Failed to create value type: %v", err)
+			}
+			log.Log.Infof("Created new value type: %+v\n", valueType)
+		} else {
+			log.Log.Errorf("Failed to query database: %v", err)
+		}
+	}
+
+	return valueType
+
+}
-- 
GitLab


From f8915bc46e899f707354c0752c15c552620be394 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:10:18 +0100
Subject: [PATCH 09/15] export function

---
 database/getOrCreateValueType.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/database/getOrCreateValueType.go b/database/getOrCreateValueType.go
index c8d85a7..9e51c1a 100644
--- a/database/getOrCreateValueType.go
+++ b/database/getOrCreateValueType.go
@@ -6,7 +6,7 @@ import (
 	"jonasled.dev/jonasled/ems-esp-logger/log"
 )
 
-func getOrCreateValueType(name string) tables.ValueType {
+func GetOrCreateValueType(name string) tables.ValueType {
 	var valueType tables.ValueType
 	err := Db.Where("name = ?", name).First(&valueType).Error
 
-- 
GitLab


From fca8d3be475113ad61773c16590e73bae8c348d8 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:16:08 +0100
Subject: [PATCH 10/15] implement writing data to database

---
 messageworker/main.go | 44 ++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 41 insertions(+), 3 deletions(-)

diff --git a/messageworker/main.go b/messageworker/main.go
index 44334de..8281364 100644
--- a/messageworker/main.go
+++ b/messageworker/main.go
@@ -1,10 +1,16 @@
 package messageworker
 
 import (
-	"fmt"
+	"encoding/json"
 	"os"
 
 	zmq "github.com/pebbe/zmq4"
+	"jonasled.dev/jonasled/ems-esp-logger/database"
+	"jonasled.dev/jonasled/ems-esp-logger/database/tables"
+	"jonasled.dev/jonasled/ems-esp-logger/helper"
+	"jonasled.dev/jonasled/ems-esp-logger/log"
+	"jonasled.dev/jonasled/ems-esp-logger/types"
+	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
 )
 
 func Run() {
@@ -12,7 +18,39 @@ func Run() {
 	defer worker.Close()
 	worker.Connect(os.Getenv("ZEROMQ_WORKER"))
 	for {
-		task, _ := worker.Recv(0)
-		fmt.Println("Processing:", task)
+		taskData, _ := worker.Recv(0)
+		log.Log.Debug("Received new task: ", taskData)
+		var task types.Task
+		err := json.Unmarshal([]byte(taskData), &task)
+		if err != nil {
+			log.Log.Error("Failed decoding task as JSON: ", err.Error())
+			continue
+		}
+		var instance tables.Instance
+		err = database.Db.Where("name = ?", task.Instance).First(&instance).Error
+		if err != nil {
+			log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error())
+			zeromq.Pusher.Send(taskData, 0)
+			continue
+		}
+
+		var jsonData map[string]interface{}
+		err = json.Unmarshal([]byte(task.Data), &jsonData)
+		if err != nil {
+			log.Log.Error("Failed decoding boiler JSON: ", err.Error())
+			zeromq.Pusher.Send(taskData, 0)
+			continue
+		}
+		for key, value := range jsonData {
+			valueType := database.GetOrCreateValueType(key)
+			dbValue := tables.Value{
+				Date:      task.Date,
+				ValueType: valueType,
+				Value:     helper.AnyToString(value),
+				Instance:  instance,
+			}
+			database.Db.Create(&dbValue)
+		}
+		log.Log.Info("Stored boiler data in database")
 	}
 }
-- 
GitLab


From 09243f2820ca47a09894479149bdb36b0b336349 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:39:02 +0100
Subject: [PATCH 11/15] remove zeromq

---
 .gitignore            |  4 +++-
 go.mod                |  1 -
 go.sum                |  2 --
 main.go               |  5 ++--
 messageworker/main.go | 20 ++++++++--------
 mqttclient/main.go    |  4 ++--
 queue/main.go         | 35 +++++++++++++++++++++++++++
 queue/queue.go        | 55 +++++++++++++++++++++++++++++++++++++++++++
 zeromq/main.go        | 20 ----------------
 9 files changed, 107 insertions(+), 39 deletions(-)
 create mode 100644 queue/main.go
 create mode 100644 queue/queue.go
 delete mode 100644 zeromq/main.go

diff --git a/.gitignore b/.gitignore
index 0400075..68f09f4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
+!.gitkeep
 .env
 boiler.csv
-boiler.txt
\ No newline at end of file
+boiler.txt
+queues/
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 1bd3fb7..f1f7efa 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,6 @@ require (
 	github.com/glebarez/sqlite v1.11.0
 	github.com/joho/godotenv v1.5.1
 	github.com/nekomeowww/gorm-logger-logrus v1.0.8
-	github.com/pebbe/zmq4 v1.2.11
 	github.com/sirupsen/logrus v1.9.3
 	github.com/wind-c/comqtt/v2 v2.6.0
 	gopkg.in/natefinch/lumberjack.v2 v2.2.1
diff --git a/go.sum b/go.sum
index 5e87357..2713a23 100644
--- a/go.sum
+++ b/go.sum
@@ -32,8 +32,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/nekomeowww/gorm-logger-logrus v1.0.8 h1:/XdgNrSBSrTwiZ7fgf8Y9zW9juW1rVmRoY7OkX/tPnY=
 github.com/nekomeowww/gorm-logger-logrus v1.0.8/go.mod h1:V266BSFFzJF1jnKWVAucFsh4U+ib3pEqJCzQY5zm7iA=
-github.com/pebbe/zmq4 v1.2.11 h1:Ua5mgIaZeabUGnH7tqswkUcjkL7JYGai5e8v4hpEU9Q=
-github.com/pebbe/zmq4 v1.2.11/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
diff --git a/main.go b/main.go
index 46e6f53..1c83f09 100644
--- a/main.go
+++ b/main.go
@@ -10,14 +10,13 @@ import (
 	"jonasled.dev/jonasled/ems-esp-logger/messageworker"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttclient"
 	"jonasled.dev/jonasled/ems-esp-logger/mqttserver"
-	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
+	"jonasled.dev/jonasled/ems-esp-logger/queue"
 )
 
 func main() {
 	log.Init()
 
-	zeromq.Init()
-	defer zeromq.Pusher.Close()
+	queue.Init()
 
 	if os.Getenv("OUTPUT_DATABSE") != "" {
 		database.Init()
diff --git a/messageworker/main.go b/messageworker/main.go
index 8281364..f7b6a93 100644
--- a/messageworker/main.go
+++ b/messageworker/main.go
@@ -2,26 +2,26 @@ package messageworker
 
 import (
 	"encoding/json"
-	"os"
+	"time"
 
-	zmq "github.com/pebbe/zmq4"
 	"jonasled.dev/jonasled/ems-esp-logger/database"
 	"jonasled.dev/jonasled/ems-esp-logger/database/tables"
 	"jonasled.dev/jonasled/ems-esp-logger/helper"
 	"jonasled.dev/jonasled/ems-esp-logger/log"
+	"jonasled.dev/jonasled/ems-esp-logger/queue"
 	"jonasled.dev/jonasled/ems-esp-logger/types"
-	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
 )
 
 func Run() {
-	worker, _ := zmq.NewSocket(zmq.PULL)
-	defer worker.Close()
-	worker.Connect(os.Getenv("ZEROMQ_WORKER"))
 	for {
-		taskData, _ := worker.Recv(0)
+		taskData, err := queue.MainQueue.Dequeue()
+		if err != nil {
+			time.Sleep(time.Second)
+			continue
+		}
 		log.Log.Debug("Received new task: ", taskData)
 		var task types.Task
-		err := json.Unmarshal([]byte(taskData), &task)
+		err = json.Unmarshal([]byte(taskData), &task)
 		if err != nil {
 			log.Log.Error("Failed decoding task as JSON: ", err.Error())
 			continue
@@ -30,7 +30,7 @@ func Run() {
 		err = database.Db.Where("name = ?", task.Instance).First(&instance).Error
 		if err != nil {
 			log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error())
-			zeromq.Pusher.Send(taskData, 0)
+			queue.MainQueue.Enqueue(taskData)
 			continue
 		}
 
@@ -38,7 +38,7 @@ func Run() {
 		err = json.Unmarshal([]byte(task.Data), &jsonData)
 		if err != nil {
 			log.Log.Error("Failed decoding boiler JSON: ", err.Error())
-			zeromq.Pusher.Send(taskData, 0)
+			queue.MainQueue.Enqueue(taskData)
 			continue
 		}
 		for key, value := range jsonData {
diff --git a/mqttclient/main.go b/mqttclient/main.go
index 6207aa5..268565e 100644
--- a/mqttclient/main.go
+++ b/mqttclient/main.go
@@ -10,8 +10,8 @@ import (
 	"jonasled.dev/jonasled/ems-esp-logger/csv"
 	"jonasled.dev/jonasled/ems-esp-logger/helper"
 	"jonasled.dev/jonasled/ems-esp-logger/log"
+	"jonasled.dev/jonasled/ems-esp-logger/queue"
 	"jonasled.dev/jonasled/ems-esp-logger/types"
-	"jonasled.dev/jonasled/ems-esp-logger/zeromq"
 )
 
 var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
@@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
 		if err != nil {
 			log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
 		}
-		zeromq.Pusher.Send(string(taskData), 0)
+		queue.MainQueue.Enqueue(string(taskData))
 	}
 
 }
diff --git a/queue/main.go b/queue/main.go
new file mode 100644
index 0000000..af4deda
--- /dev/null
+++ b/queue/main.go
@@ -0,0 +1,35 @@
+package queue
+
+import (
+	"fmt"
+	"os"
+	"os/signal"
+	"syscall"
+
+	"github.com/wind-c/comqtt/v2/cluster/log"
+)
+
+var MainQueue Queue
+
+func Init() {
+	queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json")
+	if err != nil {
+		log.Fatal("Error initializing main queue:", err)
+	}
+
+	// Set up signal handling for clean exit
+	exitChan := make(chan os.Signal, 1)
+	signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
+
+	go func() {
+		<-exitChan
+		fmt.Println("\nSaving main queue state before exiting...")
+		if err := queue.Save(); err != nil {
+			fmt.Println("Error saving main queue state:", err)
+		} else {
+			fmt.Println("Main queue state saved successfully.")
+		}
+		os.Exit(0)
+	}()
+	MainQueue = *queue
+}
diff --git a/queue/queue.go b/queue/queue.go
new file mode 100644
index 0000000..4bd627c
--- /dev/null
+++ b/queue/queue.go
@@ -0,0 +1,55 @@
+package queue
+
+import (
+	"encoding/json"
+	"fmt"
+	"os"
+)
+
+type Queue struct {
+	Elements []string
+	FilePath string
+}
+
+func NewQueue(filePath string) (*Queue, error) {
+	q := &Queue{
+		Elements: make([]string, 0),
+		FilePath: filePath,
+	}
+
+	// Load state from file
+	if _, err := os.Stat(filePath); err == nil {
+		data, err := os.ReadFile(filePath)
+		if err != nil {
+			return nil, fmt.Errorf("error reading state file: %w", err)
+		}
+		if err := json.Unmarshal(data, &q.Elements); err != nil {
+			return nil, fmt.Errorf("error parsing state file: %w", err)
+		}
+	}
+	return q, nil
+}
+
+func (q *Queue) Enqueue(element string) {
+	q.Elements = append(q.Elements, element)
+}
+
+func (q *Queue) Dequeue() (string, error) {
+	if len(q.Elements) == 0 {
+		return "", fmt.Errorf("queue is empty")
+	}
+	element := q.Elements[0]
+	q.Elements = q.Elements[1:]
+	return element, nil
+}
+
+func (q *Queue) Save() error {
+	data, err := json.Marshal(q.Elements)
+	if err != nil {
+		return fmt.Errorf("error serializing state: %w", err)
+	}
+	if err := os.WriteFile(q.FilePath, data, 0644); err != nil {
+		return fmt.Errorf("error writing state to file: %w", err)
+	}
+	return nil
+}
diff --git a/zeromq/main.go b/zeromq/main.go
deleted file mode 100644
index d27d819..0000000
--- a/zeromq/main.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package zeromq
-
-import (
-	"os"
-
-	zmq "github.com/pebbe/zmq4"
-	"jonasled.dev/jonasled/ems-esp-logger/log"
-)
-
-var Pusher *zmq.Socket
-
-func Init() {
-	var err error
-	Pusher, err = zmq.NewSocket(zmq.PUSH)
-	if err != nil {
-		log.Log.Fatal("Failed starting  zeroMQ socket: ", err.Error())
-	}
-	Pusher.Bind(os.Getenv("ZEROMQ_BIND"))
-	log.Log.Info("Successfully initialized Message broker")
-}
-- 
GitLab


From a69809c61846275a8ee0b2513bc01fa9b1597fd7 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:39:57 +0100
Subject: [PATCH 12/15] update example config

---
 .env.example | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/.env.example b/.env.example
index f092a53..415106b 100644
--- a/.env.example
+++ b/.env.example
@@ -1,11 +1,24 @@
-MQTT_SERVER_ENABLED=false
+MQTT_SERVER_ENABLED=true
 MQTT_LISTEN=0.0.0.0:1883 # only required for MQTT server
-MQTT_HOST=1.2.3.4
+MQTT_HOST=127.0.0.1
 MQTT_PORT=1883
 MQTT_USERNAME=mqtt
-MQTT_PASSWORD=password
+MQTT_PASSWORD=mqtt
 MQTT_TOPIC_BOILER=ems-esp/boiler_data
+
 LOG_LEVEL=INFO
+
 OUTPUT_FILE_NAME="boiler.csv"
 OUTPUT_FILE_NAME_RAW="boiler.txt"
-ZEROMQ_BIND="tcp://127.0.0.1:5557"
\ No newline at end of file
+
+QUEUE_STATE_FOLDER="./queues"
+
+OUTPUT_DATABSE=ems_esp_logger
+OUTPUT_DATABASE_USER=root
+OUTPUT_DATABASE_PASSWORD=password
+OUTPUT_DATABASE_HOST=localhost
+OUTPUT_DATABASE_TYPE=mysql
+OUTPUT_DATABASE_EXECUTE_MIGRATIONS=true
+
+INSTANCE_NAME=test
+INSTANCE_DESCRIPTION="My test instance"
\ No newline at end of file
-- 
GitLab


From d49cc442a6f0c6a8982af28d94c91d51b7862485 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:49:26 +0100
Subject: [PATCH 13/15] extend the queue, to support elements, to stay a given
 time in the queue

---
 messageworker/main.go |  4 ++--
 mqttclient/main.go    |  2 +-
 queue/queue.go        | 34 +++++++++++++++++++++++++++-------
 3 files changed, 30 insertions(+), 10 deletions(-)

diff --git a/messageworker/main.go b/messageworker/main.go
index f7b6a93..99167c8 100644
--- a/messageworker/main.go
+++ b/messageworker/main.go
@@ -30,7 +30,7 @@ func Run() {
 		err = database.Db.Where("name = ?", task.Instance).First(&instance).Error
 		if err != nil {
 			log.Log.Error("Failed retreiving instance from database, pushing task back to queue: ", err.Error())
-			queue.MainQueue.Enqueue(taskData)
+			queue.MainQueue.Enqueue(taskData, 60)
 			continue
 		}
 
@@ -38,7 +38,7 @@ func Run() {
 		err = json.Unmarshal([]byte(task.Data), &jsonData)
 		if err != nil {
 			log.Log.Error("Failed decoding boiler JSON: ", err.Error())
-			queue.MainQueue.Enqueue(taskData)
+			queue.MainQueue.Enqueue(taskData, 60)
 			continue
 		}
 		for key, value := range jsonData {
diff --git a/mqttclient/main.go b/mqttclient/main.go
index 268565e..b349ef5 100644
--- a/mqttclient/main.go
+++ b/mqttclient/main.go
@@ -36,7 +36,7 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
 		if err != nil {
 			log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
 		}
-		queue.MainQueue.Enqueue(string(taskData))
+		queue.MainQueue.Enqueue(string(taskData), 0)
 	}
 
 }
diff --git a/queue/queue.go b/queue/queue.go
index 4bd627c..fbc0bf4 100644
--- a/queue/queue.go
+++ b/queue/queue.go
@@ -4,16 +4,23 @@ import (
 	"encoding/json"
 	"fmt"
 	"os"
+	"time"
 )
 
+type QueueElement struct {
+	Value       string    `json:"value"`
+	EnqueueTime time.Time `json:"enqueue_time"`
+	MinDuration int       `json:"min_duration"` // Minimum duration in seconds
+}
+
 type Queue struct {
-	Elements []string
+	Elements []QueueElement
 	FilePath string
 }
 
 func NewQueue(filePath string) (*Queue, error) {
 	q := &Queue{
-		Elements: make([]string, 0),
+		Elements: make([]QueueElement, 0),
 		FilePath: filePath,
 	}
 
@@ -30,17 +37,30 @@ func NewQueue(filePath string) (*Queue, error) {
 	return q, nil
 }
 
-func (q *Queue) Enqueue(element string) {
-	q.Elements = append(q.Elements, element)
+func (q *Queue) Enqueue(element string, minDuration int) {
+	q.Elements = append(q.Elements, QueueElement{
+		Value:       element,
+		EnqueueTime: time.Now(),
+		MinDuration: minDuration,
+	})
 }
 
 func (q *Queue) Dequeue() (string, error) {
 	if len(q.Elements) == 0 {
 		return "", fmt.Errorf("queue is empty")
 	}
-	element := q.Elements[0]
-	q.Elements = q.Elements[1:]
-	return element, nil
+
+	currentTime := time.Now()
+	for i, elem := range q.Elements {
+		// Check if the element has satisfied its minimum duration in the queue
+		if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) {
+			// Remove the element from the queue
+			q.Elements = append(q.Elements[:i], q.Elements[i+1:]...)
+			return elem.Value, nil
+		}
+	}
+
+	return "", fmt.Errorf("no elements are ready for dequeuing")
 }
 
 func (q *Queue) Save() error {
-- 
GitLab


From 58b58eb2b19adbf95a6142160c39ed99d1cb1b44 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:51:14 +0100
Subject: [PATCH 14/15] add missing /

---
 queue/main.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/queue/main.go b/queue/main.go
index af4deda..f59aa3b 100644
--- a/queue/main.go
+++ b/queue/main.go
@@ -12,7 +12,7 @@ import (
 var MainQueue Queue
 
 func Init() {
-	queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "main.json")
+	queue, err := NewQueue(os.Getenv("QUEUE_STATE_FOLDER") + "/main.json")
 	if err != nil {
 		log.Fatal("Error initializing main queue:", err)
 	}
-- 
GitLab


From 5fe39d6832eb0edd7dd0f57ae3bed161b4f31ccb Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 14:56:53 +0100
Subject: [PATCH 15/15] fix storing queue data in json file

---
 queue/main.go  | 5 +++--
 queue/queue.go | 7 +++++++
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/queue/main.go b/queue/main.go
index f59aa3b..c802f1d 100644
--- a/queue/main.go
+++ b/queue/main.go
@@ -21,15 +21,16 @@ func Init() {
 	exitChan := make(chan os.Signal, 1)
 	signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
 
+	MainQueue = *queue
 	go func() {
 		<-exitChan
 		fmt.Println("\nSaving main queue state before exiting...")
-		if err := queue.Save(); err != nil {
+		fmt.Println("\nCurrent queue size: ", MainQueue.GetCurrentSize())
+		if err := MainQueue.Save(); err != nil {
 			fmt.Println("Error saving main queue state:", err)
 		} else {
 			fmt.Println("Main queue state saved successfully.")
 		}
 		os.Exit(0)
 	}()
-	MainQueue = *queue
 }
diff --git a/queue/queue.go b/queue/queue.go
index fbc0bf4..8f00ac0 100644
--- a/queue/queue.go
+++ b/queue/queue.go
@@ -5,6 +5,8 @@ import (
 	"fmt"
 	"os"
 	"time"
+
+	"jonasled.dev/jonasled/ems-esp-logger/log"
 )
 
 type QueueElement struct {
@@ -33,6 +35,7 @@ func NewQueue(filePath string) (*Queue, error) {
 		if err := json.Unmarshal(data, &q.Elements); err != nil {
 			return nil, fmt.Errorf("error parsing state file: %w", err)
 		}
+		log.Log.Info("Initialized queue, current size: ", q.GetCurrentSize())
 	}
 	return q, nil
 }
@@ -63,6 +66,10 @@ func (q *Queue) Dequeue() (string, error) {
 	return "", fmt.Errorf("no elements are ready for dequeuing")
 }
 
+func (q *Queue) GetCurrentSize() int {
+	return len(q.Elements)
+}
+
 func (q *Queue) Save() error {
 	data, err := json.Marshal(q.Elements)
 	if err != nil {
-- 
GitLab