# coding=utf-8-sig
# !/usr/bin/python3
# @Time : 2021.9.22
# @Author : Coly
# @version: V1.0
# @Des : data analysis. spark有三大引擎,spark core、sparkSQL、sparkStreaming,
# spark core 的关键抽象是 SparkContext、RDD;
# SparkSQL 的关键抽象是 SparkSession、DataFrame;
# sparkStreaming 的关键抽象是 StreamingContext、DStream
import findspark
findspark.init()
import os
import numpy as np
import pandas as pd
os.environ['JAVA_HOME'] = '/usr/lib/jdk8/jdk1.8.0_301'
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
if __name__ == "__main__":
data_frame = pd.read_csv("data2") #local reading file
CS_DF = pd.read_csv("cs.csv", delimiter=",",names=['a','b','c'])
data_frame = data_frame.iloc[:,[0,3,4,5,7,8]]
data_frame.columns = ["EV_ID", "longitude", "latitude", "date_time", "speed","direction"]
CS_DF = CS_DF.iloc[:,[0,0,1,1,2]]
CS_DF.columns = ["cs_lon_min", "cs_lon_max", "cs_lat_min", "cs_lat_max",'range']
Area_covered = 100000
CS_DF['cs_lon_min'] = CS_DF['cs_lon_min'] - CS_DF['range']/Area_covered
CS_DF['cs_lon_max'] = CS_DF['cs_lon_max'] + CS_DF['range']/Area_covered
CS_DF['cs_lat_min'] = CS_DF['cs_lat_min'] - CS_DF['range']/Area_covered
CS_DF['cs_lat_max'] = CS_DF['cs_lat_max'] + CS_DF['range']/Area_covered
print(CS_DF)
for i in range(np.size(CS_DF, 0)):
location_data = data_frame.loc[data_frame["longitude"] > CS_DF.loc[i,'cs_lon_min']]
location_data = location_data.loc[data_frame["longitude"] < CS_DF.loc[i,'cs_lon_max']]
location_data = location_data.loc[data_frame["latitude"] > CS_DF.loc[i,'cs_lat_min']]
location_data = location_data.loc[data_frame["latitude"] < CS_DF.loc[i,'cs_lat_max']]
location_data = data_frame.loc[data_frame["speed"] == 0]
print(location_data)
print('The runing is ok!')