Skip to content

Commit

Permalink
Merge pull request docker-archive#1552 from abronan/fix_lock_ttl_stor…
Browse files Browse the repository at this point in the history
…e_failure

Fix Consul Lock TTL with store failure
  • Loading branch information
vieux committed Dec 17, 2015
2 parents 45c2f79 + 1353a86 commit 75b3875
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var (
}
flLeaderTTL = cli.StringFlag{
Name: "replication-ttl",
Value: "30s",
Value: "15s",
Usage: "Leader lock release time on failure",
}
)
10 changes: 8 additions & 2 deletions cli/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Serve
}

func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Router, replica *api.Replica) {
electedCh, errCh := candidate.RunForElection()
electedCh, errCh, err := candidate.RunForElection()
if err != nil {
return
}
for {
select {
case isElected := <-electedCh:
Expand All @@ -181,7 +184,10 @@ func run(candidate *leadership.Candidate, server *api.Server, primary *mux.Route
}

func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
leaderCh, errCh := follower.FollowElection()
leaderCh, errCh, err := follower.FollowElection()
if err != nil {
return
}
for {
select {
case leader := <-leaderCh:
Expand Down
15 changes: 12 additions & 3 deletions leadership/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ if err != nil {
}

underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
electedCh, _ := underwood.RunForElection()
electedCh, _, err := underwood.RunForElection()
if err != nil {
log.Fatal("Cannot run for election, store is probably down")
}

for isElected := range electedCh {
// This loop will run every time there is a change in our leadership
Expand Down Expand Up @@ -46,7 +49,10 @@ It is possible to follow an election in real-time and get notified whenever
there is a change in leadership:
```go
follower := leadership.NewFollower(client, "service/swarm/leader")
leaderCh, _ := follower.FollowElection()
leaderCh, _, err := follower.FollowElection()
if err != nil {
log.Fatal("Cannot follow the election, store is probably down")
}
for leader := <-leaderCh {
// Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader)
Expand Down Expand Up @@ -83,7 +89,10 @@ func participate() {
}

func run(candidate *leadership.Candidate) {
electedCh, errCh := candidate.RunForElection()
electedCh, errCh, err := candidate.RunForElection()
if err != nil {
return
}
for {
select {
case elected := <-electedCh:
Expand Down
29 changes: 19 additions & 10 deletions leadership/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/docker/libkv/store"
)

const (
defaultLockTTL = 15 * time.Second
)

// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
client store.Store
Expand Down Expand Up @@ -47,23 +51,28 @@ func (c *Candidate) IsLeader() bool {
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
func (c *Candidate) RunForElection() (<-chan bool, <-chan error, error) {
c.electedCh = make(chan bool)
c.errCh = make(chan error)

lock, err := c.client.NewLock(c.key, &store.LockOptions{
Value: []byte(c.node),
TTL: c.lockTTL,
RenewLock: make(chan struct{}),
})
lockOpts := &store.LockOptions{
Value: []byte(c.node),
}

if c.lockTTL != defaultLockTTL {
lockOpts.TTL = c.lockTTL
lockOpts.RenewLock = make(chan struct{})
}

lock, err := c.client.NewLock(c.key, lockOpts)

if err != nil {
c.errCh <- err
} else {
go c.campaign(lock)
return nil, nil, err
}

return c.electedCh, c.errCh
go c.campaign(lock)

return c.electedCh, c.errCh, nil
}

// Stop running for election.
Expand Down
3 changes: 2 additions & 1 deletion leadership/candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestCandidate(t *testing.T) {
mockLock.On("Unlock").Return(nil)

candidate := NewCandidate(kv, "test_key", "test_node", 0)
electedCh, _ := candidate.RunForElection()
electedCh, _, err := candidate.RunForElection()
assert.Nil(t, err)

// Should issue a false upon start, no matter what.
assert.False(t, <-electedCh)
Expand Down
10 changes: 5 additions & 5 deletions leadership/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ func (f *Follower) Leader() string {
}

// FollowElection starts monitoring the election.
func (f *Follower) FollowElection() (<-chan string, <-chan error) {
func (f *Follower) FollowElection() (<-chan string, <-chan error, error) {
f.leaderCh = make(chan string)
f.errCh = make(chan error)

ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil {
f.errCh <- err
} else {
go f.follow(ch)
return nil, nil, err
}

return f.leaderCh, f.errCh
go f.follow(ch)

return f.leaderCh, f.errCh, nil
}

// Stop stops monitoring an election.
Expand Down
3 changes: 2 additions & 1 deletion leadership/follower_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func TestFollower(t *testing.T) {
mockStore.On("Watch", "test_key", mock.Anything).Return(mockKVCh, nil)

follower := NewFollower(kv, "test_key")
leaderCh, errCh := follower.FollowElection()
leaderCh, errCh, err := follower.FollowElection()
assert.Nil(t, err)

// Simulate leader updates
go func() {
Expand Down

0 comments on commit 75b3875

Please sign in to comment.