본문 바로가기

케라스 창시자에게 배우는 딥러닝(DACOS)

7. 케라스 완전 정복

 

 

7-1) 다양한 워크플로

7-2) 케라스 모델을 만드는 여러 방법

7-3) 내장된 훈련 루프와 평가 루프 사용하기

7-4) 사용자 정의 훈련, 평가 루프 만들기

 

 

 

 

7-1) 다양한 워크플로

 

- 케라스 api 설계는 복잡성의 단계적 공개 원칙을 따름

- 시작은 쉽게, 필요할 때 단계마다 점진적으로 학습하여 아주 복잡한 경우도 처리 가능

- 모든 워크플로는 동일한 API (Layer, Model 등) 기반으로 하기 때문에 한 워크플로의 구성 요소를 다른 워크플로에서 사용할 수 있음

 

 

7-2) 케라스 모델을 만드는 여러 방법

 

모델 생성 API 종류

1) Sequential 모델

2) 함수형 API

3) Model 서브클래싱

 

 

 

1) Sequential 모델

 

- Sequential 클래스를 사용하여 모델 만듬

model = keras.Sequential([
    layers.Dense(64, activation="relu"),
    layers.Dense(10, activation="softmax")
])

 

- add() 메서드를 통해 점진적으로 모델 만듬

model = keras.Sequential()
model.add(layers.Dense(64, activation="relu"))
model.add(layers.Dense(10, activation="softmax"))

 

* 층의 가중치 크기는 입력 크기에 따라 달라짐 > 입력 크기를 알기 전까지는 가중치를 만들 수 없음

 

가중치를 생성 방법 2가지

1) 입력의 크기를 미리 지정함

2) build() 메서드를 호출해야 함

model.build(input_shape=(None, 3))
model.weights

 

모델 구조 확인

model.summary()

Model: "sequential_1" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= dense_2 (Dense) (None, 64) 256 dense_3 (Dense) (None, 10) 650 ================================================================= Total params: 906 Trainable params: 906 Non-trainable params: 0 _________________________________________________________________

 

**중요: 모델의 build() 메소드를 호출하기 전까지는 층이 생성되지 않았기 때문에 summary() 메소드를 호출할 수 없음! **

 

 

 

2) 함수형 API

 

- Sequential() 모델은 사용하기 쉽지만 적용할 수 있는 곳이 제한적임 (하나의 입력과 하나의 출력을 가지고 순서대로 층을 쌓은 모델만 구현 가능)

- 다중 입력, 다중 출력, 비선형적 구조의 모델을 가진 문제가 현실에서는 대다수임

- 함수형 API는 이런 경우에 사용 가능, 가장 흔하게 만날 수 있는 케라스 모델

 

 

앞선 모델과 동일한 모델 해당 코드로 구현 가능

inputs = keras.Input(shape=(3,), name="my_input")
features = layers.Dense(64, activation="relu")(inputs)
outputs = layers.Dense(10, activation="softmax")(features)
model = keras.Model(inputs=inputs, outputs=outputs)

 

심볼릭 텐서

 

- 실제 데이터를 가지고 있진 않지만 사용할 때 모델이 보게 될 텐서의 사양이 인코딩되어 있음

- 심볼릭 텐서 호출 시 크기와 데이터타입 정보가 업데이트된 새로운 심볼릭 텐서 반환

* 모든 케라스 층은 실제 데이터 또는 심볼릭 텐서로 호출할 수 있음

 

- inputs: 입력 클래스 객체를 정의, 데이터 크기+ 타입에 대한 정보 담김

- features: 층을 만들어 이 입력으로 호출

- outputs: 최종 출력을 얻음

- model: 입력과 출력을 전달받아 모델 객체를 생성

 

 

다중 입력/ 다중 출력 모델

- 입력이 여러 개/ 출력이 여러 개 > 모델이 리스트가 아닌 그래프를 닮음

 

예시 : 입력 3개(이슈 티켓 제목, 이슈 티켓의 텍스트 본문, 사용자가 추가한 태그), 출력 2개(이슈 티켓의 우선순위 점수, 이슈 티켓을 처리해야 할 부서) 가지는 모델 생성

 

vocabulary_size = 10000
num_tags = 100
num_departments = 4

title = keras.Input(shape=(vocabulary_size,), name="title")
text_body = keras.Input(shape=(vocabulary_size,), name="text_body")      // 각각 3개의 입력 중 1
tags = keras.Input(shape=(num_tags,), name="tags")

features = layers.Concatenate()([title, text_body, tags])        // 3가지의 입력 데이터 합침
features = layers.Dense(64, activation="relu")(features)         // 입력을 층으로 호출

priority = layers.Dense(1, activation="sigmoid", name="priority")(features)   // 출력1: 우선순위 점수
department = layers.Dense(
    num_departments, activation="softmax", name="department")(features)    // 출력2: 담당해야 할 부서

model = keras.Model(inputs=[title, text_body, tags], outputs=[priority, department])    // 모델 생성

 

* 위 코드는 정해진 꼴의 입력과 출력에 대한 심볼릭 텐서

* 레고 블록같은 층으로 어떤 그래프도 정의할 수 있는 유연한 방법

 

 

심볼릭 텐서에 실제 데이터 적용하여 모델 훈련

import numpy as np

num_samples = 1280

title_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size))
text_body_data = np.random.randint(0, 2, size=(num_samples, vocabulary_size))  // 랜덤한 데이터 생성
tags_data = np.random.randint(0, 2, size=(num_samples, num_tags))

priority_data = np.random.random(size=(num_samples, 1))
department_data = np.random.randint(0, 2, size=(num_samples, num_departments))  // 랜덤하게 레이블 생성

model.compile(optimizer="rmsprop",
              loss=["mean_squared_error", "categorical_crossentropy"],      // 각각의 출력에 대해 손실 계산법 따로 지정
              metrics=[["mean_absolute_error"], ["accuracy"]])                // 각각의 출력에 대해 서로 다른 측정 지표 사용
model.fit([title_data, text_body_data, tags_data],
          [priority_data, department_data],
          epochs=1)                                                             // 모델 피팅
model.evaluate([title_data, text_body_data, tags_data],
               [priority_data, department_data])                                 // 모델 평가
priority_preds, department_preds = model.predict([title_data, text_body_data, tags_data])

 

* 컴파일 단계에서 각 출력의 형식이 다르기 때문에 서로 다른 손실 계산법과 측정 지표 사용함

 

위 코드와 동일한 모델, 입력, 출력 순서 신경쓰지 않을 수 있게 데이터를 딕셔너리로 전달하여 생성한 코드

model.compile(optimizer="rmsprop",
              loss={"priority": "mean_squared_error", "department": "categorical_crossentropy"},    // 딕셔너리로 각 출력에 대한 손실 지정
              metrics={"priority": ["mean_absolute_error"], "department": ["accuracy"]}) // 위와 유사
model.fit({"title": title_data, "text_body": text_body_data, "tags": tags_data},
          {"priority": priority_data, "department": department_data},
          epochs=1)
model.evaluate({"title": title_data, "text_body": text_body_data, "tags": tags_data},
               {"priority": priority_data, "department": department_data})
priority_preds, department_preds = model.predict(
    {"title": title_data, "text_body": text_body_data, "tags": tags_data})

 

 

함수형 API 의 장점

 

- 그래프 데이터 구조: 이전 그래프 노드를 새 모델의 일부로 재사용할 수 있음

- 모델 시각화, 특성 추출 가능

 

1) 모델 시각화: 앞서 생성한 모델의 연결 구조(토폴로지) 시각화

keras.utils.plot_model(model, "ticket_classifier_with_shape_info.png", show_shapes=True)

 

- 모델의 연결 구조와 각 층의 입출력 크기 정보 확인 가능

- None: 어떤 크기의 배치에서도 사용 가능하다는 뜻

- 층 연결 구조를 시각화해 참조하여 그래프에 있는 개별 노드를 조사하고 재사용(층 호출) 가능

 

2) 특성 추출

 

- model.layers: 모델에 있는 모든 층의 리스트를 가짐

model.layers
[<keras.engine.input_layer.InputLayer at 0x7f28403fbe90>,
 <keras.engine.input_layer.InputLayer at 0x7f28403fbed0>,
 <keras.engine.input_layer.InputLayer at 0x7f28403ca2d0>,
 <keras.layers.merging.concatenate.Concatenate at 0x7f28404363d0>,
 <keras.layers.core.dense.Dense at 0x7f28404368d0>,
 <keras.layers.core.dense.Dense at 0x7f28403e30d0>,
 <keras.layers.core.dense.Dense at 0x7f2840432c90>]

 

해당 리스트를 사용해 다른 모델에서 중간 특성을 재사용하는 모델을 만들 수 있음(특성 추출)

features = model.layers[4].output
difficulty = layers.Dense(3, activation="softmax", name="difficulty")(features)

new_model = keras.Model(
    inputs=[title, text_body, tags],
    outputs=[priority, department, difficulty])

 

layers 리스트의 인덱스 4번째 요소를 재사용해 모델을 만듬

difficulty 라는 새로운 범주형 분류 추가 > 모델 새로 만들 필요 없음

- 이전 모델의 중간 특성에서부터 훈련 시작하면 됨

 

 

* 새롭게 훈련하지 않고 나머지 층을 재사용함

* 위의 그래프와 비교했을 때 최종 출력에 변화가 생긴 것을 알 수 있음

 

 

 

3) Model 서브클래싱

 

- Model 클래스를 상속받아 사용자가 재정의하는 방법

 

순서

1) __init__() 메서드에서 모델이 사용할 층을 정의함

2) call() 메서드에서 앞서 만든 층을 사용하여 모델의 정방향 패스를 정의

3) 서브클래스의 객체를 만들고 데이터와 함께 호출하여 가중치를 만듬

 

 

- 이전의 모델을 서브클래싱으로 구현

class CustomerTicketModel(keras.Model):

    def __init__(self, num_departments):
        super().__init__()
        self.concat_layer = layers.Concatenate()
        self.mixing_layer = layers.Dense(64, activation="relu")
        self.priority_scorer = layers.Dense(1, activation="sigmoid")
        self.department_classifier = layers.Dense(
            num_departments, activation="softmax")

    def call(self, inputs):
        title = inputs["title"]
        text_body = inputs["text_body"]
        tags = inputs["tags"]

        features = self.concat_layer([title, text_body, tags])
        features = self.mixing_layer(features)
        priority = self.priority_scorer(features)
        department = self.department_classifier(features)
        return priority, department

 

 

* __init__() 메서드에서 모델이 사용할을 정의함

* call() 메서드에서 앞서 만든 층을 사용하여 모델의 정방향 패스를 정의

 

model = CustomerTicketModel(num_departments=4)

priority, department = model(
    {"title": title_data, "text_body": text_body_data, "tags": tags_data})

 

* 서브클래스의 객체를 만들고 데이터와 함께 호출하여 가중치를 만듬

 

model.compile(optimizer="rmsprop",
              loss=["mean_squared_error", "categorical_crossentropy"],
              metrics=[["mean_absolute_error"], ["accuracy"]])
model.fit({"title": title_data,
           "text_body": text_body_data,
           "tags": tags_data},
          [priority_data, department_data],
          epochs=1)
model.evaluate({"title": title_data,
                "text_body": text_body_data,
                "tags": tags_data},
               [priority_data, department_data])
priority_preds, department_preds = model.predict({"title": title_data,
                                                  "text_body": text_body_data,
                                                  "tags": tags_data})

 

* 컴파일하고 모델을 fitting 하는 과정은 앞선 두 방법과 동일함

* Sequential 모델이나  함수형 모델과 마찬가지로 Model을 상속받아서 만든 모델을 컴파일하고 훈련할 수 있음

 

*Layer 클래스 상속과 Model 클래스 상속의 차이

- 층은 모델을 만드는 데 사용하는 구성 요소

- 모델은 실제로 훈련하고 추론에 사용하는 최상위 객체

 

 

주의: 서브클래싱된 모델이 지원하지 않는 것

- 모델 로직을 더 많이 책임져야 함, 더 많은 디버깅 작업 필요

- 함수형 모델: 명시적인 데이터 구조 / 서브클래싱 모델: 한 덩어리의 바이트코드

- 층이 연결되는 방식이 call() 메서드 안에 감춰지기 때문에 이 정보를 활용할 수 없음

- summary() 메서드가 층의 연결 구조 출력 불가 

- 서브클래싱된 모델은 그래프가 없기 때문에 층 그래프 노드를 특성 추출을 위해 참고할 수 없음. 정방향 패스는 완전한 블랙박스가 됨

 

 

 

*여러 방식을 혼합하여 사용

 

예시1) __init__(), call() 메서드 재정의를 통해 서브클래싱 모델, 함수형 모델에서 이 서브클래싱된 모델을 사용함.

class Classifier(keras.Model):

    def __init__(self, num_classes=2):
        super().__init__()
        if num_classes == 2:
            num_units = 1
            activation = "sigmoid"
        else:
            num_units = num_classes
            activation = "softmax"
        self.dense = layers.Dense(num_units, activation=activation)

    def call(self, inputs):
        return self.dense(inputs)

inputs = keras.Input(shape=(3,))
features = layers.Dense(64, activation="relu")(inputs)
outputs = Classifier(num_classes=10)(features)
model = keras.Model(inputs=inputs, outputs=outputs)

 

 

예시2) 서브클래싱 층이나 모델의 일부로 함수형 모델을 사용함

inputs = keras.Input(shape=(64,))
outputs = layers.Dense(1, activation="sigmoid")(inputs)
binary_classifier = keras.Model(inputs=inputs, outputs=outputs)

class MyModel(keras.Model):

    def __init__(self, num_classes=2):
        super().__init__()
        self.dense = layers.Dense(64, activation="relu")
        self.classifier = binary_classifier

    def call(self, inputs):
        features = self.dense(inputs)
        return self.classifier(features)

model = MyModel()

 

 

* 그림으로 정리

 

> 사용성과 유연성 사이의 적절한 절충점 필요

>> 따라서 현재 하려는 작업에 적합한 모델 선정 중요

 

 

 

 

7-3) 내장된 훈련 루프와 평가 루프 사용하기

 

앞서 배운 간단한 워크플로를 커스터마이징하는 방법

1) 사용자 정의 측정 지표를 전달

2) fit() 메서드에 콜백을 전달하여 훈련하는 동안 특정 시점에 수행할 행동을 예약

 

 

1) 사용자 정의 측정 지표를 전달

- 케라스 지표는 keras.metrics.Metric 클래스를 상속한 클래스

 

예시) 평균 제곱근 오차 (Root Mean Squared Error, RMSE) 를 계산하는 사용자 정의 지표 

class RootMeanSquaredError(keras.metrics.Metric):

    def __init__(self, name="rmse", **kwargs):
        super().__init__(name=name, **kwargs)
        self.mse_sum = self.add_weight(name="mse_sum", initializer="zeros")
        self.total_samples = self.add_weight(
            name="total_samples", initializer="zeros", dtype="int32")

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.one_hot(y_true, depth=tf.shape(y_pred)[1])
        mse = tf.reduce_sum(tf.square(y_true - y_pred))
        self.mse_sum.assign_add(mse)
        num_samples = tf.shape(y_pred)[0]
        self.total_samples.assign_add(num_samples)

    def result(self):
        return tf.sqrt(self.mse_sum / tf.cast(self.total_samples, tf.float32))

    def reset_state(self):
        self.mse_sum.assign(0.)
        self.total_samples.assign(0)

 

 

- 기존 클래스를 오버라이딩해 작성

- __init__(): 생성자에서 상태 변수를 정의함

- update_state(): 상태 업데이트 로직을 구현, y_true : 타깃, y_pred : 이에 대한 모델의 예측

- result() : 현재 지표 값을 반환하는 메서드

- reset_state(): 객체를 다시 생성하지 않고 상태를 초기화해 객체를 훈련 반복, 훈련과 평가에 재사용할 수 있도록 하는 메서드

 

model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy", RootMeanSquaredError()])
model.fit(train_images, train_labels,
          epochs=3,
          validation_data=(val_images, val_labels))
test_metrics = model.evaluate(test_images, test_labels)

 

사용자 정의 지표는 위와 같은 방식으로 내장 지표와 동일하게 사용이 가능

 

 

 

2) fit() 메서드에 콜백을 전달하여 훈련하는 동안 특정 시점에 수행할 행동을 예약함

 

콜백(callback) : fit() 메서드 호출 시에 모델에 전달되는 객체

- 훈련하는 동안의 모델의 상태와 성능에 대한 모든 정보 접근

- 훈련 중지, 모델 저장, 가중치 적재 또는 모델 상태 변경 등을 처리할 수 있음

 

 

콜백을 사용하는 사례

 

1) 모델 체크포인트 저장

2) 조기 종료

3) 훈련하는 동안 하이퍼파라미터 값을 동적으로 조정

4) 훈련과 검증 지표를 로그에 기록하거나 모델이 학습한 표현이 업데이트될 때마다 시각화함

 

 

예시) ModelCheckpoint, EarlyStopping 콜백

callbacks_list = [
    keras.callbacks.EarlyStopping(
        monitor="val_accuracy",
        patience=2,
    ),
    keras.callbacks.ModelCheckpoint(
        filepath="checkpoint_path.keras",
        monitor="val_loss",
        save_best_only=True,
    )
]

 

- ModelCheckpoint : 훈련 중 메 에포크 끝에서 가중치 저장, 가장 성능 좋은 모델 저장

- EarlyStopping : 성능 향상이 멈추면 훈련을 중지함

 

model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.fit(train_images, train_labels,
          epochs=10,
          callbacks=callbacks_list,
          validation_data=(val_images, val_labels))

 

- fit() 메서드 안에서 callbacks = callbacks_list로 앞서 생성한 두 콜백을 불러 검증 데이터를 기준으로 훈련을 모니터링함

 

 

* 사용자 정의 콜백 만들기

- keras.callbacks.Callback 클래스를 상속받아 구현함

- 훈련하는 동안 여러 지점에서 호출될 다음과 같은 메서드 구현

- 해당 메서드들은 모두 logs 매개변수와 함께 호출됨

 

from matplotlib import pyplot as plt

class LossHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs):                             
        self.per_batch_losses = []                // 훈련 시작 시 빈 리스트 생성

    def on_batch_end(self, batch, logs):
        self.per_batch_losses.append(logs.get("loss"))             // 각 배치 끝에서 빈 리스트에 손실 저장함

    def on_epoch_end(self, epoch, logs):
        plt.clf()
        plt.plot(range(len(self.per_batch_losses)), self.per_batch_losses,     // 각 에포크 끝에서 배치 손실을 그래프로 시각화함  
                 label="Training loss for each batch")
        plt.xlabel(f"Batch (epoch {epoch})")
        plt.ylabel("Loss")
        plt.legend()
        plt.savefig(f"plot_at_epoch_{epoch}")
        self.per_batch_losses = []

 

on_train_begin(self, logs): 훈련 시작 시 빈 리스트 생성

on_batch_end(self, batch, logs) : 각 배치 끝에서 빈 리스트에 손실 저장함

on_epoch_end(self, epoch, logs) :  에포크 끝에서 배치 손실을 그래프로 시각화함  

 

model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])
model.fit(train_images, train_labels,
          epochs=10,
          callbacks=[LossHistory()],
          validation_data=(val_images, val_labels))

 

- 사용자 정의 콜백 메서드를 fit() 메서드 안에서 호출할 수 있음

 

- 손실을 저장하고 이를 그래프로 시각화한 결과

 

 

 

*텐서보드를 사용한 모니터링과 시각화

 

- 좋은 모델의 개발을 위해선 지속적인 모니터링과 훈련 결과 시각화 필요

 

텐서보드 : 로컬에서 실행할 수 있는 브라우저 기반 애플리케이션, 모델 안에서 일어나는 모든 것들을 모니터링할 수 있도록 도와줌

 

 

텐서보드의 기능

- 훈련하는 동안 측정 지표를 시각적으로 모니터링

- 모델 구조를 시각화

- 활성화 출력과 그레이디언트 히스토그램을 그림

- 임베딩을 3D로 표현함

 

 

사용하는 법: keras.callbacks.TensorBoard 콜백을 이용

 

model = get_mnist_model()
model.compile(optimizer="rmsprop",
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

tensorboard = keras.callbacks.TensorBoard(
    log_dir="./tb_logs",
)
model.fit(train_images, train_labels,
          epochs=10,
          validation_data=(val_images, val_labels),
          callbacks=[tensorboard])

 

- fit() 메서드 안에서 콜백으로 사용, 콜백의 로그를 저장할 위치 지정

- 모델이 실행될 때 지정된 위치에 로그 기록함

 

 

 

 

 

 

7-4) 사용자 정의 훈련, 평가 루프 만들기

 

- fit() 워크플로는 지도 학습에만 초점이 맞춰져 있음, 타깃과의 손실 계산에 집중

- 생성 학습, 자기지도 학습, 강화 학습과 같이 명확한 타깃이 없는 경우도 많음

- 따라서 fit() 메서드가 적합하지 않을 때 자기만의 훈련 로직 필요함

 

* 훈련 VS 추론

 

- 훈련 가능한 가중치: Dense 층의 커널과 편향처럼 모델의 손실을 최소화하기 위해 역전파로 업데이트됨

 

- 훈련되지 않는 가중치: 해당 층의 정방향 패스 동안 업데이트됨

예시) 얼마나 많은 배치를 처리했는지 카운트하는 사용자 정의 층이 필요하다면 이 정보를 훈련되지 않는 가중치에 저장하고 배치마다 1씩 값 증가시킴

 

 

* 측정 지표의 저수준 사용법

values = [0, 1, 2, 3, 4]
mean_tracker = keras.metrics.Mean()
for value in values:
    mean_tracker.update_state(value)
print(f"평균 지표: {mean_tracker.result():.2f}")

 

평균을 추적해야 할 때 keras.metrics.Mean() 처럼 추적해야 하는 값 단순 호출

 

 

* 완전한 훈련과 평가 루프

 

훈련 스텝 함수

model = get_mnist_model()

loss_fn = keras.losses.SparseCategoricalCrossentropy()    // 손실함수
optimizer = keras.optimizers.RMSprop()                                   // 옵티마이저
metrics = [keras.metrics.SparseCategoricalAccuracy()]          // 측정지표
loss_tracking_metric = keras.metrics.Mean()                // 손실 평균을 추적할 평균 지표

def train_step(inputs, targets):
    with tf.GradientTape() as tape:                                  // 정방향 패스 실행
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    gradients = tape.gradient(loss, model.trainable_weights)            // 역방향 패스 실행
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))

    logs = {}
    for metric in metrics:                                        // 측정 지표를 계산함
        metric.update_state(targets, predictions)
        logs[metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)           // 손실의 평균을 계산함
    logs["loss"] = loss_tracking_metric.result()
    return logs                                                             // 지표와 손실의 현재 값을 반환함

 

지표 재설정: 매 에포크 시작 전과 평가 전에 지표를 재설정해야 함

def reset_metrics():
    for metric in metrics:
        metric.reset_state()
    loss_tracking_metric.reset_state()

 

단계별 훈련 루프 작성하기: 훈련 루프 자체

training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3
for epoch in range(epochs):
    reset_metrics()
    for inputs_batch, targets_batch in training_dataset:      // 넘파이 데이터를 크기가 32인 배치로 순회함
        logs = train_step(inputs_batch, targets_batch)
    print(f"{epoch}번째 에포크 결과")
    for key, value in logs.items():
        print(f"...{key}: {value:.4f}")

 

단계별 평가 루프 작성하기

def test_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)

    logs = {}
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics()
for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch)
print("평가 결과:")
for key, value in logs.items():
    print(f"...{key}: {value:.4f}")

 

- test_step() 함수를 반복하여 호출함 (for 루프로 하나의 배치 데이터를 처리하는 함수)

 

 

* tf.function으로 성능 높이기

- 기본적으로 라인 단위로 읽는 파이썬 코드를 전역적인 최적화가 가능하도록 만들어 줌

 

 

*fit()메서드를 사용자 정의 루프로 활용하기

- 케라스에 내장된 훈련 로직 활용 가능

- Model 클래스의 train_step() 메서드 오버라이딩해 구현

 

예시)

- keras,Model 상속한 새로운 클래스 만들기

- train_step() 메서드 오버라이딩

- 모델의 Metric 객체를 반환하는 metrics 속성을 구현, 매 에포크 시작에서 reset_state() 메서드 자동으로 호출함 > 수동으로 지표 재설정할 필요 없어짐

 

loss_fn = keras.losses.SparseCategoricalCrossentropy()
loss_tracker = keras.metrics.Mean(name="loss")                     // 배치 손실 평균 추적

class CustomModel(keras.Model):
    def train_step(self, data):                         // 오버라이딩
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = loss_fn(targets, predictions)
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))

        loss_tracker.update_state(loss)          // 손실 평균을 업데이트함
        return {"loss": loss_tracker.result()}      // 평균 손실을 구함

    @property
    def metrics(self):
        return [loss_tracker]

 

다음과 같이 사용 가능

class CustomModel(keras.Model):
    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.compiled_loss(targets, predictions)        // 직접 정의 메소드를 통해 손실 계산
        gradients = tape.gradient(loss, self.trainable_weights)            
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
        self.compiled_metrics.update_state(targets, predictions)         // 메서드에 전달될 지표 목록이 있는 객체, 모든 지표를 동시에 업데이트할 수 있음
        return {m.name: m.result() for m in self.metrics}                // 메서드에 전달한 실제 지표의 목록