A small instance of visual analytics basing Spark(Python)

A small instance of visual analytics basing Spark(Python)

The total delay time of the major airlines in a certain month


1.Preparation

1.1.Data

      This data set was downloaded from the U.S. Department of Transportation, Office of the Secretary of Research on November 30, 2014 and represents flight data for the domestic United States in April of 2014.

      The following CSV files are lookup tables:

    - airlines.csv

    - airports.csv

      And provided detailed information about the reference codes in the main data set. These files have header rows to identify their fields.

      The flights.csv contains flight statistics for April 2014 with the following fields:

- flight date     (yyyy-mm-dd)

- airline id      (lookup in airlines.csv)

- flight num

- origin          (lookup in airports.csv)

- destination     (lookup in airports.csv)

- departure time  (HHMM)

- departure delay (minutes)

- arrival time    (HHMM)

- arrival delay   (minutes)

- air time        (minutes)

- distance        (miles)

 

1.2.Codes—a basic template

     A basic template for writing a Spark application in Python is as follows:

## Spark Application - execute with spark-submit

## Imports

from pyspark import SparkConf, SparkContext

## Module Constants

APP_NAME = "My Spark Application"

## Closure Functions

## Main functionality

def main(sc):

    pass

if __name__ == "__main__":

    # Configure Spark

    conf = SparkConf().setAppName(APP_NAME)

    conf = conf.setMaster("local[*]")

    sc   = SparkContext(conf=conf)

    # Execute Main functionality

    main(sc)


 

1.3. Codes—a basic template

The entire app is as follows:

 
## Spark Application - execute with spark-submit
## Imports
import csv
import matplotlib.pyplot as plt
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))
    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)
    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))
    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')
    plt.title('Total Minutes Delayed per Airline')
    plt.show()
## Main functionality
def main(sc):
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)
    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))
    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))
    # Provide output from the driver
    for d in delays:
        print "%0.0f minutes delayed\t%s" % (d[1], d[0])
    # Show a bar chart of the delays
    plot(delays)
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
    # Execute Main functionality
    main(sc)

 

 

1.4. equipments

     A Ubuntu computer with spark、jdk、Scala

 

2.Steps

2.1.overview

data

A small instance of visual analytics basing Spark(Python)

codes

A small instance of visual analytics basing Spark(Python)

 

2.2. use the spark-submit command as follows:

A small instance of visual analytics basing Spark(Python)

 

2.3.result

A small instance of visual analytics basing Spark(Python)

2.4.Analysis

       what is this code doing? Let's look particularly at the main function which does the work most directly related to Spark. First, we load up a CSV file into an RDD, then map the split function to it. The split function parses each line of text using the csvmodule and returns a tuple that represents the row. Finally we pass the collect action to the RDD, which brings the data from the RDD back to the driver as a Python list. In this case, airlines.csv is a small jump table that will allow us to join airline codes with the airline full name. We will store this jump table as a Python dictionary and then broadcast it to every node in the cluster using sc.broadcast.

       Next, the main function loads the much larger flights.csv. After splitting the CSV rows, we map the parse function to the CSV row, which converts dates and times to Python dates and times, and casts floating point numbers appropriately. It also stores the row as a NamedTuple called Flight for efficient ease of use.

       With an RDD of Flight objects in hand, we map an anonymous function that transforms the RDD to a series of key-value pairs where the key is the name of the airline and the value is the sum of the arrival and departure delays. Each airline has its delay summed together using the reduceByKey action and the add operator, and this RDD is collected back to the driver (again the number airlines in the data is relatively small). Finally the delays are sorted in ascending order, then the output is printed to the console as well as visualized using matplotlib.

3.Q&A

3.1.ImportError: No module named matplotlib.pyplot

A small instance of visual analytics basing Spark(Python)

http://www.codeweblog.com/importerror-no-module-named-matplotlib-pyplot/

 

 

Note:

this demo came initially from the website

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

I also find the chinese version

http://blog.jobbole.com/86232/

data come from github

https://github.com/bbengfort/hadoop-fundamentals/blob/master/data/ontime.zip



上一篇:特斯拉、吉利车上的那些ADAS系统你都会用吗?


下一篇:使用Oracle Stream Analytics 21步搭建大数据实时流分析平台