>20161011 :数据导入研究
0.sqoop报warning,需要安装accumulo;
1.下载Microsoft sql server jdbc, 使用ie下载,将42版jar包放入sqoop的lib下,注意所有自动安装的hadoop相关软件被置于/usr/hdp目录下
2.sqoop list-databases --connect jdbc:sqlserver://172.4.25.98 --username sa --password sa12345
3.sqoop list-tables --connect 'jdbc:sqlserver://172.4.25.98;database=wind_data;' --username sa --password sa12345
4. sqoop import --query 'select datatime,lastvalue from WT0001_R010 where $CONDITIONS' --connect 'jdbc:sqlserver://172.4.25.98;database=wind_data' --username sa --password sa12345 --hbase-table 'myy' --hbase-create-table --hbase-row-key datatime -split-by datatime -m 2 --column-family datatime
>20161013:MR框架研究
1.在Ambari Server上启动Spark时其实只启动了History Server,启动Spark还需要start-master.sh;
2.MR的job history web port: 19888;Spark的job history则是:8081(8080+1);
3.在ide中运行分布式程序实际上是单机模式,想要真正分布式运行需要将代码提交至集群中的master,故而有先打包然后在程序中调用函数设置jar包的方法,这种方法本身的最大作用我理解其jar包中程序本身并非全部,而是在上层程序中自动化执行分布式任务,然后在代码中继续执行其他任务;
4.yarn logs -applicationId
5.cluster运行时错误:SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application. 之前还有,未找到mainclass;client模式下依赖可以找到(在Hadoop_jar文件夹),cluster模式下不行;
6.<***org.apache.hadoop.mapr.Mapper***>:
Maps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. /***注意,与函数式语言的map函数不同,函数式语言的map对每个列表数据进行同一操作,结果至少是数据条数不变的(数据类型可能改变),下一句说明MR下map后数据条数也可能改变,从这一点看,网上很多MR原理图有误;***/A given input pair may map to zero or many output pairs.
The Hadoop Map-Reduce framework spawns one map task for each InputSplit
generated by the InputFormat
for the job./***map导致数据条数变化的原因:输入数据实质上是由InputFormat决定索引的,而不是普通列表***/ Mapper
implementations can access the JobConf
for the job via the JobConfigurable.configure(JobConf)
and initialize themselves. Similarly they can use the Closeable.close()
method for de-initialization.
The framework then calls map(Object, Object, OutputCollector, Reporter)
for each key/value pair in the InputSplit
for that task.
All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer
to determine the final output. Users can control the grouping by specifying a Comparator
via JobConf.setOutputKeyComparatorClass(Class)
.
The grouped Mapper
outputs are partitioned per Reducer
./***生成中间键值对后相同键的数据自动聚类,这是隐藏分布式处理的关键过程***/ Users can control which keys (and hence records) go to which Reducer
by implementing a custom Partitioner
.
Users can optionally specify a combiner
, via JobConf.setCombinerClass(Class)
, to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper
to the Reducer
.
The intermediate, grouped outputs are always stored in SequenceFile
s. Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodec
s are to be used via the JobConf
.
If the job has zero reduces then the output of the Mapper
is directly written to the FileSystem
without grouping by keys./***注意默认处理以提高程序效率***/
<***org.apache.hadoop.mapreduce.Mapper***>:
区别于上面的Mapper接口,这个Mapper是类:The framework first calls setup(org.apache.hadoop.mapreduce.Mapper.Context)
, followed by map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)
for each key/value pair in the InputSplit
. Finally cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
is called.
7.在map输出写到缓冲区之前,会进行一个partition操作,即分区操作。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。这里注意三点:
a:相同的key将散列为相同的值;
b:以Reduce task数量取模,则得到(0-Reduce task数量-1)个数值,刚好与Reduce task数量对应;
c:并不能保证每个Reduce task处理的key数量一致;
8.MapReduce的第一步——split的过程和hdfs的文件分割没有直接关系,Mapper要根据InputFormats读取HDFS文件然后再进行分割。Each input type implementation knows how to split itself into meaningful ranges for processing as separate map tasks(e.g. text mode`s range splitting ensures that range splits occur only at line boundries)。
9.Spark通过共享中间数据,避免中间键值对写入DFS从而提高效率。以迭代为例,迭代的每一步都存在本来要写入DFS的中间结果。刚接触这个思想的时候在想:中间结果只有map函数返回之前才会写入DFS,为什么不把迭代过程写入同一个map函数从而避免DFS存取呢?
实际上:
1.考虑这样定义迭代:一个计算过程以自己的输出作为下一次的输入,典型的,map函数以自己的输出作为map函数的输入;所以从这个定义上讲,map函数必须在执行过程结束之前返回并重新执行;
2./**********/上面的定义从实践上讲是语义正确的吗?如果在map函数里实现一个递归调用不是也能实现迭代的功能吗?这一切和内存抽象的关系又是怎样的?
3.问题可能不在1.2.所述的point上,可能真正需要内存抽象的是不同组件的数据共享,迭代只是一个例子,mapreduce的真正缺陷是没有这样的内存抽象实现供编程者将map的结果共享;
4.与1.2.所述相近的真实问题是:算法需要重复利用map之后的结果,或者需要对Reduce之后的结果map从而需要结果内存化,例如:
val points = spark.textFile(...).map(parsePoint).persist()//重复利用map之后的结果,所以用persist持久化
for w = //random innitial vector
for(i<-1 to INTERATIONS){
val gradient = points.map{
p.x*(1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
}.reduce((a,b)=>a+b)
w-=gradient//gradient 是Reduce之后才能得到的
}
10.The split is a <i>logical</i> split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <i><input-file-path, start, offset></i> tuple. The InputFormat also creates the {@link RecordReader} to read the {@link InputSplit}.
>20161103ambari平台管理研究
1.lost heartbeats以后,需要重启ambari-agent(ambari-agent restart)才能恢复;
>20161103Spark-HBase 编程研究
1.
Master URL | Meaning |
---|---|
local |
Run Spark locally with one worker thread (i.e. no parallelism at all). |
local[K] |
Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
local[*] |
Run Spark locally with as many worker threads as logical cores on your machine. |
spark://HOST:PORT |
Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. |
mesos://HOST:PORT |
Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://... . To submit with --deploy-mode cluster , the HOST:PORT should be configured to connect to the MesosClusterDispatcher. |
yarn |
Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode . The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. |
2.In practice, when running on a cluster, you will not want to hardcode master
in the program, but rather launch the application with spark-submit
and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.
也就是说spark中可以硬编码master,但是为了配置方便,一般用shell启动,从而将master作为参数传入,所以不需要master参数;
3.理解力依赖:spark-->rdd-->mapreduce-->hadoop inputformat;
4.当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()、creatValue()方法创建可供map处理的<key,value>对。简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。
5.A Result is backed by an array of Cell
objects, each representing an HBase cell defined by the row, family, qualifier, timestamp, and value.注意系统提供的TableInputFormat实现的分片操作返回的值类型也为Result。使用spark的scala接口调用newHadoopRDD将得到RDD[(ImmutableWritable, Result)。
特别注意:Result用来表示 Single row result of a Get
or Scan
query.
To get a complete mapping of all cells in the Result, which can include multiple families and multiple versions, use getMap()
.
To get a mapping of each family to its columns (qualifiers and values), including only the latest version of each, use getNoVersionMap()
.
To get a mapping of qualifiers to latest values for an individual family use getFamilyMap(byte[])
.
To get the latest value for a specific family and qualifier use getValue(byte[], byte[])
. A Result is backed by an array of Cell
objects, each representing an HBase cell defined by the row, family, qualifier, timestamp, and value.
6.HBase只在行水平上保证操作的原子性,即线程安全:当写线程在操作一行时,对该行的读线程将等待资源锁打开;对并发操作多行的情况则不保证原子性;