@marlin
2016-08-31T15:39:50.000000Z
字数 1736
阅读 3304
Spark
Streaming
Exactly-Once
首先我们梳理Exactly once所涉及到的相关内容,并明确一个讨论基础:
1. 将Spark计算引擎看作是一个整体
2. Spark Streaming(以下简写SS)需要对接上下游
3. Exactly once语义(以下简写EO,或EO语义)涉及到
- 上游是否EO到SS
- SS作为整体是否保证了EO
- SS是否将数据EO地写出到了下游
4. 只有保证了上述三个过程的EO,才能保证SS是能够实现EO的
详情参考这里
简要说一下,Spark Streaming使用Direct模式对接上游kafka。无论kafka有多少个partition,使用Direct模式总能保证SS中有相同数量的partition与之相对,也就是说SS中的KafkaRDD的并发数量在Direct模式下是由上游kafka决定的。在这个模式下,kafka的offset是作为KafkaRDD的一部分存在,会存储在checkpoints中,由于checkpoints只存储offset内容,而不存储数据,这就使得checkpoints是相对轻的操作。这就使得SS在遇到故障时,可以从checkpoint中恢复上游kafka的offset,从而保证exactly once。
详情参考这里
首先输出操作是具有At-least Once语义的,也就是说SS可以保证需要输出的数据一定会输出出去,只不过由于失败等原因可能会输出多次。那么如何保证Exactly once?第一种“鸵鸟做法”,就是期望下游(数据)具有幂等特性。第二种使用事务更新,简要代码如下:
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
这样保证同一个partition要么一起更新成功,要么一起失败,通过uniqueId来标识这一次的更新,这就要求下游支持事务机制。
SS内部的实现机制是基于RDD模型的,RDD为保证计算过程中数据不丢失使用了checkpoint机制,也就是说其计算逻辑是RDD的变换过程,也就是DAG,可以在计算过程中的任何一个阶段(也就是这个阶段的RDD)上使用checkpoint方法,就可以保证当后续计算失败,可以从这个checkpoint重新算起,使得计算延续下去。当Spark Streaming场景下,其天然会进行batch操作,也就是说kafka过来的数据,每秒(一个固定batch的时间周期)会对当前kafka中的数据产生一个RDD,那么后续计算就是在这个RDD上进行的。只需要在kafkaRDD这个位置合理使用了checkpoint(这一点在前面已经讲过,可以保证)就能保证SS内部的Exactly once。
注意一点:SS中没有Tuple级别的ACK,其操作必然是在RDD的某个partition上的,要么全做,要么不做,要么失败,要么成功,都是基于RDD的partition的。
在Spark最近的release note中没有明确提及event time。应该是对event time的支持力度还是有限的。
以下是我对event time在SS中可能的使用方式的理解
由于SS是基于batch rdd来实现的,如果在kafka接入到SS时就已经使用event time对数据进行划分了,也就是说batch rdd的划分方式使用数据本身的时间戳就可以实现对业务时间的支持。