Skip to content
代码片段 群组 项目
online_rca.py 11.2 KB
Newer Older
openaiops's avatar
openaiops 已提交
import numpy as np
import math
import json
import os
import sys
import datetime
from anormaly_detector import trace_list_partition
from anormaly_detector import system_anormaly_detect
from preprocess_data import get_operation_duration_data
from preprocess_data import get_span
from preprocess_data import get_operation_slo
from preprocess_data import get_service_operation_list
from preprocess_data import get_pagerank_graph
from pagerank import trace_pagerank
from anormaly_detector import trace_list_partition
from anormaly_detector import system_anormaly_detect
import time
import datetime
from dateutil.parser import parse
import json
import csv
import codecs


def timestamp(datetime):
    timeArray = time.strptime(str(datetime), "%Y-%m-%d %H:%M:%S")
    ts = int(time.mktime(timeArray)) * 1000
    # print(ts)
    return ts


# need to replace
start = '2020-10-11 22:18:00'
end = '2020-10-11 22:19:00'

span_list = get_span(start=timestamp(start), end=timestamp(end))
# print(span_list)
operation_list = get_service_operation_list(span_list)
print(operation_list)
slo = get_operation_slo(
    service_operation_list=operation_list, span_list=span_list)
print(slo)


def calculate_spectrum_without_delay_list(anomaly_result, normal_result, anomaly_list_len, normal_list_len,
                                          top_max, normal_num_list, anomaly_num_list, spectrum_method):
    spectrum = {}

    for node in anomaly_result:
        spectrum[node] = {}
        # spectrum[node]['ef'] = anomaly_result[node] * anomaly_list_len
        # spectrum[node]['nf'] = anomaly_list_len - anomaly_result[node] * anomaly_list_len
        spectrum[node]['ef'] = anomaly_result[node] * anomaly_num_list[node]
        spectrum[node]['nf'] = anomaly_result[node] * \
            (anomaly_list_len - anomaly_num_list[node])
        if node in normal_result:
            #spectrum[node]['ep'] = normal_result[node] * normal_list_len
            #spectrum[node]['np'] = normal_list_len - normal_result[node] * normal_list_len
            spectrum[node]['ep'] = normal_result[node] * normal_num_list[node]
            spectrum[node]['np'] = normal_result[node] * \
                (normal_list_len - normal_num_list[node])
        else:
            spectrum[node]['ep'] = 0.0000001
            spectrum[node]['np'] = 0.0000001

    for node in normal_result:
        if node not in spectrum:
            spectrum[node] = {}
            #spectrum[node]['ep'] = normal_result[node] * normal_list_len
            #spectrum[node]['np'] = normal_list_len - normal_result[node] * normal_list_len
            spectrum[node]['ep'] = (
                1 + normal_result[node]) * normal_num_list[node]
            spectrum[node]['np'] = normal_list_len - normal_num_list[node]
            if node not in anomaly_result:
                spectrum[node]['ef'] = 0.0000001
                spectrum[node]['nf'] = 0.0000001

    # print('\n Micro Rank Spectrum raw:')
    # print(json.dumps(spectrum))
    result = {}

    for node in spectrum:
        # Dstar2
        if spectrum_method == "dstar2":
            result[node] = spectrum[node]['ef'] * spectrum[node]['ef'] / \
                (spectrum[node]['ep'] + spectrum[node]['nf'])
        # Ochiai
        elif spectrum_method == "ochiai":
            result[node] = spectrum[node]['ef'] / \
                math.sqrt((spectrum[node]['ep'] + spectrum[node]['ef']) * (
                    spectrum[node]['ef'] + spectrum[node]['nf']))

        elif spectrum_method == "jaccard":
            result[node] = spectrum[node]['ef'] / (spectrum[node]['ef'] + spectrum[node]['ep']
                                                   + spectrum[node]['nf'])

        elif spectrum_method == "sorensendice":
            result[node] = 2 * spectrum[node]['ef'] / \
                (2 * spectrum[node]['ef'] + spectrum[node]
                 ['ep'] + spectrum[node]['nf'])

        elif spectrum_method == "m1":
            result[node] = (spectrum[node]['ef'] + spectrum[node]
                            ['np']) / (spectrum[node]['ep'] + spectrum[node]['nf'])

        elif spectrum_method == "m2":
            result[node] = spectrum[node]['ef'] / (2 * spectrum[node]['ep'] + 2 * spectrum[node]['nf'] +
                                                   spectrum[node]['ef'] + spectrum[node]['np'])
        elif spectrum_method == "goodman":
            result[node] = (2 * spectrum[node]['ef'] - spectrum[node]['nf'] - spectrum[node]['ep']) / \
                (2 * spectrum[node]['ef'] + spectrum[node]
                 ['nf'] + spectrum[node]['ep'])
        # Tarantula
        elif spectrum_method == "tarantula":
            result[node] = spectrum[node]['ef'] / (spectrum[node]['ef'] + spectrum[node]['nf']) / \
                (spectrum[node]['ef'] / (spectrum[node]['ef'] + spectrum[node]['nf']) +
                 spectrum[node]['ep'] / (spectrum[node]['ep'] + spectrum[node]['np']))
        # RussellRao
        elif spectrum_method == "russellrao":
            result[node] = spectrum[node]['ef'] / \
                (spectrum[node]['ef'] + spectrum[node]['nf'] +
                 spectrum[node]['ep'] + spectrum[node]['np'])

        # Hamann
        elif spectrum_method == "hamann":
            result[node] = (spectrum[node]['ef'] + spectrum[node]['np'] - spectrum[node]['ep'] - spectrum[node]
                            ['nf']) / (spectrum[node]['ef'] + spectrum[node]['nf'] + spectrum[node]['ep'] + spectrum[node]['np'])

        # Dice
        elif spectrum_method == "dice":
            result[node] = 2 * spectrum[node]['ef'] / \
                (spectrum[node]['ef'] + spectrum[node]
                 ['nf'] + spectrum[node]['ep'])

        # SimpleMatching
        elif spectrum_method == "simplematcing":
            result[node] = (spectrum[node]['ef'] + spectrum[node]['np']) / (spectrum[node]
                                                                            ['ef'] + spectrum[node]['np'] + spectrum[node]['nf'] + spectrum[node]['ep'])

        # RogersTanimoto
        elif spectrum_method == "rogers":
            result[node] = (spectrum[node]['ef'] + spectrum[node]['np']) / (spectrum[node]['ef'] +
                                                                            spectrum[node]['np'] + 2 * spectrum[node]['nf'] + 2 * spectrum[node]['ep'])

    # Top-n节点列表
    top_list = []
    score_list = []
    for index, score in enumerate(sorted(result.items(), key=lambda x: x[1], reverse=True)):
        if index < top_max + 6:
            top_list.append(score[0])
            score_list.append(score[1])
            #print('%-50s: %.8f' % (score[0], score[1]))
    return top_list, score_list


def online_anomaly_detect_RCA(slo, operation_list):
    while True:
        current_time = datetime.datetime.strptime(datetime.datetime.now().strftime(
            "%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")-datetime.timedelta(minutes=1)

        start_time = current_time - datetime.timedelta(seconds=60)
        anormaly_flag = system_anormaly_detect(start_time=timestamp(start_time),
                                               end_time=timestamp(current_time), slo=slo, operation_list=operation_list)
        if anormaly_flag:
            detect_time = current_time
            start_time = detect_time - datetime.timedelta(seconds=5)
            end_time = detect_time + datetime.timedelta(seconds=55)
            year = str(time.strptime(str(detect_time),
                       "%Y-%m-%d %H:%M:%S").tm_year)
            mon = time.strptime(str(detect_time), "%Y-%m-%d %H:%M:%S").tm_mon
            day = time.strptime(str(detect_time), "%Y-%m-%d %H:%M:%S").tm_mday
            hour = time.strptime(str(detect_time), "%Y-%m-%d %H:%M:%S").tm_hour
            minute = time.strptime(
                str(detect_time), "%Y-%m-%d %H:%M:%S").tm_min
            if mon > 9:
                mon = str(mon)
            else:
                mon = "0" + str(mon)

            if day > 9:
                day = str(day)
            else:
                day = "0" + str(day)

            if minute >= 1:
                if hour > 9:
                    hour = str(hour)
                else:
                    hour = "0" + str(hour)
            else:
                if hour - 1 > 9:
                    hour = hour - 1
                    hour = str(hour)
                elif hour == 0:
                    hour = "23"
                    current_day = time.strptime(
                        str(detect_time), "%Y-%m-%d %H:%M:%S").tm_mday
                    current_day = current_day - 1
                    if current_day > 9:
                        day = str(current_day)
                    else:
                        day = "0" + str(current_day)
                else:
                    hour = hour - 1
                    hour = "0" + str(hour)
            # date = year + "-" + mon + "-" + day
            # print("checkpoint", date)

            middle_span_list = get_span(
                timestamp(start_time), timestamp(end_time))
            operation_count = get_operation_duration_data(
                operation_list, middle_span_list)
            anomaly_list, normal_list = trace_list_partition(
                operation_count=operation_count, slo=slo)

            print("anomaly_list", len(anomaly_list))
            print("normal_list", len(normal_list))
            print("total", len(normal_list) + len(anomaly_list))

            if len(anomaly_list) == 0 or len(normal_list) == 0:
                continue
            operation_operation, operation_trace, trace_operation, pr_trace \
                = get_pagerank_graph(normal_list, middle_span_list)

            normal_trace_result, normal_num_list = trace_pagerank(operation_operation, operation_trace, trace_operation,
                                                                  pr_trace, False)

            a_operation_operation, a_operation_trace, a_trace_operation, a_pr_trace \
                = get_pagerank_graph(anomaly_list, middle_span_list)
            anomaly_trace_result, anomaly_num_list = trace_pagerank(a_operation_operation, a_operation_trace,
                                                                    a_trace_operation, a_pr_trace,
                                                                    True)
            top_list, score_list = calculate_spectrum_without_delay_list(anomaly_result=anomaly_trace_result,
                                                                         normal_result=normal_trace_result,
                                                                         anomaly_list_len=len(
                                                                             anomaly_list),
                                                                         normal_list_len=len(
                                                                             normal_list),
                                                                         top_max=5,
                                                                         anomaly_num_list=anomaly_num_list,
                                                                         normal_num_list=normal_num_list,
                                                                         spectrum_method="dstar2")
            print(top_list, score_list)
            # sleep 5min after a fault
            time.sleep(240)
        time.sleep(60)


online_anomaly_detect_RCA(slo, operation_list)