Skip to content
代码片段 群组 项目
analyze_nll.py 4.7 KB
Newer Older
openaiops's avatar
openaiops 已提交
import math
import os
import sys
import traceback
from functools import wraps
from typing import *

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.metrics import f1_score

from .fscore_utils import *

__all__ = ['analyze_anomaly_nll']


def analyze_anomaly_nll(nll_list: np.ndarray,
                        label_list: np.ndarray,
                        up_sample_normal: int = 1,
                        threshold: Optional[float] = None,
                        proba_cdf_file: Optional[str] = None,
                        auc_curve_file: Optional[str] = None,
                        method: Optional[str] = None,
                        dataset: Optional[str] = None,
                        save_dict: bool = False,
                        save_filename: str = 'baseline.csv'
                        ) -> Dict[str, float]:

    def log_error(method, default_value=None):
        @wraps(method)
        def wrapper(*args, **kwargs):
            try:
                return method(*args, **kwargs)
            except Exception:
                print(''.join(traceback.format_exception(*sys.exc_info())), file=sys.stderr)
                return default_value
        return wrapper

    def call_plot(fn_, *args, output_file, **kwargs):
        if output_file == ':show:':
            fig = fn_(*args, **kwargs)
            plt.show()
            plt.close()
        else:
            fn_(*args, output_file=output_file, **kwargs)

    # up sample normal nll & label if required
    if up_sample_normal and up_sample_normal > 1:
        normal_nll = nll_list[label_list == 0]
        normal_label = label_list[label_list == 0]
        nll_list = np.concatenate(
            [normal_nll] * (up_sample_normal - 1) + [nll_list],
            axis=0
        )
        label_list = np.concatenate(
            [normal_label] * (up_sample_normal - 1) + [label_list],
            axis=0
        )

    # prepare for analyze
    result_dict = {}
    is_anomaly_list = label_list != 0

    # separated nlls for different labels
    result_dict['nll_normal'] = float(np.mean(nll_list[label_list == 0]))
    result_dict['nll_drop'] = float(np.mean(nll_list[label_list == 1]))
    result_dict['nll_latency'] = float(np.mean(nll_list[label_list == 2]))

    # auc score
    result_dict['auc'] = float(auc_score(nll_list, is_anomaly_list))

    # best f-score
    F = log_error(best_fscore, default_value=(math.nan, math.nan))

    def best_fscore_for_label(label):
        not_label = 2 if label == 1 else 1
        mask = label_list != not_label
        return F(nll_list[mask], label_list[mask] != 0)

    best_fscore_total, _, best_pr_total, best_rc_total = F(nll_list, is_anomaly_list)
    best_fscore_drop, _, best_pr_drop, best_rc_drop = best_fscore_for_label(1)
    best_fscore_latency, best_threshold_latency, best_pr_latency, best_rc_latency = best_fscore_for_label(2)
    result_dict.update({
        'best_fscore': float(best_fscore_total),
        'best_fscore_drop': float(best_fscore_drop),
        'best_fscore_latency': float(best_fscore_latency),
        'best_pr': float(best_pr_total),
        'best_rc': float(best_rc_total),
        'best_pr_drop': float(best_pr_drop),
        'best_rc_drop': float(best_rc_drop),
        'best_pr_latency': float(best_pr_latency),
        'best_rc_latency': float(best_rc_latency),
        'best_threshold_latency': float(best_threshold_latency)
    })

    # f-score
    F = log_error(f1_score, default_value=math.nan)

    def fscore_for_label(label):
        not_label = 2 if label == 1 else 1
        mask = label_list != not_label
        return F(label_list[mask] != 0, nll_list[mask] > threshold)

    if threshold is not None:
        result_dict.update({
            'fscore': float(F(is_anomaly_list, nll_list > threshold)),
            'fscore_drop': float(fscore_for_label(1)),
            'fscore_latency': float(fscore_for_label(2)),
        })

    # save result
    if save_dict and method and dataset:
        dataset = dataset.rstrip('/')

        result_to_save = result_dict.copy()
        result_to_save['dataset'] = dataset
        result_to_save['method'] = method

        if os.path.exists(f'paper-data/{save_filename}'):
            df = pd.read_csv(f'paper-data/{save_filename}')
            
            if not df[(df['dataset']==dataset)&(df['method']==method)].empty:
                df.iloc[df[(df['dataset']==dataset)&(df['method']==method)].index[0]] = result_to_save
            else:
                df = df.append(result_to_save, ignore_index=True)
        else:
            df = pd.DataFrame()
            df = df.append(result_to_save, ignore_index=True)

        os.makedirs('paper-data', exist_ok=True)
        df.to_csv(f'paper-data/{save_filename}', index=False)

    return result_dict