Jump to content

Recommended Posts

Guest adieldar
Posted

[HEADING=1]Introduction[/HEADING]

 

Anomaly Detector, one of Azure AI services, enables you to monitor and detect anomalies in your time series data. This service is based on advanced algorithms, SR-CNN for univariate analysis and MTAD-GAT for multivariate analysis. This service is being retired by October 2026, and as part of the migration process

 

  • The algorithms were open sourced and published by the new time-series-anomaly-detector · PyPI package.
  • We offer a time series anomaly detection workflow in Microsoft Fabric data platform.

[HEADING=1]Time Series Anomaly Detection in Fabric RTI[/HEADING]

 

There are few options for time series anomaly detection in Fabric RTI (Real Time Intelligence):

 

[HEADING=1]Using time-series-anomaly-detector in Fabric[/HEADING]

 

In the following example we shall

 

  • Upload stocks change table to Fabric
  • Train the multivariate anomaly detection model in a Python notebook using Spark engine
  • Predict anomalies by applying the trained model to new data using Eventhouse (Kusto) engine

 

Below we briefly present the steps, see Multivariate anomaly detection - Microsoft Fabric | Microsoft Learn for the detailed tutorial.

 

 

 

[HEADING=1]Creating the environments[/HEADING]

  1. Create a Workspace
  2. Create Eventhouse – to store the incoming streaming data
    • Enable OneLake availability – so the older data that was ingested to the Eventhouse can be seamlessly accessed by the Spark Notebook for training the anomaly detection model
    • Enable KQL Python plugin – to be used for real time predictions of anomalies on the new streaming data. Select 3.11.7 DL image that contains the time-series-anomaly-detector package

[*]Create a Spark environment that includes the time-series-anomaly-detector package

[HEADING=1]Training & storing the Anomaly Detection model[/HEADING]

  1. Upload the stocks data to the Eventhouse
  2. Create a notebook to train the model

  • Load the data from the Eventhouse using the OneLake path:

 

onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
abfss_uri = convert_onelake_to_abfss(onelake_uri)
df = spark.read.format('delta').load(abfss_uri)
df = df.toPandas().set_index('Date')

 

 

 

  • View the data:

 

import plotly.graph_objects as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=df.index, y=df['AAPL'], mode='lines', name='AAPL'))
fig.add_trace(go.Scatter(x=df.index, y=df['AMZN'], mode='lines', name='AMZN'))
fig.add_trace(go.Scatter(x=df.index, y=df['GOOG'], mode='lines', name='GOOG'))
fig.add_trace(go.Scatter(x=df.index, y=df['MSFT'], mode='lines', name='MSFT'))
fig.add_trace(go.Scatter(x=df.index, y=df['SPY'], mode='lines', name='SPY'))
fig.update_layout(
   title='Stock Prices change',
   xaxis_title='Date',
   yaxis_title='Change %',
   legend_title='Tickers'
)

fig.show()

 

[ATTACH type=full" alt="Stocks Price Changes.png]63585[/ATTACH]

 

 

 

  • Prepare the data for training:

 

features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
cutoff_date = pd.to_datetime('2023-01-01')
train_df = df[df.Date < cutoff_date]

  • Train the model:

 

import mlflow
from anomaly_detector import MultivariateAnomalyDetector
model = MultivariateAnomalyDetector()
sliding_window = 200
param   s = {"sliding_window": sliding_window}
model.fit(train_df, params=params)

  • Save the model in Fabric ML model registry

 

with mlflow.start_run():
   mlflow.log_params(params)
   mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")

   model_info = mlflow.pyfunc.log_model(
       python_model=model,
       artifact_path="mvad_artifacts",
       registered_model_name="mvad_5_stocks_model",
   )

 

 

 

  • Extract the mode path (to be used by the Eventhouse for the prediction):

 

mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
model_abfss = mi.latest_versions[0].source
print(model_abfss)

  1. Create a Query set and attached the Eventhouse to it
    • Run the ‘.create-or-alter function’ query to define predict_fabric_mvad_fl() stored function:

 

.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
{
   let s = artifacts_uri;
   let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                            'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                            'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
   let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
   let code = ```if 1:
       import os
       import shutil
       import mlflow
       model_dir = 'C:/Temp/mvad_model'
       model_data_dir = model_dir + '/data'
       os.mkdir(model_dir)
       shutil.move('C:/Temp/MLmodel', model_dir)
       shutil.move('C:/Temp/conda.yaml', model_dir)
       shutil.move('C:/Temp/requirements.txt', model_dir)
       shutil.move('C:/Temp/python_env.yaml', model_dir)
       shutil.move('C:/Temp/python_model.pkl', model_dir)
       features_cols = kargs["features_cols"]
       trim_result = kargs["trim_result"]
       test_data = df[features_cols]
       model = mlflow.pyfunc.load_model(model_dir)
       predictions = model.predict(test_data)
       predict_result = pd.DataFrame(predictions)
       samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
       if trim_result:                                       # trim the prefix samples
           result = df[samples_offset:]
           result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
       else:
           result = df                                       # output all samples
           result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
       ```;
   samples
   | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
}

  • Run the prediction query that will detect multivariate anomalies on the 5 stocks, based on the trained model, and render it as anomalychart. Note that the anomalous points are rendered on the first stock (AAPL), though they represent multivariate anomalies, i.e. anomalies of the vector of the 5 stocks in the specific date.

 

let cutoff_date=datetime(2023-01-01);
let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
let sliding_window=200;                                                                 //  should match the window that was set for model training
let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
let num_samples = prefix_score_len + num_predictions;
demo_stocks_change
| top num_samples by Date desc 
| order by Date asc
| extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
| invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
           // NOTE: Update artifacts_uri to model path
           artifacts_uri='enter your model URI here',
           trim_result=true)
| summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
| render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')

 

 

 

[ATTACH type=full" alt="31 - Anomaly chart.png]63586[/ATTACH]

 

 

 

[HEADING=1]Summary[/HEADING]

 

The addition of the time-series-anomaly-detector package to Fabric makes it the top platform for univariate & multivariate time series anomaly detection. Choose the anomaly detection method that best fits your scenario – from native KQL function for univariate analysis at scale, through standard multivariate analysis techniques and up to the best of breed time series anomaly detection algorithms implemented in the time-series-anomaly-detector package. For more information see the overview and tutorial.

 

Continue reading...

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...