2017년 11월 5일 일요일

Tensorflow에서 Multi GPU 구동시키기

Introduction

Machine Learning 분야가 크게 발전하면서 점점 더 많은 dataset이 확보되고, 동시에 Deep Neural Network의 구조도 복잡해지며 깊어지고 있습니다. 그로 인하여 천문학적으로 많아진 계산량을 감당하기 위하여 고가 장비의 필요성이 두드러지고 있습니다. 최근에는 구글 및 아마존 등의 클라우드 서비스를 통하여 시간 당 이용요금을 지불하고 서버컴퓨터를 사용하는 방식도 제시되고 있습니다.

우리 연구실에서는 예산의 여유가 생겨 4장의 GTX-1080 GPU를 보유한 워크스테이션 컴퓨터를 구매하였습니다. 하지만 Tensorflow에서는 default로 첫번째로 장착된 GPU만 사용하게 되어있기 때문에 추가적으로 나머지 GPU도 동시에 일을 시키는 코드가 필요합니다.

국내에서는 아무래도 GPU를 여러 장 구비한 연구팀이 많지 않고, 있다 하더라도 팀 내부적으로 Python 밑 Tensorflow에 능통한 연구원이 있는지 쉽게 참고할만한 Multi GPU tutorial을 찾기 힘들었습니다. 저를 포함하여 김성훈 교수님의 모두를 위한 딥러닝 강좌를 통하여 Tensorflow를 입문한 분들이 많을텐데, 해당 강의 수준을 넘어선 제대로된 중급자를 위한 강의가 없는 실정이다보니 일반인들에게는 그 이상의 무엇인가를 능동적으로 하기가 어려운 상황입니다.

Tensorflow 측에서 제공하는 CIFAR-10 multi-gpu tutorial을 이리저리 살펴보면서 필자와 같이 기본적인 코드밖에 다루지 못하는 초심자들을 위하여 multi-GPU code를 어떻게 구성해야 하는지 살펴보고자 합니다. 이 글은 제대로 정리된 guide라기보다는, Tensorflow CIFAR-10 multigpu tutorial을 제 업무에 맞게 적용시키면서 겪은 시행착오를 정리한 글입니다. 글을 보시는 분께서는 먼저 해당 튜토리얼을 본인 업무에 적용시켜보면서 막히는 부분이 생길 때 이 글을 참고하면 좋을거라 생각됩니다.

공유 변수

GPU간의 변수 공유는 상대적으로 느리다고 잘 알려져있습니다. 추가적으로 각 GPU의 memory는 CPU에 비하여 그 용량이 상당히 제한적입니다(GTX-1080 기준 8GB). 최근 대부분의 optimize 알고리즘은 Stochastic Gradient Descent(SGD)를 바탕으로 하여 momentum 밑 변수마다 step size를 다르게 적용시키는 RMSProp와 Adaptive Moment Estimation(Adam) 등이 많이 쓰이고 있습니다. 이 때 batch size는 GPU 메모리가 허용하는 선에서 최대한 크게 잡는게 주로 쓰이는 방법이지만, optimal한 batch size는 empirical하게 잡을 수 밖에 없는 실정입니다.

결론적으로 우리는 CPU에 모든 변수를 초기화시키고 각 step마다 모든 GPU에 같은 parameter들을 복사시켜 줍니다. 여기서 중요한점은 입문시에 배운 tf.Variable 명령어만을 사용한다면 CPU에 변수를 초기화시킨다 하더라도 GPU에 변수를 공유시키는데 어려움이 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def _variable_on_cpu(name, shape, initializer=tf.truncated_normal_initializer(stddev=0.01)):
    """Helper to create a Variable stored on CPU memory.

    Args:
      name : name of the variable
      shape : list of ints
      initializer: initializer for Variable

    Returns:
      Variable Tensor"""

    with tf.device('/cpu:0'):
        if shape is None:
            var = tf.get_variable(name, initializer=initializer)
        else:
            var = tf.get_variable(name, shape, initializer=initializer)

    return var

따라서 우리는 _variable_on_cpu의 lines 12-16 와 같이 tf.get_variable() 명령어를 통하여 CPU에 변수를 초기화 시켜준 뒤,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def loss_and_grad(self):
    # Calculate the gradients and losses for each model tower.
    loss_tmp = []
    grad_tmp = []
    mb_per_gpu = int(self.batch_size / self.num_gpu)
    with tf.variable_scope(tf.get_variable_scope()):
        for d in range(self.num_gpu):
            with tf.device('/gpu:' + str(d)):
                # Calculate the loss for one tower of the model. This function
                # constructs the entire model but shares the variables across all towers.
                mb_start = mb_per_gpu * d
                mb_end = mb_per_gpu * (d + 1)
                label_mb = self.Y[mb_start:mb_end]
                image_mb = self.inference(input_=self.X[mb_start:mb_end])
                loss = tf.nn.l2_loss(label_mb - image_mb)) / mb_per_gpu

                # Reuse variables for the next tower.
                tf.get_variable_scope().reuse_variables()

                # Calculate the gradients for the batch of data on this tower.
                grad=self.optimizer.compute_gradients(loss)

                # Keep track of the gradients and loss across all towers.
                loss_tmp.append(loss)
                grad_tmp.append(grad)
    return loss_tmp, grad_tmp

loss_and_grad의 7-18 lines 와 같이각 GPU에 같은 변수를 공유시킵니다. 하지만 위의 코드만으로는 변수가 어떠한 과정을 통하여 공유되었는지 알 수 없습다.  14 line에 있는 inference를 살펴보면,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def inference(self, input_):
    h_=self.hidden_num
    input_=tf.reshape(input_, [-1, self.width, self.height, 1])

    conv1, pool1=conv_conv_pool(input_, [h_, h_], self.activation,
                                    self.w_bn, self.is_train, name = "1")

    conv2, pool2=conv_conv_pool(pool1, [2 * h_, 2 * h_], self.activation,
                                    self.w_bn, self.is_train,  name = "2")

    conv3=conv_conv_pool(pool2, [4 * h_, 2 * h_], self.activation,
                            self.w_bn, self.is_train, name="3", pool=False)

    up4 = tf.concat([self.upsampling(conv3), conv2], 3)
    conv4 = conv_conv_pool(up4, [4 * h_, 2 * h_], self.activation,
                            self.w_bn, self.is_train, name="4", pool=False,)

    up5 = tf.concat([self.upsampling(conv4), conv1], 3)
    conv5 = conv_conv_pool(up5, [2 * h_, h_], self.activation,
                            self.w_bn, self.is_train, name="5", pool=False)

    conv6 = conv2d(conv5, 1, k_h=1, k_w=1, name='layer_fc')

    return tf.reshape(conv6, [-1, self.width, self.height])

conv_conv_pool과 conv2d 라는 operator들을 통하여 연산이 진행되고, 각각은 다음과 같이 정의 해 두었습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def conv_conv_pool(input_, n_filters, activation, w_bn, is_train, name, pool=True):
    net = input_

    with tf.variable_scope("layer" + name):
        for i, F in enumerate(n_filters):
            net = conv2d(net, F, name="conv" + str(i + 1), activation=activation,
                         w_bn=w_bn, is_train=is_train)

        if not pool:
            return net
        else:
            return net, maxpool(net)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def conv2d(input_, output_channels, k_h=3, k_w=3, d_h=1, d_w=1, stddev=0.01, name="conv_layer",
           activation=None, initializer=tf.truncated_normal_initializer,
           w_bias=True, w_bn=False, is_train=True):
    with tf.variable_scope(name):
        shape = [k_h, k_w, input_.get_shape()[-1], output_channels]
        kernel = _variable_on_cpu(name="kernel", shape=shape,
                                  initializer=initializer(stddev=stddev))
        bias = _variable_on_cpu(name="bias", shape=[output_channels],
                                initializer=tf.zeros_initializer)
        if w_bias:
            conv = tf.nn.conv2d(input_, kernel, strides=[
                                1, d_h, d_w, 1], padding="SAME") + bias
        else:
            conv = tf.nn.conv2d(input_, kernel, strides=[
                                1, d_h, d_w, 1], padding="SAME")

        if w_bn:
            conv = tf.layers.batch_normalization(
                conv, training=is_train, name="BN")

    if activation is None:
        return conv
    else:
        return activation(conv)

conv2d의 6-8 lines 을 통하여 각각의 변수들은 '/cpu:0'에 저장되어 있지만, loss_and_grad line 8 'with tf.device('/gpu:'+str(d)):'를 통하여 각각의 '/gpu:d'에 복사된 후 해당 GPU에서 모든 연산을 처리하도록 명령되었음을 확인할 수 있습니다.

tf.variable_scope와 tf.get_variable()에 관한 자세한 내용은 여기서 확인하실 수 있습니다.

Averaged Gradients

기존방법

이 글을 보고 계시다면 아마도 당신의 tensorflow에서의 optimization 코드는 다음과 유사할 것이라 생각됩니다.

train_op = tf.train.RMSPropOptimizer(0.001).minimize(cost)

 해당 코드를 분석해보면, "RMSProp" algorithm에서 learning rate=0.001를 사용하여 "cost"를 minimize하는 그래프 "train_op"를 설정하고 있습니다. 한가지 이상한점은, 우리는 gradient descent method를 공부할 때 gradient 값을 계산한 후 learning rate를 곱하여 기존 parameter에서 뺀 값으로 업데이트 시키도록 배웠습니다. 하지만 위의 방법은 gradient 계산과 parameter 업데이트가 동시에 진행되도록 설정되어 있습니다.

병렬방법

Introduction에서 언급했듯이, 기존에는 batch size를 GPU 메모리가 감당하는 선에서 최대한으로 늘려서 사용하였습니다. 그렇다면 GPU를 여러개 사용하면 어떤 일이 가능할까요? 첫째로는 최대로 사용할 수 있는 batch size를 늘릴 수 있거나, 둘째로는 같은 batch size를 그만큼 빠르게 연산할 수 있습니다.

예를 들어, 기존에는 1장의 GPU만 사용하고 $256^2$ 크기의 이미지를 입력 데이터로 사용한다고 생각해보겠습니다. 이러한 경우에 9층의 convolution layer를 거거치는 3 scale U-Net 구조를 사용할 경우 한번에 최대 약 8개의 이미지를 연산할 수 있습니다.

만약 GPU 4장을 사용한다면, 첫번째 접근방법은 각 GPU에서 8개씩의 이미지를 연산하여 총 32개의 이미지를 한 mini-batch에서 연산하는 방법이 있습니다. 두번째 방법으로는 각 GPU에 2개씩의 이미지를 연산하여 전체적으로 8개의 이미지를 한 mini-batch에서 연산할 수 있습니다. 각각의 GPU가 병렬적으로 계산하기 때문에 같은 양의 이미지를 근사적으로 4배에 가깝에 연산할 수 있습니다.

두 방법 모두 계산상에서 필요점은, 각 GPU에서 연산한 Gradient들을 서로 연관시켜줘야 합니다. 예를들어 '/gpu:i'에서의 Gradient가 $w_i$라 한다면, $\frac{(w_1+w_2+w_3+w_4)}{4}$ 를 gradient로 고려해서 parameter를 업데이트 시켜줘야 합니다.

loss_and_grad의 21 line 을 통하여 각 GPU에서의 gradient를 계산한 후 "grad_tmp"에 확장시켜서 최종적으로 "grad_tmp=[grad_0, grad_1, grad_2, grad_3]" 를 얻을 수 있습니다.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def average_gradients(tower_grads):
    """Calculate the average gradient for each shared variable across all towers.

    Note that this function provides a synchronization point across all towers.

    Args:
      tower_grads: List of lists of (gradient, variable) tuples. The outer list
        is over individual gradients. The inner list is over the gradient
        calculation for each tower.
    Returns:
      List of pairs of (gradient, variable) where the gradient has been averaged
      across all towers."""
    average_grads = []
    for grad_and_vars in zip(*tower_grads):
      # Note that each grad_and_vars looks like the following:
      # ( (grad0_gpu0, var0_gpu0), ..., (grad0_gpuN, var0_gpuN) )
        grads = []
        for g, _ in grad_and_vars:
            # Add 0 dimension to the gradients to represent the tower.
            expanded_g = tf.expand_dims(g, 0)

            # Append on a 'tower' dimension which we will average over below.
            grads.append(expanded_g)

        # Average over the 'tower' dimension.
        grad = tf.concat(axis=0, values=grads)
        grad = tf.reduce_mean(grad, 0)

        # Keep in mind that the Variables are redundant because they are shared across tower.
        # So .. we will just return the first tower's pointer to the Variable.
        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        average_grads.append(grad_and_var)
    return average_grads


grad = average_gradients(grad_tmp)
train_op = optimizer.apply_gradients(grad)

CIFAR-10 Tutorial에서 제공하는 average_gradients 를 통하여 평균값인 "grad"를 계산하고 위와같이 이 값을 적용시킬 수 있습니다.

Conclustion

간단하게 튜토리얼에서 중요시 여겨지는 공유변수와 Gradient를 병렬적으로 계산 후 평균을 내는 방법을 살펴보았습니다. 사실 병렬계산을 위해서는 데이터를 집어넣어주는 방법인 파이프라인을 구성하는것도 굉장히 중요하지만 이 글에서는 다루지 않고, 기회가 되면 다음에 다뤄보겠습니다.