mirror of
https://github.com/argoproj/argo-cd.git
synced 2026-02-25 03:58:48 +01:00
274 lines
6.3 KiB
Go
274 lines
6.3 KiB
Go
// An implementation of Consistent Hashing and
|
|
// Consistent Hashing With Bounded Loads.
|
|
//
|
|
// https://en.wikipedia.org/wiki/Consistent_hashing
|
|
//
|
|
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
|
|
package consistent
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/google/btree"
|
|
|
|
blake2b "github.com/minio/blake2b-simd"
|
|
)
|
|
|
|
// OptimalExtraCapacityFactor extra factor capacity (1 + ε). The ideal balance
|
|
// between keeping the shards uniform while also keeping consistency when
|
|
// changing shard numbers.
|
|
const OptimalExtraCapacityFactor = 1.25
|
|
|
|
var ErrNoHosts = errors.New("no hosts added")
|
|
|
|
type Host struct {
|
|
Name string
|
|
Load int64
|
|
}
|
|
|
|
type Consistent struct {
|
|
servers map[uint64]string
|
|
clients *btree.BTree
|
|
loadMap map[string]*Host
|
|
totalLoad int64
|
|
replicationFactor int
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
type item struct {
|
|
value uint64
|
|
}
|
|
|
|
func (i item) Less(than btree.Item) bool {
|
|
return i.value < than.(item).value
|
|
}
|
|
|
|
func New() *Consistent {
|
|
return &Consistent{
|
|
servers: map[uint64]string{},
|
|
clients: btree.New(2),
|
|
loadMap: map[string]*Host{},
|
|
replicationFactor: 1000,
|
|
}
|
|
}
|
|
|
|
func NewWithReplicationFactor(replicationFactor int) *Consistent {
|
|
return &Consistent{
|
|
servers: map[uint64]string{},
|
|
clients: btree.New(2),
|
|
loadMap: map[string]*Host{},
|
|
replicationFactor: replicationFactor,
|
|
}
|
|
}
|
|
|
|
func (c *Consistent) Add(server string) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if _, ok := c.loadMap[server]; ok {
|
|
return
|
|
}
|
|
|
|
c.loadMap[server] = &Host{Name: server, Load: 0}
|
|
for i := 0; i < c.replicationFactor; i++ {
|
|
h := c.hash(fmt.Sprintf("%s%d", server, i))
|
|
c.servers[h] = server
|
|
c.clients.ReplaceOrInsert(item{h})
|
|
}
|
|
}
|
|
|
|
// Get returns the server that owns the given client.
|
|
// As described in https://en.wikipedia.org/wiki/Consistent_hashing
|
|
// It returns ErrNoHosts if the ring has no servers in it.
|
|
func (c *Consistent) Get(client string) (string, error) {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
|
|
if c.clients.Len() == 0 {
|
|
return "", ErrNoHosts
|
|
}
|
|
|
|
h := c.hash(client)
|
|
var foundItem btree.Item
|
|
c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool {
|
|
foundItem = i
|
|
return false // stop the iteration
|
|
})
|
|
|
|
if foundItem == nil {
|
|
// If no host found, wrap around to the first one.
|
|
foundItem = c.clients.Min()
|
|
}
|
|
|
|
host := c.servers[foundItem.(item).value]
|
|
|
|
return host, nil
|
|
}
|
|
|
|
// GetLeast returns the least loaded host that can serve the key.
|
|
// It uses Consistent Hashing With Bounded loads.
|
|
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
|
|
// It returns ErrNoHosts if the ring has no hosts in it.
|
|
func (c *Consistent) GetLeast(client string) (string, error) {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
|
|
if c.clients.Len() == 0 {
|
|
return "", ErrNoHosts
|
|
}
|
|
h := c.hash(client)
|
|
for {
|
|
var foundItem btree.Item
|
|
c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool {
|
|
if h != bItem.(item).value {
|
|
foundItem = bItem
|
|
return false // stop the iteration
|
|
}
|
|
return true
|
|
})
|
|
|
|
if foundItem == nil {
|
|
// If no host found, wrap around to the first one.
|
|
foundItem = c.clients.Min()
|
|
}
|
|
key := c.clients.Get(foundItem)
|
|
if key == nil {
|
|
return client, nil
|
|
}
|
|
host := c.servers[key.(item).value]
|
|
if c.loadOK(host) {
|
|
return host, nil
|
|
}
|
|
h = key.(item).value
|
|
}
|
|
}
|
|
|
|
// Sets the load of `server` to the given `load`
|
|
func (c *Consistent) UpdateLoad(server string, load int64) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if _, ok := c.loadMap[server]; !ok {
|
|
return
|
|
}
|
|
c.totalLoad -= c.loadMap[server].Load
|
|
c.loadMap[server].Load = load
|
|
c.totalLoad += load
|
|
}
|
|
|
|
// Increments the load of host by 1
|
|
//
|
|
// should only be used with if you obtained a host with GetLeast
|
|
func (c *Consistent) Inc(server string) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if _, ok := c.loadMap[server]; !ok {
|
|
return
|
|
}
|
|
atomic.AddInt64(&c.loadMap[server].Load, 1)
|
|
atomic.AddInt64(&c.totalLoad, 1)
|
|
}
|
|
|
|
// Decrements the load of host by 1
|
|
//
|
|
// should only be used with if you obtained a host with GetLeast
|
|
func (c *Consistent) Done(server string) {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
if _, ok := c.loadMap[server]; !ok {
|
|
return
|
|
}
|
|
atomic.AddInt64(&c.loadMap[server].Load, -1)
|
|
atomic.AddInt64(&c.totalLoad, -1)
|
|
}
|
|
|
|
// Deletes host from the ring
|
|
func (c *Consistent) Remove(server string) bool {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
for i := 0; i < c.replicationFactor; i++ {
|
|
h := c.hash(fmt.Sprintf("%s%d", server, i))
|
|
delete(c.servers, h)
|
|
c.delSlice(h)
|
|
}
|
|
delete(c.loadMap, server)
|
|
return true
|
|
}
|
|
|
|
// Return the list of servers in the ring
|
|
func (c *Consistent) Servers() (servers []string) {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
for k := range c.loadMap {
|
|
servers = append(servers, k)
|
|
}
|
|
return servers
|
|
}
|
|
|
|
// Returns the loads of all the hosts
|
|
func (c *Consistent) GetLoads() map[string]int64 {
|
|
loads := map[string]int64{}
|
|
|
|
for k, v := range c.loadMap {
|
|
loads[k] = v.Load
|
|
}
|
|
return loads
|
|
}
|
|
|
|
// Returns the maximum load of the single host
|
|
// which is:
|
|
// (total_load/number_of_hosts)*1.25
|
|
// total_load = is the total number of active requests served by hosts
|
|
// for more info:
|
|
// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
|
|
func (c *Consistent) MaxLoad() int64 {
|
|
if c.totalLoad == 0 {
|
|
c.totalLoad = 1
|
|
}
|
|
var avgLoadPerNode float64
|
|
avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap)))
|
|
if avgLoadPerNode == 0 {
|
|
avgLoadPerNode = 1
|
|
}
|
|
avgLoadPerNode = math.Ceil(avgLoadPerNode * OptimalExtraCapacityFactor)
|
|
return int64(avgLoadPerNode)
|
|
}
|
|
|
|
func (c *Consistent) loadOK(server string) bool {
|
|
// a safety check if someone performed c.Done more than needed
|
|
if c.totalLoad < 0 {
|
|
c.totalLoad = 0
|
|
}
|
|
|
|
var avgLoadPerNode float64
|
|
avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap)))
|
|
if avgLoadPerNode == 0 {
|
|
avgLoadPerNode = 1
|
|
}
|
|
avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25)
|
|
|
|
bserver, ok := c.loadMap[server]
|
|
if !ok {
|
|
panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name))
|
|
}
|
|
|
|
return float64(bserver.Load) < avgLoadPerNode
|
|
}
|
|
|
|
func (c *Consistent) delSlice(val uint64) {
|
|
c.clients.Delete(item{val})
|
|
}
|
|
|
|
func (c *Consistent) hash(key string) uint64 {
|
|
out := blake2b.Sum512([]byte(key))
|
|
return binary.LittleEndian.Uint64(out[:])
|
|
}
|