Skip to content

Commit fa5c4cf

Browse files
committed
chore: retry summary insertion in case of duplicate key errors (see #877)
1 parent 24b193e commit fa5c4cf

File tree

7 files changed

+828
-795
lines changed

7 files changed

+828
-795
lines changed

coverage/coverage.out

Lines changed: 793 additions & 787 deletions
Large diffs are not rendered by default.

main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,11 @@ func main() {
141141
// Connect to database
142142
var err error
143143
slog.Info("starting with database", "dialect", config.Db.Dialect)
144-
db, err = gorm.Open(config.Db.GetDialector(), &gorm.Config{Logger: gormLogger}, conf.GetWakapiDBOpts(&config.Db))
144+
db, err = gorm.Open(
145+
config.Db.GetDialector(),
146+
&gorm.Config{Logger: gormLogger, TranslateError: true},
147+
conf.GetWakapiDBOpts(&config.Db),
148+
)
145149
if err != nil {
146150
conf.Log().Fatal("could not connect to database", "error", err)
147151
}

mocks/summary_repository.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package mocks
22

33
import (
4+
"time"
5+
46
"github.com/muety/wakapi/models"
57
"github.com/stretchr/testify/mock"
6-
"time"
78
)
89

910
type SummaryRepositoryMock struct {
@@ -16,6 +17,11 @@ func (m *SummaryRepositoryMock) Insert(s *models.Summary) error {
1617
return args.Error(0)
1718
}
1819

20+
func (m *SummaryRepositoryMock) InsertWithRetry(s *models.Summary) error {
21+
args := m.Called(s)
22+
return args.Error(0)
23+
}
24+
1925
func (m *SummaryRepositoryMock) GetAll() ([]*models.Summary, error) {
2026
args := m.Called()
2127
return args.Get(0).([]*models.Summary), args.Error(1)

repositories/repositories.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type IProjectLabelRepository interface {
106106
type ISummaryRepository interface {
107107
IBaseRepository
108108
Insert(*models.Summary) error
109+
InsertWithRetry(*models.Summary) error
109110
GetAll() ([]*models.Summary, error)
110111
GetByUserWithin(*models.User, time.Time, time.Time) ([]*models.Summary, error)
111112
GetLastByUser() ([]*models.TimeByUser, error)

repositories/summary.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package repositories
22

33
import (
4+
"errors"
5+
"log/slog"
46
"time"
57

68
"github.com/duke-git/lancet/v2/slice"
@@ -34,8 +36,21 @@ func (r *SummaryRepository) GetAll() ([]*models.Summary, error) {
3436
return summaries, nil
3537
}
3638

37-
func (r *SummaryRepository) Insert(summary *models.Summary) error {
39+
func (r *SummaryRepository) InsertWithRetry(summary *models.Summary) (err error) {
40+
// in case of duplicate key error, retry inserting up to three times
41+
// https://github.com/muety/wakapi/issues/877
42+
for i := 0; i < 3; i++ {
43+
err = r.Insert(summary)
44+
if err == nil || !errors.Is(err, gorm.ErrDuplicatedKey) {
45+
return err
46+
}
47+
slog.Warn("retrying to insert summary", "userId", summary.UserID, "fromTime", summary.FromTime.T(), "toTime", summary.ToTime.T(), "error", err)
48+
time.Sleep(1 * time.Second)
49+
}
50+
return err
51+
}
3852

53+
func (r *SummaryRepository) Insert(summary *models.Summary) error {
3954
if err := r.db.Transaction(func(tx *gorm.DB) error {
4055
if err := tx.Create(summary).Error; err != nil {
4156
return err

services/aggregation.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package services
22

33
import (
44
"errors"
5-
datastructure "github.com/duke-git/lancet/v2/datastructure/set"
6-
"github.com/muety/artifex/v2"
7-
"github.com/muety/wakapi/config"
85
"log/slog"
96
"sync"
107
"time"
118

9+
datastructure "github.com/duke-git/lancet/v2/datastructure/set"
10+
"github.com/muety/artifex/v2"
11+
"github.com/muety/wakapi/config"
12+
1213
"github.com/muety/wakapi/models"
1314
)
1415

@@ -192,7 +193,7 @@ func (srv *AggregationService) process(job AggregationJob) {
192193
} else {
193194
slog.Info("successfully generated summary", "from", job.From, "to", job.To, "userID", job.User.ID)
194195
if err := srv.summaryService.Insert(summary); err != nil {
195-
config.Log().Error("failed to save summary", "userID", summary.UserID, "fromTime", summary.FromTime, "toTime", summary.ToTime, "error", err)
196+
config.Log().Error("failed to save summary", "userID", summary.UserID, "fromTime", summary.FromTime.T(), "toTime", summary.ToTime.T(), "error", err)
196197
}
197198
}
198199
}

services/summary.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (srv *SummaryService) DeleteByUserBefore(userId string, t time.Time) error
255255

256256
func (srv *SummaryService) Insert(summary *models.Summary) error {
257257
srv.invalidateUserCache(summary.UserID)
258-
return srv.repository.Insert(summary)
258+
return srv.repository.InsertWithRetry(summary)
259259
}
260260

261261
// Private summary generation and utility methods

0 commit comments

Comments
 (0)