package mqttclient import ( "encoding/json" "fmt" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "jonasled.dev/jonasled/ems-esp-logger/csv" "jonasled.dev/jonasled/ems-esp-logger/helper" "jonasled.dev/jonasled/ems-esp-logger/log" "jonasled.dev/jonasled/ems-esp-logger/zeromq" ) var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { log.Log.Debugf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) } var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic()) log.Log.Info("Received new boiler data") csv.JsonToCsv(string(msg.Payload())) var jsonData map[string]interface{} err := json.Unmarshal(msg.Payload(), &jsonData) if err != nil { panic(err) } currentTime := time.Now().Local().Format("2006-01-02 15:04:05") jsonData["date"] = currentTime updatedData, err := json.Marshal(jsonData) if err != nil { panic(err) } if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" { dumpRawData(string(updatedData), os.Getenv("OUTPUT_FILE_NAME_RAW")) } if os.Getenv("OUTPUT_DATABSE") != "" { zeromq.Pusher.Send(string(updatedData), 0) } } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { log.Log.Info("Successfully connected to MQTT broker") } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { log.Log.Error("Connection to MQTT broker lost: ", err.Error()) } func Init() { opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%s", os.Getenv("MQTT_HOST"), os.Getenv("MQTT_PORT"))) opts.SetClientID("ems-esp-logger_" + helper.GenerateRandomString(8)) if os.Getenv("MQTT_USERNAME") != "" { opts.SetUsername(os.Getenv("MQTT_USERNAME")) opts.SetPassword(os.Getenv("MQTT_PASSWORD")) } opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Log.Fatal("Failed connecting to MQTT server: ", token.Error()) } client.Subscribe(os.Getenv("MQTT_TOPIC_BOILER"), 1, messagePubHandlerBoiler) } func dumpRawData(data string, filename string) { file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { panic(err) } defer file.Close() file.WriteString(data) file.WriteString("\n") }