数据处理
数据集成
首先要有可靠、完整的数据流,后续才能谈得上在上层对这些数据进行存储、可视化、分析
- ETL:提取 (Extract)——转换 (Transform)——加载 (Load),在数据源抽取后首先进行转换,然后将转换的结果写入目的地
- ELT:提取 (Extract)——加载 (Load)——变换 (Transform),在抽取后将结果先写入目的地,然后利用数据库的聚合分析能力或者外部计算框架在最后对数据做转换
ELT相比ETL,因为转换是在目的地,所以可以根据后续使用的情况做调整,比较灵活
数据清洗
需要一些清洗规则帮助确保数据质量和一致性,减少数据中的错误、不完整、重复等问题,使得数据标准、干净、连续
数据变换
- 数据变换是数据分析必须的预处理步骤
- 数据平滑:去除数据中的噪声,将连续数据离散化
- 数据聚集:对数据进行汇总
- 数据概化:将数据由较低的概念抽象成为较高的概念,减少数据复杂度,即用更高的概念替代更低的概念。比如说上海、杭州、深圳、北京可以概化为中国
- 数据规范化:使属性数据按比例缩放,这样就将原来的数值映射到一个新的特定区域中
- Min-max 规范化:将原始数据变换到[0,1]的空间中
- Z-Score 规范化:新数值 =(原数值 - 均值)/ 标准差
- 小数定标规范化:移动小数点的位置来进行规范化。小数点移动多少位取决于属性 A 的取值中的最大绝对值
- 属性构造:构造出新的属性并添加到属性集中
批处理
- 输入数据是有界且不可变的
- 除了输出 其他操作都没有副作用
UNIX的管道
MapReduce的不足
- 抽象层次不足,太原始
- 维护成本:每一步的 MapReduce 都有可能出错,为了这些异常处理,就需要协调系统,协调系统又是一个复杂度的来源
- 时间性能:对 MapReduce 的配置细节不理解,难以发挥其高性能,每一步计算都要进行硬盘的读取和写入
- 只支持批处理
需要的:
- 一种技术抽象让多步骤数据处理变得易于维护
- 不要复杂的配置,需要能自动进行性能优化
- 要能把数据处理的描述语言,与背后的运行引擎解耦合开来
- 要统一批处理和流处理的编程模型
- 要在架构层面提供异常处理和数据监控的能力
分布式批处理需要解决的问题
- 如何将输入数据分区
- 容错:任务可能随时会失败
流处理
- 复杂事件处理(CEP):存储一个搜索模式,在流数据流经时判断是否符合这样的模式
- 流分析:对一定窗口期内的数据进行计算、分析
- 通过流来进行RPC
消息系统
早期使用消息队列来实现流处理:
stateDiagram
direction LR
数据源 --> 消息队列1
消息队列1 --> 处理逻辑1
处理逻辑1 --> 消息队列2
消息队列2 --> 处理逻辑3
消息队列2 --> 处理逻辑4
处理逻辑3 --> 消息队列3
处理逻辑4 --> 消息队列3
消息系统与传统的数据库有着本质的区别:数据临时与永久之分
分区日志消息系统:结合了传统消息系统与数据库:既是流,又能存
流与数据库
- 数据库的变更通过流与系统异构存储保持同步
- 变更数据捕获(CDC):初始快照 + 后续变更操作
- 事件溯源:回放所有日志得到数据的最终状态
本质上就是状态复制机的实现
本质上数据库的日志就是流,数据库里的数据就是当前流重放的快照
DataFlow模型
核心概念:
- ParDo,地位相当于 MapReduce 里的 Map 阶段。所有的输入数据,都会被一个 DoFn,也就是处理函数处理
- GroupByKey,地位则是 MapReduce 里的 Shuffle 操作。把相同的 Key 汇总到一起,然后再通过一个 ParDo 下的 DoFn 进行处理
时间问题
流处理依赖于本地时间戳,时钟是不可靠的,同时考虑消息堆积、软件错误等问题,基于时间戳的流分析可能不准
窗口类型
- 轮转窗口:固定长度,相互之间没有重叠且紧邻 [1,3] [4,6]
- 跳跃窗口:固定长度,允许之间重叠以进行平滑过度 [1,3] [2,4]
- 滑动窗口
- 会话窗口:没有固定时间,将同一用户的事件组合在一起
容错
- 微批处理:将流切成固定大小的块,如果这个块发生错误,则丢弃这个块的所有输出
- 校验点:定期生成检查点,如果流处理发生错误,就回到上一个检查点重新跑
这需要消费端保证幂等性,否则为了容错会输出不止一次导致副作用
特征工程
利用工程手段从“用户信息”“物品信息”“场景信息”中提取特征的过程,在已有的、可获得的数据基础上,“尽量”保留有用信息是现实中构建特征工程的原则
常用特征
- 用户行为数据:区分隐式反馈与显式反馈,对用户行为数据的采集与使用与业务强相关
- 用户关系数据:人与人之间连接的记录,区分强关系(主动建立连接)与弱关系(间接的关系导致的连接)
- 属性、标签类数据:物品属性、人口属性、主动打的标签等
- 内容类数据:描述型文字、图片,甚至视频,需要进一步通过NLP、图像识别等转为结构化信息才能作为特征使用
- 场景信息:描述的是用户所处的客观的推荐环境,常见的有所处于什么时空
特征处理
进行特征处理的目的,是把所有的特征全部转换成一个数值型的特征向量
类别特征处理:
One-hot 编码(也被称为独热编码),它是将类别、ID 型特征转换成数值向量的一种最典型的编码方式。它通过把所有其他维度置为 0,单独将当前类别或者 ID 对应的维度置为 1 的方式生成特征向量
周二 => [0,1,0,0,0,0,0] -- 将一周7天视为7个维度,将周二所在的维度设为1
数值类特征处理:
- 归一化
- 分桶:将样本按照某特征的值从高到低排序,然后按照桶的数量找到分位数,将样本分到各自的桶中,再用桶 ID 作为特征值
分桶:
[1,1,1,1,1,1,5,8,10] => [(1,5),(5,10)]
Embedding
用一个数值向量“表示”一个对象(Object)的方法
词 Embedding:
在通过神经网络训练得到模型,一个词就可以通过模型推断,转为向量
图 Embedding:
- Deep Walk:在由物品组成的图结构上进行随机游走,产生大量物品序列,然后将这些物品序列作为训练样本输入 Word2vec 进行训练,最终得到物品的 Embedding
- Node2vec:通过调整随机游走跳转概率的方法,让 Graph Embedding 的结果在网络的同质性(Homophily)和结构性(Structural Equivalence)中进行权衡。同质性指的是距离相近节点的 Embedding 应该尽量近似,结构性指的是结构上相似的节点的 Embedding 应该尽量接近
- 为了使 Graph Embedding 的结果能够表达网络的“结构性”,在随机游走的过程中,需要让游走的过程更倾向于 BFS(Breadth First Search,广度优先搜索),因为 BFS 会更多地在当前节点的邻域中进行游走遍历,相当于对当前节点周边的网络结构进行一次“微观扫描”。当前节点是“局部中心节点”,还是“边缘节点”,亦或是“连接性节点”,其生成的序列包含的节点数量和顺序必然是不同的,从而让最终的 Embedding 抓取到更多结构性信息
- 而为了表达“同质性”,随机游走要更倾向于 DFS(Depth First Search,深度优先搜索)才行,因为 DFS 更有可能通过多次跳转,游走到远方的节点上。但无论怎样,DFS 的游走更大概率会在一个大的集团内部进行,这就使得一个集团或者社区内部节点的 Embedding 更为相似,从而更多地表达网络的“同质性”
Embedding 可以直接使用,在到 Embedding 向量之后,直接利用 Embedding 向量的相似性实现某些推荐系统的功能。也可以预先训练好物品和用户的 Embedding 之后,不直接应用,而是把这些 Embedding 向量作为特征向量的一部分,跟其余的特征向量拼接起来,作为推荐模型的输入参与训练。最后是一种 E2E 的应用,即不预先训练 Embedding,而是把 Embedding 的训练与深度学习推荐模型结合起来,采用统一的、端到端的方式一起训练,直接得到包含 Embedding 层的推荐模型。
非负矩阵因式分解
输入多个样本数据,每个样本数据都是一个m维数值向量,首先把我们的数据集用矩阵的形式写出来,每一列是一个数据,而每一行是这些数据对应维度的数值。于是我们就有了一个大小为m*n的输入矩阵。而算法的目标就是将这个矩阵分解为另外两个非负矩阵的积
模式
Workflow
复制模式
flowchart
数据集 --> 复制器
复制器 --> 工作流1
复制器 --> 工作流2
复制器 --> 工作流3
过滤模式
flowchart
数据集(1,2,3) --> 过滤器
过滤器 --> 工作流(1,2)
分离模式
flowchart
数据集(1,2,3) --> 分离器
分离器 --> 工作流1(1,2)
分离器 --> 工作流2(2,3)
合并模式
flowchart
数据集1(1,2) --> 合并器
数据集2(2,3) --> 合并器
合并器 --> 工作流(1,2,3)
发布订阅
架构
Lambda
完整的数据集 = λ (实时数据) * λ (历史数据)
stateDiagram-v2
数据 --> 批处理层
数据 --> 速度层
state 服务层 {
批处理数据
速度数据
}
速度层 --> 速度数据
批处理层 --> 批处理数据
应用 --> 批处理数据
应用 --> 速度数据
- 批处理层通过处理所有的已有历史数据来实现数据的准确性,是基于完整的数据集来重新计算的
- 速度层通过流处理,提供最新数据的实时视图来最小化延迟
Kappa
stateDiagram-v2
state 速度层 {
任务N
任务N+1
}
数据 --> 速度层
state 服务层 {
数据N
数据N+1
}
任务N --> 数据N
任务N+1 --> 数据N+1
应用 --> 数据N+1
- 一个可以重跑历史数据的消息队列
Spark
- Spark SQL 主要用于结构化数据的处理:支持以SQL语法查询各种数据源
- Spark Streaming:微批处理 达到类流处理
- MLlib:机器学习库
- Graphx:用于图形计算和图形并行计算的新组件
Spark 比 MapReduce 快的原因:更为简单的 RDD 编程模型减少了作业调度次数,以及优先使用内存
- SparkContext 启动 DAGScheduler 构造执行的 DAG 图,拆分成计算任务
- Driver 向 Cluster Manager 请求计算资源,分配 Worker
- Worker 向 Driver 注册并下载代码执行
RDD
- 弹性分布式数据集(Resilient Distributed Dataset)
分区:同一个 RDD 包含的数据被存储在系统的不同节点中,需要读取时根据ID 和分区的 index 可以唯一确定对应数据块的编号,从而通过底层存储层的接口中提取到数据进行处理
不可变:一个 RDD 都是只读的,只可以对现有的 RDD 进行转换(Transformation)操作,得到新的 RDD 作为中间计算的结果
并行:由于上面两个特性,就可以并行对 RDD 进行操作
结构
SparkContext:所有 Spark 功能的入口,它代表了与 Spark 节点的连接,一个线程只有一个 SparkContext
SparkConf: 一些参数配置信息
Partitions:数据的逻辑结构,每个 Partition 会映射到某个节点内存或硬盘的一个数据块
Dependencies:每一步产生的 RDD 里都会存储它的依赖关系,即它是通过哪个 RDD 经过哪个转换操作得到的
窄依赖允许子 RDD 的每个分区可以被并行处理产生,而宽依赖则必须等父 RDD 的所有分区都被计算好之后才能开始处理
Checkpoint:对于一些计算过程比较耗时的 RDD,可以进行持久化,标记这个 RDD 有被检查点处理过,并且清空它的所有依赖关系,这样在进行崩溃恢复的时候就不用在向前向父 RDD 回溯
Storage Level:记录 RDD 持久化时的存储级别,内存或内存硬盘 或在分区节点上内存、内存硬盘
Iterator:迭代函数,Compute:计算函数 都是用来表示 RDD 怎样通过父 RDD 计算得到的
数据操作
大部分操作跟Stream差不多
- 转换(Transformation):把一个 RDD 转换成另一个 RDD map、filter、mapPartitions,groupByKey
- 动作(Action):通过计算返回一个结果 collect、reduce、count,countByKey
Spark 的 Shuffle 操作跟 MapReduce 是一样的,其通过生产与消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换
调度系统
- DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet
- SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构
- 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源
- 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task
- 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。
存储系统
SparkSQL
DataSet
DataSet 所描述的数据都被组织到有名字的列中,就像关系型数据库中的表一样
DataFrame
可以被看作是一种特殊的 DataSet,但是它的每一列并不存储类型信息,所以在编译时并不能发现类型错误
SparkStreaming
Spark Streaming 用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输出的数据也是一块一块的,通过提供了一个对于流数据的抽象 DStream 来描述数据流,底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD
主要缺点是实时计算延迟较高,这是由于 Spark Streaming 不支持太小的批处理的时间间隔
StructuredStreaming
输入的数据流按照时间间隔(以一秒为例)划分成数据段。每一秒都会把新输入的数据添加到表中,Spark 也会每秒更新输出结果。输出结果也是表的形式,输出表可以写入硬盘或者 HDFS。
Structured Streaming 提供一个 level 更高的 API,这样的数据抽象可以让开发者用一套统一的方案去处理批处理和流处理
相比 SparkStreaming,StructuredStreaming可以支持更小的时间间隔,2.3 也引入了连续处理模式,同时也有对事件时间的支持
Flink
架构
核心模型
最核心的数据结构是 Stream,它代表一个运行在多个分区上的并行流
当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow:
程序天生是并行和分布式的。一个 Stream 可以包含多个分区(Stream Partitions),一个操作符可以被分成多个操作符子任务,每一个子任务是在不同的线程或者不同的机器节点中独立执行的:
Beam
一个适配流处理、批处理的中间层
编程模型
- 窗口:将无边界数据根据事件时间分成了一个个有限的数据集
- 水位线:来表示与数据事件时间相关联的输入完整性的概念,用来测量数据进度
- 触发器:指的是表示在具体什么时候,数据处理逻辑会真正地触发窗口中的数据被计算
- 累加模式:如果在同一窗口中得到多个运算结果,我们应该如何处理这些运算结果
PCollection
- Parallel Collection,意思是可并行计算的数据集
特性:
- 需要编码器:需要将你的数据序列化/反序列化以在网络上传输
- 无序:以个 PCollection 被分配到不同的机器上执行,那么为了保证最大的处理输出,不同机器都是独立运行的,它的执行顺序就无从得知了
- 无界:Beam 要统一表达有界数据和无界数据,所以没有限制它的容量
- 不可变
Transform
stateDiagram-v2
数据1 --> Transform
数据2 --> Transform
Transform --> 数据3
Transform --> 数据4
常见的 Transform 接口:
- ParDo:类似于flatMap
- GroupByKey:把一个 Key/Value 的数据集按 Key 归并
Pipeline
stateDiagram-v2
输入 --> PCollection1: Transform1
PCollection1 --> PCollection2: Transform2
PCollection2 --> PCollection3: Transform3
PCollection3 --> 输出: Transform4
分布式环境下,整个数据流水线会启动 N 个 Workers 来同时处理 PCollection,在具体处理某一个特定 Transform 的时候,数据流水线会将这个 Transform 的输入数据集 PCollection 里面的元素分割成不同的 Bundle,将这些 Bundle 分发给不同的 Worker 来处理
在单个 Transfrom中,如果某一个 Bundle 里面的元素因为任意原因导致处理失败了,则这整个 Bundle 里的元素都必须重新处理
在多步骤的 Transform 上,如果处理的一个 Bundle 元素发生错误了,则这个元素所在的整个 Bundle 以及与这个 Bundle 有关联的所有 Bundle 都必须重新处理
IO
- XXIO.read()
- XXIO.write()
StreamingSQL
/* 窗口:最近10个温度的平均值 */
Select bid, avg(t) as T From BoilerStream WINDOW HOPPING (SIZE 10, ADVANCE BY 1);
/* join */
from TempStream[temp > 30.0]#window.time(1 min) as T
join RegulatorStream[isOn == false]#window.length(1) as R
on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream; // Siddhi Streaming SQL
/* 某个模式有没有在特定的时间段内发生 */
from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ]
within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;