Source code for detect_anomaly
from typing import List
import numpy as np
import pandas as pd
from pandas.tseries.frequencies import to_offset
from pkg import transform as tfr
[docs]def get_anomaly_period(
df: pd.DataFrame,
width_detect: str,
width_step: str) -> (np.ndarray, np.ndarray):
"""
異常検知を行なう期間の開始日と最終日を返す。
Parameters
----------
df : pandas.DataFrame of shape (n_datetimes, n_objects)
m_ap30 の表。
width_detect : str, tuple, datetime.timedelta, DateOffset or None
異常検知を行なう時間幅。
width_step : str or DateOffset
異常検知を行なうステップ幅。
Returns
-------
anomaly_st : numpy.ndarray of str
異常検知を行なう期間の開始日。
anomaly_en : numpy.ndarray of str
異常検知を行なう期間の最終日。
"""
anomaly_st = pd.date_range(start=df.index[0].date(),
end=df.index[-1].date(), freq=width_step).date
anomaly_en = anomaly_st \
+ pd.to_timedelta(to_offset(width_detect) - to_offset('D'))
anomaly_en[anomaly_en > df.index[-1].date()] = df.index[-1].date()
anomaly_en = np.unique(anomaly_en)
anomaly_st = anomaly_st[:len(anomaly_en)]
anomaly_st[-1] = anomaly_en[-1] \
- pd.to_timedelta(to_offset(width_detect) - to_offset('D'))
anomaly_st = anomaly_st.astype('str')
anomaly_en = anomaly_en.astype('str')
anomaly_st = anomaly_st.astype(object)
anomaly_en = anomaly_en.astype(object)
return anomaly_st, anomaly_en
[docs]def get_anomaly_data_period(
df: pd.DataFrame,
width_detect: str,
width_step: str) -> (List[str], List[str]):
"""
異常検知を行なう期間の、データが存在する開始日と最終日を返す。
Parameters
----------
df : pandas.DataFrame of shape (n_datetimes, n_objects)
m_ap30 の表。
width_detect : str, tuple, datetime.timedelta, DateOffset or None
異常検知を行なう時間幅。
width_step : str or DateOffset
異常検知を行なうステップ幅。
Returns
-------
anomaly_data_st : list of str
異常検知を行なう期間の、データが存在する開始日。
anomaly_data_en : list of str
異常検知を行なう期間の、データが存在する最終日。
"""
anomaly_st, anomaly_en = get_anomaly_period(df, width_detect=width_detect,
width_step=width_step)
anomaly_data_st = []
anomaly_data_en = []
for i in range(len(anomaly_st)):
df_slice = df[anomaly_st[i]:anomaly_en[i]]
if len(df_slice) > 0:
anomaly_data_st.append(str(df_slice.index[0].date()))
anomaly_data_en.append(str(df_slice.index[-1].date()))
anomaly_data_period = np.empty(len(anomaly_data_st), dtype=object)
for i in range(len(anomaly_data_period)):
anomaly_data_period[i] = anomaly_data_st[i] + 'to' + anomaly_data_en[i]
anomaly_data_period = np.unique(anomaly_data_period)
anomaly_data_st = []
anomaly_data_en = []
for str_period in anomaly_data_period:
str_date = str_period.split('to')
anomaly_data_st.append(str_date[0])
anomaly_data_en.append(str_date[1])
return anomaly_data_st, anomaly_data_en
[docs]def get_y_pred(df, **kwargs):
"""
異常検知アルゴリズムを適用し、異常かどうかのラベルを返す。
Parameters
----------
df : pandas.DataFrame of shape (n_objects, n_features)
m_ap30 の表。
**kwargs
Arbitrary keyword arguments.
"""
return None
[docs]def get_outlier_idx(df, **kwargs):
"""
異常検知アルゴリズムを適用し、異常と判定された行ラベルを返す。
Parameters
----------
df : pandas.DataFrame of shape (n_objects, n_features)
m_ap30 の表。
**kwargs
Arbitrary keyword arguments.
Returns
-------
outlier_idx : list
異常と判定された行ラベル。
"""
y_pred = get_y_pred(df, **kwargs)
outlier_idx = [df.index[i] for i in range(len(y_pred)) if y_pred[i] == -1]
return outlier_idx
[docs]def detect_anomaly_per_period(df, width_detect, width_step, **kwargs):
"""
入力された pandas.DataFrame に対して切り出した期間ごとに異常検知を行ない、
異常と判定された idx、mjd_st、mjd_en を columns にもつ
pandas.DataFrame を返す。
Parameters
----------
df : pandas.DataFrame of shape (n_datetimes, n_objects)
m_ap30 の表。
width_detect : str, tuple, datetime.timedelta, DateOffset or None
異常検知を行なう時間幅。
width_step : str or DateOffset
異常検知を行なうステップ幅。
**kwargs
Arbitrary keyword arguments.
Returns
-------
df_outlier : pandas.DataFrame
異常と判定された object の idx とその期間をリストアップしたもの。
"""
date_st, date_en = get_anomaly_data_period(df, width_detect=width_detect,
width_step=width_step)
df_outlier = pd.DataFrame(columns=['objectid', 'mjd_st', 'mjd_en'])
row_label = 0
for i in range(len(date_st)):
outlier_idx = get_outlier_idx(df[date_st[i]:date_en[i]].T, **kwargs)
mjd_st, mjd_en = tfr.convert_date_into_mjd(date_st[i], date_en[i])
for idx in outlier_idx:
df_outlier.loc[row_label] = [idx, mjd_st, mjd_en]
row_label += 1
return df_outlier