PySpark时序数据描述

PySpark时序数据描述

PySpark时序数据描述
为更好的洞察和处理大规模时序数据的特性,本文针对大规模时序数据,从基本统计特性,分布,序列内部检测三方面,提供Spark和借助numpy,scipy,statsmodels封装的成UDF函数脚本与理论讲解。不仅帮助了解序列本身的规律,也是机器学习建模的特征,同时,亦可作为分类分层建模的依据。因为是针对大规模时序数据,所以常规的画图可视化的方式检测其分布等特性并不合适,所以需要基于统计计算的方式给出确定的检验和描述。

一、基本统计特性

基本统计特性包括长度,均值,间断时长,标准差等,用于对序列有大致了解。

1.序列长度

针对时序数据,一般首先会关注序列的长度 n n n,较短序列意味着数据的潜在可变性无法判断,随机成分较大,无法通过该序列历史数据揭示出规律性。从理论角度来说,样本的数量要比预测模型中参数个数更多。从经典时间序列分解理论出发,序列短则无法准确的推断其趋势、季节波动,对ARIMA等模型,将导致其自相关系数,平稳性检验等变得相对不可靠,机器学习模型,同样需要较大的样本量来避免过拟合风险。针对短序列,应该选择使用需要参数更少更简单的模型,比如移动平均,规则模型,线性回归,同时减低模型复杂。如果待预测问题属于一个较大粒度,比如门店-月销售额的短序列预测,相比较在门店-天-sku预测上更容易预测,因为潜在可变性较小,样本量(序列长度)问题相对没有那么突出。

2.销售时长

销售时长,描述序列发生销售的时间跨度,销售时长较短的序列表明是最近上线的新品。

S a l e s D a y s = m a x ( X t ) − m i n ( X t ) SalesDays=max(X_{t})-min(X_{t}) SalesDays=max(Xt​)−min(Xt​)

3.间断时长

中断时长,用来度量序列完整性,中断的时间点越少,序列越完整。

I n t e r m D a y s = 1 + m a x ( X t ) − m i n ( X t ) − n IntermDays=1+max(X_{t})-min(X_{t})-n IntermDays=1+max(Xt​)−min(Xt​)−n

4.缺失值占比

通过比例的方式度量销售中断(缺失)的严重程度
M i s s i n g R a t i o = n 1 + m a x ( X t ) − m i n ( X t ) MissingRatio=\frac{n}{1+max(X_{t})-min(X_{t})} MissingRatio=1+max(Xt​)−min(Xt​)n​

5.均值(mean)

序列的均值描述该序列发生销售大致量级。

x ˉ = ∑ i = 1 n x i n \bar{x}=\frac{\sum_{i=1}^{n} x_{i}}{n} xˉ=n∑i=1n​xi​​

6.标准差(std)

用于度量序列的波动性,标准差越大,说明序列波动性越强。

S = ∑ i = 1 n ( x i − x ˉ ) 2 n − 1 S=\sqrt{\frac{\sum_{i=1}^{n}\left(x_{i}-\bar{x}\right)^{2}}{n-1}} S=n−1∑i=1n​(xi​−xˉ)2​

7.C.V系数

需要比较两组序列离散度的时,如果不同序列之间测量尺度相差大,或量纲不同,使用标准差来进行比较不是合理的方式,此时应该使用变异系数这一指标,在标准差的基础上除以原始数据平均数,消除测量尺度和量纲的影响。

C . V = S x ˉ × 100 % C . V=\frac{S}{\bar{x}} \times 100 \% C.V=xˉS​×100%

二、分布特性

时序的分布特性包含,偏度,峰度和在此之上的Jarque-Bera测试,销售量 y y y的分布不总是连续正态分布,使用机器学习建模时,需要针对不同的分布设计不同的损失函数或者选择更合适的模型。

8.偏度(skewness)

偏度描述的是某总体取值分布的对称性,为样本的三阶标准矩。
Skew ⁡ ( X ) = E [ ( X − μ σ ) 3 ] 其 中 , μ : 样 本 均 值 , δ : 标 准 差 , E : 均 值 \operatorname{Skew}(X)=E\left[\left(\frac{X-\mu}{\sigma}\right)^{3}\right] \\ 其中,μ:样本均值,δ:标准差,E:均值 Skew(X)=E[(σX−μ​)3]其中,μ:样本均值,δ:标准差,E:均值
正态分布(偏度=0),右偏分布(正偏,偏度>0),左偏分布(负偏分布,偏度<0)

9.峰度(Kurtosis)

Kurt ⁡ [ X ] = E [ ( X − μ σ ) 4 ] 其 中 μ : 样 本 均 值 , δ : 标 准 差 , E : 取 均 值 操 作 \operatorname{Kurt}[X]=\mathrm{E}\left[\left(\frac{X-\mu}{\sigma}\right)^{4}\right] \\ 其中μ:样本均值,δ:标准差,E:取均值操作 Kurt[X]=E[(σX−μ​)4]其中μ:样本均值,δ:标准差,E:取均值操作

峰度是描述总体中所有取值分布形态陡缓程度的统计量,峰度定义为四阶标准矩。峰度为0表示该总体数据分布与正态分布的陡缓程度相同;峰度的绝对值数值越大表示其分布形态的陡缓程度与正态分布的差异程度越大。峰度大于0表示该总体数据分布与正态分布相比较为陡峭,为尖顶峰;峰度小于0表示该总体数据分布与正态分布相比较为平坦,为平顶峰。

10.雅克-贝拉检验(Jarque-Bera)

Jarque-Bera 是一种总体分布的正态性检验,当序列服从正态分布时,JB统计量:
J B = n ( S 2 6 + ( K − 3 ) 2 24 ) 其 中 : n 为 样 本 规 模 , S 、 K 分 别 为 偏 度 和 峰 度 ; 原 假 设 H 0 : 样 本 服 从 正 态 分 布 , 来 自 一 个 正 态 分 布 的 总 体 , 那 么 J B 检 验 渐 近 于 χ ( 2 ) 分 布 备 择 假 设 H 1 : 样 本 不 服 从 正 态 分 布 . J B=n\left(\frac{S^{2}}{6}+\frac{(K-3)^{2}}{24}\right) \\其中: n为样本规模,S、K分别为偏度和峰度; \\原假设 H0:样本服从正态分布,来自一个正态分布的总体,那么JB检验渐近于χ(2)分布 \\备择假设 H1:样本不服从正态分布. JB=n(6S2​+24(K−3)2​)其中:n为样本规模,S、K分别为偏度和峰度;原假设H0:样本服从正态分布,来自一个正态分布的总体,那么JB检验渐近于χ(2)分布备择假设H1:样本不服从正态分布.

注:该检验可以通过python的如下两个模块进行

#Jarque-Bera test
Scipy.stats.jarque_bera
statsmodels.stats.stattools.jarque_bera

当然关于正态性检验的常用方式还有如Kolmogorov-Smirnor test(KS),Shapiro-Wilk test等,

由于spark.sql可以直接计算偏度和峰度,Jarque-Bera可以通过二者计算得到,所以考虑到便捷和通用性。所以这里使用Jarque-Bera,以上基本统计特性和分布特征的实例代码Spark.SQL如下:

select 
shop_id,
sku_id,
avg(sale_qty) as sale_avg,
stddev_samp(sale_qty) as sale_std,
stddev_samp(sale_qty)/avg(sale_qty) as cv_coef,
skewness(sale_qty) as sale_skew,
kurtosis(sale_qty) as sale_kurt,
min(dt) as start_sale,
max(dt) as end_sale,
count(dt) as sale_days,
floor(datediff(max(dt), min(dt)))+1 as sale_span,
(floor(datediff(max(dt), min(dt)))+1)  - count(dt) as intermit_day,
count(dt)/(floor(datediff(max(dt), min(dt)))+1) as sale_ratio,
count(dt)*(((skewness(sale_qty)*skewness(sale_qty))+((kurtosis(sale_qty)-3)*(kurtosis(sale_qty)-3))/4)/6) as item_jb
from app.dataset_input_df_v1
where sale_qty>0
group by shop_id,sku_id

三、序列内部特性

11.长期趋势

长期趋势通过排除季节性和循环变动以及无规则变动等因素的影响,描述时序的未来大致走向和基本趋势规律

计算长期趋势可以通过移动平均和线性方程拟合来实现,以下代码为线性回归拟合方法。

from scipy.stats import linregress

def long_trend(data):
    """
    :param data: DataFrame of one series
    :return: DataFrame of data index(key) info and regression coefficient as long_trend
    """
    slop=lambda x: linregress(list(range(1,data.shape[0]+1)),data.label.values)
    trend=slop(data)[0]
    return pd.DataFrame({'shop_id':df['shop_id'].iloc[0],'sku_id':df['sku_id'].iloc[0], 'long_trend': [trend]})

注:使用线性拟合的长期趋势是,其放入的一列特征是时间点的索引,比如[2021-01-01,2021-01-02,2021-01-03]

三天的序列,其计算长期趋势时,使用其排序后的1,2,3索引作为x。

12.平稳性

使用ARMA/ARIMA这样的经典时序分析手段,其前提假设是序列具有平稳性,因此,需要对数据或其n阶差分进行平稳检验,而一种常见的方法就是ADF检验,即单位根检验,判定其生成过程是否会随时间而变化。

平稳性分为强平稳和弱平稳,强平稳的要求非常严格,它要求两组数据之间的统计性质不会随着时间改变。其要求过于严苛,理论上很难证明,实际中难以检验,所以实际上一般使用弱平稳。
E ( Y t ) = μ V a r ( Y t ) = E ( ( Y t − μ ) ) 2 = σ 2 C o v ( Y t , Y t + k ) = E [ ( Y t − μ ) ( Y t + k − μ ) ] = γ k E(Yt)=μ\\ Var(Yt)=E((Yt−μ))2=σ 2 \\ Cov(Yt ,Yt+k)=E[(Yt−μ)(Yt+k−μ)]=γk E(Yt)=μVar(Yt)=E((Yt−μ))2=σ2Cov(Yt,Yt+k)=E[(Yt−μ)(Yt+k−μ)]=γk
也就是

(1)随机变量的期望(均值)μ不随时间t改变;

(2)任意时刻二阶矩都存在;

(3)两个随机变量之间的自相关系数,只与这两个变量的时间间隔有关,而不随时间的推移而改变。

可调用以下函数来实现,详细的针对大规模序列检测,在文末给出。

from statsmodels.tsa.stattools import adfuller

13.周期性

不是所有的时序数据都具备明显的周期性,如果存在周期,则在分析建模的时需加入周期性这一特性。

常用的周期性函数通过正弦和余弦的函数来表示,假设f(x)是以2pi为周期的函数,那么傅里叶级数为:
s ( t ) = ∑ n = 1 N ( a n cos ⁡ ( 2 π n t P ) + b n sin ⁡ ( 2 π n t P ) ) 以 年 为 周 期 的 序 列 , p = 365 , n = 10 以 月 为 周 期 的 序 列 , p = 30 , n = 5 以 周 为 周 期 的 序 列 , p = 7 , n = 3 s(t)=\sum_{n=1}^{N}\left(a_{n} \cos \left(\frac{2 \pi n t}{P}\right)+b_{n} \sin \left(\frac{2 \pi n t}{P}\right)\right)\\ 以年为周期的序列,p=365,n=10\\ 以月为周期的序列,p=30,n=5\\ 以周为周期的序列,p=7,n=3\\ s(t)=n=1∑N​(an​cos(P2πnt​)+bn​sin(P2πnt​))以年为周期的序列,p=365,n=10以月为周期的序列,p=30,n=5以周为周期的序列,p=7,n=3
也可以使用基于n阶自相关系数的方式来检测,自相关(autocorrelation or lagged correlation)用于评估时序数据是否依赖于其过去的数据。 Y t Yt Yt与 Y t + h Yt+h Yt+h之间的相关系数记为自相关系数ρ(h),其函数称为自相关函数(autocorrelation function, ACF)通过计算序列 [ t 1 , t 2 , t 3 , t 4 , . . . , t n ] [{t_{1},t_{2},t_{3},t_{4},...,t_{n}}] [t1​,t2​,t3​,t4​,...,tn​]的 [ 1 , 2 , 3 , . . . n − 1 ] [1,2,3,...n-1] [1,2,3,...n−1]次ACF自相关,其n-1个自相关系数排列的局部最值点的最小公倍数,可以作为周期性,如最小公倍数为类似于7,30,12,则可以作为其周期长度。如,n-1个自相关局部最值为[7,14,21,28],则可以认为其周期长度为7,可能在实际工作中数据本身的特点,周末销售高峰,本身周期性应该是7,本该是礼拜天出现的销售最高值,但某些时候可能是礼拜六,或者在这批序列中跨越了节假日这些不寻常的情况,可能出现的局部最大自相关系数的下标是[ 7, 13, 21, 28, 32],故实践中,如只要有一个下标能被周期7整除,则判断为具有周期性,具体代码细节请查看文末的代码。

14.序列复杂度

序列数据中潜在规律成分多少可以通过计算序列复杂度来度量,复杂度越低,时序内部的有序成分越多,也更具规律性。了解序列本身蕴含的稳定或者波动性,也便于解释某些商品预测为何比其他商品预测准确性更差。近年来有不少学者提出可以使用排列熵(permutation entropy)针对时间序列本身具有的空间信号特性进行检验,该方法计算简单,抗噪声能力强,对时间敏感度高,输出结果直观,应用范围广泛。理论方面论文和详细推导可以参考《Practical considerations of permutation entropy: A tutorial review》。

from math import factorial
import numpy as np
def _embed(x, order=3, delay=1):
    N = len(x)
    Y = np.empty((order, N - (order - 1) * delay))
    for i in range(order):
        Y[i] = x[i * delay:i * delay + Y.shape[1]]
    return Y.T

def permutation_entropy(time_series, order=3, delay=1, normalize=False):
    x = np.array(time_series)
    hashmult = np.power(order, np.arange(order))
    # Embed x and sort the order of permutations
    sorted_idx = _embed(x, order=order, delay=delay).argsort(kind='quicksort')
    # Associate unique integer to each permutations
    hashval = (np.multiply(sorted_idx, hashmult)).sum(1)
    # Return the counts
    _, c = np.unique(hashval, return_counts=True)
    # Use np.true_divide for Python 2 compatibility
    p = np.true_divide(c, c.sum())
    pe = -np.multiply(p, np.log2(p)).sum()#【3】
    if normalize:
        pe /= np.log2(factorial(order))
    return pe

借助Spark.SQL实现大规模时间序列的平稳性和周期性检验的代码如下:

import datetime
import numpy as np
import pandas as pd
import scipy.signal as signal
from statsmodels.tsa.stattools import acf
from statsmodels.tsa.stattools import adfuller
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType

schema_adfuller = StructType([
    StructField("shop_number", StringType()),
    StructField("item_number", StringType()),
    StructField("adfuller", IntegerType())])


@pandas_udf(schema_adfuller, functionType=PandasUDFType.GROUPED_MAP)
def adf_test(df):
    df.sort_values(by=['date'],ascending=[True],inplace=True)
    adf_values=adfuller(df['qty'],autolag='AIC')[1]
    is_adf_func = lambda x: 1 if x < 0.05 else 0
    is_adf = is_adf_func(adf_values)
    result=pd.DataFrame({'store_id':df['store_id'].iloc[0],'is_adf':[is_adf]})
    return result

schema_cycle = StructType([
    StructField("shop_number", StringType()),
    StructField("item_number", StringType()),
    StructField("cycle", IntegerType())])

@pandas_udf(schema_cycle, functionType=PandasUDFType.GROUPED_MAP)
def cycle_test(df,check_len=7):

    """
     check series the cycle
    :param df: data of series
    :param check_len: will check cycle length,eg weekly:7,month:30,year:12
    :return:int,1 is exit cycle,other is 0
    """

    assert check_len <=2,('cycle length must >2')
    df = df.sort_values(by=['dt'], ascending=True)
    acf_values=acf(df['label'],nlags=df.shape[0]-1)
    loc_max_index=signal.argrelextrema(acf_values,comparator=np.greater,order=check_len//2)

    #7 is weekly cycle if month data series can choice 12
    #occur local max index in cycle for len, as be exit cycle
    cycle_check=[i for i in loc_max_index[0] if i%check_len==0]
    is_cycle=lambda  x : 1 if cycle_check else 0
    cycle_result=is_cycle(cycle_check)
    result = pd.DataFrame({'shop_number':df['shop_number'].iloc[0],'item_number':df['item_number'].iloc[0], 'cycle': [cycle_result]})
    return result


def model_input():
    data_sql = """select shop_id,sku_id,dt,label from temp.dataset_feature"""
    data = spark.sql(data_sql)
    return data

if __name__ == '__main__':
    data=model_input()
    data =data.na.fill(0)
    item_adfuller = data.groupby(['shop_id','sku_id']).apply(adf_test)
    item_adfuller.createOrReplaceTempView('item_adfuller')
    item_cycle = data.groupby(['shop_id','sku_id']).apply(cycle_test)
    try:
        feature_df_tab.write.mode("append").format('hive').saveAsTable('temp.item_adfuller_cycle_table')
    except:
        item_cycle.createOrReplaceTempView('item_cycle')
        spark.sql("""drop table if exists temp.item_adfuller_cycle_table""")
        spark.sql("""create table temp.item_adfuller_cycle_table as 
        select a.shop_id,a.sku_id,a.cycle,b.adfuller
        from item_cycle a 
        left join item_adfuller b
        on a.shop_id=b.shop_id and a.sku_id=b.sku_id""")

以上只是工程实践角度实现对序列描述和检验的方法,其中如中断时长乃至计算方式并不是一个严格意义上的学术称谓,或许还有更严谨和先进的方式,欢迎交流。

上一篇:【CF117C】Cycle


下一篇:「SOL」Hamiltonian Cycle (AtCoder)