Anomaly Detection in Darts#

This notebook showcases some of the functionalities of Darts’ Anomaly Detection Module. We’ll look at Anomaly Scorers, Detectors, Aggregators and Anomaly Models.

  • Scorers: compute anomaly scores time series, either only on the target series or between the target series and a forecasted/predicted series. They are the core of the anomaly detection module.

  • Detectors: transform time series (such as anomaly scores) into binary anomaly time series. The presence of an anomaly is flagged with 1, and 0 otherwise.

  • Aggregators: reduce a multivariate binary time series (e.g., where each component represents the anomaly score of a different series component/model) into a univariate binary time series.

  • Anomaly Models: offer a convenient way to produce anomaly scores from any of Darts’ global forecasting models or filtering models by comparing the models’ predictions with actual observations. Each Anomaly Model takes one forecasting/filtering model and one or multiple scorers. The model produces some predictions, which are fed together with the actual series to the scorer(s). It will return anomaly scores for each scorer.

The figure below illustrates the different input/output for each tool:

The notebook is devided into two sections:

  • How to use ForecastingAnomalyModel to find anomalies in the number of taxi passengers in New York.

  • How to use an AnomalyScorer and the importance of its windowing capabilities on two toy datasets.

First, some necessary imports:

import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import pandas as pd

from darts import TimeSeries
from darts.ad import (
    ForecastingAnomalyModel,
    KMeansScorer,
    NormScorer,
    WassersteinScorer,
)
from darts.ad.utils import (
    eval_metric_from_scores,
    show_anomalies_from_scores,
)
from darts.dataprocessing.transformers import Scaler
from darts.datasets import TaxiNewYorkDataset
from darts.metrics import mae, rmse
from darts.models import RegressionModel
/Users/dennisbader/miniconda3/envs/darts310_test/lib/python3.10/site-packages/statsforecast/utils.py:237: FutureWarning: 'M' is deprecated and will be removed in a future version, please use 'ME' instead.
  "ds": pd.date_range(start="1949-01-01", periods=len(AirPassengers), freq="M"),

Anomaly Model: Taxi passengers in NY#

Load and visualize the data#

Information on the data:

  • Univariate Time Series (represents the number of taxi passengers in New York)

  • During a period of 8 months (2014-07 to 2015-01)

  • Frequency of 30 minutes

In this example, anomalies are subjective. It can be defined as periods where the demand for taxis is abnormal (different than what should be expected). Based on this definition, the following five dates can be considered anomalies:

  • NYC Marathon - 2014-11-02

  • Thanksgiving - 2014-11-27

  • Christmas - 2014-12-24/25

  • New Years - 2015-01-01

  • Snow Blizzard - 2015-01-26/27

# load the data
series_taxi = TaxiNewYorkDataset().load()

# define start and end dates for some known anomalies
anomalies_day = {
    "NYC Marathon": ("2014-11-02 00:00", "2014-11-02 23:30"),
    "Thanksgiving ": ("2014-11-27 00:00", "2014-11-27 23:30"),
    "Christmas": ("2014-12-24 00:00", "2014-12-25 23:30"),
    "New Years": ("2014-12-31 00:00", "2015-01-01 23:30"),
    "Snow Blizzard": ("2015-01-26 00:00", "2015-01-27 23:30"),
}
anomalies_day = {
    k: (pd.Timestamp(v[0]), pd.Timestamp(v[1])) for k, v in anomalies_day.items()
}

# create a series with the binary anomaly flags
anomalies = pd.Series([0] * len(series_taxi), index=series_taxi.time_index)
for start, end in anomalies_day.values():
    anomalies.loc[(start <= anomalies.index) & (anomalies.index <= end)] = 1.0

series_taxi_anomalies = TimeSeries.from_series(anomalies)

# plot the data and the anomalies
fig, ax = plt.subplots(figsize=(15, 5))
series_taxi.plot(label="Number of taxi passengers", linewidth=1, color="#6464ff")
(series_taxi_anomalies * 10000).plot(label="5 known anomalies", color="r", linewidth=1)
plt.show()
../../_images/a5558a6ba3dcede61d46845472031aa0bc2113cfcadec023deacace5e8fd6294.png
def plot_anom(selected_anomaly, delta_plotted_days):
    one_day = series_taxi.freq * 24 * 2
    anomaly_date = anomalies_day[selected_anomaly][0]
    start_timestamp = anomaly_date - delta_plotted_days * one_day
    end_timestamp = anomaly_date + (delta_plotted_days + 1) * one_day

    series_taxi[start_timestamp:end_timestamp].plot(
        label="Number of taxi passengers", color="#6464ff", linewidth=0.8
    )

    (series_taxi_anomalies[start_timestamp:end_timestamp] * 10000).plot(
        label="Known anomaly", color="r", linewidth=0.8
    )
    plt.title(selected_anomaly)
    plt.show()


for anom_name in anomalies_day:
    plot_anom(anom_name, 3)
    break  # remove this to see all anomalies
../../_images/57d9a354f46507816e1c95970a264f8a47a772b15ee88f2b9db9b630e1d8fd0f.png

The goal would be to detect these five irregular periods and identify other possible abnormal days.

Train a Darts forecasting model#

We will use a RegressionModel to predict the number of taxi passengers. The first 4500 timestamps will be used to train the model. The training set is considered to be anomaly-free, the five considered anomalies are located after the 4500th timestamps. The number of lags is set to 1 week, assuming the demand follows a periodicity of 1 week. To help the model, additional information on the targeted series is passed as covariates (the hour and the day of the week).

# split the data in a training and testing set
s_taxi_train = series_taxi[:4500]
s_taxi_test = series_taxi[4500:]

# Add covariates (hour and day of the week)
add_encoders = {
    "cyclic": {"future": ["hour", "dayofweek"]},
}

# one week corresponds to (7 days * 24 hours * 2) of 30 minutes
one_week = 7 * 24 * 2

forecasting_model = RegressionModel(
    lags=one_week,
    lags_future_covariates=[0],
    output_chunk_length=1,
    add_encoders=add_encoders,
)
forecasting_model.fit(s_taxi_train)
RegressionModel(lags=336, lags_past_covariates=None, lags_future_covariates=[0], output_chunk_length=1, output_chunk_shift=0, add_encoders={'cyclic': {'future': ['hour', 'dayofweek']}}, model=None, multi_models=True, use_static_covariates=True)

Use a Forecasting Anomaly Model#

The anomaly model consists of two inputs:

  • a fitted GlobalForecastingModel (you can find a list here. If the model hasn’t been fitted, set parameter allow_model_training to True when calling fit())

  • a single or list of AnomalyScorer (trainable or not)

For this example, three scorers will be used:

  • NormScorer (window is by default set to 1)

  • WassersteinScorer with a half-day window (24 timestamps) and no window aggregation

  • WassersteinScorer with a full-day window (48 timestamps) including window aggregation

The window parameter is an integer value indicating the window size used by the scorer to transform the series into an anomaly score. A scorer will slice the given series into subsequences of size W and returns a value indicating how anomalous these subset of W values are.

The window_agg can be used to transform the window-wise scores into point-wise scores by aggregating all anomaly scores from each window that the point was included in.

The following figure illustrates the mechanism of a Forecasting Anomaly model:

Using the main functions: fit(), score(), eval_metric() and show_anomalies()#

# with timestamps of 30 minutes
half_a_day = 2 * 12
full_day = 2 * 24

# instantiate the anomaly model with: one fitted model, and 3 scorers
anomaly_model = ForecastingAnomalyModel(
    model=forecasting_model,
    scorer=[
        NormScorer(ord=1),
        WassersteinScorer(window=half_a_day, window_agg=False),
        WassersteinScorer(window=full_day, window_agg=True),
    ],
)

Now let’s train the anomaly model with fit(). In sequence it will:

  • fit the forecasting model on the given series if it has not been fitted yet and allow_model_training=True.

  • generate historical forecasts for the given series.

  • feed the historical forecasts to each fittable/trainable scorer:

    • compute the differences between the forecasts and the given series (controled by the scorers diff_fn, see Darts “per time step” metrics here)

    • train the scorer on these differences

You can control how the historical forecasts are generated when calling fit() (the supported parameters are here).

START = 0.1
anomaly_model.fit(s_taxi_train, start=START, allow_model_training=False, verbose=True)
<darts.ad.anomaly_model.forecasting_am.ForecastingAnomalyModel at 0x2905d0130>

We call the function score() to compute the anomaly scores of a new series s_taxi_test. It returns the scores from each scorer in the anomaly model. We will use the results in the next section. With return_model_prediction=True, we can additionally get the historical forecasts generated by the forecasting model.

anomaly_scores, model_forecasting = anomaly_model.score(
    s_taxi_test, start=START, return_model_prediction=True, verbose=True
)
pred_start = model_forecasting.start_time()
# compute the MAE and RMSE on the test set
print(
    f"On testing set -> MAE: {mae(model_forecasting, s_taxi_test)}, RMSE: {rmse(model_forecasting, s_taxi_test)}"
)

# plot the data and the anomalies
fig, ax = plt.subplots(figsize=(15, 5))
s_taxi_test.plot(label="Number of taxi passengers")
model_forecasting.plot(label="Prediction of the model", linewidth=0.9)
plt.show()
On testing set -> MAE: 595.190366262045, RMSE: 896.6287614972252
../../_images/9312d277b029509a7e129b49ad54da8d11a01ff49ec9122fdfe94f2d8e50e131.png

To evaluate the anomaly model, we call the function eval_metric(). It outputs the score of an agnostic threshold metric (“AUC-ROC” or “AUC-PR”), between the predicted anomaly score time series and some known binary ground-truth time series indicating the presence of actual anomalies.

It will return a dictionary containing the name of the scorer and its score.

metric_names = ["AUC_ROC", "AUC_PR"]
metric_data = []
for metric_name in metric_names:
    metric_data.append(
        anomaly_model.eval_metric(
            anomalies=series_taxi_anomalies,
            series=s_taxi_test,
            start=START,
            metric=metric_name,
        )
    )
pd.DataFrame(data=metric_data, index=metric_names).T
AUC_ROC AUC_PR
Norm (ord=1)_w=1 0.658074 0.215601
WassersteinScorer_w=24 0.884915 0.609469
WassersteinScorer_w=48 0.950035 0.687788

We can see that the anomaly model, using the WassersteinScorer, can separate the abnormal days from the normal ones. The AUC ROC is above 0.9. Additionally, a window of size 48 timestamps (24 hours) is a better option than a window of size 24 timestamps (12 hours).

We call the function show_anomalies() to visualize the results. It plots the forecasts, predicted scores, the input series, and the actual anomalies (if provided). The scorers with different windows will be separated. It is possible to compute a metric that will be shown next to the scorer’s name.

anomaly_model.show_anomalies(
    series=s_taxi_test,
    anomalies=series_taxi_anomalies[pred_start:],
    start=START,
    metric="AUC_ROC",
)
../../_images/30478d4cdfc6e3e55a9bd52336c47cd4a4b113b09a0845f8100243b0b0b9199a.png

Convert an anomaly score to a binary prediction with a Detector#

Darts’ Anomaly Detectors convert anomaly scores into binary anomlies/predictions. In this example, we’ll use the QuantileDetector.

It detects anomalies based on the quantile values (high_quantile and/or low_quantile) of historical data. It flags times as anomalous when the values exceed these quantile thresholds. In this example, the anomaly scores were computed for the absolute residuals of the model. It is lower-bound by 0. We set low_quantile=None (default), as we only want to flag values above high_quantile.

We set high_quantile to 0.95. This value must be chosen carefully, as it will convert the (1- high_quantile) * 100 % biggest anomaly scores into a prediction of anomalies. In our case, we want to see the 5% most anomalous timestamps.

Note: You can also use ThresholdDetector to define some fixed value thresholds for anomaly detection

from darts.ad.detectors import QuantileDetector

contamination = 0.95
detector = QuantileDetector(high_quantile=contamination)
# use the anomaly score that gave the best AUC ROC score: Wasserstein anomaly score with a window of 'full_day'
best_anomaly_score = anomaly_scores[-1]

# fit and detect on the anomaly scores, it will return a binary prediction
anomaly_pred = detector.fit_detect(series=best_anomaly_score)

# plot the binary prediction
fig, (ax1, ax2) = plt.subplots(nrows=2, sharex=True)
anomaly_pred.plot(label="Prediction", ax=ax1)
series_taxi_anomalies[anomaly_pred.start_time() :].plot(
    label="Known anomalies", ax=ax2, color="red"
)
fig.tight_layout()
../../_images/24efc88f68329e05b054db92382bcdb99834cf629a68191050d1fe923a60a363.png
for metric_name in ["accuracy", "precision", "recall", "f1"]:
    metric_val = detector.eval_metric(
        pred_scores=best_anomaly_score,
        anomalies=series_taxi_anomalies,
        window=full_day,
        metric=metric_name,
    )
    print(metric_name + f": {metric_val:.2f}/1")
accuracy: 0.91/1
precision: 0.76/1
recall: 0.32/1
f1: 0.45/1

Using the functions show_anomalies_from_scores(), eval_metric_from_scores()#

Internally, methods eval_metric() and show_anomalies() call eval_metric_from_scores() and show_anomalies_from_scores(), respectively. We can also call them directly with pre-computed anomaly scores to avoid having to re-generate the scores each time.

Let’s reproduce the results from above. Both functions require the window sizes used to compute each of the anomaly scores. In our case, the window sizes were 1, 24 (half_a_day), 48 (full_day).

windows = [1, half_a_day, full_day]
scorer_names = [f"{scorer}_{w}" for scorer, w in zip(anomaly_model.scorers, windows)]

metric_data = {"AUC_ROC": [], "AUC_PR": []}
for metric_name in metric_data:
    metric_data[metric_name] = eval_metric_from_scores(
        anomalies=series_taxi_anomalies,
        pred_scores=anomaly_scores,
        window=windows,
        metric=metric_name,
    )

pd.DataFrame(index=scorer_names, data=metric_data)
AUC_ROC AUC_PR
Norm (ord=1)_1 0.658074 0.215601
WassersteinScorer_24 0.884915 0.609469
WassersteinScorer_48 0.950035 0.687788

As expected, the AUC ROC and AUC PR values are identical to before.

For visualizing the anomalies:

  • if we want to compute a metric, we need to specify the window sizes as well

  • optionally, we can indicate the scorers’ names that generated the scores

show_anomalies_from_scores(
    series=s_taxi_test,
    anomalies=series_taxi_anomalies[pred_start:],
    pred_scores=anomaly_scores,
    pred_series=model_forecasting,
    window=windows,
    title="Anomaly results using a forecasting method",
    names_of_scorers=scorer_names,
    metric="AUC_ROC",
)
../../_images/8a2b2afaa9a45e20ac3059e88c5da3177f3acb49e1f8a556674adc304b6acb19.png

A zoom on each anomalies: visualize the results#

def plot_anom_eval(selected_anomaly, delta_plotted_days):
    one_day = series_taxi.freq * 24 * 2
    anomaly_date = anomalies_day[selected_anomaly][0]
    start = anomaly_date - one_day * delta_plotted_days
    end = anomaly_date + one_day * (delta_plotted_days + 1)

    # input series and forecasts
    series_taxi[start:end].plot(
        label="Number of taxi passengers", color="#6464ff", linewidth=0.8
    )
    model_forecasting[start:end].plot(
        label="Model prediction", color="green", linewidth=0.8
    )

    # actual anomalies and predicted scores
    (series_taxi_anomalies[start:end] * 10000).plot(
        label="Known anomaly", color="r", linewidth=0.8
    )
    # Scaler transforms scores into a value range between (0, 1)
    (Scaler().fit_transform(best_anomaly_score)[start:end] * 10000).plot(
        label="Anomaly score", color="black", linewidth=0.8
    )
    plt.legend(loc="upper center", ncols=2)
    plt.title(selected_anomaly)
    fig.tight_layout()
    plt.show()


for anom_name in anomalies_day:
    plot_anom_eval(anom_name, 3)
    break  # remove this to see all anomalies
../../_images/3971280ba0a3321114887f3b137dd44c8375b6ea6806fd5328929e1354f610f8.png