A
adieldar
Introduction
Anomaly Detector, one of Azure AI services, enables you to monitor and detect anomalies in your time series data. This service is based on advanced algorithms, SR-CNN for univariate analysis and MTAD-GAT for multivariate analysis. This service is being retired by October 2026, and as part of the migration process
- The algorithms were open sourced and published by the new time-series-anomaly-detector · PyPI package.
- We offer a time series anomaly detection workflow in Microsoft Fabric data platform.
Time Series Anomaly Detection in Fabric RTI
There are few options for time series anomaly detection in Fabric RTI (Real Time Intelligence):
- For univariate analysis, KQL contains the native function series_decompose_anomalies() that can perform anomaly detection on thousands of time series in seconds. For further info on using this function take a look at Time series anomaly detection & forecasting in Azure Data Explorer.
- For multivariate analysis, there are few KQL library functions leveraging known multivariate analysis algorithms in scikit-learn , taking advantage of ADX capability to run inline Python as part of the KQL query. For further info see Multivariate Anomaly Detection in Azure Data Explorer - Microsoft Community Hub.
- For both univariate and multivariate analysis you can now use the new workflow, which is based on the time-series-anomaly-detector package, as described below.
Using time-series-anomaly-detector in Fabric
In the following example we shall
- Upload stocks change table to Fabric
- Train the multivariate anomaly detection model in a Python notebook using Spark engine
- Predict anomalies by applying the trained model to new data using Eventhouse (Kusto) engine
Below we briefly present the steps, see Multivariate anomaly detection - Microsoft Fabric | Microsoft Learn for the detailed tutorial.
Creating the environments
- Create a Workspace
- Create Eventhouse – to store the incoming streaming data
- Enable OneLake availability – so the older data that was ingested to the Eventhouse can be seamlessly accessed by the Spark Notebook for training the anomaly detection model
- Enable KQL Python plugin – to be used for real time predictions of anomalies on the new streaming data. Select 3.11.7 DL image that contains the time-series-anomaly-detector package
- Create a Spark environment that includes the time-series-anomaly-detector package
Training & storing the Anomaly Detection model
- Upload the stocks data to the Eventhouse
- Create a notebook to train the model
- Load the data from the Eventhouse using the OneLake path:
Code:
onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI
abfss_uri = convert_onelake_to_abfss(onelake_uri)
df = spark.read.format('delta').load(abfss_uri)
df = df.toPandas().set_index('Date')
- View the data:
Code:
import plotly.graph_objects as go
fig = go.Figure()
fig.add_trace(go.Scatter(x=df.index, y=df['AAPL'], mode='lines', name='AAPL'))
fig.add_trace(go.Scatter(x=df.index, y=df['AMZN'], mode='lines', name='AMZN'))
fig.add_trace(go.Scatter(x=df.index, y=df['GOOG'], mode='lines', name='GOOG'))
fig.add_trace(go.Scatter(x=df.index, y=df['MSFT'], mode='lines', name='MSFT'))
fig.add_trace(go.Scatter(x=df.index, y=df['SPY'], mode='lines', name='SPY'))
fig.update_layout(
title='Stock Prices change',
xaxis_title='Date',
yaxis_title='Change %',
legend_title='Tickers'
)
fig.show()
- Prepare the data for training:
Code:
features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
cutoff_date = pd.to_datetime('2023-01-01')
train_df = df[df.Date < cutoff_date]
- Train the model:
Code:
import mlflow
from anomaly_detector import MultivariateAnomalyDetector
model = MultivariateAnomalyDetector()
sliding_window = 200
param s = {"sliding_window": sliding_window}
model.fit(train_df, params=params)
- Save the model in Fabric ML model registry
Code:
with mlflow.start_run():
mlflow.log_params(params)
mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
model_info = mlflow.pyfunc.log_model(
python_model=model,
artifact_path="mvad_artifacts",
registered_model_name="mvad_5_stocks_model",
)
- Extract the mode path (to be used by the Eventhouse for the prediction):
Code:
mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
model_abfss = mi.latest_versions[0].source
print(model_abfss)
- Create a Query set and attached the Eventhouse to it
- Run the ‘.create-or-alter function’ query to define predict_fabric_mvad_fl() stored function:
Code:
.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
{
let s = artifacts_uri;
let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
let code = ```if 1:
import os
import shutil
import mlflow
model_dir = 'C:/Temp/mvad_model'
model_data_dir = model_dir + '/data'
os.mkdir(model_dir)
shutil.move('C:/Temp/MLmodel', model_dir)
shutil.move('C:/Temp/conda.yaml', model_dir)
shutil.move('C:/Temp/requirements.txt', model_dir)
shutil.move('C:/Temp/python_env.yaml', model_dir)
shutil.move('C:/Temp/python_model.pkl', model_dir)
features_cols = kargs["features_cols"]
trim_result = kargs["trim_result"]
test_data = df[features_cols]
model = mlflow.pyfunc.load_model(model_dir)
predictions = model.predict(test_data)
predict_result = pd.DataFrame(predictions)
samples_offset = len(df) - len(predict_result) # this model doesn't output predictions for the first sliding_window-1 samples
if trim_result: # trim the prefix samples
result = df[samples_offset:]
result.iloc[:,-4:] = predict_result.iloc[:, 1:] # no need to copy 1st column which is the timestamp index
else:
result = df # output all samples
result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
```;
samples
| evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
}
- Run the prediction query that will detect multivariate anomalies on the 5 stocks, based on the trained model, and render it as anomalychart. Note that the anomalous points are rendered on the first stock (AAPL), though they represent multivariate anomalies, i.e. anomalies of the vector of the 5 stocks in the specific date.
Code:
let cutoff_date=datetime(2023-01-01);
let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count); // number of latest points to predict
let sliding_window=200; // should match the window that was set for model training
let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
let num_samples = prefix_score_len + num_predictions;
demo_stocks_change
| top num_samples by Date desc
| order by Date asc
| extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
| invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
// NOTE: Update artifacts_uri to model path
artifacts_uri='enter your model URI here',
trim_result=true)
| summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
| render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
Summary
The addition of the time-series-anomaly-detector package to Fabric makes it the top platform for univariate & multivariate time series anomaly detection. Choose the anomaly detection method that best fits your scenario – from native KQL function for univariate analysis at scale, through standard multivariate analysis techniques and up to the best of breed time series anomaly detection algorithms implemented in the time-series-anomaly-detector package. For more information see the overview and tutorial.
Continue reading...