Skip to content
代码片段 群组 项目
callbacks.py 46.2 KB
Newer Older
openaiops's avatar
openaiops 已提交
import os
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from enum import IntFlag
from logging import getLogger
from typing import *

import numpy as np

from .checkpoint import BaseCheckpoint, CheckpointManager
from .errors import NaNMetricError
from .formatting import MetricsFormatter, format_duration, format_as_asctime
from .logging_ import print_with_time
from .metrics import ScalarMetricsLogger, ScalarMetricCollector
from .mlstorage import ExperimentDoc
from .stateful import StatefulObjectGroup, StatefulObject
from .utils import NOT_SET

__all__ = [
    'CallbackData', 'Callback', 'CallbackList',
    'LoggerMode', 'LoggerCallback', 'StopOnNaN',
    'BaseTrainCallback', 'BaseCheckpointCallback',
    'AutoCheckpoint', 'EarlyStopping',
]


@dataclass
class CallbackData(object):
    """
    Data carried by a cycle begin/end event from :class:`Callback`.
    """

    __slots__ = ('stage', 'index', 'size', 'start_timestamp',
                 'end_timestamp', 'exc_time', 'metrics')

    stage: 'Stage'
    """The stage that calls the callback."""

    index: Optional[int]
    """Index of the epoch or batch, start from 1."""

    size: Optional[int]
    """The size of the batch."""

    start_timestamp: float
    """Start timestamp of the stage/epoch/batch."""

    end_timestamp: Optional[float]
    """End timestamp of the stage/epoch/batch, available at the cycle end."""

    exc_time: Optional[float]
    """Execution time of the stage/epoch/batch, available at the cycle end."""

    metrics: Optional[Dict[str, Any]]
    """Metrics dict, available at the cycle end."""


class Callback(object):
    """Base class of a callback for a machine learning stage."""

    priority: int = 0
    """
    The priority of the callback.  Smaller priority indicates the callback
    should be called earlier than other callbacks with larger priorities.
    """

    ###########
    # metrics #
    ###########
    def on_metrics(self, data: CallbackData):
        pass  # pragma: no cover

    ##################
    # general events #
    ##################
    def on_stage_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_stage_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_epoch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_epoch_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_batch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_batch_end(self, data: CallbackData):
        pass  # pragma: no cover

    ################
    # train events #
    ################
    def on_train_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_train_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_train_epoch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_train_epoch_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_train_batch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_train_batch_end(self, data: CallbackData):
        pass  # pragma: no cover

    #####################
    # validation events #
    #####################
    def on_validation_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_validation_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_validation_batch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_validation_batch_end(self, data: CallbackData):
        pass  # pragma: no cover

    ###############
    # test events #
    ###############
    def on_test_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_test_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_test_batch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_test_batch_end(self, data: CallbackData):
        pass  # pragma: no cover

    ##################
    # predict events #
    ##################
    def on_predict_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_predict_end(self, data: CallbackData):
        pass  # pragma: no cover

    def on_predict_batch_begin(self, data: CallbackData):
        pass  # pragma: no cover

    def on_predict_batch_end(self, data: CallbackData):
        pass  # pragma: no cover


class CallbackList(Sequence[Callback]):
    """
    A callback list, which maintains the orders of callbacks according to
    their priority.
    """

    _SORTED = object()

    def __init__(self,
                 callbacks: Optional[Iterator[Callback]] = None,
                 *,
                 _sorted=None):
        if callbacks is not None:
            if _sorted is not self._SORTED:
                callbacks = sorted(callbacks, key=lambda cb: cb.priority)
            else:
                callbacks = list(callbacks)
        self._callbacks = callbacks

    def __len__(self) -> int:
        return len(self._callbacks)

    def __iter__(self):
        return iter(self._callbacks)

    def __eq__(self, other):
        return isinstance(other, CallbackList) and \
            self._callbacks == other._callbacks

    def __getitem__(self, item):
        return self._callbacks[item]

    def __delitem__(self, item):
        del self._callbacks[item]

    def __copy__(self):
        return self.clone()