Post

CHAPTER 5. Concurrency at Scale

Error Propagation

에러는 다음의 중요한 정보를 포함해야 한다.

  • 무슨 일이 발생했는지
  • 언제, 어디서 발생했는지
  • 사용자에게 제공되는 적절한 메세지
  • 사용자가 더 많은 정보를 얻을 방법

이러한 정보를 포함하지 않은 날것의 에러를 버그라 할 수 있다.

Timeouts and Cancellation

타임아웃을 도입하기 좋은 경우는 다음과 같다고 볼 수 있다.

  • 포화상태인 시스템
    • 큐에 요청이 꽉 찬 경우
    • 요청이 타임아웃이 선언된 뒤 다시 반복되지 않는 경우
    • 요청을 저장해 둘 수 있는 자원이 없는 경우
  • 더 이상 사용 못 하는 데이터
    • 차례가 너무 늦게 돌아와 요청을 처리할 때 이미 데이터는 유효하지 않은 경우
      • context.WithDeadline을 사용해 처리하기 좋다.
      • 데드라인을 미리 알지 못하면 context.WithCancel을 사용해 취소하면 좋다.
  • Deadlock 방지
    • 시스템이 커질수록 모든 흐름을 제어하기 어렵기 때문에 deadlock을 방지하고자 하는 경우
    • Deadlock을 방지한다고 타임아웃을 잘 못 설정하면 livelock이 될 수 있다. 하지만 deadlock이 발생하여 시스템을 재시작하느니 livelock을 고치는 게 낫다.

동시성 작업을 취소하기 위해선 해당 작업은 preemptable 해야 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Preemptable 하지 않은 작업
reallyLongCalculation := func(
	done <-chan interface{},
	value interface{},
) interface{} {
	intermediateResult := longCalculation(value)
	select {
	case <-done:
		return nil
	default:
	}
	return longCaluclation(intermediateResult)
}

// Preemptable 한 작업
reallyLongCalculation := func(
	done <-chan interface{},
	value interface{},
) interface{} {
	intermediateResult := longCalculation(done, value)
	return longCaluclation(done, intermediateResult)
}

또한 데이터베이스와 같은 shared state를 다루는 작업은 쉽게 취소 또는 롤백할 수 있도록 해야 한다.

1
2
3
4
5
6
7
8
9
10
11
// wrong
result := add(1, 2, 3)
writeTallyToState(result)
result = add(result, 4, 5, 6)
writeTallyToState(result)
result = add(result, 7, 8, 9)
writeTallyToState(result)

// good
result := add(1, 2, 3, 4, 5, 6, 7, 8, 9)
writeTallyToState(result)

Heartbeats

외부 관찰자에게 심장박동을 들려주는 것과 같이 동시성 작업에서 외부로 신호를 보내는 걸 heartbeats라 한다.

Heartbeats에는 두 가지 종류가 있다.

  • 일정 시간 간격으로 작동하는 heartbeats
    • 어떤 이벤트가 발생하길 기다리는 경우 유용하다.
  • 어떤 작업의 시작에서 작동하는 heartbeats
    • 테스트에서 타임아웃 대신 사용하기 유용하다.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    
      func DoWork(
      	done <-chan interface{},
      	nums ...int,
      ) (<-chan interface{}, <-chan int) {
      	heartbeat := make(chan interface{}, 1)
      	intStream := make(chan int)
      	go func() {
      		defer close(heartbeat)
      		defer close(intStream)
        
      		time.Sleep(2 * time.Second) 
        
      		for _, n := range nums {
      			select {
      			case heartbeat <- struct{}{}:
      			default:
      			}
        
      			select {
      			case <-done:
      				return
      			case intStream <- n:
      			}
      		}
      	}()
        
      	return heartbeat, intStream
      }
        
      // Bad
      func TestDoWork_GeneratesAllNumbers(t *testing.T) {
      	done := make(chan interface{})
      	defer close(done)
        
      	intSlice := []int{0, 1, 2, 3, 5}
      	_, results := DoWork(done, intSlice...)
        
      	for i, expected := range intSlice {
      		select {
      		case r := <-results:
      			if r != expected {
      				t.Errorf(
      					"index %v: expected %v, but received %v,",
      					i,
      					expected,
      					r,
      				)
      			}
      		case <-time.After(1 * time.Second): // Nondeterministic!
      			t.Fatal("test timed out")
      		}
      	}
      }
        
      // Good
      func TestDoWork_GeneratesAllNumbers(t *testing.T) {
      	done := make(chan interface{})
      	defer close(done)
        
      	intSlice := []int{0, 1, 2, 3, 5}
      	heartbeat, results := DoWork(done, intSlice...)
        
      	<-heartbeat // 작업 시작 신호
        
      	i := 0
      	for r := range results {
      		if expected := intSlice[i]; r != expected {
      			t.Errorf("index %v: expected %v, but received %v,", i, expected, r)
      		}
      		i++
      	}
      }
    

Replicated Requests

응답을 빠르게 받는 게 매우 중요한 어플리케이션에선 요청을 여러 개 만들어 가장 빨리 오는 응답을 바로 보내줄 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
doWork := func(
	done <-chan interface{},
	id int,
	wg *sync.WaitGroup,
	result chan<- int,
) {
	started := time.Now()
	defer wg.Done()

	// Simulate random load
	simulatedLoadTime := time.Duration(1+rand.Intn(5)) * time.Second
	select {
	case <-done:
	case <-time.After(simulatedLoadTime):
	}

	select {
	case <-done:
	case result <- id:
	}

	took := time.Since(started)
	// Display how long handlers would have taken
	if took < simulatedLoadTime {
		took = simulatedLoadTime
	}
	fmt.Printf("%v took %v\n", id, took)
}

done := make(chan interface{})
result := make(chan int)

var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
	go doWork(done, i, &wg, result)
}

firstReturned := <-result
close(done)
wg.Wait()

fmt.Printf("Received an answer from #%v\n", firstReturned)
// 4 took 5s
// 5 took 1s
// 2 took 3s
// 1 took 1s
// 0 took 4s
// 8 took 4s
// 7 took 5s
// 9 took 3s
// 6 took 2s
// 3 took 1s
// Received an answer from #3

만약 모든 요청이 사용하는 자원 등의 이유로 동일한 조건에서 작동하지 못하거나, 너무 비슷해서 차이가 매우 작다면 이 방법은 그다지 유용하지 않다.

Rate Limiting

많은 rate limiting은 token bucket이라는 알고리즘을 사용한다. 이 알고리즘은 bucket에서 token을 빼내서 요청을 보내는 데 사용하는 과정으로 비유될 수 있다.

  • bucket이 d만큼의 깊이를 가지고 있다면 사용자는 최대 d번 요청을 보낼 수 있다.
  • Bucket에 token이 다시 차는 rate를 r이라 하면, 이 값이 곧 rate limit이 된다.
  • Burstiness는 bucket이 가득 찼을 때 보낼 수 있는 요청의 수를 의미한다.

자주 들어오는 요청과 그렇지 않은 요청에 동시에 대응하기 위해 multi-rate limiter를 이용할 수도 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
type RateLimiter interface {
	Wait(context.Context) error
	Limit() rate.Limit
}

func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
	byLimit := func(i, j int) bool {
		return limiters[i].Limit() < limiters[j].Limit()
	}
	sort.Slice(limiters, byLimit)
	return &multiLimiter{limiters: limiters}
}

type multiLimiter struct {
	limiters []RateLimiter
}

func (l *multiLimiter) Wait(ctx context.Context) error {
	for _, l := range l.limiters {
		if err := l.Wait(ctx); err != nil {
			return err
		}
	}
	return nil
}

func (l *multiLimiter) Limit() rate.Limit {
	return l.limiters[0].Limit()
}

func Open() *APIConnection {
	secondLimit := rate.NewLimiter(Per(2, time.Second), 1)
	minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10)
	return &APIConnection{
		rateLimiter: MultiLimiter(secondLimit, minuteLimit),
	}
}

type APIConnection struct {
	rateLimiter RateLimiter
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	// Pretend we do work here
	return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	// Pretend we do work here
	return nil
}

// 22:46:10 ResolveAddress
// 22:46:10 ReadFile <- 먼저 1 sec token 다 사용
// 22:46:11 ReadFile
// 22:46:11 ReadFile
// 22:46:12 ReadFile
// 22:46:12 ReadFile
// 22:46:13 ReadFile
// 22:46:13 ReadFile
// 22:46:14 ReadFile
// 22:46:14 ReadFile
// 22:46:16 ResolveAddress <- 1 min token과 채워지는 1 sec token 사용
// 22:46:22 ResolveAddress
// 22:46:28 ReadFile
// 22:46:34 ResolveAddress
// 22:46:40 ResolveAddress
// 22:46:46 ResolveAddress
// 22:46:52 ResolveAddress
// 22:46:58 ResolveAddress
// 22:47:04 ResolveAddress
// 22:47:10 ResolveAddress
// 22:47:10 Done.

여러 시스템에 접근하는 상황에서 또한 유용하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func Open() *APIConnection {
	return &APIConnection{
		apiLimit: MultiLimiter(
			rate.NewLimiter(Per(2, time.Second), 2),
			rate.NewLimiter(Per(10, time.Minute), 10),
		),
		diskLimit: MultiLimiter(
			rate.NewLimiter(rate.Limit(1), 1),
		),
		networkLimit: MultiLimiter(
			rate.NewLimiter(Per(3, time.Second), 3),
		),
	}
}

type APIConnection struct {
	networkLimit,
	diskLimit,
	apiLimit RateLimiter
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx)
	if err != nil {
		return err
	}
	// Pretend we do work here
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx)
	if err != nil {
		return err
	}
	// Pretend we do work here
	return nil
}

Healing Unhealthy Goroutines

Heartbeats를 이용하면 제대로 작동하지 않는 고루틴을 찾을 수 있고, 이를 다시 정상 작동하게 만들 수 있다.

This post is licensed under CC BY 4.0 by the author.