Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE contractlisteners DROP COLUMN options;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE contractlisteners ADD COLUMN options TEXT;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE contractlisteners DROP COLUMN options;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE contractlisteners ADD COLUMN options TEXT;
20 changes: 20 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,11 @@ paths:
type: string
namespace:
type: string
options:
properties:
firstEvent:
type: string
type: object
protocolId:
type: string
type: object
Expand Down Expand Up @@ -2337,6 +2342,11 @@ paths:
type: string
namespace:
type: string
options:
properties:
firstEvent:
type: string
type: object
protocolId:
type: string
type: object
Expand Down Expand Up @@ -2378,6 +2388,11 @@ paths:
type: string
namespace:
type: string
options:
properties:
firstEvent:
type: string
type: object
protocolId:
type: string
type: object
Expand Down Expand Up @@ -2474,6 +2489,11 @@ paths:
type: string
namespace:
type: string
options:
properties:
firstEvent:
type: string
type: object
protocolId:
type: string
type: object
Expand Down
14 changes: 7 additions & 7 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,26 +616,26 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc
return &ethLocation, nil
}

func (e *Ethereum) AddSubscription(ctx context.Context, subscription *fftypes.ContractListenerInput) error {
location, err := parseContractLocation(ctx, subscription.Location)
func (e *Ethereum) AddContractListener(ctx context.Context, listener *fftypes.ContractListenerInput) error {
location, err := parseContractLocation(ctx, listener.Location)
if err != nil {
return err
}
abi, err := e.FFIEventDefinitionToABI(ctx, &subscription.Event.FFIEventDefinition)
abi, err := e.FFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, i18n.MsgContractParamInvalid)
}

subName := fmt.Sprintf("ff-sub-%s", subscription.ID)
result, err := e.streams.createSubscription(ctx, location, e.initInfo.stream.ID, subName, abi)
subName := fmt.Sprintf("ff-sub-%s", listener.ID)
result, err := e.streams.createSubscription(ctx, location, e.initInfo.stream.ID, subName, listener.Options.FirstEvent, abi)
if err != nil {
return err
}
subscription.ProtocolID = result.ID
listener.ProtocolID = result.ID
return nil
}

func (e *Ethereum) DeleteSubscription(ctx context.Context, subscription *fftypes.ContractListener) error {
func (e *Ethereum) DeleteContractListener(ctx context.Context, subscription *fftypes.ContractListener) error {
return e.streams.deleteSubscription(ctx, subscription.ProtocolID)
}

Expand Down
18 changes: 12 additions & 6 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,13 +1254,16 @@ func TestAddSubscription(t *testing.T) {
},
},
},
Options: &fftypes.ContractListenerOptions{
FirstEvent: string(fftypes.SubOptsFirstEventNewest),
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.NoError(t, err)
}
Expand Down Expand Up @@ -1299,7 +1302,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10311", err)
}
Expand All @@ -1324,7 +1327,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) {
},
}

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10310", err)
}
Expand All @@ -1348,13 +1351,16 @@ func TestAddSubscriptionFail(t *testing.T) {
"address": "0x123",
}.String()),
Event: &fftypes.FFISerializedEvent{},
Options: &fftypes.ContractListenerOptions{
FirstEvent: string(fftypes.SubOptsFirstEventNewest),
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewStringResponder(500, "pop"))

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand All @@ -1380,7 +1386,7 @@ func TestDeleteSubscription(t *testing.T) {
httpmock.RegisterResponder("DELETE", `http://localhost:12345/subscriptions/sb-1`,
httpmock.NewStringResponder(204, ""))

err := e.DeleteSubscription(context.Background(), sub)
err := e.DeleteContractListener(context.Background(), sub)

assert.NoError(t, err)
}
Expand All @@ -1405,7 +1411,7 @@ func TestDeleteSubscriptionFail(t *testing.T) {
httpmock.RegisterResponder("DELETE", `http://localhost:12345/subscriptions/sb-1`,
httpmock.NewStringResponder(500, ""))

err := e.DeleteSubscription(context.Background(), sub)
err := e.DeleteContractListener(context.Background(), sub)

assert.Regexp(t, "FF10111", err)
}
Expand Down
14 changes: 11 additions & 3 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/restclient"
"github.com/hyperledger/firefly/pkg/fftypes"
)

type streamManager struct {
Expand Down Expand Up @@ -133,11 +134,18 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript
return subs, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName string, abi ABIElementMarshaling) (*subscription, error) {
func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, fromBlock string, abi ABIElementMarshaling) (*subscription, error) {
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
switch fromBlock {
case string(fftypes.SubOptsFirstEventOldest):
fromBlock = "0"
case string(fftypes.SubOptsFirstEventNewest):
fromBlock = "latest"
}
sub := subscription{
Name: subName,
Stream: stream,
FromBlock: "0",
FromBlock: fromBlock,
Address: location.Address,
Event: abi,
}
Expand Down Expand Up @@ -190,7 +198,7 @@ func (s *streamManager) ensureSubscription(ctx context.Context, instancePath, st
}

if sub == nil {
if sub, err = s.createSubscription(ctx, location, stream, subName, abi); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, subName, string(fftypes.SubOptsFirstEventOldest), abi); err != nil {
return nil, err
}
}
Expand Down
11 changes: 8 additions & 3 deletions internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hyperledger/firefly/internal/i18n"
"github.com/hyperledger/firefly/internal/log"
"github.com/hyperledger/firefly/internal/restclient"
"github.com/hyperledger/firefly/pkg/fftypes"
)

type streamManager struct {
Expand Down Expand Up @@ -112,7 +113,11 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript
return subs, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event string) (*subscription, error) {
func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, fromBlock string) (*subscription, error) {
// Map FireFly "firstEvent" values to Fabric "fromBlock" values
if fromBlock == string(fftypes.SubOptsFirstEventOldest) {
fromBlock = "0"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth processing "Newest" here for consistency, even if it's a no-op?

sub := subscription{
Name: name,
Channel: location.Channel,
Expand All @@ -122,7 +127,7 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati
ChaincodeID: location.Chaincode,
EventFilter: event,
},
FromBlock: "0",
FromBlock: fromBlock,
}
res, err := s.client.R().
SetContext(ctx).
Expand Down Expand Up @@ -159,7 +164,7 @@ func (s *streamManager) ensureSubscription(ctx context.Context, location *Locati
}

if sub == nil {
if sub, err = s.createSubscription(ctx, location, stream, subName, event); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, subName, event, string(fftypes.SubOptsFirstEventOldest)); err != nil {
return nil, err
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,20 +690,20 @@ func parseContractLocation(ctx context.Context, location *fftypes.JSONAny) (*Loc
return &fabricLocation, nil
}

func (f *Fabric) AddSubscription(ctx context.Context, subscription *fftypes.ContractListenerInput) error {
location, err := parseContractLocation(ctx, subscription.Location)
func (f *Fabric) AddContractListener(ctx context.Context, listener *fftypes.ContractListenerInput) error {
location, err := parseContractLocation(ctx, listener.Location)
if err != nil {
return err
}
result, err := f.streams.createSubscription(ctx, location, f.initInfo.stream.ID, "", subscription.Event.Name)
result, err := f.streams.createSubscription(ctx, location, f.initInfo.stream.ID, "", listener.Event.Name, listener.Options.FirstEvent)
if err != nil {
return err
}
subscription.ProtocolID = result.ID
listener.ProtocolID = result.ID
return nil
}

func (f *Fabric) DeleteSubscription(ctx context.Context, subscription *fftypes.ContractListener) error {
func (f *Fabric) DeleteContractListener(ctx context.Context, subscription *fftypes.ContractListener) error {
return f.streams.deleteSubscription(ctx, subscription.ProtocolID)
}

Expand Down
23 changes: 17 additions & 6 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,13 +1092,21 @@ func TestAddSubscription(t *testing.T) {
"chaincode": "mycode",
}.String()),
Event: &fftypes.FFISerializedEvent{},
Options: &fftypes.ContractListenerOptions{
FirstEvent: string(fftypes.SubOptsFirstEventOldest),
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))
func(req *http.Request) (*http.Response, error) {
var body map[string]interface{}
json.NewDecoder(req.Body).Decode(&body)
assert.Equal(t, "0", body["fromBlock"])
return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req)
})

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.NoError(t, err)
}
Expand All @@ -1123,7 +1131,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) {
},
}

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10310", err)
}
Expand All @@ -1148,13 +1156,16 @@ func TestAddSubscriptionFail(t *testing.T) {
"chaincode": "mycode",
}.String()),
Event: &fftypes.FFISerializedEvent{},
Options: &fftypes.ContractListenerOptions{
FirstEvent: string(fftypes.SubOptsFirstEventNewest),
},
},
}

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewStringResponder(500, "pop"))

err := e.AddSubscription(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub)

assert.Regexp(t, "FF10284", err)
assert.Regexp(t, "pop", err)
Expand All @@ -1180,7 +1191,7 @@ func TestDeleteSubscription(t *testing.T) {
httpmock.RegisterResponder("DELETE", `http://localhost:12345/subscriptions/sb-1`,
httpmock.NewStringResponder(204, ""))

err := e.DeleteSubscription(context.Background(), sub)
err := e.DeleteContractListener(context.Background(), sub)

assert.NoError(t, err)
}
Expand All @@ -1205,7 +1216,7 @@ func TestDeleteSubscriptionFail(t *testing.T) {
httpmock.RegisterResponder("DELETE", `http://localhost:12345/subscriptions/sb-1`,
httpmock.NewStringResponder(500, "pop"))

err := e.DeleteSubscription(context.Background(), sub)
err := e.DeleteContractListener(context.Background(), sub)

assert.Regexp(t, "FF10284", err)
assert.Regexp(t, "pop", err)
Expand Down
16 changes: 14 additions & 2 deletions internal/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,12 @@ func (cm *contractManager) AddContractListener(ctx context.Context, ns string, l
return nil, err
}

if listener.Options == nil {
listener.Options = cm.getDefaultContractListenerOptions()
} else if listener.Options.FirstEvent == "" {
listener.Options.FirstEvent = cm.getDefaultContractListenerOptions().FirstEvent
}

err = cm.database.RunAsGroup(ctx, func(ctx context.Context) (err error) {
if listener.Name != "" {
if err := fftypes.ValidateFFNameField(ctx, listener.Name, "name"); err != nil {
Expand Down Expand Up @@ -513,7 +519,7 @@ func (cm *contractManager) AddContractListener(ctx context.Context, ns string, l
if err := cm.validateFFIEvent(ctx, &listener.Event.FFIEventDefinition); err != nil {
return nil, err
}
if err = cm.blockchain.AddSubscription(ctx, listener); err != nil {
if err = cm.blockchain.AddContractListener(ctx, listener); err != nil {
return nil, err
}
if listener.Name == "" {
Expand Down Expand Up @@ -554,7 +560,7 @@ func (cm *contractManager) DeleteContractListenerByNameOrID(ctx context.Context,
if err != nil {
return err
}
if err = cm.blockchain.DeleteSubscription(ctx, listener); err != nil {
if err = cm.blockchain.DeleteContractListener(ctx, listener); err != nil {
return err
}
return cm.database.DeleteContractListenerByID(ctx, listener.ID)
Expand Down Expand Up @@ -582,3 +588,9 @@ func (cm *contractManager) GenerateFFI(ctx context.Context, ns string, generatio
generationRequest.Namespace = ns
return cm.blockchain.GenerateFFI(ctx, generationRequest)
}

func (cm *contractManager) getDefaultContractListenerOptions() *fftypes.ContractListenerOptions {
return &fftypes.ContractListenerOptions{
FirstEvent: string(fftypes.SubOptsFirstEventNewest),
}
}
Loading