什么是时间序列分析:
时间序列通常用折线图绘制。 时间序列用于统计学、信号处理、模式识别、计量经济学、数学金融、天气预报、地震预测、脑电图、控制工程、天文学、通信工程以及主要涉及时间测量的应用科学和工程领域。
时间序列分析包括分析时间序列数据以提取有意义的统计数据和数据的其他特征的方法。 时间序列预测是使用模型根据先前观察到的值来预测未来值。 尽管回归分析通常以检验一个或多个独立时间序列的当前值影响另一个时间序列的当前值的理论的方式进行,但这种类型的时间序列分析并不称为“时间序列分析”,其重点是对单个时间序列或多个相关时间序列在不同时间点的值进行比较。 中断时间序列分析是对单个时间序列的干预分析。
时间序列数据具有自然的时间顺序。 这使得时间序列分析不同于横截面研究,横截面研究没有自然的观察顺序(例如,通过参考人们各自的教育水平来解释他们的工资,其中个人数据可以按任何顺序输入)。 时间序列分析也不同于空间数据分析,空间数据分析的观察结果通常与地理相关(例如,按位置计算房价和房屋的内在特征)。 随机时间序列模型通常反映这样一个事实:时间上紧密相连的观测值比相距较远的观测值之间的相关性更紧密。 此外,时间序列模型通常会利用时间的自然单向排序,因此给定时期的值将表示为源自过去的值而不是未来的值(请参阅时间可逆性)。
时间序列分析可应用于实值、连续数据、离散数值数据或离散符号数据(即字符序列,例如英文字母和单词)。
常见的时间序列分析:
时间序列常用的算法包括三大类:移动平均(MA, )、指数平滑(ES, -tial )和微分自回归移动平均模型(ARIMA, Auto-Model)。 每个类别又可以细分并扩展为各种算法。
当数据集较大、数据粒度丰富时,我们可以选择多种时间序列预测模式。 时间序列的数据粒度可以分为秒、分钟、小时、天、周、月、季度、年等,不同的粒度可以用于时间序列预测。
如果按每分钟时间戳的粒度有 3650 天(完整 10 年)的数据,那么这个数据集将有 5256 000(60 分钟 24 小时 365 天 * 10 年)条记录。 根据这个数据集来预测今天的总数据,可以考虑三种时间序列模式:
这三种方法的训练集都是3650条,即当天的数据粒度。 水平模式和垂直模式由于分割粒度更细,需要更多的模型,这意味着需要更多的时间进行训练和预测。 当第一种集成方式不能满足需要或者预测效果不好时,可以尝试后两种方式。
平稳性
平稳性是时间序列分析的先决条件。 所谓稳定通俗理解是指数据随着时间的推移不呈现明显的趋势和规律,如剧烈波动、增加、减少等,而是相对均匀、随机分布在均值附近。 ARIMA模型中的I是对数据进行差分以实现数据的稳定,ARIMA关键参数p、d、q中的d是时间序列变得稳定时所做的差分次数。
如何判断时间序列数据是否需要平稳处理? 一般有以下三种方法:
有几种方法可以实现数据稳定:
白噪声
白噪声(white Noise)测试,也称为随机性测试,用于测试时间序列的值之间是否存在相关性。 白噪声分布是应用时间序列分析的前提。 检测时间序列是否属于随机分布,可以采用图解法和Ljung Box法进行检验。
完成
import pandas as pd # pandas库
import numpy as np # numpy库
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf # acf和pacf展示库
from statsmodels.tsa.stattools import adfuller # adf检验库
from statsmodels.stats.diagnostic import acorr_ljungbox # 随机性检验库
from statsmodels.tsa.arima_model import ARMA # ARMA库
import matplotlib.pyplot as plt # matplotlib图形展示库
import prettytable # 导入表格库
# 由于pandas判断周期性时会出现warning,这里忽略提示
import warnings
warnings.filterwarnings('ignore')
#### 读取数据 ####
# 创建解析列的功能对象
date_parse = lambda dates: pd.datetime.strptime(dates, '%m-%d-%Y')
# 读取数据
df = pd.read_table('https://raw.githubusercontent.com/ffzs/dataset/master/time_series.txt', delimiter='\t', index_col='date', date_parser=date_parse)
# 将列转换为float32类型
ts_data = df['number'].astype('float32')
print ('data summary')
# 打印输出时间序列数据概况
print (ts_data.describe())
# 多次用到的表格
def pre_table(table_name, table_rows):
'''
:param table_name: 表格名称,字符串列表
:param table_rows: 表格内容,嵌套列表
:return: 展示表格对象
'''
table = prettytable.PrettyTable() # 创建表格实例
table.field_names = table_name # 定义表格列名
for i in table_rows: # 循环读多条数据
table.add_row(i) # 增加数据
return table
# 稳定性(ADF)检验
def adf_val(ts, ts_title, acf_title, pacf_title):
'''
:param ts: 时间序列数据,Series类型
:param ts_title: 时间序列图的标题名称,字符串
:param acf_title: acf图的标题名称,字符串
:param pacf_title: pacf图的标题名称,字符串
:return: adf值、adf的p值、三种状态的检验值
'''
plt.figure()
plt.plot(ts) # 时间序列图
plt.title(ts_title) # 时间序列标题
plt.show()
plot_acf(ts, lags=20, title=acf_title).show() # 自相关检测
plot_pacf(ts, lags=20, title=pacf_title).show() # 偏相关检测
adf, pvalue, usedlag, nobs, critical_values, icbest = adfuller(ts) # 稳定性(ADF)检验
table_name = ['adf', 'pvalue', 'usedlag', 'nobs', 'critical_values', 'icbest'] # 表格列名列表
table_rows = [[adf, pvalue, usedlag, nobs, critical_values, icbest]] # 表格行数据,嵌套列表
adf_table = pre_table(table_name, table_rows) # 获得平稳性展示表格对象
print ('stochastic score') # 打印标题
print (adf_table) # 打印展示表格
return adf, pvalue, critical_values, # 返回adf值、adf的p值、三种状态的检验值
# 白噪声(随机性)检验
def acorr_val(ts):
'''
:param ts: 时间序列数据,Series类型
:return: 白噪声检验的P值和展示数据表格对象
'''
lbvalue, pvalue = acorr_ljungbox(ts, lags=1) # 白噪声检验结果
table_name = ['lbvalue', 'pvalue'] # 表格列名列表
table_rows = [[lbvalue, pvalue]] # 表格行数据,嵌套列表
acorr_ljungbox_table = pre_table(table_name, table_rows) # 获得白噪声检验展示表格对象
print ('stationarity score') # 打印标题
print (acorr_ljungbox_table) # 打印展示表格
return pvalue # 返回白噪声检验的P值和展示数据表格对象
##### 原始数据检验 ####
# 稳定性检验
adf, pvalue1, critical_values = adf_val(ts_data, 'raw time series', 'raw acf', 'raw pacf')
# 白噪声检验
pvalue2 = acorr_val(ts_data)
# 数据平稳处理
def get_best_log(ts, max_log=5, rule1=True, rule2=True):
'''
:param ts: 时间序列数据,Series类型
:param max_log: 最大log处理的次数,int型
:param rule1: rule1规则布尔值,布尔型
:param rule2: rule2规则布尔值,布尔型
:return: 达到平稳处理的最佳次数值和处理后的时间序列
'''
if rule1 and rule2: # 如果两个规则同时满足
return 0, ts # 直接返回0和原始时间序列数据
else: # 只要有一个规则不满足
for i in range(1, max_log): # 循环做log处理
ts = np.log(ts) # log处理
adf, pvalue1, usedlag, nobs, critical_values, icbest = adfuller(ts) # 稳定性(ADF)检验
lbvalue, pvalue2 = acorr_ljungbox(ts, lags=1) # 白噪声(随机性)检验
rule_1 = (adf < critical_values['1%'] and adf < critical_values['5%'] and adf < critical_values[
'10%'] and pvalue1 < 0.01) # 稳定性(ADF)检验规则
rule_2 = (pvalue2 < 0.05) # 白噪声(随机性)规则
rule_3 = (i < 5)
if rule_1 and rule_2 and rule_3: # 如果同时满足条件
print ('The best log n is: {0}'.format(i)) # 打印输出最佳次数
return i, ts # 返回最佳次数和处理后的时间序列
# 还原经过平稳处理的数据
def recover_log(ts, log_n):
'''
:param ts: 经过log方法平稳处理的时间序列,Series类型
:param log_n: log方法处理的次数,int型
:return: 还原后的时间序列
'''
for i in range(1, log_n + 1): # 循环多次
ts = np.exp(ts) # log方法还原
return ts # 返回时间序列
#### 对时间序列做稳定性处理 #####
# 稳定性检验规则
rule1 = (adf < critical_values['1%'] and adf < critical_values['5%'] and adf < critical_values[
'10%'] and pvalue1 < 0.01)
# 白噪声检验的规则
rule2 = (pvalue2[0,] < 0.05)
# 使用log进行稳定性处理
log_n, ts_data = get_best_log(ts_data, max_log=5, rule1=rule1, rule2=rule2)
#### 稳定后数据进行检验 ####
# 稳定性检验
adf, pvalue1, critical_values = adf_val(ts_data, 'final time series', 'final acf', 'final pacf')
# 白噪声检验
pvalue2 = acorr_val(ts_data)
稳定性满足要求
# arma最优模型训练
def arma_fit(ts):
'''
:param ts: 时间序列数据,Series类型
:return: 最优状态下的p值、q值、arma模型对象、pdq数据框和展示参数表格对象
'''
max_count = int(len(ts) / 10) # 最大循环次数最大定义为记录数的10%
bic = float('inf') # 初始值为正无穷
tmp_score = [] # 临时p、q、aic、bic和hqic的值的列表
print ('each p/q traning record') # 打印标题
print('p q aic bic hqic')
for tmp_p in range(max_count + 1): # p循环max_count+1次
for tmp_q in range(max_count + 1): # q循环max_count+1次
model = ARMA(ts, order=(tmp_p, tmp_q)) # 创建ARMA模型对象
try:
# ARMA模型训练 disp不打印收敛信息 method条件平方和似然度最大化
results_ARMA = model.fit(disp=-1, method='css')
except:
continue # 遇到报错继续
finally:
tmp_aic = results_ARMA.aic # 模型的获得aic
tmp_bic = results_ARMA.bic # 模型的获得bic
tmp_hqic = results_ARMA.hqic # 模型的获得hqic
print('{:2d}|{:2d}|{:.8f}|{:.8f}|{:.8f}'.format(tmp_p, tmp_q, tmp_aic, tmp_bic, tmp_hqic))
tmp_score.append([tmp_p, tmp_q, tmp_aic, tmp_bic, tmp_hqic]) # 追加每个模型的训练参数和结果
if tmp_bic < bic: # 如果模型bic小于最小值,那么获得最优模型ARMA的下列参数:
p = tmp_p # 最优模型ARMA的p值
q = tmp_q # 最优模型ARMA的q值
model_arma = results_ARMA # 最优模型ARMA的模型对象
aic = tmp_bic # 最优模型ARMA的aic
bic = tmp_bic # 最优模型ARMA的bic
hqic = tmp_bic # 最优模型ARMA的hqic
pdq_metrix = np.array(tmp_score) # 将嵌套列表转换为矩阵
pdq_pd = pd.Dataframe(pdq_metrix, columns=['p', 'q', 'aic', 'bic', 'hqic']) # 基于矩阵创建数据框
table_name = ['p', 'q', 'aic', 'bic', 'hqic'] # 表格列名列表
table_rows = [[p, q, aic, bic, hqic]] # 表格行数据,嵌套列表
parameter_table = pre_table(table_name, table_rows) # 获得最佳ARMA模型结果展示表格对象
# print ('each p/q traning record') # 打印标题
# print (pdq_pd) # 打印输出每次ARMA拟合结果,包含p、d、q以及对应的AIC、BIC、HQIC
print ('best p and q') # 打印标题
print (parameter_table) # 输出最佳ARMA模型结果展示表格对象
return model_arma # 最优状态下的arma模型对象
# 模型训练和效果评估
def train_test(model_arma, ts, log_n, rule1=True, rule2=True):
'''
:param model_arma: 最优ARMA模型对象
:param ts: 时间序列数据,Series类型
:param log_n: 平稳性处理的log的次数,int型
:param rule1: rule1规则布尔值,布尔型
:param rule2: rule2规则布尔值,布尔型
:return: 还原后的时间序列
'''
train_predict = model_arma.predict() # 得到训练集的预测时间序列
if not (rule1 and rule2): # 如果两个条件有任意一个不满足
train_predict = recover_log(train_predict, log_n) # 恢复平稳性处理前的真实时间序列值
ts = recover_log(ts, log_n) # 时间序列还原处理
ts_data_new = ts[train_predict.index] # 将原始时间序列数据的长度与预测的周期对齐
RMSE = np.sqrt(np.sum((train_predict - ts_data_new) ** 2) / ts_data_new.size) # 求RMSE
# 对比训练集的预测和真实数据
plt.figure() # 创建画布
train_predict.plot(label='predicted data', style='--') # 以虚线展示预测数据
ts_data_new.plot(label='raw data') # 以实线展示原始数据
plt.legend(loc='best') # 设置图例位置
plt.title('raw data and predicted data with RMSE of %.2f' % RMSE) # 设置标题
plt.show() # 展示图像
return ts # 返回还原后的时间序列
# 训练最佳ARMA模型并输出相关参数和对象
model_arma = arma_fit(ts_data)
# 模型训练和效果评估
ts_data = train_test(model_arma, ts_data, log_n, rule1=rule1, rule2=rule2)
# 预测未来指定时间项的数据
def predict_data(model_arma, ts, log_n, start, end, rule1=True, rule2=True):
'''
:param model_arma: 最优ARMA模型对象
:param ts: 时间序列数据,Series类型
:param log_n: 平稳性处理的log的次数,int型
:param start: 要预测数据的开始时间索引
:param end: 要预测数据的结束时间索引
:param rule1: rule1规则布尔值,布尔型
:param rule2: rule2规则布尔值,布尔型
:return: 无
'''
predict_ts = model_arma.predict(start=start, end=end) # 预测未来指定时间项的数据
print ('-----------predict data----------') # 打印标题
if not (rule1 and rule2): # 如果两个条件有任意一个不满足
predict_ts = recover_log(predict_ts, log_n) # 还原数据
print (predict_ts) # 展示预测数据
# 展示预测趋势
plt.figure() # 创建画布
ts.plot(label='raw time series') # 设置推向标签
predict_ts.plot(label='predicted data', style='--') # 以虚线展示预测数据
plt.legend(loc='best') # 设置图例位置
plt.title('predicted time series') # 设置标题
plt.show() # 展示图像
#### 模型预测应用 ####
# 设置时间
start = '1991-07-28'
end = '1991-08-02'
# 预测
predict_data(model_arma, ts_data, log_n, start, end, rule1=rule1, rule2=rule2)
补充:ARMA中
常用的时间序列模型有AR、ARMA、ARIMA,都集中在.tsa下。 ARMA(p, q) 是 AR§ 和 MA(q) 模型的组合。 关于p和q的选择,一种方法是观察自相关图ACF和偏相关图PACF,通过删失和拖尾等信息进行分析并应用模型和p和q的顺序进行适配; 另一种方法是遍历所有条件,通过循环方法通过AIC和BIC值自动确定顺序。
ARMA(以及 中的其他算法)与 中的算法类似,并且都具有方法和属性。 常用的ARMA方法:
参考:
《数据分析与数字化运营》宋天龙