Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ func (psql *Postgres) Features() sqlcommon.SQLFeatures {
return features
}

func (psql *Postgres) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) {
return insert.Suffix(" RETURNING seq"), true
func (psql *Postgres) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) {
suffix := " RETURNING seq"
if requestConflictEmptyResult {
// Caller wants us to return an empty result set on insert conflict, rather than an error
suffix = fmt.Sprintf(" ON CONFLICT DO NOTHING%s", suffix)
}
return insert.Suffix(suffix), true
}

func (psql *Postgres) Open(url string) (*sql.DB, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/database/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestPostgresProvider(t *testing.T) {
assert.Equal(t, `LOCK TABLE "events" IN EXCLUSIVE MODE;`, psql.Features().ExclusiveTableLockSQL("events"))

insert := sq.Insert("test").Columns("col1").Values("val1")
insert, query := psql.UpdateInsertForSequenceReturn(insert)
insert, query := psql.ApplyInsertQueryCustomizations(insert, true)
sql, _, err := insert.ToSql()
assert.NoError(t, err)
assert.Equal(t, "INSERT INTO test (col1) VALUES (?) RETURNING seq", sql)
assert.Equal(t, "INSERT INTO test (col1) VALUES (?) ON CONFLICT DO NOTHING RETURNING seq", sql)
assert.True(t, query)
}
111 changes: 67 additions & 44 deletions internal/database/sqlcommon/group_sql.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -41,16 +41,27 @@ var (
}
)

func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, allowExisting bool) (err error) {
func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, optimization database.UpsertOptimization) (err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)

// We use an upsert optimization here for performance, but also to account for the situation where two threads
// try to perform an insert concurrently and ensure a non-failure outcome.
optimized := false
if optimization == database.UpsertOptimizationNew {
opErr := s.attemptGroupInsert(ctx, tx, group, true /* we want a failure here we can progress past */)
optimized = opErr == nil
} else if optimization == database.UpsertOptimizationExisting {
rowsAffected, opErr := s.attemptGroupUpdate(ctx, tx, group)
optimized = opErr == nil && rowsAffected == 1
}

existing := false
if allowExisting {
// Do a select within the transaction to detemine if the UUID already exists
if !optimized {
// Do a select within the transaction to determine if the UUID already exists
groupRows, _, err := s.queryTx(ctx, tx,
sq.Select("hash").
From("groups").
Expand All @@ -61,55 +72,67 @@ func (s *SQLCommon) UpsertGroup(ctx context.Context, group *fftypes.Group, allow
}
existing = groupRows.Next()
groupRows.Close()
}

if existing {

// Update the group
if _, err = s.updateTx(ctx, tx,
sq.Update("groups").
Set("message_id", group.Message).
Set("namespace", group.Namespace).
Set("name", group.Name).
Set("ledger", group.Ledger).
Set("hash", group.Hash).
Set("created", group.Created).
Where(sq.Eq{"hash": group.Hash}),
func() {
s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeUpdated, group.Namespace, group.Hash)
},
); err != nil {
return err
}
} else {
_, err := s.insertTx(ctx, tx,
sq.Insert("groups").
Columns(groupColumns...).
Values(
group.Message,
group.Namespace,
group.Name,
group.Ledger,
group.Hash,
group.Created,
),
func() {
s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeCreated, group.Namespace, group.Hash)
},
)
if err != nil {
return err
if existing {
if _, err = s.attemptGroupUpdate(ctx, tx, group); err != nil {
return err
}
} else {
if err = s.attemptGroupInsert(ctx, tx, group, false); err != nil {
return err
}
}

}

if err = s.updateMembers(ctx, tx, group, existing); err != nil {
return err
// Note the member list is not allowed to change, as it is part of the hash.
// So the optimization above relies on the fact these are in a transaction, so the
// whole group (with members) will have been inserted
if (optimized && optimization == database.UpsertOptimizationNew) || (!optimized && !existing) {
if err = s.updateMembers(ctx, tx, group, false); err != nil {
return err
}
}

return s.commitTx(ctx, tx, autoCommit)
}

func (s *SQLCommon) attemptGroupUpdate(ctx context.Context, tx *txWrapper, group *fftypes.Group) (int64, error) {
// Update the group
return s.updateTx(ctx, tx,
sq.Update("groups").
Set("message_id", group.Message).
Set("namespace", group.Namespace).
Set("name", group.Name).
Set("ledger", group.Ledger).
Set("hash", group.Hash).
Set("created", group.Created).
Where(sq.Eq{"hash": group.Hash}),
func() {
s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeUpdated, group.Namespace, group.Hash)
},
)
}

func (s *SQLCommon) attemptGroupInsert(ctx context.Context, tx *txWrapper, group *fftypes.Group, requestConflictEmptyResult bool) error {
_, err := s.insertTxExt(ctx, tx,
sq.Insert("groups").
Columns(groupColumns...).
Values(
group.Message,
group.Namespace,
group.Name,
group.Ledger,
group.Hash,
group.Created,
),
func() {
s.callbacks.HashCollectionNSEvent(database.CollectionGroups, fftypes.ChangeEventTypeCreated, group.Namespace, group.Hash)
},
requestConflictEmptyResult,
)
return err
}

func (s *SQLCommon) updateMembers(ctx context.Context, tx *txWrapper, group *fftypes.Group, existing bool) error {

if existing {
Expand Down
57 changes: 38 additions & 19 deletions internal/database/sqlcommon/group_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sqlcommon

import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"testing"
Expand Down Expand Up @@ -56,7 +55,7 @@ func TestUpsertGroupE2EWithDB(t *testing.T) {
s.callbacks.On("HashCollectionNSEvent", database.CollectionGroups, fftypes.ChangeEventTypeCreated, "ns1", groupHash, mock.Anything).Return()
s.callbacks.On("HashCollectionNSEvent", database.CollectionGroups, fftypes.ChangeEventTypeUpdated, "ns1", groupHash, mock.Anything).Return()

err := s.UpsertGroup(ctx, group, true)
err := s.UpsertGroup(ctx, group, database.UpsertOptimizationNew)
assert.NoError(t, err)

// Check we get the exact same group back
Expand All @@ -72,18 +71,15 @@ func TestUpsertGroupE2EWithDB(t *testing.T) {
GroupIdentity: fftypes.GroupIdentity{
Name: "group1",
Namespace: "ns1",
Members: fftypes.Members{
{Identity: "0x12345", Node: fftypes.NewUUID()},
group.Members[0],
},
Ledger: fftypes.NewUUID(),
Members: group.Members,
Ledger: fftypes.NewUUID(),
},
Created: fftypes.Now(),
Message: fftypes.NewUUID(),
Hash: groupHash,
}

err = s.UpsertGroup(context.Background(), groupUpdated, true)
err = s.UpsertGroup(context.Background(), groupUpdated, database.UpsertOptimizationExisting)
assert.NoError(t, err)

// Check we get the exact same group back - note the removal of one of the data elements
Expand Down Expand Up @@ -139,7 +135,7 @@ func TestUpsertGroupE2EWithDB(t *testing.T) {
func TestUpsertGroupFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
err := s.UpsertGroup(context.Background(), &fftypes.Group{}, true)
err := s.UpsertGroup(context.Background(), &fftypes.Group{}, database.UpsertOptimizationSkip)
assert.Regexp(t, "FF10114", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -150,7 +146,7 @@ func TestUpsertGroupFailSelect(t *testing.T) {
mock.ExpectQuery("SELECT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
groupID := fftypes.NewRandB32()
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true)
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip)
assert.Regexp(t, "FF10115", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -162,7 +158,7 @@ func TestUpsertGroupFailInsert(t *testing.T) {
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
groupID := fftypes.NewRandB32()
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true)
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip)
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -174,21 +170,28 @@ func TestUpsertGroupFailUpdate(t *testing.T) {
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow(groupID.String()))
mock.ExpectExec("UPDATE .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true)
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip)
assert.Regexp(t, "FF10117", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpsertGroupFailUpdateMembers(t *testing.T) {
func TestUpsertGroupFailMembers(t *testing.T) {
s, mock := newMockProvider().init()
groupID := fftypes.NewRandB32()
mock.ExpectBegin()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}).AddRow(groupID.String()))
mock.ExpectExec("UPDATE .*").WillReturnResult(driver.ResultNoRows)
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}))
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true)
assert.Regexp(t, "FF10118", err)
err := s.UpsertGroup(context.Background(), &fftypes.Group{
Hash: groupID,
GroupIdentity: fftypes.GroupIdentity{
Members: fftypes.Members{
{Identity: "org1", Node: fftypes.NewUUID()},
},
},
}, database.UpsertOptimizationSkip)
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

Expand All @@ -199,11 +202,27 @@ func TestUpsertGroupFailCommit(t *testing.T) {
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"hash"}))
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit().WillReturnError(fmt.Errorf("pop"))
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, true)
err := s.UpsertGroup(context.Background(), &fftypes.Group{Hash: groupID}, database.UpsertOptimizationSkip)
assert.Regexp(t, "FF10119", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpdateMembersRecreateFail(t *testing.T) {
s, mock := newMockProvider().init()
groupID := fftypes.NewRandB32()
mock.ExpectBegin()
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
tx, _ := s.db.Begin()
err := s.updateMembers(context.Background(), &txWrapper{sqlTX: tx}, &fftypes.Group{
Hash: groupID,
GroupIdentity: fftypes.GroupIdentity{
Members: fftypes.Members{{Node: fftypes.NewUUID()}},
},
}, true)
assert.Regexp(t, "FF10118", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpdateMembersMissingOrg(t *testing.T) {
s, mock := newMockProvider().init()
groupID := fftypes.NewRandB32()
Expand Down
4 changes: 2 additions & 2 deletions internal/database/sqlcommon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ type Provider interface {
// Features returns database specific configuration switches
Features() SQLFeatures

// UpdateInsertForSequenceReturn updates the INSERT query for returning the Sequence, and returns whether it needs to be run as a query to return the Sequence field
UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (updatedInsert sq.InsertBuilder, runAsQuery bool)
// ApplyInsertQueryCustomizations updates the INSERT query for returning the Sequence, and returns whether it needs to be run as a query to return the Sequence field
ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (updatedInsert sq.InsertBuilder, runAsQuery bool)
}
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/provider_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (psql *mockProvider) Features() SQLFeatures {
return features
}

func (mp *mockProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) {
func (mp *mockProvider) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) {
if mp.fakePSQLInsert {
return insert.Suffix(" RETURNING seq"), true
}
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/provider_sqlitego_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (psql *sqliteGoTestProvider) Features() SQLFeatures {
return features
}

func (tp *sqliteGoTestProvider) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) {
func (tp *sqliteGoTestProvider) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) {
// Nothing required - QL supports the query for returning the generated ID, and we use that for the sequence
return insert, false
}
Expand Down
13 changes: 11 additions & 2 deletions internal/database/sqlcommon/sqlcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/pkg/database"
"github.com/hyperledger/firefly/pkg/fftypes"
"github.com/sirupsen/logrus"

// Import migrate file source
_ "github.com/golang-migrate/migrate/v4/source/file"
Expand Down Expand Up @@ -236,8 +237,12 @@ func (s *SQLCommon) queryRes(ctx context.Context, tx *txWrapper, tableName strin
}

func (s *SQLCommon) insertTx(ctx context.Context, tx *txWrapper, q sq.InsertBuilder, postCommit func()) (int64, error) {
return s.insertTxExt(ctx, tx, q, postCommit, false)
}

func (s *SQLCommon) insertTxExt(ctx context.Context, tx *txWrapper, q sq.InsertBuilder, postCommit func(), requestConflictEmptyResult bool) (int64, error) {
l := log.L(ctx)
q, useQuery := s.provider.UpdateInsertForSequenceReturn(q)
q, useQuery := s.provider.ApplyInsertQueryCustomizations(q, requestConflictEmptyResult)

sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql()
if err != nil {
Expand All @@ -249,7 +254,11 @@ func (s *SQLCommon) insertTx(ctx context.Context, tx *txWrapper, q sq.InsertBuil
if useQuery {
err := tx.sqlTX.QueryRowContext(ctx, sqlQuery, args...).Scan(&sequence)
if err != nil {
l.Errorf(`SQL insert failed: %s sql=[ %s ]: %s`, err, sqlQuery, err)
level := logrus.DebugLevel
if !requestConflictEmptyResult {
level = logrus.ErrorLevel
}
l.Logf(level, `SQL insert failed (conflictEmptyRequested=%t): %s sql=[ %s ]: %s`, requestConflictEmptyResult, err, sqlQuery, err)
return -1, i18n.WrapError(ctx, err, i18n.MsgDBInsertFailed)
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlite3/sqlite3.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (sqlite *SQLite3) Features() sqlcommon.SQLFeatures {
return features
}

func (sqlite *SQLite3) UpdateInsertForSequenceReturn(insert sq.InsertBuilder) (sq.InsertBuilder, bool) {
func (sqlite *SQLite3) ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (sq.InsertBuilder, bool) {
return insert, false
}

Expand Down
3 changes: 2 additions & 1 deletion internal/database/sqlite3/sqlite3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build cgo
// +build cgo

package sqlite3
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestSQLite3GoProvider(t *testing.T) {
assert.Equal(t, sq.Dollar, sqlite.Features().PlaceholderFormat)

insert := sq.Insert("test").Columns("col1").Values("val1")
insert, query := sqlite.UpdateInsertForSequenceReturn(insert)
insert, query := sqlite.ApplyInsertQueryCustomizations(insert, false)
sql, _, err := insert.ToSql()
assert.NoError(t, err)
assert.Equal(t, "INSERT INTO test (col1) VALUES (?)", sql)
Expand Down
Loading