Skip to content

Commit 57c621d

Browse files
authored
Merge dev into master (anthdm#45)
* Stop the ticker when calling stop on SendRepeater * replaced GGQ with RingBuffer implementation
1 parent 011a4bc commit 57c621d

File tree

10 files changed

+240
-275
lines changed

10 files changed

+240
-275
lines changed

actor/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func (sr SendRepeater) start() {
157157
case <-ticker.C:
158158
sr.engine.SendWithSender(sr.target, sr.msg, sr.self)
159159
case <-sr.cancelch:
160+
ticker.Stop()
160161
return
161162
}
162163
}

actor/engine_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,8 @@ func TestSendMsgRaceCon(t *testing.T) {
135135

136136
for i := 0; i < 100; i++ {
137137
wg.Add(1)
138-
go func() {
139-
e.Send(pid, []byte("f"))
140-
wg.Done()
141-
}()
138+
e.Send(pid, []byte("f"))
139+
wg.Done()
142140
}
143141
wg.Wait()
144142
}
@@ -231,8 +229,7 @@ func TestRequestResponse(t *testing.T) {
231229
// 56 ns/op
232230
func BenchmarkSendMessageLocal(b *testing.B) {
233231
e := NewEngine()
234-
p := NewTestProducer(nil, func(_ *testing.T, _ *Context) {})
235-
pid := e.Spawn(p, "bench", WithInboxSize(1024*8))
232+
pid := e.SpawnFunc(func(_ *Context) {}, "bench", WithInboxSize(128))
236233

237234
b.ResetTimer()
238235
b.Run("send_message_local", func(b *testing.B) {

actor/inbox.go

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,36 @@ package actor
22

33
import (
44
"runtime"
5+
"sync/atomic"
56

6-
"github.com/anthdm/hollywood/ggq"
7-
"github.com/anthdm/hollywood/log"
7+
"github.com/anthdm/hollywood/ringbuffer"
88
)
99

10-
var LOCK_OS_THREAD = true
10+
const defaultThroughput = 300
11+
12+
const (
13+
idle int32 = iota
14+
running
15+
)
16+
17+
type Scheduler interface {
18+
Schedule(fn func())
19+
Throughput() int
20+
}
21+
22+
type goscheduler int
23+
24+
func (goscheduler) Schedule(fn func()) {
25+
go fn()
26+
}
27+
28+
func (sched goscheduler) Throughput() int {
29+
return int(sched)
30+
}
31+
32+
func NewScheduler(throughput int) Scheduler {
33+
return goscheduler(throughput)
34+
}
1135

1236
type Inboxer interface {
1337
Send(Envelope)
@@ -16,43 +40,56 @@ type Inboxer interface {
1640
}
1741

1842
type Inbox struct {
19-
ggq *ggq.GGQ[Envelope]
20-
proc Processer
43+
rb *ringbuffer.RingBuffer[Envelope]
44+
proc Processer
45+
scheduler Scheduler
46+
procStatus int32
2147
}
2248

2349
func NewInbox(size int) *Inbox {
24-
in := &Inbox{}
25-
in.ggq = ggq.New[Envelope](uint32(size), in)
26-
return in
50+
return &Inbox{
51+
rb: ringbuffer.New[Envelope](int64(size)),
52+
scheduler: NewScheduler(defaultThroughput),
53+
}
2754
}
2855

29-
func (in *Inbox) Consume(msgs []Envelope) {
30-
in.proc.Invoke(msgs)
56+
func (in *Inbox) Send(msg Envelope) {
57+
in.rb.Push(msg)
58+
in.schedule()
3159
}
3260

33-
func (in *Inbox) Start(proc Processer) {
34-
in.proc = proc
35-
var lockOSThread bool
36-
// prevent race condition here be reassigning before go routine.
37-
if LOCK_OS_THREAD {
38-
lockOSThread = true
61+
func (in *Inbox) schedule() {
62+
if atomic.CompareAndSwapInt32(&in.procStatus, idle, running) {
63+
in.scheduler.Schedule(in.process)
3964
}
40-
go func() {
41-
if lockOSThread {
42-
runtime.LockOSThread()
65+
}
66+
67+
func (in *Inbox) process() {
68+
in.run()
69+
atomic.StoreInt32(&in.procStatus, idle)
70+
}
71+
72+
func (in *Inbox) run() {
73+
i, t := 0, in.scheduler.Throughput()
74+
for {
75+
if i > t {
76+
i = 0
77+
runtime.Gosched()
4378
}
44-
in.ggq.ReadN()
45-
}()
46-
log.Tracew("[INBOX] started", log.M{"pid": proc.PID()})
79+
i++
80+
81+
if msg, ok := in.rb.Pop(); ok {
82+
in.proc.Invoke([]Envelope{msg})
83+
} else {
84+
return
85+
}
86+
}
4787
}
4888

49-
func (in *Inbox) Stop() error {
50-
in.ggq.Close()
51-
log.Tracew("[INBOX] closed", log.M{"pid": in.proc.PID()})
52-
return nil
89+
func (in *Inbox) Start(proc Processer) {
90+
in.proc = proc
5391
}
5492

55-
func (in *Inbox) Send(msg Envelope) {
56-
in.ggq.Awake()
57-
in.ggq.Write(msg)
93+
func (in *Inbox) Stop() error {
94+
return nil
5895
}

examples/helloworld/main.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package main
22

33
import (
44
"fmt"
5-
"time"
5+
"sync"
66

77
"github.com/anthdm/hollywood/actor"
88
)
@@ -20,17 +20,21 @@ func newFoo() actor.Receiver {
2020
func (f *foo) Receive(ctx *actor.Context) {
2121
switch msg := ctx.Message().(type) {
2222
case actor.Started:
23-
fmt.Println("foo started")
23+
fmt.Println("actor started")
24+
case actor.Stopped:
25+
fmt.Println("actor stopped")
2426
case *message:
25-
fmt.Println("foo has received", msg.data)
27+
fmt.Println("actor has received", msg.data)
2628
}
2729
}
2830

2931
func main() {
3032
engine := actor.NewEngine()
31-
pid := engine.Spawn(newFoo, "foo")
32-
for i := 0; i < 99; i++ {
33+
pid := engine.Spawn(newFoo, "my_actor")
34+
for i := 0; i < 100; i++ {
3335
engine.Send(pid, &message{data: "hello world!"})
3436
}
35-
time.Sleep(time.Second * 1)
37+
wg := sync.WaitGroup{}
38+
engine.Poison(pid, &wg)
39+
wg.Wait()
3640
}

examples/ttt/main.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
"github.com/anthdm/hollywood/actor"
9+
)
10+
11+
func main() {
12+
e := actor.NewEngine()
13+
pid := e.SpawnFunc(func(c *actor.Context) {
14+
switch msg := c.Message().(type) {
15+
case actor.Started:
16+
fmt.Println("started")
17+
case actor.Stopped:
18+
fmt.Println("stopped")
19+
default:
20+
_ = msg
21+
}
22+
}, "foobarbas")
23+
24+
wg := sync.WaitGroup{}
25+
e.Poison(pid, &wg)
26+
wg.Wait()
27+
time.Sleep(time.Second * 2)
28+
}

ggq/ggq.go

Lines changed: 0 additions & 171 deletions
This file was deleted.

0 commit comments

Comments
 (0)