Skip to content
Snippets Groups Projects
Commit c75af5dc authored by Jonas Leder's avatar Jonas Leder
Browse files

write new messages to ZeroMQ broker

parent 547b4033
Branches
No related tags found
1 merge request!1WIP: write values to a database
Pipeline #54559 failed
...@@ -8,3 +8,4 @@ MQTT_TOPIC_BOILER=ems-esp/boiler_data ...@@ -8,3 +8,4 @@ MQTT_TOPIC_BOILER=ems-esp/boiler_data
LOG_LEVEL=INFO LOG_LEVEL=INFO
OUTPUT_FILE_NAME="boiler.csv" OUTPUT_FILE_NAME="boiler.csv"
OUTPUT_FILE_NAME_RAW="boiler.txt" OUTPUT_FILE_NAME_RAW="boiler.txt"
ZEROMQ_BIND="tcp://127.0.0.1:5557"
\ No newline at end of file
...@@ -8,10 +8,14 @@ import ( ...@@ -8,10 +8,14 @@ import (
"jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/log"
"jonasled.dev/jonasled/ems-esp-logger/mqttclient" "jonasled.dev/jonasled/ems-esp-logger/mqttclient"
"jonasled.dev/jonasled/ems-esp-logger/mqttserver" "jonasled.dev/jonasled/ems-esp-logger/mqttserver"
"jonasled.dev/jonasled/ems-esp-logger/zeromq"
) )
func main() { func main() {
log.Init() log.Init()
defer zeromq.Pusher.Close()
zeromq.Init()
if os.Getenv("MQTT_SERVER_ENABLED") == "true" { if os.Getenv("MQTT_SERVER_ENABLED") == "true" {
log.Log.Info("Starting embedded MQTT server") log.Log.Info("Starting embedded MQTT server")
mqttserver.Start() mqttserver.Start()
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/csv"
"jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/helper"
"jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/log"
"jonasled.dev/jonasled/ems-esp-logger/zeromq"
) )
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
...@@ -20,8 +21,26 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m ...@@ -20,8 +21,26 @@ var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg m
log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic()) log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic())
log.Log.Info("Received new boiler data") log.Log.Info("Received new boiler data")
csv.JsonToCsv(string(msg.Payload())) csv.JsonToCsv(string(msg.Payload()))
var jsonData map[string]interface{}
err := json.Unmarshal(msg.Payload(), &jsonData)
if err != nil {
panic(err)
}
currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
jsonData["date"] = currentTime
updatedData, err := json.Marshal(jsonData)
if err != nil {
panic(err)
}
if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" { if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" {
dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW")) dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW"))
}
if os.Getenv("OUTPUT_DATABSE") != "" {
zeromq.Pusher.Send(string(updatedData), 0)
} }
} }
...@@ -53,26 +72,12 @@ func Init() { ...@@ -53,26 +72,12 @@ func Init() {
} }
func dumpRawData(data string, filename string) { func dumpRawData(data string, filename string) {
var jsonData map[string]interface{}
err := json.Unmarshal([]byte(data), &jsonData)
if err != nil {
panic(err)
}
currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
jsonData["date"] = currentTime
updatedData, err := json.Marshal(jsonData)
if err != nil {
panic(err)
}
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer file.Close() defer file.Close()
file.WriteString(string(updatedData)) file.WriteString(data)
file.WriteString("\n") file.WriteString("\n")
} }
package zeromq
import (
"os"
zmq "github.com/pebbe/zmq4"
"jonasled.dev/jonasled/ems-esp-logger/log"
)
var Pusher *zmq.Socket
func Init() {
var err error
Pusher, err = zmq.NewSocket(zmq.PUSH)
if err != nil {
log.Log.Fatal("Failed starting zeroMQ socket: ", err.Error())
}
Pusher.Bind(os.Getenv("ZEROMQ_BIND"))
log.Log.Info("Successfully initialized Message broker")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment