零碎记录Hadoop平台各组件使用

>20161011 :数据导入研究
    0.sqoop报warning,需要安装accumulo;
    1.下载Microsoft sql server jdbc, 使用ie下载,将42版jar包放入sqoop的lib下,注意所有自动安装的hadoop相关软件被置于/usr/hdp目录下
    2.sqoop list-databases --connect jdbc:sqlserver://172.4.25.98 --username sa --password sa12345
    3.sqoop list-tables --connect 'jdbc:sqlserver://172.4.25.98;database=wind_data;'  --username sa --password sa12345
    4. sqoop import --query 'select datatime,lastvalue from WT0001_R010 where $CONDITIONS' --connect 'jdbc:sqlserver://172.4.25.98;database=wind_data' --username sa     --password sa12345 --hbase-table 'myy' --hbase-create-table --hbase-row-key datatime -split-by datatime -m 2 --column-family datatime
>20161013:MR框架研究
    1.在Ambari Server上启动Spark时其实只启动了History Server,启动Spark还需要start-master.sh;
    2.MR的job history web port: 19888;Spark的job history则是:8081(8080+1);
    3.在ide中运行分布式程序实际上是单机模式,想要真正分布式运行需要将代码提交至集群中的master,故而有先打包然后在程序中调用函数设置jar包的方法,这种方法本身的最大作用我理解其jar包中程序本身并非全部,而是在上层程序中自动化执行分布式任务,然后在代码中继续执行其他任务;
    4.yarn logs -applicationId
    5.cluster运行时错误:SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application.  之前还有,未找到mainclass;client模式下依赖可以找到(在Hadoop_jar文件夹),cluster模式下不行;

6.<***org.apache.hadoop.mapr.Mapper***>:

  Maps input key/value pairs to a set of intermediate key/value pairs.

  Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. /***注意,与函数式语言的map函数不同,函数式语言的map对每个列表数据进行同一操作,结果至少是数据条数不变的(数据类型可能改变),下一句说明MR下map后数据条数也可能改变,从这一点看,网上很多MR原理图有误;***/A given input pair may map to zero or many output pairs.

  The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job./***map导致数据条数变化的原因:输入数据实质上是由InputFormat决定索引的,而不是普通列表***/ Mapper implementations can access the JobConf for the job via the JobConfigurable.configure(JobConf) and initialize themselves. Similarly they can use the Closeable.close() method for de-initialization.

  The framework then calls map(Object, Object, OutputCollector, Reporter) for each key/value pair in the InputSplit for that task.

  All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the grouping by specifying a Comparator via JobConf.setOutputKeyComparatorClass(Class).

  The grouped Mapper outputs are partitioned per Reducer./***生成中间键值对后相同键的数据自动聚类,这是隐藏分布式处理的关键过程***/ Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.

Users can optionally specify a combiner, via JobConf.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

  The intermediate, grouped outputs are always stored in SequenceFiles. Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the JobConf.

  If the job has zero reduces then the output of the Mapper is directly written to the FileSystem without grouping by keys./***注意默认处理以提高程序效率***/

  <***org.apache.hadoop.mapreduce.Mapper***>:

  区别于上面的Mapper接口,这个Mapper是类:The framework first calls setup(org.apache.hadoop.mapreduce.Mapper.Context), followed by map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context) for each key/value pair in the InputSplit. Finally cleanup(org.apache.hadoop.mapreduce.Mapper.Context) is called.

  7.在map输出写到缓冲区之前,会进行一个partition操作,即分区操作。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。这里注意三点:

  a:相同的key将散列为相同的值;

  b:以Reduce task数量取模,则得到(0-Reduce task数量-1)个数值,刚好与Reduce task数量对应;

  c:并不能保证每个Reduce task处理的key数量一致;

  8.MapReduce的第一步——split的过程和hdfs的文件分割没有直接关系,Mapper要根据InputFormats读取HDFS文件然后再进行分割。Each input type implementation knows how to split itself into meaningful ranges for processing as separate map tasks(e.g. text mode`s range splitting ensures that range splits occur only at line boundries)。

  9.Spark通过共享中间数据,避免中间键值对写入DFS从而提高效率。以迭代为例,迭代的每一步都存在本来要写入DFS的中间结果。刚接触这个思想的时候在想:中间结果只有map函数返回之前才会写入DFS,为什么不把迭代过程写入同一个map函数从而避免DFS存取呢?

  实际上:

  1.考虑这样定义迭代:一个计算过程以自己的输出作为下一次的输入,典型的,map函数以自己的输出作为map函数的输入;所以从这个定义上讲,map函数必须在执行过程结束之前返回并重新执行;

  2./**********/上面的定义从实践上讲是语义正确的吗?如果在map函数里实现一个递归调用不是也能实现迭代的功能吗?这一切和内存抽象的关系又是怎样的?

  3.问题可能不在1.2.所述的point上,可能真正需要内存抽象的是不同组件的数据共享,迭代只是一个例子,mapreduce的真正缺陷是没有这样的内存抽象实现供编程者将map的结果共享;

  4.与1.2.所述相近的真实问题是:算法需要重复利用map之后的结果,或者需要对Reduce之后的结果map从而需要结果内存化,例如:

    val points = spark.textFile(...).map(parsePoint).persist()//重复利用map之后的结果,所以用persist持久化

    for w = //random innitial vector

    for(i<-1 to INTERATIONS){

      val gradient = points.map{

         p.x*(1/(1+exp(-p.y*(w dot p.x)))-1)*p.y

      }.reduce((a,b)=>a+b)

      w-=gradient//gradient 是Reduce之后才能得到的

    }

  10.The split is a <i>logical</i> split of the inputs and the input files are not physically split into chunks. For e.g. a split could be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat also creates the {@link RecordReader} to read the {@link InputSplit}.

>20161103ambari平台管理研究

  1.lost heartbeats以后,需要重启ambari-agent(ambari-agent restart)才能恢复;

>20161103Spark-HBase 编程研究

  1.

 Master URL Meaning
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
yarn Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.

  2.In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

  也就是说spark中可以硬编码master,但是为了配置方便,一般用shell启动,从而将master作为参数传入,所以不需要master参数;

  3.理解力依赖:spark-->rdd-->mapreduce-->hadoop inputformat;

  4.当数据传送给map时,map会将输入分片传送到InputFormat,InputFormat则调用方法getRecordReader()生成RecordReader,RecordReader再通过creatKey()、creatValue()方法创建可供map处理的<key,value>对。简而言之,InputFormat()方法是用来生成可供map处理的<key,value>对的。

  5.A Result is backed by an array of Cell objects, each representing an HBase cell defined by the row, family, qualifier, timestamp, and value.注意系统提供的TableInputFormat实现的分片操作返回的值类型也为Result。使用spark的scala接口调用newHadoopRDD将得到RDD[(ImmutableWritable, Result)。

  特别注意:Result用来表示 Single row result of a Get or Scan query.

  To get a complete mapping of all cells in the Result, which can include multiple families and multiple versions, use getMap().

  To get a mapping of each family to its columns (qualifiers and values), including only the latest version of each, use getNoVersionMap().

  To get a mapping of qualifiers to latest values for an individual family use getFamilyMap(byte[]).

  To get the latest value for a specific family and qualifier use getValue(byte[], byte[]). A Result is backed by an array of Cell objects, each representing an HBase cell defined by the row, family, qualifier, timestamp, and value.

  6.HBase只在行水平上保证操作的原子性,即线程安全:当写线程在操作一行时,对该行的读线程将等待资源锁打开;对并发操作多行的情况则不保证原子性;

上一篇:Codeforces Round #327 (Div. 2) D. Chip 'n Dale Rescue Rangers 二分 物理


下一篇:HackerRank "Fair Rations"