断路器是系统的基本,基本但至关重要的部分,尤其是当您要处理微服务架构时。最近,我的一位同事想知道这些代码像的代理人做了什么,在向他解释时,我想知道,尽管我有点知道断路器在高度上是什么级别,我真的不知道引擎盖下发生了什么。因此,我决定更深入地分享我发现的东西。在本文中,我们将在高级别上了解断路器,以及浏览GO中的断路器模式的基本实现。
介绍
想象服务正在尝试通过RPC与服务B (作为服务器)联系。从服务的角度来看,它看起来像这样:
package service_A
func callServiceB(req Request) {
resp, err := serviceB.SomeMethod(context.Background(), req)
if err != nil {
log.Printf("got error: %s", err.Error())
return
}
return log.Printf("got response: %s", resp)
}
好吧,这可能是拨打另一个服务的最基本方法,但是有很多事情可以(遗嘱)出错。可以说,我们没有收到服务B 的任何回应,这是什么原因?其中之一(这不一定是我们目前的关注)是,由于我们主要处理不可靠的网络连接,因此服务b 从未处理过该请求。在这种情况下,我们将仅依靠TCP重新传播,实际上,大多数RPC框架都提供了自己的重试机制,并且它将主要在应用程序层中处理。但是,服务器没有响应的事实并不总是意味着存在网络问题,也许服务B 。
。通常,服务(甚至最可靠的服务)往往会不时变得缓慢或无法使用。这可能会导致响应某些(或全部)来电的错误。如果服务真的忙于处理很多请求,我们可能想将我们的请求率限制为该服务,甚至停止发送任何请求,让它屏住呼吸,并在和平中做任何事情。 P>
在另一种情况下,让我们说这项服务暂时无法使用,我们以前失败的请求(让我们说100个请求)正在接一个地重述。如果我们不将未来请求的数量限制为该服务,并提出100个请求,当服务最终恢复时,它将同时面对200个请求,甚至可能导致该服务立即崩溃一次再次。
尽管在现实世界中,大多数RPC都是由特定的超时制成的,并且之后不会重新审议,但在遇到问题时不要用请求淹没服务的好习惯。
对我们来说,这种限制机制的另一个积极要点是,我们不费心(一段时间)向该服务提出任何要求,因此没有时间消耗I/O。这比仅等待一段时间,然后接收失败的结果(例如5xx状态代码),这是更快的。
。断路器
简单地说,两个服务之间的RPC就像在两者之间绘制2条直线。一条线将服务A的请求发送到服务B,另一行将响应从服务B返回到服务A。但是,为了实施限制政策,我们需要拥有某种中间人决定是否将请求引向目的地。
这个中间人,代理(不是网络代理)或包装器要么要让两个服务之间的电路(或连接) - 召唤对方,因此打开
断路器背后的主要思想如下:
默认情况下,电路处于封闭模式,您可以自由地拨打到目的地。在目的地发生了一定量的失败响应之后(阈值,让我们说5),它将阻止您在电路为一段时间(退缩时间,例如30秒)中提出任何进一步的请求被认为开放。 **在此间隔结束后,它进入了一个**半开的状态。如果下一个请求要确定我们是要陷入封闭的staet还是恢复开放状态。如果成功,电路将关闭,但是如果请求失败,我们将重新处于空旷状态并被迫等待另一个后退间隔。
让我们看看如何在GO中实现简单的断路器。
执行
从使用断路器来向另一个服务请求的服务的立场,使RPC有点不同:
您在下面看到的所有代码也可以在this github repo中找到。
package service_A
func callServiceB(cb CircuitBreaker, req Request) {
r, err := cb.Execute(func() (interface{}, error) {
return serviceB.SomeMethod(req)
})
if err != nil {
log.Printf("got error: %s", err.Error())
return
} else {
resp, ok := r.(serviceB.Response)
if !ok {
log.Println("type assertion failed")
return
}
log.Printf("got response: %s", resp)
}
}
让我们看一下CB.ectute的作用:
package circuitbreaker
func (cb *circuitbreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
err := cb.doPreRequest()
if err != nil {
return nil, err
}
res, err := req()
err = cb.doPostRequest(err)
if err != nil {
return nil, err
}
return res, nil
}
实际请求将在第8行,res,err:= req()上发生,在此之前应该有一个beforeRequest功能:
func (cb *circuitbreaker) doPreRequest() error {
if cb.state == open {
return ErrRefuse
}
return nil
}
这检查了当前状态是否打开。如果电路打开,则简单返回错误,表明连接被拒绝。让我们看看到目前为止的断路器的结构:
type State string
const (
open State = "open"
closed State = "closed"
halfOpen State = "half-open"
)
type circuitbreaker struct {
state State
}
嗯,终点呢?
func (cb *circuitbreaker) doPostRequest(err error) error {
cb.mutex.Lock()
defer cb.mutex.Unlock()
if err == nil {
if cb.policy == MaxConsecutiveFails {
cb.fails = 0
}
cb.state = closed
return nil
}
if cb.state == halfOpen {
cb.state = open
cb.openChannel <- struct{}{}
return err
}
cb.fails++
if cb.failsExcceededThreshod() {
cb.state = open
cb.openChannel <- struct{}{}
}
return err
}
此功能引入了许多新字段和方法,让我们描述此功能背后的逻辑,然后扩展我们的断路器结构。
-
mutex用于确保以安全的方式进行读取 - - 修饰周期,以防我们同时尝试修改断路器状态
-
err是返回的 *目标 *的实际错误。如果零,基本上是重置状态并继续。失败是当前关闭状态中失败请求的数量。 MaxConsecteedFails是一项策略,这意味着断路器经历了连续失败的数量后必须打开电路。
-
如果有错误,并且我们处于半开的状态,请打开电路并继续。 cb.openchannel <-struct {} {}触发等待间隔
-
如果有错误,而我们不处于半开状态,则只需递增当前状态下的失败attemps的数量。
-
检查失败尝试的数量是否为您的策略划分了阈值。如果是这样,请打开电路并触发等待间隔
让我们看一下断路器的完整结构:
type Policy int
type State string
const (
// MaxFails specifies the maximum non-consecutive fails which are allowed
// in the "Closed" state before the state is changd to "Open".
MaxFails Policy = iota
// MaxConsecutiveFails specifies the maximum consecutive fails which are allowed
// in the "Closed" state before the state is changed to "Open".
MaxConsecutiveFails
)
const (
open State = "open"
closed State = "closed"
halfOpen State = "half-open"
)
type circuitbreaker struct {
policy Policy
maxFails uint64
maxConsecutiveFails uint64
openInterval time.Duration
// fails is the number of failed requets for the current "Closed" state,
// resets after a successful transition from half-open to closed.
fails uint64
// current state of the circuit
state State
// openChannel handles the event transfer mechanism for the open state
openChannel chan struct{}
mutex sync.Mutex
}
And the helper functions we’ve seen so far:
func (cb *circuitbreaker) failsExcceededThreshod() bool {
switch cb.policy {
case MaxConsecutiveFails:
return cb.fails >= cb.maxConsecutiveFails
case MaxFails:
return cb.fails >= cb.maxFails
default:
return false
}
}
func (cb *circuitbreaker) openWatcher() {
for range cb.openChannel {
time.Sleep(cb.openInterval)
cb.mutex.Lock()
cb.state = halfOpen
cb.fails = 0
cb.mutex.Unlock()
}
}
OpenWatcher只是在OpenChannel上听,然后在接收到OpenInterval的Goroutine上,然后重置数字,然后将状态更改为半开。然后循环重复。但是,何时打开openwatcher?就在我们初始化断路器的初始化时:
type ExtraOptions struct {
// Policy determines how the fails should be incremented
Policy Policy
// MaxFails specifies the maximum non-consecutive fails which are allowed
// in the "Closed" state before the state is changd to "Open".
MaxFails *uint64
// MaxConsecutiveFails specifies the maximum consecutive fails which are allowed
// in the "Closed" state before the state is changed to "Open".
MaxConsecutiveFails *uint64
OpenInterval *time.Duration
}
func New(opts ...ExtraOptions) Circuitbreaker {
var opt ExtraOptions
if len(opts) > 0 {
opt = opts[0]
}
if opt.MaxFails == nil {
opt.MaxFails = literal.ToPointer(uint64(5))
}
if opt.MaxConsecutiveFails == nil {
opt.MaxConsecutiveFails = literal.ToPointer(uint64(5))
}
if opt.OpenInterval == nil {
opt.OpenInterval = literal.ToPointer(5 * time.Second)
}
cb := &circuitbreaker{
policy: opt.Policy,
maxFails: *opt.MaxFails,
maxConsecutiveFails: *opt.MaxConsecutiveFails,
openInterval: *opt.OpenInterval,
openChannel: make(chan struct{}),
}
go cb.openWatcher()
return cb
}
让我们以最简单的方式尝试我们写的内容:
package main
func main() {
cbOpts := circuitbreaker.ExtraOptions{
Policy: circuitbreaker.MaxFails,
MaxFails: literal.ToPointer(uint64(5)),
MaxConsecutiveFails: literal.ToPointer(uint64(5)),
OpenInterval: literal.ToPointer(160 * time.Millisecond),
}
cb := circuitbreaker.New(cbOpts)
wg := &sync.WaitGroup{}
for i := 1; i < 100; i += 1 {
wg.Add(1)
go makeServiceCall(i, cb, wg)
time.Sleep(10 * time.Millisecond)
}
log.Println("sent all the requests")
wg.Wait()
log.Println("got all the responses, exiting.")
}
func serviceMethod(id int) (string, error) {
if val := rand.Float64(); val <= 0.3 {
return "", errors.New("failed")
}
return fmt.Sprintf("[id: %d] done.", id), nil
}
func makeServiceCall(id int, cb circuitbreaker.Circuitbreaker, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := cb.Execute(func() (interface{}, error) {
return serviceMethod(id)
})
if err != nil {
log.Printf("[id %d] got err: %s", id, err.Error())
} else {
log.Printf("[id %d] success: %s", id, resp)
}
}
可以随时尝试使用不同的策略,maxfails,maxconsecteepthe和OpenInterval进行试验。
结论
到目前为止,我们看到断路器在可靠性和性能中如何发挥至关重要的作用。如果没有断路器,客户可能会浪费大量时间尝试连接到不可用的服务并每次等待整个超时期。另一方面,目标服务可能会在从停机时间或灾难中恢复过来后立即受到数十个要求的轰炸,从而导致另一次不幸的中断。我们还浏览了GO中断路器模式的简单实现。不用说,我很想知道您对此主题的看法以及您可能会有的任何其他评论或建议。通过您的评论以及您在评论中提供的其他信息,我学到了很多阅读。