Hadoop

关键技术

  1. 数据分布在多台机器

    可靠性:每个数据块都复制到多个节点

    性能:多个节点同时处理数据

  2. 计算随数据走

    网络IO速度 << 本地磁盘IO速度,大数据系统会尽量地将任务分配到离数据最近的机器上运行(程序运行时,将程序及其依赖包都复制到数据所在的机器运行)

    代码向数据迁移,避免大规模数据时,造成大量数据迁移的情况,尽量让一段数据的计算发生在同一台机器上

  3. 串行IO取代随机IO

    传输时间 << 寻道时间,一般数据写入后不再修改

  4. Hadoop可运行于一般的商用服务器上,具有高容错、高可靠性、高扩展性等特点

  5. 特别适合写一次,读多次的场景

    适合: 大规模数据, 流式数据(写一次,读多次), 商用硬件(一般硬件)
    不适合: 低延时的数据访问, 大量的小文件, 频繁修改文件(基本就是写1次)

列出Hadoop集群的Hadoop守护进程和相关的角色。

  • Block数据块;

    基本存储单位,一般大小为64M(配置大的块主要是因为:1)减少搜寻时间,一般硬盘传输速率比寻道时间要快,大的块可以减少寻道时间;2)减少管理块的数据开销,每个块都需要在NameNode上有对应的记录;3)对数据块进行读写,减少建立网络的连接成本)

    一个大文件会被拆分成一个个的块,然后存储于不同的机器。如果一个文件少于Block大小,那么实际占用的空间为其文件的大小

    基本的读写单位,类似于磁盘的页,每次都是读写一个块

    每个块都会被复制到多台机器,默认复制3份

  • NameNode:

    存储文件的metadata,运行时所有数据都保存到内存,整个HDFS可存储的文件数受限于NameNode的内存大小

    一个Block在NameNode中对应一条记录(一般一个block占用150字节),如果是大量的小文件,会消耗大量内存。同时map task的数量是由splits来决定的,所以用MapReduce处理大量的小文件时,就会产生过多的map task,线程管理开销将会增加作业时间。处理大量小文件的速度远远小于处理同等大小的大文件的速度。因此Hadoop建议存储大文件

    数据会定时保存到本地磁盘,但不保存block的位置信息,而是由DataNode注册时上报和运行时维护(NameNode中与DataNode相关的信息并不保存到NameNode的文件系统中,而是NameNode每次重启后,动态重建)

    NameNode失效则整个HDFS都失效了,所以要保证NameNode的可用性

  • Secondary NameNode:

    定时与NameNode进行同步(定期合并文件系统镜像和编辑日志,然后把合并后的传给NameNode,替换其镜像,并清空编辑日志,类似于CheckPoint机制),但NameNode失效后仍需要手工将其设置成主机

  • DataNode:

    保存具体的block数据

    负责数据的读写操作和复制操作

    DataNode启动时会向NameNode报告当前存储的数据块信息,后续也会定时报告修改信息

    DataNode之间会进行通信,复制数据块,保证数据的冗余性

Hadoop 新旧架构对比

旧的MapReduce架构

  • JobTracker:

    这是运行在Namenode上,负责提交和跟踪MapReduce Job的守护程序。负责资源管理,跟踪资源消耗和可用性,作业生命周期管理(调度作业任务,跟踪进度,为任务提供容错)

  • TaskTracker:

    这是Datanode上运行的守护进程。它在Slave节点上负责具体任务的运行。

  • 旧框架存在问题:

    JobTracker是MapReduce的集中处理点,存在单点故障

    JobTracker完成了太多的任务,造成了过多的资源消耗,当MapReduce job 非常多的时候,会造成很大的内存开销。这也是业界普遍总结出老Hadoop的MapReduce只能支持4000 节点主机的上限

    在TaskTracker端,以map/reduce task的数目作为资源的表示过于简单,没有考虑到cpu/ 内存的占用情况,如果两个大内存消耗的task被调度到了一块,很容易出现OOM

    在TaskTracker端,把资源强制划分为map task slot和reduce task slot, 如果当系统中只有map task或者只有reduce task的时候,会造成资源的浪费,也就集群资源利用的问题

YARN就是将JobTracker的职责进行拆分,将资源管理和任务调度监控拆分成独立的进程:一个全局的资源管理和一个每个作业的管理(ApplicationMaster) ResourceManager和NodeManager提供了计算资源的分配和管理,而ApplicationMaster则完成应用程序的运行

  • ResourceManager: 全局资源管理和任务调度

  • NodeManager: 单个节点的资源管理和监控

  • ApplicationMaster: 单个作业的资源管理和任务监控

  • Container: 资源申请的单位和任务运行的容器

旧架构 基本流程

YARN 基本流程

  1. Job submission

    从ResourceManager中获取一个Application ID 检查作业输出配置,计算输入分片 拷贝作业资源(job jar、配置文件、分片信息)到HDFS,以便后面任务的执行

  2. Job initialization

    ResourceManager 将作业递交给 Scheduler(有很多调度算法,一般是根据优先级)Scheduler 为作业分配一个 Container, ResourceManager 就加载一个 application master process 并交给 NodeManager 管理, ApplicationMaster 主要是创建一系列的监控进程来跟踪作业的进度,同时获取输入分片,为每一个分片创建一个 Map task 和相应的 reduce task 。 Application Master 还决定如何运行作业,如果作业很小(可配置),则直接在同一个JVM下运行

  3. Task assignment

    ApplicationMaster 向Resource Manager 申请资源(一个个的Container,指定任务分配的资源要求)一般是根据 data locality 来分配资源

  4. Task execution

    Application Master 根据 ResourceManager 的分配情况,在对应的 NodeManager 中启动 Container 。从 HDFS 读取任务所需资源(job jar,配置文件等),然后执行该任务

  5. Progress and status update

    定时将任务的进度和状态报告给 ApplicationMaster, Client 定时向 ApplicationMaster 获取整个任务的进度和状态

  6. Job completion

    Client 定时检查整个作业是否完成 作业完成后,会清空临时文件、目录等

Yarn Failover

  1. 任务失败

    运行时异常或者JVM退出都会报告给 ApplicationMaster

    通过心跳来检查挂住的任务(timeout),会检查多次(可配置)才判断该任务是否失效

    一个作业的任务失败率超过配置,则认为该作业失败

    失败的任务或作业都会有 ApplicationMaster 重新运行

  2. ApplicationMaster 失败

    ApplicationMaster 定时发送心跳信号到 ResourceManager,通常一旦 ApplicationMaster 失败,则认为失败,但也可以通过配置多次后才失败

    一旦 ApplicationMaster 失败,ResourceManager 会启动一个新的 ApplicationMaster

    新的 ApplicationMaster 负责恢复之前错误的 ApplicationMaster 的状态 (yarn.app.mapreduce.am.job.recovery.enable=true),这一步是通过将应用运行状态保存到共享的存储上来实现的,ResourceManager 不会负责任务状态的保存和恢复

    Client也会定时向 ApplicationMaster 查询进度和状态,一旦发现其失败,则向 ResouceManager 询问新的 ApplicationMaster

  3. NodeManager失败

    NodeManager 定时发送心跳到 ResourceManager,如果超过一段时间没有收到心跳消息,ResourceManager 就会将其移除

    任何运行在该 NodeManager 上的任务和 ApplicationMaster 都会在其他 NodeManager 上进行恢复

    如果某个 NodeManager 失败的次数太多,ApplicationMaster 会将其加入黑名单(ResourceManager没有),任务调度时不在其上运行任务

  4. ResourceManager失败

    通过 checkpoint 机制,定时将其状态保存到磁盘,然后失败的时候,重新运行

    通过 zookeeper 同步状态和实现透明的 High Available

可以看出,一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和 zookeeper 来ֹ实现 HA

Zookeeper

Zookeeper: 为分布式应用提供分布式协作(协调)服务。使用类似文件系统的树形结构。目的是分布式服务不再需要由于协作冲突而另外实现协作服务。

Zookeeper集群统一为其他分布式应用(也是集群,例如HDFS集群、Yarn集群等等)集群提供协作服务。

Zookeeper 的节点名称叫做 znode,以路径标注, 每个节点存储上限是 1MB。数据访问是原子的。

znode 有临时性和永久性之分。临时性 znode 在客户端会话结束后删除。临时性 znode 对所有客户端也是可见的。

一个顺序的 znode 由 Zookeeper 给定一个序号作为其名称一部分,如’/a/b-3’,3 即为指定的序号。可用来实现分布式锁。

Watch 在 znode 变更时通知客户端。客户端调用 znode 读操作时会同时获得一个 Watch,当 znode 发生写操作时,此 Watch 会被通知一次且仅一次。

更新 znode 必须声明版本号(乐观锁),如果版本号错误则更新失败,可指定版本号为 -1 进行强制更新。

Zookeeper 抛弃了文件系统的操作原语,因为文件非常小且整体读写,所以不需要打开、关闭、寻址操作。

支持 同步 和 异步 两种 API。

znode 维护一个 ACL 列表,决定谁可以访问那些操作。

Zookeeper 运行在一组叫做 ensemble 的集群上,服务器数量是奇数个,2N+1, N代表允许宕机的个数。

Zookeeper角色:Leader, Follower, Client

Zookeeper 采用 Zab 协议,分两个阶段:

  1. 领导者选举,且要求大部分更随者同领导者同步状态。

  2. 原子广播,所有写操作被传送给领导者,并通过广播将更新信息告诉跟随者,当大部分跟随者都同步了写操作,则返回成功给客户端。

每次写操作都有一个唯一全局标识,zxid,按先后顺序排序。所有的更新都是,顺序的,原子的,单系统镜像的,容错的,合时的(不过太过时)。

读操作由内存提供,可能读到过时信息,可调用 sync 强制同领导者同步。

tick time 是基本时长,其他时间均为该时间倍数。

案例:

  1. 配置服务:每次读操作获得一个 Watch,在更新时被通知,重新读即可。

  2. 分布式锁:指明一个lock znode,所有客户端在该 znode下创建临时 znode,拥有最小序列号的 znode 获得锁。避免链接丢失而重复创建锁,获得锁时,应检查属于该客户会话的锁是否存在。

Hadoop 读取数据

通过 InputFormat 决定读取的数据的类型,然后拆分成一个个 InputSplit ,每个 InputSplit 对应一个 Map 处理,RecordReader 读取 InputSplit 的内容给 Map。

InputSplit 代表一个个逻辑分片,并没有真正存储数据,只是提供了一个如何将数据分片的方法,Split 内有 获取长度和位置的方法,长度可排序大的先执行,位置利于数据局部化。

RecordReader 将 InputSplit 拆分成一个个 key, value 对给 Map 处理

大量小文件如何处理:CombineFileInputFormat 可以将若干个 Split 打包成一个

通常一个 split 就是一个 block,这样做的好处是使得 Map 可以在存储有当前数据的节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度

通过 mapred.min.split.size, mapred.max.split.size, block.size 来控制拆分的大小

分片间的数据如何处理:忽略最后一条记录,后一个split远程读该条记录

HDFS

不适用场景:低延迟访问,大量小文件,多用户写入。HDFS文件只有一个写入者,且总是在文件末尾。

HDFS中小于 64M 时不会占用整块空间,大容量块是为了减少寻址开销。

Namenode高可用:写入本地同时写入一个 NFS 挂载,第二Namenode

JAVA 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// URL
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
InputStream in = new URL("..").openStream();
IOUtils.copyBytes(in, System.out, 4096, false);

//FileSystem API
FileSystem fs = FileSystem.get(URI.create(".."), conf);
fs.oepn(new Path("..")) // return a FSDataInputStream, 支持随机访问和获取当前位置

//写入
FileSystem fs = FileSystem.get(URI.create(".."), conf);
OutputStream out = fs.create(new Path(".."), new Progressable(){}); // return a FSDataOutputStream, 支持获取当前位置
IOUtils.copyBytes(in, out, 4096, true);

fs.mkdirs(Path);

fs.exists(Path);

fs.getFileStatus(Path);

fs.listStatus(Path);

fs.globStatus(Path, PathFilter); // 返回路径上匹配的文件

fs.delete(Path, boolean recursive);

HDFS 读取数据

Namenode 只提供索引,不提供数据。错误时跳过,损坏时读副本。检查校验和。

网络拓扑

HDFS 写入数据

如果某datanode失败,会重新执行确认队列中的数据,且向namenode注册新身份,跳过故障节点。

写入副本的放置

一致模型:写入一个块即可看到该块,如果要同步读取,调用 sync() 方法。

distcp 分布式复制,多线程,两个 HDFS 之间传输。以 MapReduce 方式进行,只有 Map。

Hadoop Archives: 多个小文件合并成 .har 存储。以副本方式归档、归档后不可修改。

Hadoop I/O

CRC-32 校验和,每512字节计算32字节。

压缩Gzip,ZIP,bzip2,LZO。CompressionCodec 编码器,使用 native library 加速。CodecPool 编码器池。

压缩后无法分割,造成一个 map 处理多个块,无法数据本地化。

序列划:

  1. Writable 接口,write(DataOutput) 和 readFields(DataInput) 方法。
  2. 两个比较器,Writable 接口和RawComparator 接口。前者对象比较 CompareTo(A, B),后者字节比较 compare(byte[], int, int, byte[], int, int),默认的 compare 的实现会调用 compareTo, 因此也会反序列化成对象。

Writable 类:BooleanWritable, ByteWritable, IntWritable, VIntWritable, FloatWritable, LongWritable, VLongWritable, DoubleWritable.

Text: 严格按照字节寻址,unicode会拆成两个地址。

NullWritable: 单实例,除此之外,其他 Writable 都可以修改。

ObjectWritable:可封装多种类型。

GenericWritable:可封装多种类型,但通过数组索引指明类型。

Writable集合:ArrayWritbale,TwoDArrayWritable,MapWritable,SortedMapWritable。数组,二维数字,Map,sortedMap。

HashPartitioner 使用 writable 中的 hashCode() 方法来选择分区。

SequenceFile:用来打包多个小文件,读取提供,next(),getCurrentValue() 方法。seek() 可定位,但不是记录边界时会出错,可用 sync() 定位到下一个同步点。文件为 name.seq。

MapFile:经过排序带有索引的 SequenceFile。文件为 name.map。其中包含一个 index 和一个 data 文件。index 中只保留部分索引,提供 get() 和 getClost() 读取方法。可用 fix() 方法重建索引,或者从 SequenceFile 转为 MapFile。

Hadoop 测试

对 Map 和 Reduce 单元测试

本地作业上测试驱动程序,MiniDFSCluster 和 MiniMRCluster 测试。

远程调试:设置 keep.failed.task.files 为 true,保存失败任务输入,登录目标节点,用 IsolationRunner 再次执行。

Shuffle 和 排序

Map 端:缓冲区 100M,超过 80% 开始内存溢写,分区,且分区内排序,排序后执行combiner,多个溢写文件会合并,少于3个不合并。可压缩。

Reduce 端:

  • 复制阶段:5个线程复制,先复制到内存,超出阈值溢写到磁盘。需解压。
  • 排序阶段:根据合并系数进行合并,可能最后有多个文件。
  • reduce 阶段:多个文件输入 reduce 函数。

优化原则:为 shuffle 指定更多空间,避免多次磁盘溢写。对于Map,主要就是避免把文件写入磁盘,例如使用 Combiner,增大 io.sort.mb 的值。对于 Reduce ,主要是把 Map 的结果尽可能地保存到内存中,同样也是要避免把中间结果写入磁盘。默认情况下,所有的内存都是分配给 Reduce 方法的。使用压缩,增大buffer size。

任务执行

speculative execution, 某个任务过慢,开启相同任务作为备份,谁先完成用谁,另外一个则终止。

JVM 重用

跳过坏记录:skipping 模式,失败两次后,第三次失败则记录并跳过。

多个实例向同一个文件写:先写入临时文件夹,先完成的允许修改。

Hadoop 流格式

Input:

  • FileInputFormat,过滤隐藏文件,分割原则:max(min_split, min(max_split, block_size))
  • CombineFileInputFormat,把多个小文件打包到一个输入中。
  • TextInputForamt
  • KeyValueTextInputForamt,第一个 tab 分开 key 和 value
  • NLineInputForamt,指定每个 map 收到多少行
  • SteamInputForamt, 处理 XML
  • SequenceFileInputForamt,可处理 SequenceFile
  • SequenceFileAsTextInputForamt,键值变成 Text
  • SequenceFileAsBinaryInputForamt,键值变成 Binary
  • MultipleInputs,在每一个路径上规定 Format
  • DBInputFormat

Output:

  • TextOutputForamt
  • SequenceFileOutputForamt
  • SequenceFileAsBinaryOutputForamt
  • MapFileOutputForamt
  • MltipleOutputFormat, 可控制文件名
  • MultipleOutpus,文件名后跟’-r-分区索引’,但灵活,功能多
  • LazyOutputForamt,有记录时才写文件,避免空文件

计数器

任务失败计数器可能减少。枚举定义计数器,也可以用组名定义动态计数器。可通过 Web 界面,命令行 和 JAVA API 获取计数器。

排序

  • 部分排序:利用 MapReduce,
  • 部分排序查找:输出变为 MapFile
  • 全局排序:partition 对 key 从小到大分区,利用采样器 Sampler 均匀分区,partition 文件写入分布式缓存全局共享。
  • 二次排序:复合键,单键分区(到同一个reduce),单键分组(到同一组,setOutputValueGroupingComparator),自定义 comparator,按照双键排序(setOutputKeyComparatorClass)。

python 流中,可用 KeyFieldBasedPartitioner 进行单键分割,用 KeyFiledBasedComparator 比较双键比较。

链接

速度 全内存链接 > 部分内存链接 > MapReduce

  • Map 端:要求严格,用来链接多个作业输出,相同数量reducer,相同键,输出文件不可分。
  • Reduce 端:比如 1对多 链接,左半部分 key 为(链接键,0),右半部分 key 为(链接键,1),分区分组都按照连接键进行,0 和 1 保证了到达的先后顺序。

分布式缓存

使用 -files -archives,让APPlicationMaster直接去拷贝。引用计数和缓存清理。

或者通过API,addCacheFile() 和 addCacheArchive() 去指定拷贝。添加 symlink 符号链接。

增加和移除节点

Pig

Hbase

Hbase 是一个分布的,面向列的,排序的 map 存储。

基于Google Bigtable的开源实现,是一个具有高可靠性、高性能、面向列、可伸缩性、典型的key/value分布式存储的nosql数据库系统,主要用于海量结构化和半结构化数据存储。它介于nosql和RDBMS之间,仅能通过行键(row key)和行键的range来检索数据,行数据存储是原子性的,仅支持单行事务(可通过hive支持来实现多表join等复杂操作)。HBase查询数据功能很简单,不支持join等复杂操作,不支持跨行和跨表事务

单元格内容为无解释的字节数组,行键也是字节数组,表以行键排序。面向列族的存储。

把表按照行切分成区域,区域分散在 HBase 集群上。

行级事务,行的更新时原子的。

两个目录表,-ROOT- 表包含 .META. 表所在区域。.META. 表包含所有用户空间区域列表。

区域自动平衡。

Hive

Hive 中存放是表,是和 hdfs 的映射关系,hive 是逻辑上的数据仓库,实际操作的都是 hdfs 上的文件,HQL 就是用 sql 语法来写的 mr 程序。

Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供HQL语句(类SQL语言)查询功能,存储依赖于HDFS。支持多种计算引擎,如Spark、MapReduce(默认)、Tez;支持多种存储格式,如TextFile、SequenceFile、RCFile、ORC、Parquet(常用);支持多种压缩格式,如gzip、lzo、snappy(常用)、bzip2

Sqoop

hadoop生态圈上的数据传输工具。

可以将关系型数据库的数据导入非结构化的hdfs、hive或者bbase中,也可以将hdfs中的数据导出到关系型数据库或者文本文件中。

使用的是mr程序来执行任务,使用jdbc和关系型数据库进行交互。

Flume

Flume 一个分布式、可靠的、高可用的用于数据采集、聚合和传输的系统。常用于日志采集系统中,支持定制各类数据发送方用于收集数据、通过自定义拦截器对数据进行简单的预处理并传输到各种数据接收方如HDFS、HBase、Kafka中。之前由Cloudera开发,后纳入Apache

ALS, canopy, 朴素贝叶斯分类, apriori

Spark

Spark是一个快速、通用、可扩展、可容错的、内存迭代式计算的大数据分析引擎。目前生态体系主要包括用于批数据处理的SparkRDD、SparkSQL,用于流数据处理的SparkStreaming、Structured-Streaming,用于机器学习的Spark MLLib,用于图计算的Graphx以及用于统计分析的SparkR,支持Java、Scala、Python、R多种数据语言