1、使用spark来处理CSV文件,写入mysql表当中
spark介绍
Spark是一个快速(基于内存),通用、可扩展的计算引擎,采用Scala语言编写。2009年诞生于UC Berkeley(加州大学伯克利分校,CAL的AMP实验室),2010年开源,2013年6月进入Apach孵化器,同年由美国伯克利大学 AMP 实验室的 Spark 大数据处理系统多位创始人联合创立Databricks(属于 Spark 的商业化公司-业界称之为数砖-数据展现-砌墙-侧面应正其不是基石,只是数据计算),2014年成为Apach*项目,自2009年以来,已有1200多家开发商为Spark出力!
Spark支持Java、Scala、Python、R、SQL语言,并提供了几十种(目前80+种)高性能的算法,这些如果让我们自己来做,几乎不可能。
Spark得到众多公司支持,如:阿里、腾讯、京东、携程、百度、优酷、土豆、IBM、Cloudera、Hortonworks等。
spark是在Hadoop基础上的改进,是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
spark是基于内存计算框架,计算速度非常之快,但是它仅仅只是涉及到计算,并没有涉及到数据的存储,后期需要使用spark对接外部的数据源,比如hdfs。
1、创建mysql表
在mysql数据库当中进行创建数据库以及数据库表
CREATE DATABASE /*!32312 IF NOT EXISTS*/`job_crawel` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `job_crawel`;
/*Table structure for table `jobdetail` */
DROP TABLE IF EXISTS `jobdetail`;
CREATE TABLE `jobdetail` (
`job_id` int(11) NOT NULL AUTO_INCREMENT,
`job_name` varchar(900) DEFAULT NULL,
`job_url` varchar(900) DEFAULT NULL,
`job_location` varchar(900) DEFAULT NULL,
`job_salary` varchar(900) DEFAULT NULL,
`job_company` text,
`job_experience` text,
`job_class` text,
`job_given` text,
`job_detail` text,
`company_type` text,
`company_person` text,
`search_key` varchar(900) DEFAULT NULL,
`city` varchar(900) DEFAULT NULL,
PRIMARY KEY (`job_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、创建maven工程并导入jar包
<properties>
<spark.version>2.3.3</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!--这里锁定版本为2.9.2 -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3、开发spark程序处理CSV文件数据
使用spark读取csv文件,然后将数据加载到hive表当中去
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object CSVOperate {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("sparkCSV")
val session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
session.sparkContext.setLogLevel("WARN")
val frame: DataFrame = session
.read
.option("inferSchema", "true")
.format("csv")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("header", "true")
.option("delimiter","@")
.option("multiLine","true")
.option("ignoreLeadingWhiteSpace", true)
.option("multiLine", true)
.load("file:///D:\\1、课程6.0版本课程资料\\20、公开课与训练营\\1、用数据告诉你职业发展之路该如何选择\\3、数据资料集\\job_detail4.csv")
frame.createOrReplaceTempView("job_detail")
//session.sql("select job_name,job_url,job_location,job_salary,job_company,job_experience,job_class,job_given,job_detail,company_type,company_person,search_key,city from job_detail where job_company = '北京无极慧通科技有限公司' ").show(80)
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "123456")
frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/job_crawel?useUnicode=true&characterEncoding=UTF-8", "job_crawel.jobdetail", prop)
}
}