Guest adieldar Posted August 22 Posted August 22 [HEADING=1]Introduction[/HEADING] Anomaly Detector, one of Azure AI services, enables you to monitor and detect anomalies in your time series data. This service is based on advanced algorithms, SR-CNN for univariate analysis and MTAD-GAT for multivariate analysis. This service is being retired by October 2026, and as part of the migration process The algorithms were open sourced and published by the new time-series-anomaly-detector · PyPI package. We offer a time series anomaly detection workflow in Microsoft Fabric data platform. [HEADING=1]Time Series Anomaly Detection in Fabric RTI[/HEADING] There are few options for time series anomaly detection in Fabric RTI (Real Time Intelligence): For univariate analysis, KQL contains the native function series_decompose_anomalies() that can perform anomaly detection on thousands of time series in seconds. For further info on using this function take a look at Time series anomaly detection & forecasting in Azure Data Explorer. For multivariate analysis, there are few KQL library functions leveraging known multivariate analysis algorithms in scikit-learn , taking advantage of ADX capability to run inline Python as part of the KQL query. For further info see Multivariate Anomaly Detection in Azure Data Explorer - Microsoft Community Hub. For both univariate and multivariate analysis you can now use the new workflow, which is based on the time-series-anomaly-detector package, as described below. [HEADING=1]Using time-series-anomaly-detector in Fabric[/HEADING] In the following example we shall Upload stocks change table to Fabric Train the multivariate anomaly detection model in a Python notebook using Spark engine Predict anomalies by applying the trained model to new data using Eventhouse (Kusto) engine Below we briefly present the steps, see Multivariate anomaly detection - Microsoft Fabric | Microsoft Learn for the detailed tutorial. [HEADING=1]Creating the environments[/HEADING] Create a Workspace Create Eventhouse – to store the incoming streaming data Enable OneLake availability – so the older data that was ingested to the Eventhouse can be seamlessly accessed by the Spark Notebook for training the anomaly detection model Enable KQL Python plugin – to be used for real time predictions of anomalies on the new streaming data. Select 3.11.7 DL image that contains the time-series-anomaly-detector package [*]Create a Spark environment that includes the time-series-anomaly-detector package [HEADING=1]Training & storing the Anomaly Detection model[/HEADING] Upload the stocks data to the Eventhouse Create a notebook to train the model Load the data from the Eventhouse using the OneLake path: onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI abfss_uri = convert_onelake_to_abfss(onelake_uri) df = spark.read.format('delta').load(abfss_uri) df = df.toPandas().set_index('Date') View the data: import plotly.graph_objects as go fig = go.Figure() fig.add_trace(go.Scatter(x=df.index, y=df['AAPL'], mode='lines', name='AAPL')) fig.add_trace(go.Scatter(x=df.index, y=df['AMZN'], mode='lines', name='AMZN')) fig.add_trace(go.Scatter(x=df.index, y=df['GOOG'], mode='lines', name='GOOG')) fig.add_trace(go.Scatter(x=df.index, y=df['MSFT'], mode='lines', name='MSFT')) fig.add_trace(go.Scatter(x=df.index, y=df['SPY'], mode='lines', name='SPY')) fig.update_layout( title='Stock Prices change', xaxis_title='Date', yaxis_title='Change %', legend_title='Tickers' ) fig.show() [ATTACH type=full" alt="Stocks Price Changes.png]63585[/ATTACH] Prepare the data for training: features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'] cutoff_date = pd.to_datetime('2023-01-01') train_df = df[df.Date < cutoff_date] Train the model: import mlflow from anomaly_detector import MultivariateAnomalyDetector model = MultivariateAnomalyDetector() sliding_window = 200 param s = {"sliding_window": sliding_window} model.fit(train_df, params=params) Save the model in Fabric ML model registry with mlflow.start_run(): mlflow.log_params(params) mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset") model_info = mlflow.pyfunc.log_model( python_model=model, artifact_path="mvad_artifacts", registered_model_name="mvad_5_stocks_model", ) Extract the mode path (to be used by the Eventhouse for the prediction): mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0] model_abfss = mi.latest_versions[0].source print(model_abfss) Create a Query set and attached the Eventhouse to it Run the ‘.create-or-alter function’ query to define predict_fabric_mvad_fl() stored function: .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric") predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false) { let s = artifacts_uri; let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'), 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'), 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate')); let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result); let code = ```if 1: import os import shutil import mlflow model_dir = 'C:/Temp/mvad_model' model_data_dir = model_dir + '/data' os.mkdir(model_dir) shutil.move('C:/Temp/MLmodel', model_dir) shutil.move('C:/Temp/conda.yaml', model_dir) shutil.move('C:/Temp/requirements.txt', model_dir) shutil.move('C:/Temp/python_env.yaml', model_dir) shutil.move('C:/Temp/python_model.pkl', model_dir) features_cols = kargs["features_cols"] trim_result = kargs["trim_result"] test_data = df[features_cols] model = mlflow.pyfunc.load_model(model_dir) predictions = model.predict(test_data) predict_result = pd.DataFrame(predictions) samples_offset = len(df) - len(predict_result) # this model doesn't output predictions for the first sliding_window-1 samples if trim_result: # trim the prefix samples result = df[samples_offset:] result.iloc[:,-4:] = predict_result.iloc[:, 1:] # no need to copy 1st column which is the timestamp index else: result = df # output all samples result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:] ```; samples | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts) } Run the prediction query that will detect multivariate anomalies on the 5 stocks, based on the trained model, and render it as anomalychart. Note that the anomalous points are rendered on the first stock (AAPL), though they represent multivariate anomalies, i.e. anomalies of the vector of the 5 stocks in the specific date. let cutoff_date=datetime(2023-01-01); let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count); // number of latest points to predict let sliding_window=200; // should match the window that was set for model training let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1; let num_samples = prefix_score_len + num_predictions; demo_stocks_change | top num_samples by Date desc | order by Date asc | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null) | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'), // NOTE: Update artifacts_uri to model path artifacts_uri='enter your model URI here', trim_result=true) | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly)) | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies') [ATTACH type=full" alt="31 - Anomaly chart.png]63586[/ATTACH] [HEADING=1]Summary[/HEADING] The addition of the time-series-anomaly-detector package to Fabric makes it the top platform for univariate & multivariate time series anomaly detection. Choose the anomaly detection method that best fits your scenario – from native KQL function for univariate analysis at scale, through standard multivariate analysis techniques and up to the best of breed time series anomaly detection algorithms implemented in the time-series-anomaly-detector package. For more information see the overview and tutorial. Continue reading... Quote
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.