Newer
Older
"jonasled.dev/jonasled/ems-esp-logger/csv"
"jonasled.dev/jonasled/ems-esp-logger/helper"
"jonasled.dev/jonasled/ems-esp-logger/log"
"jonasled.dev/jonasled/ems-esp-logger/types"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
log.Log.Debugf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var messagePubHandlerBoiler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
log.Log.Debugf("Received message for boiler: %s from topic: %s\n", msg.Payload(), msg.Topic())
log.Log.Info("Received new boiler data")
if os.Getenv("OUTPUT_FILE_NAME_RAW") != "" {
dumpRawData(string(msg.Payload()), os.Getenv("OUTPUT_FILE_NAME_RAW"))
if os.Getenv("OUTPUT_DATABSE") != "" || os.Getenv("CLIENT_USE_SERVER") == "true" {
task := types.Task{
Date: time.Now().UTC(),
Data: string(msg.Payload()),
Instance: os.Getenv("INSTANCE_NAME"),
}
taskData, err := json.Marshal(task)
if err != nil {
log.Log.Error("Failed encoding new data task as JSON: ", err.Error())
}
queue.MainQueue.Enqueue(string(taskData), 0)
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Log.Info("Successfully connected to MQTT broker")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Log.Error("Connection to MQTT broker lost: ", err.Error())
}
func Init() {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%s", os.Getenv("MQTT_HOST"), os.Getenv("MQTT_PORT")))
opts.SetClientID("ems-esp-logger_" + helper.GenerateRandomString(8))
if os.Getenv("MQTT_USERNAME") != "" {
opts.SetUsername(os.Getenv("MQTT_USERNAME"))
opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
}
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Log.Fatal("Failed connecting to MQTT server: ", token.Error())
}
client.Subscribe(os.Getenv("MQTT_TOPIC_BOILER"), 1, messagePubHandlerBoiler)
func dumpRawData(data string, filename string) {
var jsonData map[string]interface{}
err := json.Unmarshal([]byte(data), &jsonData)
if err != nil {
}
currentTime := time.Now().Local().Format("2006-01-02 15:04:05")
jsonData["date"] = currentTime
updatedData, err := json.Marshal(jsonData)
if err != nil {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
file.WriteString(string(updatedData))