텐서플로를 자세히 들여다보고 저수준 파이썬 API를 살펴본다. 자신만의 손실 함수, 지표, 층, 모델, 초기화, 규제, 가중치 규제 등을 만들어 세부적으로 제어하고 싶을 때 필요하다. 그레이디언트에 특별한 변환이나 규제를 적용하거나 네트워크의 부분마다 다른 옵티마이저를 사용하는 등 훈련 반복문 자체를 완전히 제어하고 싶을 수도 있다.
12.1 텐서플로 훑어보기
텐서플로는 강력한 수치 계산용 라이브러리, 특히 대규모 머신러닝에 잘 맞도록 튜닝되어 있다.
텐서플로의 제공 내용
- 핵심 구조는 넘파이와 비슷하지만 GPU를 지원
- 분산 컴퓨팅 지원
- 속도를 높이고 메모리 사용량을 줄이기 위해 계산을 최적화, 파이썬 함수에서 계산 그래프를 추출한 다음 최적화
가장 저수준의 텐서플로 연산은 C++ 코드로 구현되어 있다. 많은 연산은 커널 kernel 이라 부르는 여러 구현을 가진다. 각 커널은 CPU, GPU, TPU 와 같은 특정 장치에 맞추어 만들어진다.
GPU는 계산을 작은 단위로 나누어 여러 GPU 스레드에서 병렬로 실행하므로 속도 향상
TPU는 딥러닝 연산을 위해 특별하게 설계된 ASIC 칩
윈도우, 리눅스, 맥OS 뿐만 아니라 IOS와 안드로이드, C++, 자바, Go, 스위프트 API를 사용할 수 있다. 자바 스크립트 구현도 존재
12.2 넘파이처럼 텐서플로 사용하기
텐서플로 API는 텐서 tensor를 순환시킨다. 텐서는 한 연산에서 다른 연산으로 흐른다. 그래서 텐서플로라고 한다. 텐서는 넘파이 ndarray와 매우 비슷하다. 즉 텐서는 일반적으로 다차원 배열이다. 하지만 스칼라 값도 가질 수 있다.
사용자 정의 손실 함수, 사용자 정의 지표, 사용자 정의 층 등을 만들 때 텐서가 중요하다.
12.2.1 텐서와 연산
tf.constant() 함수로 텐서를 만들 수 있다. 다음은 두 개의 행과 세 개의 열을 가진 실수 행렬을 나타내는 텐서이다.
tf.constant([[1., 2., 3.], [4., 5., 6.]]) # 행렬
>>>
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
tf.constant(42) # 스칼라
>>>
<tf.Tensor: shape=(), dtype=int32, numpy=42>
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t
>>>
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
t.shape
>>>
TensorShape([2, 3])
t.dtype
>>>
tf.float32
tf.Tensor는 크기와 데이터 타입을 가진다. 인덱스 참조도 넘파이와 매우 비슷하게 작동,
t[:, 1:]
>>>
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
[5., 6.]], dtype=float32)>
t[..., 1, tf.newaxis]
>>>
<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
[5.]], dtype=float32)>
가장 중요한 것은 모든 종류의텐서 연산이 가능하다는 것이다.
t + 10
>>>
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
[14., 15., 16.]], dtype=float32)>
tf.square(t)
>>>
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)>
t @ tf.transpose(t)
>>>
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
[32., 77.]], dtype=float32)>
@ 연산은 행렬 곱셈의 연산, tf.matmul() 함수와 동일하다.
케라스의 저수준 API
케라스 API는 keras.backend 에 자체적인 저수준 API를 가지고 있다. tf.keras에서 이런 함수들은 보통 상응하는 텐서플로 연산을 호출하는 것이 전부이다. 다른 케라스 구현에 적용할 수 있는 코드를 작성하고 싶다면 이런 케라스 함수를 사용해야 한다. 하지만 케라스 저수준 함수는 텐서플로에서 제공하는 함수의 일부만 지원한다.
다음은 keras.backend를 사용하는 간단한 예, 보통 별칭 K를 사용한다.
from tensorflow import keras
K = keras.backend
K.square(K.transpose(t)) + 10
>>>
<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
[14., 35.],
[19., 46.]], dtype=float32)>
12.2.2 텐서와 넘파이
텐서는 넘파이와 함께 사용하기 편리리하다. 넘파이 배열로 텐서를 만들 수 있고, 그 반대도 가능하다. 넘파이 배열에 텐서플로 연산을 할 수 있고 텐서에 넘파이 연산을 적용할 수도 있다.
a = np.array([2., 4., 5.])
tf.constant(a)
>>>
<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>
t.numpy()
>>>
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
np.array(t)
>>>
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
tf.square(a)
>>>
<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>
np.square(t)
>>>
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)
넘파이 배열로 텐서를 만들 때 dtype=tf.float32로 지정해야 한다. 텐서플로는 32비트 정밀도를 사용하기 때문, 넘파이는 기본으로 64비트 정밀도를 사용한다.
12.2.3 타입 변환
타입 변환은 성능을 크게 감소시킬 수 있다. 이를 사용자가 인지하기 위해 텐서플로는 어떤 타입 변환도 자동으로 수행하지 않는다. 호환되지 않는 타입의 텐서로 연산을 실행하면 예외가 발생한다.
#에러 발생
tf.constant(2.) + tf.constant(40)
t2 = tf.constant(40., dype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)
>>>
<tf.Tensor: shape=(), dtype=float32, numpy=42.0>
타입 변환이 필요할 때 tf.cast() 함수를 사용할 수 있다.
12.2.4 변수
지금까지 본 tf.Tensor는 변경이 불가능한 객체, 텐서의 내용을 바꿀 수 없다. 때문에 일반적인 텐서로는 역전파로 변경되어야 하는 신경망의 가중치를 구현할 수 없다. 또한 시간에 따라 변경되어야 하는 다른 파라미터도 있다. 이것이 tf.Variable이 필요한 이유
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v
>>>
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
tf.Variable 은 tf.Tensor 와 비슷하게 동작한다. 동일한 연산을 수행할 수 있고 넘파이와도 잘 호환된다.
assign() 메서드를 사용하여 변숫값을 바꿀 수 있다.
v.assign(2 * v)
>>>
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 4., 6.],
[ 8., 10., 12.]], dtype=float32)>
v[0, 1].assign(42)
>>>
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42., 6.],
[ 8., 10., 12.]], dtype=float32)>
v[:, 2].assign([0., 1.])
>>>
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42., 0.],
[ 8., 10., 1.]], dtype=float32)>
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])
>>>
<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100., 42., 0.],
[ 8., 10., 200.]], dtype=float32)>
12.2.5 다른 데이터 구조
- 희소 텐서 sparse tensor : 대부분 0으로 채워진 텐서를 효율적으로 나타낸다. tf.SparseTensor
- 텐서 배열 tensor array : 텐서의 리스트, 기본적으로 고정된 길이를 가지지만 동적으로 바꿀 수 있다. 리스트에 포함된 모든 텐서는 크기와 데이터 타입이 동일해야 한다. tf.TensorArray
- 래그드 텐서 ragged tensor : 리스트의 리스트를 나타낸다. 텐서에 포함된 값은 동일한 데이터 타입을 가져야 하지만 리스트의 길이는 다를 수 있다. tf.ragged
- 문자열 텐서 string tensor : tf.string 타입의 텐서이다. 바이트 문자열을 나타낸다.
- 집합 set : 집합은 일반적인 텐서로 나타낸다. 예를 들면 tf.constant([[1,2],[3,4]])는 두 개의 집합을 나타낸다. 일반적으로 각 집합은 텐서의 마지막 축에 있는 벡터에 의해 표현된다.
- 큐 queue : 큐는 단계별로 텐서를 저장한다. 텐서플로는 여러 종류의 큐를 제공한다. tf.queue
12.3 사용자 정의 모델과 훈련 알고리즘
12.3.1 사용자 정의 손실 함수
평균 제곱 오차는 큰 오차에 너무 과한 벌칙을 가하기 때문에 정확하지 않은 모델이 만들어진다.
평균 절댓값 오차는 이상치에 관대하여 훈련이 수렴되기까지 시간이 걸린다. 모델이 정밀하게 훈련되지 않음
이럴 경우 후버 손실을 사용, keras에서 지원하지는 않음, 실제로 구현, 레이블과 예측을 매개변수로 받는 함수를 만들고 텐서플로 연산을 사용해 샘플의 손실을 계산하면 된다.
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < 1
squared_loss = tf.square(error) / 2
linear_loss = tf.abs(error) - 0.5
return tf.where(is_small_error, squared_loss, linear_loss)
성능을 위해서는 이 예시처럼 벡터화하여 구현해야 한다. 또한 텐서플로 그래프의 장점을 활용하기 위해 텐서플로 연산만 사용한다.
전체 손실의 평균이 아니라 샘플마다 하나의 손실을 담은 텐서를 반환하는 것이 좋다. 이렇게 해야 필요할 때 케라스가 클래스 가중치나 샘플 가중치를 적용할 수 있다.
이제 이 손실을 사용해 케라스 모델의 컴파일 메서드를 호출하고 모델을 훈련할 수 있다.
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train, y_train)
훈련하는 동안 배치마다 케라스는 huber_fn() 함수를 호출하여 손실을 계산하고 이를 사용해 경사 하강법을 수행한다.
하지만 모델을 저장할 때 사용자 정의 손실에 어떤 문제가 생길까?
12.3.2 사용자 정의 요소를 가진 모델을 저장하고 로드하기
케라스가 함수 이름을 저장하므로 사용자 정의 손실 함수를 사용하는 모델은 아무 이상 없이 저장된다. 모델을 로드 할 때는 함수 이름과 실제 함수를 매핑한 딕셔너리를 전달해야 한다. 좀 더 일반적으로 사용자 정의 객체를 포함한 모델을 로드할 때는 그 이름과 객체를 매핑해야 한다.
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
custom_objects={"huber_fn": huber_fn})
앞서 구현한 함수는 -1과 1 사이의 오차는 작은 것으로 간주한다. 다른 기준이 필요할 때는 어떻게 해야 할까?
한 가지 방법은 매개변수를 받을 수 있는 함수를 만드는 것이다.
def create_huber(threshold=1.0):
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < threshold
squared_loss = tf.square(error) / 2
linear_loss = threshold * tf.abs(error) - threshold**2 / 2
return tf.where(is_small_error, squared_loss, linear_loss)
return huber_fn
모델을 저장할 때 이 threshold 값은 저장되지 않는다. 따라서 모델을 로드할 대 threshold 값을 지정해야 한다.
model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
custom_objects={"huber_fn": create_huber(2.0)})
새로 저장한 함수 이름이 아니라 저장한 케라스 모델에서 사용했던 함수 이름인 huber_fn을 사용한다.
이 문제는 keras.losses.Loss 클래스를 상혹하고 get_config() 메서드를 구현하여 해결할 수 있다.
class HuberLoss(keras.losses.Loss):
def __init__(self, threshold=1.0, **kwargs):
self.threshold = threshold
super().__init__(**kwargs)
def call(self, y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < self.threshold
squared_loss = tf.square(error) / 2
linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
return tf.where(is_small_error, squared_loss, linear_loss)
def get_config(self):
base_config = super().get_config()
return {**base_config, "threshold": self.threshold}
- 생성자는 기본적인 하이퍼파라미터를 **kwargs로 받은 매개변수 값을 부모 클래스의 생성자에게 전달한다. 손실 함수의 name과 개별 샘플의 손실을 모으기 위해 사용할 reduction 알고리즘, 기본값은 sum_ove_batch_size로 샘플 손실에 가중치를 곱하여 더하고 배치 크기로 나눈다. 샘플 가중치가 없다면 1.0으로 간주한다. 다른 값으로는 sum과 none이 있다.
- call() 메서드는 레이블과 예측을 받고 모든 샘플의 손실을 계산하여 반환한다.
- get_config() 메서드는 하이퍼파라미터 이름과 같이 매핑된 딕셔너리를 반환한다. 먼저 부모 클래스의 get_config() 메서드를 호출하고 그 다음 반환된 딕셔너리에 새로운 하이퍼파라미터를 추가한다.
그다음 모델을 컴파일할 때 이 클래스의 인스턴스를 사용할 수 있다.
model.compile(loss=HuberLoss(2.), optimizer="nadam")
이 모델을 저장할 때 임곗값도 함께 저장된다. 모델을 로드할 때 클래스 이름과 클래스 자체를 매핑해줘야 한다.
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",
custom_objects={"HuberLoss":HuberLoss})
모델을 저장할 때 케라스는 손실 객체의 get_config() 메서드를 호출하여 반환된 설정을 HDF5 파일에 JSON 형태로 저장한다. 모델을 로드하면 HuberLoss 클래스의 from_config() 클래스 메서드를 호출한다. 이 메서드는 기본 손실 클래스에 구현되어 있고 생성자에게 **config 매개변수를 전달해 클래스의 인스턴스를 만든다.
12.3.3 활성화 함수, 초기화, 규제, 제한을 커스터마이징하기
손실, 규제, 제한, 초기화, 지표, 활성화 함수, 층, 모델과 같은 대부분의 케라스 기능은 유사한 방법으로 커스터마이징할 수 있다. 대부분의 경우 입력과 출력을 가진 간단한 함수를 작성하면 된다.
다음은 사용자 정의 활성화 함수, 사용자 정의 글로럿 초기화, 사용자 정의 l1 규제, 양수인 가중치만 남기는 사용자 정의 제한에 대한 예이다.
def my_softplus(z):
return tf.math.log(tf.exp(z) + 1.0)
def my_glorot_initializer(shape, dtype=tf.float32):
stddev = tf.sqrt(2. / (shape[0] + shape[1])):
return tf.random.normal(shape, stddev=stddev, dtype=dtype)
def my_l1_regularizer(weights):
return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights):
return tf.where(weights < 0. tf.zeros_like(weights), weights)
여기서 볼 수 있듯이 매개변수는 사용자 정의하려는 함수의 종류에 따라 다르다. 만들어진 사용자 정의 함수는 보통의 함수와 동일하게 사용할 수 있다.
layers = keras.layers.Dense(30, activation=my_softplus,
kernel_initializer=my_glorot_initializer,
kernel_regularizer=my_l1_regularizer,
kernel_constraint=my_positive_weight)
이 활성화 함수는 Dense 층의 출력에 적용되고 다음 층에 그 결과가 전달된다. 층의 가중치는 초기화 함수에서 반환된 값으로 초기화된다. 훈련 스텝 마다 가중치가 규제 함수에 전달되어 규제 손실을 계산하고 전체 손실에 추가되어 훈련을 위한 최종 손실을 만든다. 마지막으로 제한 함수가 훈련 스텝마다 호출되어 층의 가중치를 제한한 가중치 값으로 바뀐다.
함수가 모델과 함께 저장해야 할 하이퍼파라미터를 가지고 있다면 적절한 클래스를 상속한다. 다음은ㅇ factor 하이퍼파라미터를 저장하는 l1 규제를 위한 간단한 클래스의 예
class MyL1Regularizer(keras.regularizers.Regularizer):
def __init__(self, factor):
self.factor = factor
def __call__(self, weights):
return tf.reduce_sum(tf.abs(self.factor * weights))
def get_config(self):
return {"factor": self.factor}
손실, 층, 모델의 경우 call() 메서드를 구현해야 한다. 규제, 초기화, 제한의 경우 __call__() 메서드를 구현해야 한다.
12.3.4 사용자 정의 지표
손실과 지표는 개념적으로 다른 것은 아니다. 손실은 모델을 훈련하기 위해 경사 하강법에서 사용하므로 미분가능해야 하고 그레이디언트가 모든 곳에서 0이 아니어야 한다.
반대로 지표는 모델을 평가할 때 사용한다. 미분 가능하지 않거나 그레이디언트가 0이어도 괜찮다.
대부분의 경우 사용자 지표 함수를 만드는 것은 사용자 손실 함수를 만드는 것과 동일하다.
model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[create_huber(2.0)])
훈련하는 동안 각 배치에 대해 케라스는 지표를 계산하고 에포크가 시작할 때부터 평균을 기록한다.
keras.metrics.Precision 클래스를 통해 두 배치에 대한 모델의 진짜 정밀도 값을 얻을 수 있다.
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])
>>>
<tf.Tensor: shape=(), dtype=float32, numpy=0.8>
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])
>>>
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>
이 예에서 Precision 클래스 객체를 만들고 이를 함수처럼 사용하여 첫 번째 배치와 두 번째 배치의 레이블과 예측을 각각 첫 번째 매개변수와 두 번째 매개변수로 전달햇다.
배치마다 점진적으로 업데이트되기 때문에 이를 스트리밍 지표 streaming metrics 라고 부른다.
이런 스트리밍 지표를 만들고 싶다면 keras.metrics.Metrics 클래스를 상속한다.
class HuberMetric(keras.metrics.Metric):
def __init__(self, threshold=1.0, **kwargs):
super().__init__(**kwargs) # 기본 매개변수 처리 (예를 들면, dtype)
self.threshold = threshold
self.huber_fn = create_huber(threshold)
self.total = self.add_weight("total", initializer="zeros")
self.count = self.add_weight("count", initializer="zeros")
def update_state(self, y_true, y_pred, sample_weight=None):
metric = self.huber_fn(y_true, y_pred)
self.total.assign_add(tf.reduce_sum(metric))
self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
def result(self):
return self.total / self.count
def get_config(self):
base_config = super().get_config()
return {**base_config, "threshold": self.threshold}
- 생서자는 add_weight() 메서드를 사용하여 여러 배치에 걸쳐 지표의 상태를 기록하기 위한 변수를 만든다. 이 예에서는 후버 손실의 합과 지금까지 처리한 샘플 수를 기록한다.
- update_state() 메서드는 이 클래스를 함수처럼 사용할 때 호출된다.
- result() 메서드는 최종 결과를 계산하고 반환한다.
- get_config() 메서드를 구현하여 threshold 변수를 모델과 함께 저장한다.
지표를 간단한 함수로 정의하면 케라스가 배치마다 자동으로 이 함수를 호출하고 에포크 동안 평균을 기록한다.
12.3.5 사용자 정의 층
텐서플로에 없는 특이한 층을 가진 네트워크를 만들어야 할 때가 있다. 이런 경우 사용자 정의 층을 만든다. 동일한 층 블럭이 여러 번 반복되는 네트워크를 만들 경우 각각의 층 블럭을 하나의 층으로 다루는 것이 편리하다.
예를 들어 모델이 층 A, B, C, A, B, C, A, B, C 순서대로 구성되어 있다면 A, B, C를 사용자 정의 층 D로 정의하고 D,D,D 로 구성된 모델을 만들 수 있다.
먼저 keras.layers.Flatten 나 keras.layers.ReLU 와 같은 층은 가중치가 없다. 가중치가 필요 없는 사용자 정의 층을 만들기 위한 간단한 방법은 파이썬 함수를 만든 후 keras.layers.Lambda 층으로 감싸는 것이다.
예를 들면 입력에 지수 함수를 적용하는 층이다.
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))
이 사용자 정의 층을 시퀀셜 API나 함수형 API, 서브클래싱 API에서 보통의 층과 동일하게 사용할 수 있다. 또는 활성화 함수로 사용할 수 도 있다.
지수 함수는 이따금 회귀 모델에서 예측값의 스케일이 매우 다를 때 출력 층에 사용된다.
상태가 있는 층, 즉 가중치가 가진 층을 만들려면 keras.layers.Layer를 상속해야 한다. 예를 들면 다음 클래스는 Dense 층의 간소화 버전을 구현한 것이다.
class MyDense(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, batch_input_shape):
self.kernel = self.add_weight(
name="kernel", shape=[batch_input_shape[-1], self.units],
initializer="glorot_normal")
self.bias = self.add_weight(
name="bias", shape=[self.units], initializer="zeros")
super().build(batch_input_shape) # must be at the end
def call(self, X):
return self.activation(X @ self.kernel + self.bias)
def compute_output_shape(self, batch_input_shape):
return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
def get_config(self):
base_config = super().get_config()
return {**base_config, "units": self.units,
"activation": keras.activations.serialize(self.activation)}
- 생성자는 모든 하이퍼파라미터를 매개변수로 받는다. (units 와 activatoin). **kwargs 매개변수를 추가하는 것도 중요하다. 부모 생성자를 호출하면서 kwargs 를 전달한다. 이를 통해 input_shape, trainable, name 와 같은 기본 매개변수들을 처리할 수 있다. 그다음 하이퍼파라미터를 속성으로 저장하고 activation 매개변수를 keras.activations.get() 함수를 사용해 적절한 활성화 함수로 바꾼다.
- build() 메서드의 역할은 가중치마다 add_weight() 메서드를 호출하여 층의 변수를 만드는 것이다. build() 메서드는 층이 처음 사용될 때 호출된다. build() 메서드의 입력으로 크기를 전달한다. 가중치를 만들 때 크기가 꼭 필요한 경우가 종종 있다. 예를 들어 연결 가중치를 만들려면 이전 층의 뉴런 개수를 알아야 한다. 이 크기는 입력의 마지막 차원 크기에 해당한다. build() 메서드 끝에서 반드시 부모의 build() 메서드를 호출해야 한다. 이를 통해 층이 만들어졌다는 것을 케라스가 인식한다.
- call() 메서드는 이 층에서 필요한 연산을 수행한다. 이 경우 입력 X와 층의 커널을 행렬 곱셈하고 편향을 더한다. 그다음 결과에 활성화 함수를 적용한다. 이 값이 층의 출력이다.
- compute_output_shape() 메서드는 이 층의 출력 크기를 반환한다. 이 예에서는 마지막 차원을 제외하고 입력과 크기가 같다. 마지막 차원은 이 층의 뉴런 개수이다. tf.keras 에서 크기는 tf.TensorShape 클래스의 객체이다. 이 객체는 as_list() 메서드를 사용해 파이썬 리스트로 바꿀 수 있다.
- get_config() 메서드는 앞서 보았던 것과 같다. keras.activatoins.serialize() 를 사용하여 활성화 함수의 전체 설정을 저장한다.
훈련과 테스트에서 다르게 동작하는 층이 필요하다면(Dropout, BatchNormalization) call() 메서드에 training 매개변수를 추가하여 훈련인지 테스트인지를 결정해야 한다.
예를 들어 훈련하는 동안 규제 목적으로 가우스 잡음을 추가하고 테스트 시에는 아무것도 하지 않는 층을 만들어 본다. 케라스에서의 keras.layers.GaussianNoise
class AddGaussianNoise(keras.layers.Layer):
def __init__(self, stddev, **kwargs):
super().__init__(**kwargs)
self.stddev = stddev
def call(self, X, training=None):
if training:
noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
return X + noise
else:
return X
def compute_output_shape(self, batch_input_shape):
return batch_input_shape
12.3.6 사용자 정의 모델
사용자 정의 모델 클래스를 만들기 위해 keras.Model 클래스를 상속하여 생성자에서 층과 변수를 만들고 모델이 해야 할 작업을 call() 메서드에 구현한다. 다음과 같은 모델을 만들어야 한다고 가정
입력이 첫 번째 완전 연결 층을 통과하여 두 개의 완전 연결 층과 스킵 연결로 구성된 잔차 블록 residual block 으로 전달된다. 그다음 동일한 잔차 블록을 세 번 더 통과시킨다. 그다음 두 번째 잔차 블록을 지나 마지막 출력이 완전 연결된 출력 층에 전달된다. 이 모델을 구현하려면 동일한 블록을 여러 개 만들어야 하므로 먼저ResidualBlock 층을 만든다.
class ResidualBlock(keras.layers.Layer):
def __init__(self, n_layers, n_neurons, **kwargs):
super().__init__(**kwargs)
self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
kernel_initializer="he_normal")
for _ in range(n_layers)]
def call(self, inputs):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
return inputs + Z
이 층은 다른 층을 포함하고 있기 때문에 조금 특별하다 케라스가 알아서 추적해야 할 객체가 담긴 hidden 속성을 감지하고 필요한 변수를 자동으로 이 층의 변수 리스트에 추가한다. 이 클래스의 나머지는 그 자체로 이해할 수 있다. 그다음 서브 클래싱 API를 사용해 이 모델을 정의한다.
class ResidualRegressor(keras.models.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden1 = keras.layers.Dense(30, activation="elu",
kernel_initializer="he_normal")
self.block1 = ResidualBlock(2, 30)
self.block2 = ResidualBlock(2, 30)
self.out = keras.layers.Dense(output_dim)
def call(self, inputs):
Z = self.hidden1(inputs)
for _ in range(1 + 3):
Z = self.block1(Z)
Z = self.block2(Z)
return self.out(Z)
생성자에서 층을 만들고 call() 메서드에서 이를 사용한다. 이 모델을 다른 모델처럼 사용할 수 있다.
12.3.7 모델 구성 요소에 기반한 손실과 지표
앞서 정의한 사용자 손실과 지표는 모두 레이블과 예측을 기반으로 한다. 은닉층의 가중치나 활성화 함수 등과 같이 모델의 구성 요소에 기반한 손실을 정의해야할 때가 있다. 이런 손실은 규제나 모델의 내부 상황을 모니터링할 때 유용하다.
모델 구성 요소에 기반한 손실을 정의하고 계산하여 add_loss() 메서드에 그 결과를 전달한다.
예를 들어 다섯 개의 은닉층과 출력층으로 구성된 회귀용 MLP 모델을 만들어 본다. 이 모델은 맨 위의 은닉층에 보조 출력을 가진다. 이 보조 출력에 연결된 손실을 재구성 손실 reconstruction loss 라고 부른다. 즉 재구성과 입력 사이의 평균 제곱 오차이다. 재구성 손실을 주 손실에 더하여 회귀 작업에 직접적으로 도움이 되지 않은 정보라도 은닉층을 통과하면서 가능한 많은 정보를 유지하도록 유도한다.
사실 이런 손실이 일반화 성능을 향상시킨다.
다음은 사용자 정의 재구성 손실을 가지는 모델을 만드는 코드이다.
class ReconstructingRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal") for _ in range(5)]
self.out = keras.layers.Dense(output_dim)
self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error")
def build(self, batch_input_shape):
n_inputs = batch_input_shape[-1]
self.reconstruct = keras.layers.Dense(n_inputs)
#super().build(batch_input_shape)
def call(self, inputs, training=None):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
reconstruction = self.reconstruct(Z)
self.recon_loss = 0.05 * tf.reduce_mean(tf.square(reconstruction - inputs))
if training:
result = self.reconstruction_mean(recon_loss)
self.add_metric(result)
return self.out(Z)
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x)
loss = self.compiled_loss(y, y_pred, regularization_losses=[self.recon_loss])
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return {m.name: m.result() for m in self.metrics}
- 생성자가 다섯 개의 은닉층과 하나의 출력층으로 구성된 심층 신경망을 만든다.
- build() 메서드에서 완전 연결 층을 하나 더 추가하여 모델의 입력을 재구성하는데 사용한다.
- call() 메서드에서 입력이 다섯 개의 은닉층에 모두 통과한다. 그다음 결괏값을 재구성 층에 전달하여 재구성을 만든다.
- call() 메서드에서 재구성 손실을 계산하고 add_loss() 메서드를 사용해 모델의 손실 리스트에 추가한다. 재구성 손실이 주 손실을 압도하지 않도록 0.05를 곱하여 크기를 줄였다.
- call() 메서드 마지막에서 은닉층의 출력을 출력층에 전달하여 얻은 출력값을 반환한다.
model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)
>>>
Epoch 1/2
363/363 [==============================] - 4s 6ms/step - loss: 0.7886 - reconstruction_error: 0.0000e+00
Epoch 2/2
363/363 [==============================] - 2s 6ms/step - loss: 0.4134 - reconstruction_error: 0.0000e+00
12.3.8 자동 미분을 사용하여 그레이디언트 계산하기
자동 미분을 사용하여 그레이디언트를 자동으로 계산하는 방법을 이해하기 위해 간단한 함수의 예를 살펴본다.
def f(w1, w2):
return 3 * w1 ** 2 + 2 * w1 * w2
w1에 대한 이 함수의 도함수가 6*w1+2*w2 라는 것을 구할 수 있다. 또한 w2에 대한 도함수는 2*w1 이다. 예를 들어 포인트 (w1,w2)=(5,3) 에서 이 도함수의 값은 각각 36과 10이다. 따라서 이 포인트의 그레이디언트 벡터는 (36, 10)이다.
신경망은 보통 수만 개의 파라미터를 가진 매우 복잡한 함수이다. 손으로 직접 도함수를 구하는 것은 거의 불가능한 작업, 한 가지 대안은 각 파라미터가 바뀔 때마다 함수의 출력이 얼마나 변하는지 측정하여 도함수의 근삿값을 계산하는 것이다.
w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps
>>>
36.000003007075065
(f(w1, w2 + eps) - f(w1, w2)) / eps
>>>
10.000000003174137
이 방법은 잘 동작하고 구현하기도 쉽다. 하지만 근삿값이고 무엇보다도 파라미터마다 적어도 한 번씩은 함수 f()를 호출해야 한다. 파라미터마다 적어도 한번씩 f()를 호출하므로 대규모 신경망에서는 적용하기 어려운 방법이다. 대신 자동 미분을 사용해본다.
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])
gradients
>>>
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
먼저 두 변수 w1과 w2를 정의하고 tf.GradientTape 블럭을 만들어 이 변수와 관련된 모든 연산을 자동으로 기록한다. 마지막으로 이 테이프에 두 변수 [w1, w2] 에 대한 z의 그레이디언트를 요청한다.
결과가 정확하고 변수가 얼마나 많아도 gradient() 메서드는 기록된 계산을 한 번 만에 통과했다.
gradient() 메서드가 호출된 후에는 자동으로 테이프가 즉시 지워진다. 따라서 gaadient() 메서드를 두 번 호출하면 예외가 발생한다.
gradient() 메서드를 한 번 이상 호출해야 한다면 지속 가능한 테이프를 만들고 사용이 끝난 후 테이프를 삭제하여 리소스를 해제해야 한다.
with tf.GradientTape(persistent=True) as tape:
z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2) # works now!
del tape
기본적으로 테이프는 변수가 포함된 연산만을 기록한다. 만약 변수가 아닌 다른 객체에 대한 z의 그레이디언트를 계산하려면 None 이 반환된다.
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
gradients
>>>
[None, None]
하지만 필요한 어떤 텐서라도 감시하여 관련된 모든 연산을 기록하도록 강제할 수 있다. 그다음 변수처럼 이런 텐서에 대해 그레이디언트를 계산할 수 있다.
with tf.GradientTape() as tape:
tape.watch(c1)
tape.watch(c2)
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
gradients
>>>
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
이것이 유용한 경우가 있다 예를 들면 입력이 작을 때 변동 폭이 큰 활성화 함수에 대한 규제 손실을 구현하는 경우다. 이 손실은 입력에 대한 활성화 함수의 그레이디언트를 기반으로 할 것이다. 입력은 변수가 아니므로 테이프에 기록을 명시적으로 알려주어야 한다.
대부분의 경우 그레이디언트 테이프는 여러 값에 대한 한 값의 그레이디언트를 계산하는 데 사용된다.
이런 경우 후진 모드 자동 미분 reverse-mode autodiff 가 적합하다. 한 번의 정방향 계산과 역방향 계산으로 모든 그레이디언트를 동시에 계산할 수 있기 때문이다.
여러 손실이 포함된 벡터의 그레이디언트를 계산하면 텐서플로는 벡터의 합의 그레이디언트를 계산할 것이다. 만약 개별 그레이디언트를 계산하고 싶다면 테이프의 jacobian() 메서드를 호출해야 한다. 이 메서드는 벡터에 있는 각 손실마다 후진 자동 미분을 수행한다. 심지어 이계도함수 second-order partial derivative를 계산할 수도 있다.
어떤 경우에는 신경망의 일부분에 그레이디언트가 역전파되지 않도록 막을 필요가 있다. 이렇게 하려면 tf.stop_gradient() 함수를 사용해야 한다.이 함수는 정방향 계산을 할 때 입력을 반환한다. 하지만 역전파 시에는 그레이디언트를 전파하지 않는다.
def f(w1, w2):
return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)
with tf.GradientTape() as tape:
z = f(w1, w2)
tape.gradient(z, [w1, w2])
>>>
[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]
마지막으로 이따금 그레이디언트를 계산할 때 수치적인 이슈가 발생할 수 있다. 예를 들면 큰 입력에 대한 my_softplus() 함수의 그레이디언트를 계산하면 NaN이 반환된다.
x = tf.Variable(100.)
with tf.GradientTape() as tape:
z = my_softplus(x)
tape.gradient(z, [x])
>>>
[<tf.Tensor: shape=(), dtype=float32, numpy=nan>]
자동 미분을 사용하여 이 함수의 그레이디언트를 계산하는 것이 수치적으로 불안정하기 때문이다. 즉 부동소수점 정밀도 오류로 인해 자동 미분이 무한 나누기 무한을 계산하게 된다.
다행히 수치적으로 안전한 소프트플러스의 도함수 1/(1+1 / exp(x)) 를 해석적으로 구할 수 있다. 그다음 @tf.custom_gradient 데코레이터를 사용하고 일반 출력과 도함수를 계산하는 함수를 반환하여 텐서플로가 my_softplus() 함수의 그레이디언트를 계산할 때 안전한 함수를 사용하도록 만들 수 있다.
@tf.custom_gradient
def my_better_softplus(z):
exp = tf.exp(z)
def my_softplus_gradients(grad):
return grad / (1 + 1 / exp)
return tf.math.log(exp + 1), my_softplus_gradients
이제 큰 입력 값에서도 my_better_softplus() 함수의 그레이디언트를 올바르게 계산할 수 있다.
12.3.9 사용자 정의 훈련 반복
드물게 fit() 메서드의 유연성이 원하는 만큼 충분하지 않을 수 있다. 두 개의 다른 옵티마이저를 사용하는 모델의 경우 fit() 메서드는 하나의 옵티마이저만 사용하므로 이 논문을 구현하려면 훈련 반복을 직접 구현해야 한다.
혹은 의도한 대로 잘 동작하는지 확신을 갖기 위해 사용자 정의 훈련 반복을 쓸 수도 있다.
하지만 사용자 훈련 반복을 만들면 길고, 버그가 발생하기 쉽고, 유지 보수하기 어려운 코드가 만들어진다.
먼저, 간단한 모델을 만든다.
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
kernel_regularizer=l2_reg),
keras.layers.Dense(1, kernel_regularizer=l2_reg)
])
그다음 훈련 세트에서 샘플 배치를 랜덤하게 추출하는 작은 함수를 만든다.
def random_batch(X, y, batch_size=32):
idx = np.random.randint(len(X), size=batch_size)
return X[idx], y[idx]
현재 스텝 수, 전체 스텝 횟수, 에포크 시작부터 평균 손실, 그 외 다른 지표를 포함하여 훈련 상태를 출력하는 함수도 만든다.
def print_status_bar(iteration, total, loss, metrics=None):
metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
for m in [loss] + (metrics or [])])
end = "" if iteration < total else "\n"
print("\r{}/{} - ".format(iteration, total) + metrics,
end=end)
이제 실제로 적용한다. 먼저 몇 개의 하이퍼파라미터를 정의하고 옵티마이저, 손실 함수, 지표를 선택한다.
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
for epoch in range(1, n_epochs + 1):
print("Epoch {}/{}".format(epoch, n_epochs))
for step in range(1, n_steps + 1):
X_batch, y_batch = random_batch(X_train_scaled, y_train)
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for variable in model.variables:
if variable.constraint is not None:
variable.assign(variable.constraint(variable))
mean_loss(loss)
for metric in metrics:
metric(y_batch, y_pred)
print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
for metric in [mean_loss] + metrics:
metric.reset_states()
- 두 개의 반복문을 중첩했다. 하나는 에포크를 위해서, 다른 하나는 에포크 안의 배치를 위해
- 그다음 훈련 세트에서 배치를 랜덤하게 샘플링한다.
- tf.GradientTape() 블럭 안에서 배치 하나를 위한 예측을 만들고 손실을 계산한다.
- 그다음 테이프를 사용해 훈련 가능한 각 변수에 대한 손실의 그레이디언트를 계산한다. 이를 옵티마이저에 적용하여 경사 하강법을 수행한다.
- 다음 평균 손실과 지표를 업데이트하고 상태 막대를 출력한다.
- 매 에포크 끝에서 상태 막대를 다시 출력하여 완료를 나타내고 줄바꿈을 수행한다. 마지막으로 평균 손실과 지푯값을 초기화한다.
'책 > Hands-On Machine Learning' 카테고리의 다른 글
15. RNN과 CNN을 사용해 시퀀스 처리하기 (0) | 2022.10.10 |
---|---|
14. 합성곱 신경망을 사용한 컴퓨터 비전 (0) | 2022.10.09 |
11.심층 신경망 훈련하기 (0) | 2022.10.08 |
10. 케라스를 사용한 인공 신경망 소개 (1) | 2022.10.08 |
9. 비지도 학습 (1) | 2022.10.08 |