Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions NOTICES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,26 @@ https://github.com/rogpeppe/fastuuid.git
License: BSD 3-clause (https://github.com/google/uuid/LICENSE)
Copyright © 2014, Roger Peppe All rights reserved.


golang.org/x/sync/singleflight
https://golang.org/x/sync/singleflight
License: BSD 3-clause (https://golang.org/x/sync/LICENSE)
Copyright (c) 2009 The Go Authors. All rights reserved.


github.com/rubyist/circuitbreaker
https://github.com/rubyist/circuitbreaker
License: MIT (https://github.com/rubyist/circuitbreaker/blob/master/LICENSE)
Copyright (c) 2014 Scott Barron


github.com/facebookgo/clock
https://github.com/facebookgo/clock
License: MIT (https://github.com/facebookgo/clock/blob/master/LICENSE)
Copyright (c) 2014 Ben Johnson


github.com/cenk/backoff
https://github.com/cenkalti/backoff
License: MIT (https://github.com/cenkalti/backoff/blob/master/LICENSE)
Copyright (c) 2014 Cenk Altı
120 changes: 120 additions & 0 deletions cb/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package cb

import (
"log"
"strings"
"time"

circuit "github.com/rubyist/circuitbreaker"
)

// Monitor implements a circuit breaker monitor which manages
// multiple circuit breakers and generates routing table updates
// from the state changes.
//
// Monitor generates a new routing table fragment on a state change
// and on a regular basis. The default is to trigger the breaker
// after three consecutive failures.
type Monitor struct {
UpdateInterval time.Duration
ConsecFailures int
routes chan string
fail chan string
success chan string
done chan struct{}
}

func NewMonitor() *Monitor {
return &Monitor{
UpdateInterval: 15 * time.Second,
ConsecFailures: 3,
routes: make(chan string, 1),
fail: make(chan string, 100),
success: make(chan string, 100),
done: make(chan struct{}),
}
}

func (m *Monitor) Stop() {
close(m.done)
}

func (m *Monitor) Start() {
cbs := make(map[string]*circuit.Breaker)

getcb := func(addr string) *circuit.Breaker {
cb := cbs[addr]
if cb == nil {
cb = circuit.NewConsecutiveBreaker(int64(m.ConsecFailures))
cbs[addr] = cb
}
return cb
}

ticker := time.NewTicker(m.UpdateInterval)
for {
select {
case <-ticker.C:
ready := 0
for addr, cb := range cbs {
if cb.Tripped() && cb.Ready() {
ready++
log.Printf("[INFO] breaker: retrying routes for %s", addr)
}
}
if ready > 0 {
m.routes <- m.update(cbs)
}

case addr := <-m.fail:
cb := getcb(addr)
wasready := cb.Ready()
cb.Fail()
if wasready && cb.Tripped() {
log.Printf("[WARN] breaker: breaker for %s tripped", addr)
m.routes <- m.update(cbs)
}

case addr := <-m.success:
cb := getcb(addr)
wasready := cb.Ready()
cb.Success()
if !wasready && !cb.Tripped() {
log.Printf("[INFO] breaker: breaker for %s recovered", addr)
m.routes <- m.update(cbs)
}

case <-m.done:
return
}
}
}

func (m *Monitor) SuccessHost(addr string) {
select {
case m.success <- addr:
default:
}
}

func (m *Monitor) FailHost(addr string) {
select {
case m.fail <- addr:
default:
}
}

func (m *Monitor) Routes() <-chan string {
return m.routes
}

func (m *Monitor) update(cbs map[string]*circuit.Breaker) string {
var s []string
for addr, cb := range cbs {
if cb.Tripped() || !cb.Ready() {
s = append(s, "route del * * http://"+addr)
}
}
routes := strings.Join(s, "\n")
return routes
}
17 changes: 12 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/fabiolb/fabio/admin"
"github.com/fabiolb/fabio/cb"
"github.com/fabiolb/fabio/cert"
"github.com/fabiolb/fabio/config"
"github.com/fabiolb/fabio/exit"
Expand Down Expand Up @@ -97,12 +98,16 @@ func main() {
log.Printf("[INFO] Profile path %q", cfg.ProfilePath)
}

cbmon := cb.NewMonitor()
go cbmon.Start()

exit.Listen(func(s os.Signal) {
atomic.StoreInt32(&shuttingDown, 1)
proxy.Shutdown(cfg.Proxy.ShutdownWait)
if prof != nil {
prof.Stop()
}
cbmon.Stop()
if registry.Default == nil {
return
}
Expand All @@ -119,12 +124,12 @@ func main() {
go watchNoRouteHTML(cfg)

first := make(chan bool)
go watchBackend(cfg, first)
go watchBackend(cfg, first, cbmon)
log.Print("[INFO] Waiting for first routing table")
<-first

// create proxies after metrics since they use the metrics registry.
startServers(cfg)
startServers(cfg, cbmon)
exit.Wait()
log.Print("[INFO] Down")
}
Expand Down Expand Up @@ -241,7 +246,7 @@ func startAdmin(cfg *config.Config) {
}()
}

func startServers(cfg *config.Config) {
func startServers(cfg *config.Config, cbmon *cb.Monitor) {
for _, l := range cfg.Listen {
l := l // capture loop var for go routines below
tlscfg, err := makeTLSConfig(l)
Expand Down Expand Up @@ -371,11 +376,12 @@ func initBackend(cfg *config.Config) {
}
}

func watchBackend(cfg *config.Config, first chan bool) {
func watchBackend(cfg *config.Config, first chan bool, cbmon *cb.Monitor) {
var (
last string
svccfg string
mancfg string
cbcfg string

once sync.Once
)
Expand All @@ -387,11 +393,12 @@ func watchBackend(cfg *config.Config, first chan bool) {
select {
case svccfg = <-svc:
case mancfg = <-man:
case cbcfg = <-cbmon.Routes():
}

// manual config overrides service config
// order matters
next := svccfg + "\n" + mancfg
next := svccfg + "\n" + mancfg + "\n" + cbcfg
if next == last {
continue
}
Expand Down
76 changes: 76 additions & 0 deletions proxy/http_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -15,9 +16,11 @@ import (
"regexp"
"sort"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/fabiolb/fabio/cb"
"github.com/fabiolb/fabio/config"
"github.com/fabiolb/fabio/logger"
"github.com/fabiolb/fabio/noroute"
Expand Down Expand Up @@ -118,6 +121,7 @@ func TestProxyStripsPath(t *testing.T) {
w.WriteHeader(404)
}
}))
defer server.Close()

proxy := httptest.NewServer(&HTTPProxy{
Transport: http.DefaultTransport,
Expand All @@ -137,6 +141,78 @@ func TestProxyStripsPath(t *testing.T) {
}
}

func TestProxyTripsBreaker(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "OK")
}))
defer server.Close()

// create a transport with a short dial timeout to speedup tests
// otherwise, we have to wait 30 sec for every request to time out
tr := &http.Transport{
Dial: (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial,
}

// create a routing table with one good and one bad target
routes := "route add mock / http://127.0.0.99:12345\n"
routes += "route add mock / " + server.URL

// create a circuit breaker monitor which will generate routing
// table updates
cbmon := cb.NewMonitor()
cbmon.UpdateInterval = time.Second // check every second for recovered CBs
go cbmon.Start()
defer cbmon.Stop()

// create sync value to contain the routing table
var syncTbl atomic.Value
tbl, err := route.NewTable(routes)
if err != nil {
t.Fatal(err)
}
syncTbl.Store(tbl)

go func() {
for cbroutes := range cbmon.Routes() {
src := routes + "\n" + cbroutes
tbl, err := route.NewTable(src)
if err != nil {
t.Fatal(err)
}
syncTbl.Store(tbl)
log.Println("new routing table:\n" + tbl.String())
}
}()

proxy := httptest.NewServer(&HTTPProxy{
Transport: tr,
Lookup: func(r *http.Request) *route.Target {
tbl := syncTbl.Load().(route.Table)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"])
},
CBMon: cbmon,
})
defer proxy.Close()

call := func(wantStatus int, wantBody string) {
t.Helper()
resp, body := mustGet(proxy.URL + "/")
t.Logf("GET %s: status: %d body: %q", proxy.URL, resp.StatusCode, string(body))
// if got, want := resp.StatusCode, wantStatus; got != want {
// t.Fatalf("got status %d want %d", got, want)
// }
// if got, want := string(body), wantBody; got != want {
// t.Fatalf("got body %q want %q", got, want)
// }
}

for i := 0; i < 100; i++ {
call(502, "")
time.Sleep(100 * time.Millisecond)
// call(200, "OK")
}
}

func TestProxyHost(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, r.Host)
Expand Down
38 changes: 38 additions & 0 deletions proxy/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/fabiolb/fabio/cb"
"github.com/fabiolb/fabio/config"
"github.com/fabiolb/fabio/logger"
"github.com/fabiolb/fabio/metrics"
Expand Down Expand Up @@ -55,6 +56,10 @@ type HTTPProxy struct {
// UUID returns a unique id in uuid format.
// If UUID is nil, uuid.NewUUID() is used.
UUID func() string

// CBMon is a circuit breaker monitor which manages failure
// and success events and generates routing table updates from them.
CBMon *cb.Monitor
}

func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -167,11 +172,24 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
timeNow = time.Now
}

// wrap ResponseWriter to capture the status code
w = &statusRW{w, http.StatusOK}

start := timeNow()
h.ServeHTTP(w, r)
end := timeNow()
dur := end.Sub(start)

if p.CBMon != nil {
addr := targetURL.Host
switch w.(*statusRW).code {
case 502: // BadGateway is returned on i/o timeout
p.CBMon.FailHost(addr)
default:
p.CBMon.SuccessHost(addr)
}
}

if p.Requests != nil {
p.Requests.Update(dur)
}
Expand Down Expand Up @@ -213,3 +231,23 @@ func key(code int) string {
b = strconv.AppendInt(b, int64(code), 10)
return string(b)
}

// statusRW wraps a http.ResponseWriter
// to capture the status code set by WriteHeader
type statusRW struct {
w http.ResponseWriter
code int
}

func (w *statusRW) Header() http.Header {
return w.w.Header()
}

func (w *statusRW) Write(p []byte) (int, error) {
return w.w.Write(p)
}

func (w *statusRW) WriteHeader(code int) {
w.code = code
w.w.WriteHeader(code)
}
Loading