Skip to content

Commit 725f274

Browse files
authored
Cluster improvemts (anthdm#182)
* Unique actors cluster wide * cluster improvements * better tests
1 parent 91ef524 commit 725f274

File tree

4 files changed

+150
-5
lines changed

4 files changed

+150
-5
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ large number of concurrent users and complex interactions.
2929

3030
## Features
3131

32+
Compiles to WASM! Both GOOS=js and GOOS=wasm32
33+
3234
- Guaranteed message delivery on actor failure (buffer mechanism)
3335
- Fire & forget or request & response messaging, or both
3436
- High performance dRPC as the transport layer

cluster/agent.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cluster
33
import (
44
"log/slog"
55
"reflect"
6+
"strings"
67

78
"github.com/anthdm/hollywood/actor"
89
"golang.org/x/exp/maps"
@@ -16,7 +17,10 @@ type (
1617
getMembers struct{}
1718
getKinds struct{}
1819
deactivate struct{ pid *actor.PID }
19-
getActive struct{ id string }
20+
getActive struct {
21+
id string
22+
kind string
23+
}
2024
)
2125

2226
// Agent is an actor/receiver that is responsible for managing the state
@@ -79,9 +83,29 @@ func (a *Agent) Receive(c *actor.Context) {
7983
}
8084
c.Respond(kinds)
8185
case getActive:
86+
a.handleGetActive(c, msg)
87+
}
88+
}
89+
90+
func (a *Agent) handleGetActive(c *actor.Context, msg getActive) {
91+
if len(msg.id) > 0 {
8292
pid := a.activated[msg.id]
8393
c.Respond(pid)
8494
}
95+
if len(msg.kind) > 0 {
96+
pids := make([]*actor.PID, 0)
97+
for id, pid := range a.activated {
98+
parts := strings.Split(id, "/")
99+
if len(parts) != 2 {
100+
break
101+
}
102+
kind := parts[0]
103+
if msg.kind == kind {
104+
pids = append(pids, pid)
105+
}
106+
}
107+
c.Respond(pids)
108+
}
85109
}
86110

87111
func (a *Agent) handleActorTopology(msg *ActorTopology) {
@@ -107,6 +131,7 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
107131
slog.Error("received activation request but kind not registered locally on this node", "kind", msg.Kind)
108132
return &ActivationResponse{Success: false}
109133
}
134+
110135
kind := a.localKinds[msg.Kind]
111136
pid := a.cluster.engine.Spawn(kind.producer, msg.Kind, actor.WithID(msg.ID))
112137
resp := &ActivationResponse{
@@ -117,6 +142,12 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
117142
}
118143

119144
func (a *Agent) activate(kind string, config ActivationConfig) *actor.PID {
145+
// Make sure actors are unique across the whole cluster.
146+
id := kind + "/" + config.id // the id part of the PID
147+
if _, ok := a.activated[id]; ok {
148+
slog.Warn("activation failed", "err", "duplicated actor id across the cluster", "id", id)
149+
return nil
150+
}
120151
members := a.members.FilterByKind(kind)
121152
if len(members) == 0 {
122153
slog.Warn("could not find any members with kind", "kind", kind)

cluster/cluster.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,27 @@ func (c *Cluster) HasKind(name string) bool {
228228
return false
229229
}
230230

231-
// TODO: Weird
232-
func (c *Cluster) GetActivated(id string) *actor.PID {
231+
// GetActiveByKind returns all the actor PIDS that are active across the cluster
232+
// by the given kind.
233+
//
234+
// playerPids := c.GetActiveByKind("player")
235+
// // [127.0.0.1:34364/player/1 127.0.0.1:34365/player/2]
236+
func (c *Cluster) GetActiveByKind(kind string) []*actor.PID {
237+
resp, err := c.engine.Request(c.agentPID, getActive{kind: kind}, c.config.requestTimeout).Result()
238+
if err != nil {
239+
return []*actor.PID{}
240+
}
241+
if res, ok := resp.([]*actor.PID); ok {
242+
return res
243+
}
244+
return []*actor.PID{}
245+
}
246+
247+
// GetActiveByID returns the full PID by the given ID.
248+
//
249+
// playerPid := c.GetActiveByID("player/1")
250+
// // 127.0.0.1:34364/player/1
251+
func (c *Cluster) GetActiveByID(id string) *actor.PID {
233252
resp, err := c.engine.Request(c.agentPID, getActive{id: id}, c.config.requestTimeout).Result()
234253
if err != nil {
235254
return nil

cluster/cluster_test.go

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func TestActivate(t *testing.T) {
201201
wg.Wait()
202202
assert.Equal(t, len(c1.Members()), 2)
203203
assert.True(t, c1.HasKind("player"))
204-
assert.True(t, c1.GetActivated("player/1").Equals(expectedPID))
204+
assert.True(t, c1.GetActiveByID("player/1").Equals(expectedPID))
205205

206206
c1.Stop()
207207
c2.Stop()
@@ -237,7 +237,7 @@ func TestDeactivate(t *testing.T) {
237237

238238
assert.Equal(t, len(c1.Members()), 2)
239239
assert.True(t, c1.HasKind("player"))
240-
assert.Nil(t, c1.GetActivated("player/1"))
240+
assert.Nil(t, c1.GetActiveByID("player/1"))
241241

242242
c1.Stop()
243243
c2.Stop()
@@ -317,6 +317,99 @@ func TestMembersExcept(t *testing.T) {
317317
assert.Equal(t, am[0].ID, "C")
318318
}
319319

320+
func TestGetActiveByID(t *testing.T) {
321+
c1Addr := getRandomLocalhostAddr()
322+
c2Addr := getRandomLocalhostAddr()
323+
324+
c1 := makeCluster(t, c1Addr, "A", "eu")
325+
c1.RegisterKind("player", NewPlayer, NewKindConfig())
326+
c1.Start()
327+
328+
c2 := makeCluster(t, c2Addr, "B", "eu")
329+
c2.RegisterKind("player", NewPlayer, NewKindConfig())
330+
c2.Start()
331+
332+
pid1 := c1.Activate("player", NewActivationConfig().WithID("1"))
333+
pid2 := c2.Activate("player", NewActivationConfig().WithID("2"))
334+
time.Sleep(time.Millisecond * 10)
335+
336+
pid := c1.GetActiveByID("player/1")
337+
assert.NotNil(t, pid)
338+
assert.Equal(t, pid.ID, pid1.ID)
339+
340+
pid = c1.GetActiveByID("player/2")
341+
assert.NotNil(t, pid)
342+
assert.Equal(t, pid.ID, pid2.ID)
343+
344+
pid = c1.GetActiveByID("player/3")
345+
assert.Nil(t, pid)
346+
347+
c1.Stop()
348+
c2.Stop()
349+
}
350+
351+
func TestGetActiveByKind(t *testing.T) {
352+
c1Addr := getRandomLocalhostAddr()
353+
c2Addr := getRandomLocalhostAddr()
354+
355+
c1 := makeCluster(t, c1Addr, "A", "eu")
356+
c1.RegisterKind("player", NewPlayer, NewKindConfig())
357+
c1.Start()
358+
359+
c2 := makeCluster(t, c2Addr, "B", "eu")
360+
c2.RegisterKind("player", NewPlayer, NewKindConfig())
361+
c2.Start()
362+
363+
pid1 := c1.Activate("player", NewActivationConfig().WithID("1"))
364+
pid2 := c2.Activate("player", NewActivationConfig().WithID("2"))
365+
c1.Activate("foo", NewActivationConfig().WithID("2"))
366+
c1.Activate("bar", NewActivationConfig().WithID("2"))
367+
time.Sleep(time.Millisecond * 10)
368+
369+
pids := c1.GetActiveByKind("player")
370+
assert.Len(t, pids, 2)
371+
pidsStr := make([]string, 2)
372+
pidsStr[0] = pids[0].String()
373+
pidsStr[1] = pids[1].String()
374+
assert.Contains(t, pidsStr, pid1.String())
375+
assert.Contains(t, pidsStr, pid2.String())
376+
377+
c1.Stop()
378+
c2.Stop()
379+
}
380+
381+
func TestCannotDuplicateActor(t *testing.T) {
382+
c1Addr := getRandomLocalhostAddr()
383+
c2Addr := getRandomLocalhostAddr()
384+
385+
c1 := makeCluster(t, c1Addr, "A", "eu")
386+
c1.RegisterKind("player", NewPlayer, NewKindConfig())
387+
c1.Start()
388+
389+
c2 := makeCluster(t, c2Addr, "B", "eu")
390+
c2.RegisterKind("player", NewPlayer, NewKindConfig())
391+
c2.Start()
392+
393+
pid := c1.Activate("player", NewActivationConfig().WithID("1"))
394+
time.Sleep(10 * time.Millisecond)
395+
// Lets make sure we spawn the actor on "our" node. Why?
396+
// Because when we randomly selected the other node to spawn the actor
397+
// with the same id on the test will pass.
398+
// Local registry will prevent duplicated actor IDs from the get go.
399+
pid2 := c2.Activate("player", NewActivationConfig().WithID("1").WithSelectMemberFunc(func(_ ActivationDetails) *Member {
400+
return c2.Member()
401+
}))
402+
fmt.Println(pid2)
403+
time.Sleep(time.Millisecond * 10)
404+
405+
pids := c1.GetActiveByKind("player")
406+
assert.Len(t, pids, 1)
407+
assert.Equal(t, pids[0].String(), pid.String())
408+
409+
c1.Stop()
410+
c2.Stop()
411+
}
412+
320413
func makeCluster(t *testing.T, addr, id, region string) *Cluster {
321414
config := NewConfig().
322415
WithID(id).

0 commit comments

Comments
 (0)