Source code for models

from fireTS.core import GeneralAutoRegressor
from sklearn.utils.validation import check_X_y, check_array
from sklearn.metrics import r2_score, mean_squared_error
import numpy as np
from collections import deque


[docs]class NARX(GeneralAutoRegressor): r""" NARX stands for `Nonlinear AutoRegressive eXogenous model <https://en.wikipedia.org/wiki/Nonlinear_autoregressive_exogenous_model>`_. The model equation is written as follows. .. math:: y(t + 1) &=& f(y(t), ..., y(t-p+1), \\ & & x_1(t - d_1), ..., x_1(t-d_1-q_1+1), \\ & & ..., x_m(t - d_1), ..., x_m(t - d_m - q_m + 1)) + e(t) :label: narx :param object base_estimator: an estimator object that implements the scikit-learn API (fit, and predict). The estimator will be used to fit the function :math:`f` in equation :eq:`narx`. :param int auto_order: the autoregression order :math:`p` in equation :eq:`narx`. :param list exog_order: the exogenous input order, a list of integers representing the order for each exogenous input, i.e. :math:`[q_1, q_2, ..., q_m]` in equation :eq:`narx`. :param list exog_delay: the delays of the exogenous inputs, a list of integers representing the delay of each exogenous input, i.e. :math:`[d_1, d_2, ..., d_m]` in equation :eq:`narx`. By default, all the delays are set to 0. :param dict base_params: other keyword arguments for base_estimator. """ def __init__(self, base_estimator, auto_order, exog_order, exog_delay=None, **base_params): super(NARX, self).__init__( base_estimator, auto_order, exog_order, exog_delay=exog_delay, pred_step=1, **base_params)
[docs] def score(self, X, y, step=1, method="r2"): """ Produce multi-step prediction of y, and compute the metrics against y. Nan is ignored when computing the metrics. :param array-like X: exogenous input time series, shape = (n_samples, n_exog_inputs) :param array-like y: target time series to predict, shape = (n_samples) :param int step: prediction step. :param string method: could be "r2" (R Square) or "mse" (Mean Square Error). :return: prediction metric. Nan is ignored when computing the metrics. """ ypred = self.predict(X, y, step=step) mask = np.isnan(y) | np.isnan(ypred) if method == "r2": return r2_score(y[~mask], ypred[~mask]) elif method == "mse": return mean_squared_error(y[~mask], ypred[~mask])
[docs] def predict(self, X, y, step=1): r""" Produce multi-step prediction of y. The multi-step prediction is done recursively by using the future inputs in X. The prediction equation is as follows: .. math:: \hat{y}(t + k) &=& f(\hat{y}(t + k - 1), ..., \hat{y}(t + k - p), \\ & &x_1(t + k - 1 - d_1), ..., x_1(t + k - d_1 - q_1) \\ & &..., x_m(t + k - 1 - d_m), ..., x_m(t + k - d_m - q_m)) :param array-like X: exogenous input time series, shape = (n_samples, n_exog_inputs) :param array-like y: target time series to predict, shape = (n_samples) :param int step: prediction step. :return: k-step prediction time series, shape = (n_samples). The :math:`i` th value of the output is the k-step prediction of the :math:`i` th value of the input ``y``. The first ``step + max(auto_order - 1, max(exog_order + exog_delay) - 1)`` values of the output is ``np.nan``. """ X, y = self._check_and_preprocess_X_y(X, y) p = self._get_lag_feature_processor(X, y) features = p.generate_lag_features() for k in range(step): yhat = self._predictNA(features) if k == step - 1: break features = p.update(yhat) ypred = np.concatenate([np.empty(step) * np.nan, yhat])[0:len(y)] return ypred
[docs] def forecast(self, X, y, step=1, X_future=None): r""" Forecast y multiple step ahead given the exogenous input history X, output history y and the future exogenous input X_future. X_future is assumed to be all zeros if not specified. :param array-like X: exogenous input time series, shape = (n_samples, n_exog_inputs) :param array-like y: target time series to predict, shape = (n_samples) :param int step: prediction step. :param array-like X_futrue: future exogenous input time series, shape = (step - 1, n_exog_inputs) :return: multi-step forecasted time series, shape = (step). """ assert step > 0 X, y = self._check_and_preprocess_X_y(X, y) if X_future is None: X_future = np.zeros((step - 1, self.num_exog_inputs)) X_future = check_array(X_future, ensure_min_samples=0) if X_future.shape[0] != step - 1: raise ValueError('The row number of X_future ({}) must to step - 1 ({})!'.format(X_future.shape[0], step - 1)) auto_regressor = deque(y[:(-1 - self.auto_order):-1]) exog_regressors = [ deque(X[(-1 - d):(-1 - d - q):-1, i]) for i, (d, q) in enumerate(zip(self.exog_delay, self.exog_order)) ] cur_step = 0 y_forecast = [] while cur_step < step: X_base = np.concatenate([np.array(auto_regressor), np.concatenate(exog_regressors)]).reshape(1, -1) y_hat = self.base_estimator.predict(X_base) y_forecast.append(y_hat[0]) if cur_step == step - 1: break # update regressors with the newly obtained values auto_regressor.pop() auto_regressor.appendleft(y_forecast[-1]) for exog_reg, X_next in zip(exog_regressors, X_future[cur_step, :]): exog_reg.pop() exog_reg.appendleft(X_next) cur_step += 1 return np.array(y_forecast)
[docs]class DirectAutoRegressor(GeneralAutoRegressor): r""" This model performs autoregression with exogenous inputs on the k-step ahead output directly. The model equation is written as follows. .. math:: y(t + k) &=& f(y(t), ..., y(t-p+1), \\ & & x_1(t - d_1), ..., x_1(t-d_1-q_1+1), \\ & & ..., x_m(t - d_1), ..., x_m(t - d_m - q_m + 1)) + e(t) :label: direct :param object base_estimator: an estimator object that implements the scikit-learn API (fit, and predict). The estimator will be used to fit the function :math:`f` in equation :eq:`direct`. :param int auto_order: the autoregression order :math:`p` in equation :eq:`direct`. :param list exog_order: the exogenous input order, a list of integers representing the order for each exogenous input, i.e. :math:`[q_1, q_2, ..., q_m]` in equation :eq:`direct`. :param int pred_step: the prediction step :math:`k` in equation :eq:`gar`. By default, it is set to 1. :param list exog_delay: the delays of the exogenous inputs, a list of integers representing the delay of each exogenous input, i.e. :math:`[d_1, d_2, ..., d_m]` in equation :eq:`direct`. By default, all the delays are set to 0. :param dict base_params: other keyword arguments for base_estimator. """ def __init__(self, base_estimator, auto_order, exog_order, pred_step, exog_delay=None, **base_params): super(DirectAutoRegressor, self).__init__( base_estimator, auto_order, exog_order, exog_delay=exog_delay, pred_step=pred_step, **base_params)
[docs] def predict(self, X, y): r""" Produce multi-step prediction of y. The multi-step prediction is done directly. No future X inputs are used in the prediction. The prediction equation is as follows: .. math:: \hat{y}(t + k) &=& f(y(t), ..., y(t - p + 1), \\ & & x_1(t - d_1), ..., x_1(t - d_1 - q_1 + 1) \\ & & ..., x_m(t - d_m), ..., x_m(t - d_m - q_m + 1)) :param array-like X: exogenous input time series, shape = (n_samples, n_exog_inputs) :param array-like y: target time series to predict, shape = (n_samples) :param int step: prediction step. :return: k-step prediction time series, shape = (n_samples). The :math:`i` th value of the output is the k-step prediction of the :math:`i` th value of the input ``y``. The first ``pred_step + max(auto_order - 1, max(exog_order + exog_delay) - 1)`` values of the output is ``np.nan``. """ X, y = self._check_and_preprocess_X_y(X, y) p = self._get_lag_feature_processor(X, y) features = p.generate_lag_features() yhat = self._predictNA(features) ypred = np.concatenate([np.empty(self.pred_step) * np.nan, yhat])[0:len(y)] return ypred
[docs] def score(self, X, y, method="r2", verbose=False): """ Produce multi-step prediction of y, and compute the metrics against y. Nan is ignored when computing the metrics. :param array-like X: exogenous input time series, shape = (n_samples, n_exog_inputs) :param array-like y: target time series to predict, shape = (n_samples) :param string method: could be "r2" (R Square) or "mse" (Mean Square Error). :return: prediction metric. Nan is ignored when computing the metrics. """ ypred = self.predict(X, y) mask = np.isnan(y) | np.isnan(ypred) if verbose: print('Evaluating {} score, {} of {} data points are evaluated.'. format(method, np.sum(~mask), y.shape[0])) if method == "r2": return r2_score(y[~mask], ypred[~mask]) elif method == "mse": return mean_squared_error(y[~mask], ypred[~mask]) else: raise ValueError('{} method is not supported. Please choose from \"r2\" or \"mse\".')