diff --git a/go.mod b/go.mod index f1f7efa8326aeda20193ef01614ad175bf2918b4..b350c1c53c491c9541b083ed77d1db58a5b5ca5c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.4 require ( github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/glebarez/sqlite v1.11.0 + github.com/google/uuid v1.3.0 github.com/joho/godotenv v1.5.1 github.com/nekomeowww/gorm-logger-logrus v1.0.8 github.com/sirupsen/logrus v1.9.3 @@ -19,7 +20,6 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/glebarez/go-sqlite v1.21.2 // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect diff --git a/messageworker/main.go b/messageworker/main.go index c12f71c54c29c0e1420bc4b7da220f9337ea72a5..3b0cebb7be6a24da9449a41cda5e56ff58b23402 100644 --- a/messageworker/main.go +++ b/messageworker/main.go @@ -14,7 +14,7 @@ import ( func Run() { for { - taskData, err := queue.MainQueue.Dequeue() + taskData, taskId, err := queue.MainQueue.Dequeue() if err != nil { time.Sleep(time.Second) continue @@ -53,6 +53,10 @@ func Run() { valuesToInsert = append(valuesToInsert, dbValue) } database.Db.Create(&valuesToInsert) + err = queue.MainQueue.MarkDone(taskId) + if err != nil { + log.Log.Error("Failed marking element in queue as done: ", err.Error()) + } log.Log.Info("Stored boiler data in database") } } diff --git a/queue/queue.go b/queue/queue.go index 8f00ac087da1abc1b1bd64a17e0e4e9e26a15a46..9042c2c711e16e43c4cc505f605a232d31d009fc 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -6,13 +6,16 @@ import ( "os" "time" + "github.com/google/uuid" "jonasled.dev/jonasled/ems-esp-logger/log" ) type QueueElement struct { - Value string `json:"value"` - EnqueueTime time.Time `json:"enqueue_time"` + ID string `json:"id"` // Unique identifier for the job + Value string `json:"value"` // Job data + EnqueueTime time.Time `json:"enqueue_time"` // Time the job was enqueued MinDuration int `json:"min_duration"` // Minimum duration in seconds + Done bool `json:"done"` // Job completion status } type Queue struct { @@ -35,35 +38,59 @@ func NewQueue(filePath string) (*Queue, error) { if err := json.Unmarshal(data, &q.Elements); err != nil { return nil, fmt.Errorf("error parsing state file: %w", err) } + + // Set all jobs' Done status to false when loading the queue + for i := range q.Elements { + q.Elements[i].Done = false + } + log.Log.Info("Initialized queue, current size: ", q.GetCurrentSize()) } return q, nil } func (q *Queue) Enqueue(element string, minDuration int) { + // Generate a unique ID for the new job + id := uuid.New().String() + q.Elements = append(q.Elements, QueueElement{ + ID: id, Value: element, EnqueueTime: time.Now(), MinDuration: minDuration, + Done: false, // Job is not done initially }) } -func (q *Queue) Dequeue() (string, error) { +func (q *Queue) Dequeue() (string, string, error) { if len(q.Elements) == 0 { - return "", fmt.Errorf("queue is empty") + return "", "", fmt.Errorf("queue is empty") } currentTime := time.Now() for i, elem := range q.Elements { - // Check if the element has satisfied its minimum duration in the queue - if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) { - // Remove the element from the queue - q.Elements = append(q.Elements[:i], q.Elements[i+1:]...) - return elem.Value, nil + // Check if the element has satisfied its minimum duration and is not marked as done + if currentTime.Sub(elem.EnqueueTime).Seconds() >= float64(elem.MinDuration) && !elem.Done { + // Lock the job for this task runner by marking it as "done" + q.Elements[i].Done = true + // Return job value and ID to the task runner + return elem.Value, elem.ID, nil } } - return "", fmt.Errorf("no elements are ready for dequeuing") + return "", "", fmt.Errorf("no elements are ready for dequeuing") +} + +func (q *Queue) MarkDone(id string) error { + // Find the job in the queue by ID and mark it as done + for i, elem := range q.Elements { + if elem.ID == id && elem.Done { + // Once marked as done, remove the job from the queue + q.Elements = append(q.Elements[:i], q.Elements[i+1:]...) + return nil + } + } + return fmt.Errorf("job with ID %s not found or already marked as done", id) } func (q *Queue) GetCurrentSize() int {