From 36305cc1c4ee0af29e85474aad06258df3925af9 Mon Sep 17 00:00:00 2001
From: Jonas Leder <jonas@jonasled.de>
Date: Sun, 26 Jan 2025 15:59:41 +0100
Subject: [PATCH] fix not requeing partial done tasks
---
go.mod | 2 +-
messageworker/main.go | 6 +++++-
queue/queue.go | 47 ++++++++++++++++++++++++++++++++++---------
3 files changed, 43 insertions(+), 12 deletions(-)
diff --git a/go.mod b/go.mod
index f1f7efa..b350c1c 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.23.4
require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/glebarez/sqlite v1.11.0
+ github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.5.1
github.com/nekomeowww/gorm-logger-logrus v1.0.8
github.com/sirupsen/logrus v1.9.3
@@ -19,7 +20,6 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/glebarez/go-sqlite v1.21.2 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
- github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
diff --git a/messageworker/main.go b/messageworker/main.go
index c12f71c..3b0cebb 100644
--- a/messageworker/main.go
+++ b/messageworker/main.go
@@ -14,7 +14,7 @@ import (
func Run() {
for {
- taskData, err := queue.MainQueue.Dequeue()
+ taskData, taskId, err := queue.MainQueue.Dequeue()
if err != nil {
time.Sleep(time.Second)
continue
@@ -53,6 +53,10 @@ func Run() {
valuesToInsert = append(valuesToInsert, dbValue)
}
database.Db.Create(&valuesToInsert)
+ err = queue.MainQueue.MarkDone(taskId)
+ if err != nil {
+ log.Log.Error("Failed marking element in queue as done: ", err.Error())
+ }
log.Log.Info("Stored boiler data in database")
}
}
diff --git a/queue/queue.go b/queue/queue.go
index 8f00ac0..9042c2c 100644
--- a/queue/queue.go
+++ b/queue/queue.go
@@ -6,13 +6,16 @@ import (
"os"
"time"
+ "github.com/google/uuid"
"jonasled.dev/jonasled/ems-esp-logger/log"
)
type QueueElement struct {
- Value string `json:"value"`
- EnqueueTime time.Time `json:"enqueue_time"`
+ ID string `json:"id"` // Unique identifier for the job
+ Value string `json:"value"` // Job data
+ EnqueueTime time.Time `json:"enqueue_time"` // Time the job was enqueued
MinDuration int `json:"min_duration"` // Minimum duration in seconds
+ Done bool `json:"done"` // Job completion status
}
type Queue struct {
@@ -35,35 +38,59 @@ func NewQueue(filePath string) (*Queue, error) {
if err := json.Unmarshal(data, &q.Elements); err != nil {
return nil, fmt.Errorf("error parsing state file: %w", err)
}
+
+ // Set all jobs' Done status to false when loading the queue
+ for i := range q.Elements {
+ q.Elements[i].Done = false
+ }
+
log.Log.Info("Initialized queue, current size: ", q.GetCurrentSize())
}
return q, nil
}
func (q *Queue) Enqueue(element string, minDuration int) {
+ // Generate a unique ID for the new job
+ id := uuid.New().String()
+
q.Elements = append(q.Elements, QueueElement{
+ ID: id,
Value: element,
EnqueueTime: time.Now(),
MinDuration: minDuration,
+ Done: false, // Job is not done initially
})
}
-func (q *Queue) Dequeue() (string, error) {
+func (q *Queue) Dequeue() (string, string, error) {
if len(q.Elements) == 0 {
- return "", fmt.Errorf("queue is empty")
+ return "", "", fmt.Errorf("queue is empty")
}
currentTime := time.Now()
for i, elem := range q.Elements {
- // Check if the element has satisfied its minimum duration in the queue
- if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) {
- // Remove the element from the queue
- q.Elements = append(q.Elements[:i], q.Elements[i+1:]...)
- return elem.Value, nil
+ // Check if the element has satisfied its minimum duration and is not marked as done
+ if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) && !elem.Done {
+ // Lock the job for this task runner by marking it as "done"
+ q.Elements[i].Done = true
+ // Return job value and ID to the task runner
+ return elem.Value, elem.ID, nil
}
}
- return "", fmt.Errorf("no elements are ready for dequeuing")
+ return "", "", fmt.Errorf("no elements are ready for dequeuing")
+}
+
+func (q *Queue) MarkDone(id string) error {
+ // Find the job in the queue by ID and mark it as done
+ for i, elem := range q.Elements {
+ if elem.ID == id && elem.Done {
+ // Once marked as done, remove the job from the queue
+ q.Elements = append(q.Elements[:i], q.Elements[i+1:]...)
+ return nil
+ }
+ }
+ return fmt.Errorf("job with ID %s not found or already marked as done", id)
}
func (q *Queue) GetCurrentSize() int {
--
GitLab