ᕕ( ᐛ )ᕗ Jimyag's Blog

大数据技术及应用 - 复习资料

大数据技术及应用的复习资料。

大数据概述

大数据概念

指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力来适应海量、高增长率和多样化的信息资产。

大数据特性

  1. 海量性
  2. 多样性
  3. 真实性
  4. 价值密度低
  5. 高速性
  6. 可变性

img

大数据的影响

  1. 数据的运行、计算速度越来越快

  2. 数据存储成本下降

  3. 实现信息对等解放脑力,机器拥有人的智慧

大数据的关键技术

  1. 分布式系统基础架构 Hadoop 的出现,为大数据带来了新的曙光;

  2. HDFS 为海量的数据提供了存储;

  3. MapReduce 则为海量的数据提供了并行计算,从而大大提高了计算效率;

  4. Spark、Storm、Impala、Flink 等各种各样的技术进入人们的视野。

大数据与云计算物联网的关系

物联网、云计算和大数据三者互为基础。

物联网产生大数据,大数据需要云计算。

物联网在将物品和互联网连接起来,进行信息交换和通信,以实现智能化识别、定位、跟踪、监控和管理的过程中,产生的大量数据,云计算解决万物互联带来的巨大数据量,所以三者互为基础,又相互促进。如果不那么严格的说,它们三者可以看做一个整体,相互发展、相互促进。

以下关于云计算、大数据和物联网之间的关系,论述错误的是:B

A、物联网可以借助于大数据实现海量数据的分析

B、云计算侧重于数据分析

C、物联网可以借助于云计算实现海量数据的存储

D、云计算、大数据和物联网三者紧密相关,相辅相成

hadoop 简介

Hadoop 简介

hadoop 是一个分布式系统基础架构,hadoop 的框架最核心的设计是 HDFS 和 MapReduce,HDFS 为海量的数据提供了存储能力,MapReduce 为海量数据提供了计算能力

Hadoop 特性/优点

  1. 高可靠性:采用冗余数据存贮方式,即使一个副本发生故障,其他副本也可以保证对外工作的正常进行

  2. 高扩展性:采用冗余数据存贮方式,即使一个副本发生故障,其他副本也可以保证对外工作的正常进行。

  3. 高效性:作为并行分布式计算平台,hadoop 采用分布式存贮和分布式处理两大核心技术,能够高效的处理 PB 级别的数据

  4. 高容错性:采用冗余数据存贮方式,自动保存数据的多个副本,并且能够自动将失败的任务重新分配。

  5. 经济性:hadoop 采用廉价的计算机集群,普通的用户也可以 pc 机搭建环境

  6. 运行在 linux 平台上,hadoop 是基于 java 语言开发的,可以较好的运行在 linux 的平台上

  7. 支持多种编程语言,如:C++ 等/

Hadoop 缺点

在当前 Hadoop 的设计中,所有的 metadata 操作都要通过集中式的 NameNode 来进行,NameNode 有可能是性能的瓶颈。

当前 Hadoop 单一 NameNode、单一 Jobtracker 的设计严重制约了整个 Hadoop 可扩展性和可靠性。

首先,NameNode 和 JobTracker 是整个系统中明显的单点故障源。再次,单一 NameNode 的内存容量有限,使得 Hadoop 集群的节点数量被限制到 2000 个左右,能支持的文件系统大小被限制在 10-50PB,最多能支持的文件数量大约为 1.5 亿左右。实际上,有用户抱怨其集群的 NameNode 重启需要数小时,这大大降低了系统的可用性。

Hadoop 项目结构

详细介绍

img

1,core/common

Hadoop Common 原名为 Hadoop Core,0.20 版本之后改为 common。自 0.21 版本之后,HDFS 和 MapReduce 被分离出来作为单独的子项目,其余部分构成 Hadoop Common。

Common 是为 Hadoop 及其他子项目提供支持的常用工具,主要包括文件系统,RPC 和串行化库,他们为在廉价的硬件上搭建云计算环境提供基本的服务,同时也为运行在该平台上的软件开发提供所需要的 API。

2,Avro

Avro 是 Hadoop 的一个子项目,也是 Apache 中的一个独立项目。

Avro 是一个用于数据序列化的系统,提供了丰富的数据结构类型,快速可压缩的二进制数据格式,存储持久性数据的文件集,远程调用的功能和简单的动态语言集成功能。

Avro 可以将数据结构或对象转化成便于存储和传输的格式,节约数据存储空间和网络传输带宽,Hadoop 的其它子项目的客户端与服务端之间的数据传输都采用 Avro。

3,HDFS

HDFS 是 Hadoop 项目的两大核心之一,它是针对谷歌文件系统(GFS)的开源实现。

HDFS 具有处理超大数据,流式处理,可以运行在廉价商用服务器上等优点。

HDFS 在设计之初就是要运行在廉价的大型服务器集群上,因此,在设计上就把硬件故障作为一种常态来考虑,可以保证在部分硬件发生故障的情况下,仍能保证文件系统的整体的可用性和可靠性。

HDFS 放宽了一部分 POSIX 约束,从而实现以流的形式访问文件系统中的数据。

HDFS 在访问应用程序数据时候,可以具有很高的吞吐量,因此,对于超大数据集的应用程序而言,选择 HDFS 作为底层数据存储是较好的选择。

4,HBase

HBase 是一个提供高可靠性,高性能,可伸缩,实时读写,分布式的列式数据库,一般采用 HDFS 作为其底层数据存储。

HBase 是针对谷歌的 BigTable 的开源实现,二者都采用了相同的数据模型,具有强大的非结构化数据存储能力。HBase 与传统关系数据库的一个重要区别就是,前者是基于列的存储,而后者采用基于行的存储。HBase 具有良好的横向扩展能力,可以通过不断增加廉价的商用服务器来增加存储能力。

5,MapReduce

Hadoop MapReduce 是这对 google 的 MapReduce 的实现。

MapReduce 是一种编程模型,用于大规模数据集的并行计算,它将复杂,运行于大规模集群上的并行计算过程高度的抽象到了两个函数——Map 和 Reduce,并允许用户在不了解分分布式系统底层细节的情况下开发并行应用程序,并将其运行于廉价计算机集群上,完成海量数据的处理。

6,Zookeeper

Zookeeper 是针对谷歌 Chubby 的一个开源实现,是高效和可靠的协同工作系统,提供分布式锁之类的基本服务,用于构建分布式应用,减轻分布式应用程序锁承担的协调任务。

Zookeeper 使用 Java 编写,很容易编程接入,它使用了一个和文件树结构相似的数据模型,可以使用 Java 或者 C 来进行编程接入。

7,Hive

Hive 是一个基于 Hadoop 的数据仓库工具,可以用于对 Hadoop 文件中的数据集进行数据整理,特殊查询和分析存储。

Hive 的学习门槛较低,因为,它提供了类似于关系数据 SQL 语言的特殊查询语言——Hive QL,可以通过 Hive QL 语句快速实现简单的 MapReduce 统计,Hive 自身可以将Hive QL 语句转换为 MapReduce 任务进行运行,而不必开发专门的 MapReduce 应用,因而十分适合数据仓库的统计分析。

8,Pig

Pig 是一种数据流语言和运行环境,适合于使用 Hadoop 和 MapReduce 平台来查询大型半结构化数据集。

Pig 的出现大大简化了 Hadoop 常见的工作任务,它在 MapReduce 的基础上创建了更简单的过程语言抽象,为 Hadoop 应用程序提供了一种更加接近结构化查询语言的接口。

Pig 是一个相对简单的语言,它可以执行 SQL 语句,因此,当我们需要从大型数据集中搜索满足某个给定搜索条件的记录时,采用 Pig 要比 MapReduce 具有明显的优势,前者只需要编写一个简单的脚本在集群中自动并行处理与分发,而后者则需要编写一个单独的 MapReduce 应用程序。

9,Sqoop

Sqoop 可以改进数据的互操作性,主要用来在 Hadoop 和关系数据库直接交换数据。

通过 Sqoop,我们可以方便的将关系数据库之中的数据导入 Hadoop,或者将 Hadoop 中的数据导入关系数据库。Sqoop 主要通过 JDBC和关系数据库进行交互,理论上,支持 JDBC 的关系数据库都可以使 Sqoop 和 Hadoop 进行数据交互。

Sqoop 是专门为大数据集设计的,支持增量更新,可以将新纪录添加到最近一次到处的数据源上,或者指定上次修改的时间戳。

10,Chukwa

Chukwa 是一个开源的,用于监控大型分布式系统的数据收集系统,可以将各种类型的数据收集成合适的 Hadoop 处理的文件,并保存在 HDFS 中供 Hadoop 进行各种 MapReduce 操作。

Chukwa 构建在 Hadoop 的 HDFS 和 MapReduce 框架之上,继承了 Hadoop 的可伸缩性和可扩展性。

Chukwa 内置了一个强大而灵活的工具集,可用于展示,监控和分析已收集的数据。

Hadoop 生态系统

详细介绍

1.Hive 2.Hbase 3.Pig 4.Sqoop 5.Flume 6.Zookeeper 7.Spark 8.Storm 9.Avr

img

Hdfs

HDFS( Hadoop Distributed File System) 是一个易于扩展的分布式文件系统

Hdfs 体系结构

HDFS 采用的是 master/slaves 主从结构模型来管理数据。

这种结构模型主要由四个部分组成:

Client(客户端)、Namenode(名称节点)、Datanode(数据节点) 和 SecondaryNamenode(第二名称节点,辅助 Namenode)。

一个真正的 HDFS 集群包括一个 Namenode 和若干数目的 Datanode。

Namenode 是一个中心服务器,负责管理文件系统的命名空间 (Namespace ) 及客户端对文件的访问。

集群中的 Datanode 一般是一个节点运行一个 Datanode 进程,负责管理客户端的读写请求,在 Namenode 的统一调度下进行数据块的创建、删除和复制等操作。

Client 的主要功能

  1. 在上传文件时将文件切分为 Block,在文件下载时将文件合并;
  2. 上传与下载数据文件时,与 NameNode 交互,获取文件元数据;
  3. 上传与下载数据文件时,与 DataNode 交互,读取或写入数据。

NameNode 介绍

  1. 主要功能提供名称查询服务,用来保存 metadata 信息
  2. 管理文件系统的命名空间;(它维护着文件系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上
  3. 管理元数据:文件的位置、所有者、权限、数据块 block 信息
  4. 管理 Block 副本策略:多少个副本,默认 3 个副本;
  5. 处理客户端读写请求,为 DataNode 分配任务。

DataNode 介绍

  1. 主要功能保存 Block。
  2. Slave 工作节点(可大规模扩展);
  3. 存储 Block 和数据校验和执行客户端发送的读写操作;
  4. 通过心跳机制定期(默认 3 秒)向 NameNode 汇报运行状态和 Block 列表信息,如果 NN10 分钟没有收到 DN 的心跳,则认为其已经 lost,并复制其上的 block 到其它 DN;
  5. 集群启动时,DataNode 向 NameNode 提供 Block 列表信息。(数据块的位置并不是由 namenode 维护的,而是以块列表的形式,存储在 datanode 中,在安全模式中,datanode 会向 namenode 发送最新的块列表信息。)

Bloack 数据块

  1. HDFS 是 HDFS 的最小存储单元;
  2. 文件写入 HDFS 会被切分成若干个 Block;
  3. Block 大小固定,默认为 128MB,可自定义;
  4. 若一个 Block 的大小小于设定值,物理上不会占用整个块空间;
  5. 默认情况下每个 Block 有 3 个副本。
  6. Block 和元数据分开存储:Block 存储于 DataNode,元数据存储于 NameNode;
  7. 如何设置 Block 大小:
    1. 目标:最小化寻址开销,降到 1% 以下
    2. 默认大小:128M
    3. 块太小:寻址时间占比过高
    4. 块太大:Map 任务数太少,作业执行速度变慢
  8. Block 多副本:
    1. 以 DataNode 节点为备份对象
    2. 机架感知:将副本存储到不同的机架上,实现数据的高容错
    3. 副本均匀分布:提高访问带宽和读取性能,实现负载均衡

元数据存储

  1. 元数据的两种存储形式:
    1. 内存元数据(NameNode)
    2. 文件元数据(edits + fsimage)

Block 副本放置机制

  1. 第一个副本:放置在上传文件的 DN 上,如果是集群外提交,则随机挑选一台磁盘不太满、CPU 不太忙的节点上;
  2. 第二个副本:与第一个不同机架的节点上;
  3. 第三个副本:与第一个机架相同的其他节点上

节点选择:同等条件下优先选择空闲节点

image-20220427153152628

Block 大小和副本数由 Client 端上传文件到 HDFS 时设置,其中副本数可以变更,Block 是不可以在上传后变更的。

不一次性写三份,而是由一个 dn 写入另一个 dn,目的是防止阻塞,防止并发量过大。

安全模式

什么是安全模式

  1. 安全模式是 HDFS 的一种特殊状态,在这种状态下,HDFS 只接收读数据请求,而不接收写入、删除、修改等变更请求;
  2. 安全模式是 HDFS 确保 Block 数据安全的一种保护机制;
  3. Active NameNode 启动时,HDFS 会进入安全模式,DataNode 主动向 NameNode 汇报可用 Block 列表等信息,在系统达到安全标准前,HDFS 一直处于“只读”状态。

何时正常离开安全模式

  1. Block 上报率:DataNode 上报的可用 Block 个数 / NameNode 元数据记录的 Block 个数;
  2. 当 Block 上报率 >= 阈值时,HDFS 才能离开安全模式,默认阈值为 0.999;
  3. 不建议手动强制退出安全模式。

触发安全模式的原因

  1. NameNode 重启
  2. NameNode 磁盘空间不足
  3. Block 上报率低于阈值
  4. DataNode 无法正常启动
  5. 日志中出现严重异常
  6. 用户操作不当,如:强制关机(特别注意!)

HDFS 文件的读取 - 重点

  1. 客户端向 NameNode 请求读取文件
  2. NameNode 查找目录树,查询块和 NameNode 的关系
  3. 按照 NameNode 与客户端的距离由近到远的顺序列表返回给客户端
  4. 客户端与最近的 DataNode 连接
  5. DataNode 返回相应 Block 的数据
  6. 客户端组装 block 成一个文件

image-20220425234518340

HDFS 文件的写入

  1. 客户端请求上传文件
  2. NameNode 检查目录中是否存在这个文件,并返回是否可以上传
  3. 客户端将文件切块
  4. 客户端向 NamNode 提出上传的各个 block 的列表
  5. NameNode 检查 DataNode 信息
  6. NameNode 返回可以上传的 DataNode 列表
  7. 客户端请求与 DataNode 建立传输 Block 的通道
    1. 上传到一个结点 A 之后,通过这个结点 A 复制到另一个结点 B
    2. 再通过复制的结点 B 复制到新的结点 C
  8. 客户端以 Packet 为单位发送数据
  9. 客户端通知 NameNode 成功写入 Block

image-20220425234554417

Hdfs 存储原理

HDFS 采用 master/slave 架构。一个 HDFS 集群是由一个 Namenode 和一定数目的 Datanodes 组成。NameNode 作为 master 服务,它负责管理文件系统的命名空间和客户端对文件的访问。DataNode 作为 slave 服务,在集群中可以存在多个。通常每一个 DataNode 都对应于一个物理节点。DataNode 负责管理节点上它们拥有的存储,它将存储划分为多个 block 块,管理 block 块信息,同时周期性的将其所有的 block 块信息发送给 NameNode。

Hadoop1.0 与 2.0 的区别

  1. 提出 HDFS Federation,它让多个 NameNode 分管不同的目录进而实现访问隔离和横向扩展,同时彻底解决了NameNode 单点故障问题
  2. 针对 Hadoop1.0 中的 MapReduce 在扩展性和多框架支持等方面的不足,它将 JobTracker 中的资源管理和作业控制分开,分别由 ResourceManager(负责所有应用程序的资源分配)和 ApplicationMaster(负责管理一个应用程序)实现,即引入了资源管理框架 Yarn。通用的资源管理模块,可为各类应用程序进行资源管理和调度

HDFS 的主要组件及功能

Block 是 HDFS 最小存储单元,大小固定,1.X 默认是 64MB2.X 默认为 128MB,可自定义。默认情况下每个 Block 有(至少)三个副本,通过水平复制,达到数据冗余度的要求。

单一 master(NameNode)来协调存储元数据。

nameNode DataNode
存储元数据 存储我呢见数据
元数据保存在内存中 文件保存在磁盘上
保存文件,Block,DataNode 之间的映射关系 维护了 block id 到 datanode 本地文件的映射关系

HDFS 适用场景

  1. 超大文件
  2. 流式数据访问
    1. 一次写入、多次读取
    2. 传输时间和寻址时间

不适用

  1. 低延时
  2. 大量小文件
  3. 多用户写入、任意修改文件

Shell 语法

ls:查看文件

hadoop fs -ls / 查看HDFS文件系统上的文件
hadoop fs -ls -R / 查看HDFS文件系统多层文件夹

mkdir:创建文件夹

hadoop fs -mkdir /test/ 创建test文件夹
hadoop fs -mkdir -p /a/b 创建多层文件夹

put:上传文件

hadoop fs -put 1.tar /test 把当前目录的1.tar上传到hdfs的test目录

cat/text :查看文件内容

hadoop fs -cat 1.txt
hadoop fs -text 1.txt

get:下载文件

hadoop fs -get /test/1.tar test.tar 把hdfs的test目录下的1.tar下载到本地,命名为test.tar

rm:删除文件

hadoop fs -rm /test/1.tar 删除HDFS系统上test目录下的1.tar文件
hadoop fs rm -r /test/ 删除HDFS系统上test目录
或者
hadoop fs -rmr /test/ 删除HDFS系统上test目录

MapReduce

特点

  1. 无需管理 master、slave 和分布式,程序员只需关注业务本身。
  2. 计算跟着数据走
  3. 良好的扩展性:计算能力随着节点数增加,近似线性递增
  4. 高容错
  5. 状态监控
  6. 适合海量数据的离线批处理
  7. 降低了分布式编程的门槛
  8. MapReduce 框架采用了 Master/Slave 架构,包括一个 Master 和若干个 Slave。
  9. Master 上运行 JobTracker,负责作业管理、状态监控和任务调度等,Slave 上运行 TaskTracker,负责任务的执行和任务状态的汇报;

适用场景

  1. 数据统计,如:网站的 PV(page view)、UV(user visit)统计,搜索引擎构建索引
  2. 海量数据查询、复杂数据分析算法实现

不适用场景

  1. OLAP(On-Line Analytical Processing)联机分析处理

  2. 要求毫秒或秒级返回结果

  3. 流计算

    流计算的输入数据集是动态的,而 MapReduce 是静态的

  4. 多步骤的复杂计算任务

MapReduce 体系结构

主要由四个部分组成,分别是 Client、JobTracker、TaskTracker 以及 Task。

image-20220427162140197

Client

  1. 用户编写的 MapReduce 程序通过 Client 提交到 JobTracker 端;
  2. 用户可通过 Client 提供的一些接口查看作业运行状态。

JobTracker

  1. JobTracker 负责资源监控和作业调度;
  2. JobTracker 监控所有 TaskTracker 与 Job 的健康状况,一旦发现失败,就将相应的任务转移到其他节点;
  3. JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源。

TaskTracker

  1. TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给 JobTracker,同时接收 JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等);
  2. TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,而 Hadoop 调度器的作用就是将各个 TaskTracker 上的空闲 slot 分配给 Task 使用。slot 分为 Map slot 和 Reduce slot 两种,分别供 MapTask 和 Reduce Task 使用。

Task

  1. Task 分为 Map Task 和 Reduce Task 两种,均由 TaskTracker 启动。

MapReduce 工作流程

MapReduce 就是将输入进行分片,交给不同的 Map 任务进行处理,然后由 Reduce 任务合并成最终的解。

image-20220427204105219

  1. 对输入的数据进行分片格式化。
  2. 执行 MapTask。每个切片分配一个 map() 任务,map() 对其中的数据进行计算,对每个数据用键值对的形式记录。
  3. 对 MapTask 进行 Shuffle,形成内部有序,整体无序的小文件
  4. 将小文件传到 Reduce() 中执行,,然后进行归并排序,最终输出

注意

  1. 不同的 Map 任务之间不会进行通信
  2. 不同的 Reduce 任务之间也不会发生任何信息交换
  3. 用户不能显式地从一台机器向另一台机器发送消息
  4. 所有的数据交换都是通过 MapReduce 框架自身去实现的

WordCount 流程

image-20220427205450352

  1. 我们将任务切为三份,所以启动三个 map 任务。
  2. 我们会启动四个 reduce 任务,所以数据被重构,重新分布成四份,每份对应一个 reduce。
  3. map 将数据转换为键值对,reduce 将键值对合并。

其中:spliting 和 Mapping 是用户实现的,Shuffling 是框架实现的,Reducing 是用户实现

image-20220427213043596

Job & Task(作业与任务)

  1. 作业是客户端请求执行的一个工作单元,如整个 wordcount 计算作业;
  2. 包括输入数据、MapReduce 程序、配置信息
  3. 任务是将作业分解后得到的细分工作单元,如其中的一个 map 任务。
  4. 分为 Map 任务和 Reduce 任务两类

Split(切片)

  1. 输入数据被划分成等长的小数据块,称为输入切片(Input Split),简称切片;
  2. 每个 Split 交给一个 Map 任务处理,Split 的数量决定 Map 任务的数量;
  3. Split 的划分方式由程序设定,按照 HDFS block 是其中的一种;
  4. Split 越小,负载越均衡,但集群的开销越大;

Shuffle 阶段(洗牌)

  1. Map、Reduce 阶段的中间环节,负责执行 Partition(分区)、Sort(排序)、Spill(溢写)、Merge(合并)、抓取(Fetch)等工作;
  2. Partition 决定了 Map 任务输出的每条数据放入哪个分区,交给哪个 Reduce 任务处理;
  3. Reduce 任务的数量决定了 Partition 数量;(reduce 任务的数量并非由输入数据的大小决定,而是特别指定的。 )
  4. Partition 编号 = Reduce 任务编号 =“hash(key) % reduce task number”;
  5. 避免和减少 Shuffle 是 MapReduce 程序调优的重点。

Mapper、Partition、Reducer 数目的确定与关系?

  1. Mapper:由客户端分片情况决定,客户端获取到输入路径的所有文件,依次对每个文件执行分片,分片大小通过最大分片大小、最小分片大小、hdfs 的 blocksize 综合确定,分片结果写入 job.split 提交给 yarn,对每个分片分配一个 Mapper,即确定了数目。
  2. Partition:由 PartitionerClass 中的逻辑确定,默认情况下使用的 HashPartitioner 中使用了 hash 值与 reducerNum 的余数,即由 reducerNum 决定,等于 Reducer 数目。如果自定义的 PartitionerClass 中有其他逻辑比如固定了,也可以与 Reducer 数目无关,但注意这种情况下,如果 reducerNum 小于分区数则会报错,如果大于则会产生无任务的 reduecer 但不会影响结果。但是如果 reducerNum 只有 1 个,则不会报错而是所有分区都交给唯一的 reducer。
  3. Reducer:通过 job.setNumReduceTasks 手动设置决定。

MapReduce shuffle 过程

img

Map 端的 shuffle 过程

image-20220427221211907

  1. 每个 Map 任务分配一个缓存

    MapReduce 默认 100MB 缓存

  2. 设置溢写比例 0.8

  3. 分区默认采用哈希函数

  4. 排序是默认的操作

  5. 排序后可以合并(Combine)(自定义)

  6. 合并不能改变最终结果

  7. 在 Map 任务全部结束之前进行归并

  8. 归并得到一个大的文件,放在本地磁盘

  9. 文件归并时,如果溢写文件数量大于预定值(默认是 3)则可以再次启动 Combiner,少于 3 不需要

  10. JobTracker 会一直监测 Map 任务的执行,并通知 Reduce 任务来领取数据

合并(Combine)和归并(Merge)的区别:

两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1»

image-20220427222720142

Reduce 端的 Shuffle 过程

  1. Reduce 任务通过 RPC(远程过程调用)向 JobTracker 询问 Map 任务是否已经完成,若完成,则领取数据;
  2. Reduce 领取数据先放入缓存,来自不同 Map 机器,先归并,再合并,写入磁盘;
  3. 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的;
  4. 当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给 Reduce。

image-20220427222754270

image-20220427222847111

Shuffle 详解 - 及这个就行

Map 端
  1. Map 任务将中间结果写入专用内存缓冲区 Buffer(默认 100M),同时进行 Partition 和 Sort(先按“key hashcode % reduce task number”对数据进行分区,分区内再按 key 排序)
  2. 当 Buffer 的数据量达到阈值(默认 80%)时,将数据溢写(Spill)到磁盘的一个临时文件中,文件内数据先分区后排序
  3. Map 任务结束前,将多个临时文件合并(Merge)为一个 Map 输出文件,文件内数据先分区后排序
  4. 所有 Map 任务完成后,Map 阶段结束,一般每个 Map 任务都有输出
Reduce 端
  1. Reduce 任务从多个 Map 输出文件中主动抓取(Fetch)属于自己的分区数据,先写入 Buffer,数据量达到阈值后,溢写到磁盘的一个临时文件中
  2. 数据抓取完成后,将多个临时文件合并为一个 Reduce 输入文件,文件内数据按 key 排序

YARN 模式(Hadoop 2.X)

image-20220427223407142

image-20220427223414098

HBase

HBase 概念

HBase 是一个高可靠、高性能、面向列、可伸缩的分布式数据库,主要用来存储非结构化和半结构化的松散数据。

HBase 与传统数据库的对比

数据类型:传统数据库数据类型较丰富,Hbase 数据类型更加简单。

数据操作:传统数据库涉及多表连接,Hbase 不存在。

存储模式:关系数据库是基于行模式存储的。HBase 是基于列存储的。

数据索引:关系数据库可以针对不同列构建多个索引,HBase 只有行键索引。

数据维护:传统数据库更新会丢失版本旧的数据,Hbase 更新会保留版本旧的数据。

可伸缩性:关系数据库很难实现横向扩展,纵向扩展的空间也比较有限。Hbase 相反。

HBase 适用场景

  1. 并发查询

    1. 海量数据
    2. 高并发
    3. 简单条件查询
  2. 半结构化和非结构化数据存储

    1. 10K~10M 的结构化和非结构化数据

HBase 数据模型

数据模型概述

HBase 是一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳

数据模型相关概念

1. 表:HBase 采用表组织数据,由行和列构成

2. 行:由行键来标识

3. 列族:基本的访问控制单元

4. 列限定符:数据通过列限定符定位

5. 单元格:通过行、列、列限定符确定一个单元格

6. 时间戳:每个单元格都保存着同一份数据的不同版本,这些版本采用时间戳进行索引。

四维模型

image-20220427230835914

HBase 中需要根据行键、列族、列限定符和时间戳来确定一个单元格,因此,可以视为一个“四维坐标”,即[行键, 列族, 列限定符, 时间戳]

image-20220427231407273

image-20220427231550636

Hbase 实现原理

功能组件

  1. 库函数

  2. 一个 Master 主服务器 : 负责管理和维护 HBase 表的分区信息,维护 Region 服务器列表,分配 Region,负载均衡

  3. 许多个 Region 服务器:负责存储和维护分配给自己的 Region,处理来自客户端的读写请求;客户端并不是直接从 Master 主服务器上读取数据,而是在获得 Region 的存储位置信息后,直接从 Region 服务器上读取数据

Region
  1. 分布式存储和负载的最小单元;
  2. 系统将表水平划分(按行)为多个 Region,每个 Region 保存表的一段连续数据;
  3. 默认每张表开始只有一个 Region,随着数据不断写入,Region 不断增大,当 Region 大小超过阀值时,当前 Region 会分裂成两个子 Region。
  4. 每个 Region 默认大小是 100MB 到 200MB(2006 年以前的硬件配置)
    1. 每个 Region 的最佳大小取决于单台服务器的有效处理能力
    2. 目前每个 Region 最佳大小建议 1GB-2GB(2013 年以后的硬件配置)
  5. 同一个 Region 不会被分拆到多个 Region 服务器
  6. 每个 Region 服务器存储 10-1000 个 Region
  7. 表被切分成多个 Regions,分布到多个 RegionServers 上

image-20220427233651462

Hbase 三层结构

一、Zookeeper 文件:记录了-ROOT-表的位置信息

二、ROOT-表:记录了.META.表的 Region 位置信息

三、META.表:-记录了数据表的 Region 位置信息

为了加快访问速度,.META.表的全部 Region 都会被保存在内存中

image-20220427234006224

客户端访问数据时的“三级寻址”

  1. 为了加速寻址,客户端会缓存位置信息,同时,需要解决缓存失效问题;
  2. 寻址过程客户端只需要询问 Zookeeper 服务器,不需要连接 Master 服务器。

HBase 系统架构

image-20220427234840843

客户端

客户端包含访问 HBase 的接口,同时在缓存中维护着已经访问过的 Region 位置信息,用来加快后续数据访问过程。

Zookeeper 服务器

  1. Zookeeper 可以帮助选举出一个 Master 作为集群的总管,并保证在任何时刻总有唯一一个 Master 在运行,这就避免了 Master 的“单点失效”问题;

    image-20220427235003425

  2. 监控 RegionServer 的上下线信息,并通知 Master;存储元数据的寻址入口;存储所有 Region 的寻址入口。

  3. Zookeeper 是一个很好的集群管理工具,被大量用于分布式计算,提供配置维护、域名服务、分布式同步、组服务等。

Master

Region 服务器(Slave)

Region 服务器工作原理

 Region 服务器

Region 按照列族,分为多个 Store。(下图)
  1. 一个 Region 由多个 Store 组成,每个 Store 存储一个列族。Region 是分布式存储的最小单元,而 Store 是存储落盘的最小单元。
  2. Store 由一个 MemStore 和若干 StoreFile 组成。

image-20220428094104999

MemStore 与 StoreFile
  1. MemStore 是 Store 的内存缓冲区,StoreFile 是 MemStore 的磁盘溢写文件,在 HDFS 中被称为 HFile。
  2. 数据读写都先访问 MemStore。Client 读取数据时,先找 MemStore,再找 StoreFile。写数据时,先写 MemStore,当数据量超过阈值时,RegionServer 会将 MemStore 中的数据溢写磁盘,每次溢写都生成一个独立的 StoreFile(HFile);
    1. 读数据时
      1. client 先找 MemStore,再找 StoreFile
    2. 写数据时
      1. 先再 MemStore 中写入,
      2. 数据量超过阈值时,RegionServer 将 MemStore 中的数据溢写磁盘。每次溢写独立生成一个 StoreFile

image-20220428094921168

用户写入数据

  1. 用户写入数据时,被分配到相应 Region 服务器去执行
  2. 用户数据首先被写入到 MemStore 和 Hlog 中
  3. 只有当操作写入 Hlog 之后,commit() 调用才会将其返回给客户端
向 Hbase 写入数据
  1. 访问 ZK,获取 meta 表所在 RegionServer 和 Region
  2. 读取 meta 表,获取所有 Region 在所有 RegionServer 上的分布,和每个 Region 中维护的表数据的范围。
  3. 根据主键和 meta 表,得出待写数据归属的 Region 和 RegionServer,向特定 RegionServer 发送数据
RegionServer 接受数据
  1. RegionServer 收到数据。先将操作写入 HLog,再将数据写入 MemStore。当 MemStore 的数据量超过阈值时,将数据溢写磁盘,生成一个 StoreFile 文件。
  2. 当 Store 中 StoreFile 的数量超过阈值时,将若干小 StoreFile 合并。
  3. 当 Region 中最大 Store 的大小超过阈值时,Region 分裂成两个子 Region
HLog
  1. 含义:以 WAL(Write Ahead Log,预写日志)方式写数据时产生的日志文件
  2. 目的:RegionServer 意外宕机时的数据恢复
  3. 先写 HLog,再写 MemStore,最后写 StoreFile,每个 RegionServer 维护一个 HLog
  4. 定期删除 HLog 过期数据
  5. 用户更新数据必须首先写入日志后,才能写入 MemStore 缓存,并且,直到 MemStore 缓存内容对应的日志已经写入磁盘,该缓存内容才能被刷写到磁盘。
  6. Zookeeper 会实时监测每个 Region 服务器的状态,当某个 Region 服务器发生故障时,Zookeeper 会通知 Master;
  7. Master 首先会处理该故障 Region 服务器上面遗留的 HLog 文件,这个遗留的 HLog 文件中包含了来自多个 Region 对象的日志记录;
  8. 系统会根据每条日志记录所属的 Region 对象对 HLog 数据进行拆分,分别放到相应 Region 对象的目录下,然后,再将失效的 Region 重新分配到可用的 Region 服务器中,并把与该 Region 对象相关的 HLog 日志记录也发送给相应的 Region 服务器;
  9. Region 服务器领取到分配给自己的 Region 对象以及与之相关的 HLog 日志记录以后,会重新做一遍日志记录中的各种操作,把日志记录中的数据写入到 MemStore 缓存中,然后,刷新到磁盘的 StoreFile 文件中,完成数据恢复;
  10. 共用日志优点:提高对表的写操作性能;缺点:恢复时需要分拆日志。

image-20220428100325890

用户读数据

image-20220428100532053

Client 从 Hbase 读取数据

  1. 访问 ZK,获取 meta 表所在 RegionServer 和 Region
  2. 读取 meta 表,获取所有 Region 在所有 RegionServer 上的分布,和每个 Region 中维护的表数据的范围。
  3. 缓存 meta 表位置和内容。根据表空间、表名、主键和 meta 表内容,得出待读数据归属的 Region 和 RegionServer,从特定 RegionServer 读数据。
  4. RegionServer 先从 MemStore 读取数据,如未找到,再从 StoreFile 中读取。

Shell

查看当前 namespace,

list_namespace

创建一个新的 namespace“test”,

create_namespace 'test'

并设置最大建表数为 10

alter_namespace 'test', {METHOD => 'set', 'hbase.namespace.quota.maxtables' => '10'}

创建一个表 test01,有两个列族 f1,f2,设置 f1 的最大版本数为 5,设置表的预分区为 3 个

create 'test:test01', {NAME=>'f1', VERSIONS=>5}, 'f2', SPLITS => ['10','20','30']

创建另一个表 test02,有两个列族 g1,g2

create 'test:test02','g1','g2'

查看 namespace test 中的表

list

删除表 test02 中的 g2 列

alter 'test:test02, 'delete'=>'g2'

修改表 test02 的的属性 MAX_FILESIZE 为 256128256

alter 'test:test02', MAX_FILESIZE='256128256'

为表 test02 增加一列 s1,设置 s1 的最大版本数为 5

alter 'test:test02', {NAME=>'s1', VERSIONS=>'5'}

删除表 test02

drop 'test:test02'

为表 test01 添加数据

put 'test:test01', 'r01', 'f1:name', 'zhang'
put 'test:test01', 'r01', 'f1:num', '10'
put 'test:test01', 'r01', 'f1:addr', 'shanghai'
put 'test:test01', 'r02', 'f1:name', 'wang'
put 'test:test01', 'r02', 'f2:addr', 'hangzhou'

全表扫描数据

scan 'test:test01'

查看 r02 行的 name 列数据

get 'test:test01', 'r02', 'f1:name'

删除 r02 行 f2 列的数据

delete 'test:test01', 'r02', 'f2'

清空表 test01 的数据

truncate_preserve 'test:test01'

现有以下关系型数据库中的表和数据,要求将其转换为适合于 HBase 存储的表,绘出表格,然后插入数据,并查看数据。建议用列族的方式来创建。

学号(S_No) 姓名(S_Name) 年龄(S_Age)
2018001 Lily 21
2018002 Jacky 22
2018003 Mouse 21
课程号(C_No) 课程名(C_Name)
123001 English
123002 Computer
学号(SC_Sno) 课程号(SC_Cno) 成绩(SC_Score)
2018001 123001 89
2018001 123002 78
2018002 123001 90
2018002 123002 69
2018003 123001 78
2018003 1230023 65

构造的 HBase 表格可以为(仅供参考)

image-20220428111607380

主键的列名是随机分配的,因此无需创建主键列。

创建表:create 表名,字段名 1 / 列族 1,字段名 2 / 列族 2,……

create 'scs','basic_info','score'

插入数据:put 表名,rowkey,字段名 1,字段值 1

put 'scs','s001','basic_info:s_no','2018001'
put 'scs','s001','basic_info:s_name','Lily'
put 'scs','s001','basic_info:s_age','21'
put 'scs','s001','score:english','89'
put 'scs','s001','score:computer','78'

查看数据:scan 表名

scan 'scs'

如果在学生表中为学生增加一项“联系电话(S_Tel)”,如何完成?插入数据后,查看数据。(提示:使用列族,添加列限定符)

put 'scs','s001','basic_info:tel','185CCCCCCCC'

若查看 rowkey 为“s001”(值是示例,根据你自己设置的 rowkey 值进行查询)的所有课程成绩(SC 表),如何完成?(提示:get 表名 key 值)

get 'scs','s001','score'

Hive

Hive 概念

Hive 是一个数据仓库基础工具在 Hadoop 中用来处理结构化数据。

Hive 特点

  1. Hive 是一个构建于 Hadoop 顶层的数据仓库工具
  2. 支持大规模数据存储、分析,具有良好的可扩展性
  3. 某种程度上可以看作是用户编程接口,本身不存储和处理数据
  4. 依赖分布式文件系统 HDFS 存储数据
  5. 依赖分布式并行计算模型 MapReduce 处理数据
  6. 定义了简单的类似 SQL 的查询语言——HiveQL
  7. 用户可以通过编写的 HiveQL 语句运行 MapReduce 任务
  8. 可以很容易把原来构建在关系数据库上的数据仓库应用程序移植到 Hadoop 平台上
  9. 是一个可以提供有效、合理、直观组织和使用数据的分析工具

image-20220428115338830

Hive 具有的特点非常适用于数据仓库

  1. 采用批处理方式处理海量数据

  2. 提供适合数据仓库操作的工具

Hive 缺点

  1. 延迟较高:默认 MR 为执行引擎,MR 延迟较高。

  2. 不支持物化视图 :Hive 支持普通视图,不支持物化视图(数据转换成表)。Hive 不能在视图上更新、插入、删除数据。

  3. 不适用 OLTP :暂不支持列级别的数据添加、更新、删除操作。

  4. 暂不支持存储过程

Hive 应用场景

  1. 数据挖掘

  2. 非实时分析

  3. 数据汇总

  4. 数据仓库

Hive 系统架构

由用户接口模块、驱动模块和元数据存储模块构成

image-20220428120944208

相关概念

  1. Metastore,存储元数据的角色。Hive 将元数据存储在传统的关系型数据库(mysql、derby)中。
  2. Hive 中的元数据包括:表的名字、表的数据所在的 HDFS 目录、数据在目录中的分布规则、以及其他表属性。
  3. 正如 Oracle 使用的 SQL 方言是 PL/SQL,Hive 所使用的 SQL 方言是 HQL。
  4. Hive 将 HQL 语句转换成分布式的 MapReduce 计算任务。
  5. Hive 计算引擎可以是 Apache MapReduce 或者 Apache Spark。

Hive 数据存储模型

image-20220428123540480

内部表与外部表

  1. 表(Table)
    1. 表是数据管理和存储的基本对象,由元数据和表数据组成
    2. 元数据保存在 Metastore 中
    3. 表的数据保存在存储引擎中,如 HDFS 文件系统
  2. Hive+HDFS
    1. 表数据保存在 HDFS,每个表对应一个目录,表名=目录名。每个数据库对应一个目录,目录名=数据库名.db,表数据是目录内的文件。表目录在数据库目录下。在表目录下,数据还可以按照分区和分桶方式分布。
内部表(Table)/托管表
  1. 内部表与关系数据库中的 Table 在概念上类似;
  2. 每个 Table 在 Hive 中都有一个相应的目录存储数据;
  3. 所有的 Table 数据(不包括 External Table)都保存在这个目录中;
  4. 内部表的创建过程和数据加载过程,可以分别独立完成,也可以在同一个语句中完成,在加载数据的过程中,数据会被移动到数据仓库目录中;之后对数据访问将会直接在数据仓库目录中完成。
  5. 删除表时,元数据与数据都会被删除。
外部表(External Table)
  1. 外部表指向已经在 HDFS 中存在的数据。
  2. 它和内部表在元数据的组织上是相同的,而实际数据的存储则有较大的差异。
  3. 外部表只有一个过程,创建表和加载数据同时完成(CREATE EXTERNAL TABLE …… LOCATION),实际数据是存储在 LOCATION 后面指定的 HDFS 路径中,并不会移动到数据仓库目录中。
  4. 删除表时,仅删除该链接,不删除数据。
注意
  1. 默认创建内部表/托管表,Hive 会将数据移动到数据仓库目录。
  2. 创建外部表,这时 Hive 会到仓库目录以外的位置访问数据。
  3. 如果所有处理都由 Hive 完成,建议使用内部表/托管表。
  4. 如果要用 Hive 和其它工具来处理同一个数据集,建议使用外部表。
内部表 外部表
创建加载可以独立完成 数据移到仓库目录 数据位置不移动
创建加载同时完成 元数据和数据会被一起删除 只删除元数据

分区分桶

分区

通过特定条件将表的数据分发到分区目录中,或者将分区中的数据分发到子分区目录中。

  1. 分区的作用:减少不必要的全表扫描,提升查询效率。

image-20220428133543362

分桶

通过分桶键哈希取模的方式,将表或分区中的数据随机、均匀地分发到 N 个桶中,桶数 N 一般为质数,桶编号为 0, 1, …, N-1

  1. 分桶的作用:提高取样效率,提高 Join 查询效率,对应桶抽取数据就好

image-20220428133755400

区别
  1. 分区:
    1. 数据表可以按照某个字段的值划分分区。
    2. 每个分区是一个目录。
    3. 分区数量不固定。
    4. 分区下可再有分区或者桶。
  2. 分桶
    1. 数据可以根据桶的方式将不同数据放入不同的桶中。
    2. 每个桶是一个文件。
    3. 建表时指定桶个数,桶内可排序。
    4. 数据按照某个字段的值 Hash 后放入某个桶中。

文件格式

Text 表
  1. 系统默认的表类型,无压缩,行存储,仅支持批量 Insert
  2. 分析查询的性能较低,主要用于导入原始文本数据时建立过渡表
ORC 表
  1. 优化的列式存储,轻量级索引,压缩比高,仅支持批量 Insert
  2. Hive 计算的主要表类型,主要用于数仓的离线分析,通常由 Text 表生成

用户向 Hive 输入一段命令或查询时,Hive 需要与 Hadoop 交互工作来完成该操作

  1. 驱动模块接收该命令或查询编译器
  2. 对该命令或查询进行解析编译
  3. 由优化器对该命令或查询进行优化计算
  4. 该命令或查询通过执行器进行执行

Shell

首先通过 show databases 命令查看已存在的数据库。然后使用 create 命令创建一个新的数据库,在本实验中命名为“demo+ 学号后 4 位”

create database 'demo0737';

使用 USE 命令,将你创建的 demo 数据库设置为当前使用的数据库

use 'demo0737';

通过 create table 命令创建一个表,表名 users,创建完成后,用 describe 表名 命令查看建表结果,确保建表成功。表属性包含:

id:int,记录编号,具有唯一性

uid:string,用户 id

item_id:string,商品 id

behavior_type:int,包括浏览、收藏、加购物车、购买,分别为 1、2、3、4

item_category:string,商品分类

visit_date:date,该记录产生时间

province:string,用户所在省份

create external  table users(
 id int,uid string ,item_id string,behavior_type int ,item_category string, visit_date date,province string 
)row format delimited fields terminated by ‘\t;

按照给定的附件中表样式(users_table.txt,字段间隔为 tab)自己编写更多的 txt 数据,然后使用 load data local inpath +‘路径’命令,注意:local 表示加载本地系统中文件的数据,而不是 HDFS 中的。

load data local inpath './users_table.txt' into table users;

查询 visit_date 为“2019-11-11”的记录。(给定检索条件)

select * from demo0737 where visit_date='2019-11-11';

查询 visit_date 为“2019-11-11”的前 5 条用户购买商品的商品分类和用户所在省份。(给定检索条件,并限定检索数量,limit)

select item_category,province from demo0737 where visit_date='2019-11-11' limit 5;

使用聚合函数 count()计算出表内有多少行数据

select count(*) from demo0737;

查询 uid 不重复的数据有多少条。(distinct)

select distinct uid from demo0737;

Spark

Spark 特点

1.运行速度快

2.容易使用

3.通用性

4.运行模式多样

与 Hadoop 的关系

Spark 在借鉴 Hadoop MapReduce 优点的同时,

  1. Spark 编辑模型比 Hadoop 更灵活,
  2. spark 提高了内存计算,对于迭代运算效率更高。
  3. Spark 基于 DAG 的任务调度执行机制优于 Hadoop 的迭代执行机制。

Spark 生态系统

主要包含了 Spark Core、Spark SQL、Spark Streaming、MLLib 和 GraphX 等组件。

Spark 生态系统组件应用场景

  1. 复杂的批量数据处理

  2. 基于历史数据的交互式查询

  3. 基于实时数据流的数据处理

  4. 基于历史数据的数据挖掘

  5. 图结构数据的处理

Spark 运行架构

集群资源管理器(Cluster Manager)

运行作业任务的工作节点(Worker Node)

每个应用的任务控制节点(Driver)

每个工作节点上负责具体任务的执行进程(Executor)

image-20220428185119904

  1. 一个应用由一个任务控制节点 Driver 和若干个作业 Job 构成,一个作业由多个阶段 Stage 构成,一个阶段由多个没有 Shuffle 关系的任务 Task 组成。
  2. 当执行一个应用时,Driver 会向集群管理器申请资源,启动 Executor,并向 Executor 发送应用程序代码和文件,然后在 Executor 上执行任务,运行结束后,执行结果会返回给 Driver,或者写到 HDFS 或者其他数据库中。

image-20220428185344469

Spark 运行基本流程

image-20220428190140394

SparkContext对象代表了和一个集群的连接

  1. 首先为应用构建起基本的运行环境,即由 Driver 创建一个 SparkContext,进行资源的申请、任务的分配和监控;
  2. 资源管理器为 Executor 分配资源,并启动 Executor 进程;
  3. SparkContext 根据 RDD 的依赖关系构建 DAG 图,DAG 图提交给 DAGScheduler 解析成 Stage,然后把一个个 TaskSet 提交给底层调度器 TaskScheduler 处理;Executor 向 SparkContext 申请 Task,Task Scheduler 将 Task 发放给 Executor 运行,并提供应用程序代码;
  4. Task 在 Executor 上运行,把执行结果反馈给 TaskScheduler,然后反馈给 DAGScheduler,运行完毕后写入数据并释放所有资源。

RDD 工作原理

RDD(弹性分布式数据集) 概念

一个只读的分区记录集合。不能直接修改,只能基于稳定的物理存储中的数据集创建 RDD,或者通过在其他 RDD 上执行确定的转换操作(如 map、join 和 group by)而创建得到新的 RDD。

RDD 执行过程

  1. RDD 读入外部数据源进行创建
  2. RDD 经过一系列的转换(Transformation)操作,每一次都会产生不同的 RDD,供给下一个转换操作使用
  3. 最后一个 RDD 经过“动作”操作进行转换,并输出到外部数据源

这一系列处理称为一个 Lineage(血缘关系),即 DAG 拓扑排序的结果。 优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单。

image-20220428192622942

RDD 特性

  1. 高效的容错性
  2. 中间结果持久化到内存
  3. 存放的数据可以是未序列化的 Java 对象

宽依赖与窄依赖

image-20220428192802610

窄依赖

一个父 RDD 的分区对应于一个子 RDD 的分区或多个父 RDD 的分区对应于一个子 RDD 的分区。多对一或一对一

宽依赖

存在一个父 RDD 的一个分区对应一个子 RDD 的多个分区。多对多

fork/join的优化原理

image-20220428193606941

所以,如果连续的变换操作序列都是窄依赖,就可以把多个 fork/join 合并为一个,这个过程称为“流水线(pipeline)优化”。

  1. 窄依赖可以实现“流水线”优化
  2. 宽依赖无法实现“流水线”优化(节点间需 shuffle)

阶段划分

Spark 通过分析各个 RDD 的依赖关系生成了 DAG,再通过分析各个 RDD 中的分区之间的依赖关系来决定如何划分 Stage,具体划分方法是:

  1. 在 DAG 中进行反向解析,遇到宽依赖就断开
  2. 遇到窄依赖就把当前的 RDD 加入到 Stage 中
  3. 将窄依赖尽量划分在同一个 Stage 中,可以实现流水线计算

image-20220428195422536

被分成三个 Stage,在 Stage2 中,从 map 到 union 都是窄依赖,这两步操作可以形成一个流水线操作

RDD 运行原理

  1. 创建 RDD 对象;
  2. SparkContext 负责计算 RDD 之间的依赖关系,构建 DAG;
  3. DAGScheduler 负责把 DAG 图分解成多个 Stage,每个 Stage 中包含了多个 Task,每个 Task 会被 TaskScheduler 分发给各个 WorkerNode 上的 Executor 去执行。

image-20220428221524926

Spark SQL 工作原理

  1. 将 SQL 转换成抽象语法树

  2. 将抽象语法树转换成查询块

  3. 将查询块转换成逻辑查询计划

  4. 重写逻辑查询计划

  5. 讲逻辑计划转成物理计划

  6. 选择最佳优化查询策略

Spark Mllib 基本原理

MLlib 是 Spark 的机器学习库,旨在简化机器学习的工程实践工作。Mllib 常见机器学习问题:分类、回归、聚类、协同过滤。

DataFrame

使用 SparkSQL 中的 DataFrame 作为数据集,可以容纳各种数据类型。较之 RDD,DataFrame 包含了 schema 信息,更加类似传统数据库中的二维表。

它被 ML Pipeline 用来存储源数据。

Transformer

转换器,是一种可以将一个 DataFrame 转换为另一个 DataFrame 的算法。如一个模型就是一个 Transformer。它可以把一个不包含预测标签的测试集的 DataFrame 打上标签,转化成另一个包含预测标签的 DataFrame

大致方法原型为 DataFrame Transformer.transform(DataFrame)

Estimator

估计器或评估器,它是某种学习算法,或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生成一个 Transformer

从技术上将,估计器有一个抽象方法 fit () 需要被具体算法去实现,它接收一个 DataFrame 并产生一个转换器。

大致方法原型如下 Transformer Estimator.fit(DataFrame)

即通过 Estimator 对某个数据集进行 fit 操作后得到 Transformer。

Parameter

参数,参数被用来设置 Transformer 或者 Estimator 的参数。现在所有转换器和估计器可共享用于指定参数的公共 API。

PipeLine

流水线或管道,流水线将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

注意流水线本身也是一个 Estimator,在执行完 fit 操作后,产生一个 PipelineModel,它也是一个 Transformer。

参考

Hadoop 项目结构_weixin_33727510 的博客-CSDN 博客

#复习资料