如何更有效率地拉取数据
1. 背景
以前从来没有想过,拉取数据这事,要把它做得好一点点,也不是那么容易和直接的。
不同于自己能控制的系统中的数据的导入导出,当你用别个系统提供的有限的手段,去处理量大一点的待获取数据时,就会发现有很多很现实的问题需要自己想办法去解决。
如果是自己系统,比如 clickhouse 吧,直接:
clickhouse-client --query="select * from xx FORMAT csv" > out.csv
就可以把“全部”数据都导出到 out.csv
文件中了。
一次性地这个“全部”可以是多少呢?
对于 2G 左右的数据量,6, 7 个字段数量的话大概是 1 亿条,你机器内存比 16G 多一点就可以一次导出了,导出的文件不到 15G , tar.gz 能压缩到 2G 内。即使内存不够,通过时间分片一下,2, 3 次也很容易处理完。整体时间应该不会超过 5 分钟。
不过,如果是从其它系统拉取数据,假设单次最多 5000 条,那么 1 亿条最少需要请求 2 万次,平均每次 1 秒时间,就是 2 万秒时间, 5 个多小时吧。但是注意,这是完全理想情况,现实中如果不附加额外的处理机制,只是简单写写,要想顺利完成全部数据的获取,这 5 个小时可能变成 50 个小时。即使尽量优化,最终的时间最快估计也要 10+ 个小时。
因为,在保证可行的前提下,你是没有办法得知单次请求如何才能取到足够的 5000 条数据的。
select * from t order by date asc limit 5000 offset ?
offset 在 100 万的时候,系统估计已经跑不动了。
通常这种情况,我们是通过一个单调键(一般是日期),不断顺序地获取数据:
select * from t where date >= A and date < B
但是这时,就没有办法保证 [A, B) 段正好有 5000 条数据。
嗯,我是想说,在 [A, B) 如果有 1000 万的数据被直接查询,那系统也肯定挂了,不是对方挂就是你自己挂,或者中间请求超时挂,反正结果一样,你数据拿不全。
2. 问题定义
- 通过 SQL ,获取 1 亿条数据,假设是 2 年范围的吧。
- 单次最多获取 5000 条。
- 单次请求,假设需要 1 秒时间。
- 这 1 亿条都是历史事实数据,即不会再变化的数据。
- 如何能在尽量短的时间内,把数据全部获取完?
这里,只关注获取的策略本身就行,不用额外考虑系统的鲁棒性,假设网络环境稳定,系统也不会有任何不良反应。
我们也不考虑“并发”。可以理解成,这本身是并发之后面对的状况,本来有 10 亿数据, 10 个并发,现在还剩 1 亿要想办法去解决。
关于并发,多说几句,有两个方向可以处理并发时的数据分片:
按时间,或者其它单调键分片。比如 2 年的时间,分成 4 个半年,4 个任务并行执行。
这可能是最直观的分片处理办法,但是,这种方式会给任务的中断与恢复带来麻烦。同时在“部署”时,也必须依赖其它的配套机制,才能把 4 个任务顺利分派出去。
另一种分片方式,是依赖某个维度字段分,比如,按类型,或按国家,每个并行的任务是处理的不同类型,或者不同国家的数据。执行时,就是带了一个过滤条件。
这种方式不会带来任何的额外问题,但是任务数量,受限于维度字段本身的成员数量及数据事实上的分布情况。比如,想并发 4 个任务,那么这个维度的成员数量,必须 >= 4
,一般也要 < 10
,就是它的数量本身和最终的任务数,要能匹配。
当然,无论用哪种方式执行并行的任务,最终的时间肯定是大大的多于 1/4
总时间的,原因是数据的分布本身不均匀,时间上不均匀,维度上也不均匀。所以最终的完成时间依赖分到数据最多的那个任务的时间(也是一个木桶原理的例子)。
3. 问题解决
下面介绍解决问题的各个单项思路。因为每个单项之间是相互依赖的,所以,很难有一个正确的顺序。尽量单个地能把一个点说清楚吧。
3.1. 理解情况
1 亿条数据,每次 5000 条,理解情况 20000 次就能拿完,共计 20000 秒, 5.5 小时。
什么情况下能达到这种最理解的情况?在你曾经拥有过这 1 亿条数据的情况下。
(其实,服务提供方应该提供下面说的数据,这有利于整体系统的效率)
比如,你以前数据已经全量获取过,但是某一天你的磁盘坏了,要重新获取。如果你之前保存过这理想 20000 次的时间区间分布,那么恭喜你,依赖这些精准的时间区间,你每次请求几乎都可以拿到 5000 条数据。
可以使用类似以下的 SQL ,来对于全量数据,得到每 5000 条数据的时间区间分布:
select min(grass_date) as t, max(grass_date) + INTERVAL 1 SECOND, sum(ct), g from ( select grass_date, ct, c , intDiv(c, 4990) as g from ( select grass_date, count(id) as ct, sum(ct) over (order by grass_date) as c from data_table group by grass_date ) ) group by g order by t;
- 这是在 clickhouse 上实际可用的一条 SQL 。(你直接在服务提供方的环境下,想通过 SQL 在有限时间内得到这个结果不一定现实)
- 它依赖窗口函数
sum(ct) over (...)
功能,及其它的整除intDiv
,日期运算+ INTERVAL 1 SECOND
功能。 - 使用 4990 而不是 5000 ,是因为在“秒”这个粒度上,可能刚好在临界值上不止 1 条数据。尽量避免结果中有重叠的时间段。
- 最小粒度,比如“秒”,内,有多于 5000 条的数据的话,那么时间段的重叠不可避免。这需要额外处理。
我在 SQL 方面并不擅长,如果不使用窗口函数,或者不使用 from
子查询,也能达到目的,请告诉我。
最终的数据类似于:
"2021-10-10 00:00:00","2021-10-10 00:19:00",4987,0 "2021-10-10 00:19:01","2021-10-10 00:35:48",4988,1 "2021-10-10 00:35:48","2021-10-10 00:51:19",4991,2 "2021-10-10 00:51:19","2021-10-10 01:06:03",4992,3 "2021-10-10 01:06:03","2021-10-10 01:20:31",4990,4 "2021-10-10 01:20:31","2021-10-10 01:34:16",4990,5 "2021-10-10 01:34:16","2021-10-10 01:47:32",4990,6 "2021-10-10 01:47:32","2021-10-10 01:59:16",4988,7 "2021-10-10 01:59:16","2021-10-10 02:11:10",4992,8 "2021-10-10 02:11:10","2021-10-10 02:22:10",4991,9 "2021-10-10 02:22:10","2021-10-10 02:33:09",4989,10
请求时,按 1,2 列的时间区间去请求数据,得到的就是第 3 列的数据量。
3.2. 如何才能更有效率
回到一般情况,没有上帝视角。
如果每次都能获取到被允许的上限(比如 5000 条)的数据条数,则整体取数请求数最少,花的总时间也越少。
这里补充一下,一般分析型的数据系统,单次请求的“固定成本”远比数据量的“变动成本”要大得多,也就是说,一次取 2000 条,花 1 秒,一次取 1 条,可能也要花 1 秒。
在不了解数据分布的情况下,是没有办法做到每次都刚好取到 5000 条数据的,所以,我们的方向就是,只能不断去试,碰到具体情况,再动态调节。
- 先根据数据总量,比如 2 年 1 亿条数据,可以算出平均每天 27 万。
- 晚上肯定没那么多业务量,所以算平均小时的数据量,按 1 天 18 小时算, 27 万 / 18 小时,平均每小时就是 15 万。
- 15 万对于 5000 ,还太大,继续减小粒度,平均每分钟 2500 条。
如果仅从平均条数看,应该每 2 分钟取数一次。这样,1 小时的范围要取数 30 次,2 年就要取数 52 万次,一次 1 秒,共计 144 小时。
实际的情况还要远多于 144 小时,因为很多时间段是 0 数据的同时,还有一些时间段数据不止 5000 条,不能一次取完。
144 小时对比理想情况的 5.5 小时,相差巨大吧。
有人可能会说,想快,为啥不 1 天取一次,这样不到 1000 轮就可以取完。这个问题前面已经说过了,每次只能取 5000 ,要取完平均每天 27 万的数据:
select * from data where date >= A and date < B limit 5000 offset 265000
平均情况下,都可能出现 offset 265000
,碰到极限一点的情况, offset
再大,对方系统不挂,请求也肯定超时。
而且,取同样的数,系统执行上,随着 offset
的变大,系统的响应时间也会变慢,那时本来 1 秒响应的,可能变成 10 秒响应。
所以,不能使用过大的 offset
,要把 144 小时的时间减少,只能调节每次迭代的 [A, B)
范围。
考虑数据的业务背景,数据量的分布,肯定是服从固定的分布规律的,比如,每天下午 14 点到 16 点,是业务量最大,也是数据量最大的时候。半夜一般就没什么数据量。
因为没有办法事先得到具体的分布(也没有必要事先关心),所以,我们的思路变成,在过程中,根据实际的数据,不断地去刻画这种分布的内在规律,再把积累的信息,用于后面的时间范围分配。比如,当发现 14 点 16 点数据量很大的时候,就每 2 分钟迭代一次。而晚上几乎没有数据,就直接取 6 小时的时间范围。这样可以整体上减少请求量,大幅度提高效率。
目前我的做法是:
- 维护一个
day_per_minute_avg = []
的列表,用于保存 1 天中,每一个分钟的,不断更新的平均数据量。这个列表一共有 1440 个成员。 - 每次通过
[A, B)
范围,获取到具体数据时,结果都会反馈到day_per_minute_avg
当中。比如[A, B)
的范围是 1 天,获取了 3000 条数据,那么计算一下,平均摊到每分钟的量是 2 左右。再通过保存的每分钟的day_per_minute_count
计数,重新更新day_per_minute_avg
中每个成员的值。 - 我把
day_per_minute_avg = []
中的值初始化为5000 / 粒度
,day_per_minute_count
初始值都是 1 。 - 后面会讲到
Timeline
,它返回迭代时间范围时,会根据day_per_minute_avg
预计算这个范围内的数据量,如果数据量不够 5000 ,则不会返回,会加大时间范围,继续判断,直接某个时间范围,根据day_per_minute_avg
中的值计算出来的数据量不少于 5000 ,才返回这个时间区间。
比如,初始选择以 10 分钟为粒度迭代,因为 day_per_minute_avg
中的每分钟平均值就是 5000 / 10 = 500
条。
第一天,每个迭代得到的时间区间,肯定都是 10 分钟一个,因为 500 * 10
,刚好 5000 。
假设很不幸,每一天跑完,0 条数量。那么这时,因为 day_per_minute_count
中的成员都 +1
,重新计算后的 day_per_minute_avg
中的每个成员的值现在都是 250
了。
第二天再迭代的时候,返回的时间区,就都会以 20 分钟为间隔了。因为 250 * 20 = 5000
。
这个过程也可以看成,系统认为数据数量少,可以尝试更大的时间范围,以便可以更快的迭代完整个过程。
当然,现实中,肯定不会整天没有数据,而是晚上没有数据,白天某个时间段有很多的数据。所以,通过积累的每分钟平均数据量,就可以实现“当发现 14 点 16 点数据量很大的时候,我就每 2 分钟迭代一次。而晚上几乎没有数据,就直接取 6 小时的时间范围”。
3.3. 判断数据是否获取完整
下一步介绍碰到数据量很大的时间片时,如何处理。在此之前,先简单说一下,某个时间片数据是否取完的判断。
在面对限制返回条数的服务上,我们可以顶着上限去 limit
,但是无法直接判断在指定时间区间内的所有数据,是否已经全部返回。
select * from data limit 5000 where date >= A and date < B
对于这条 SQL ,如果返回的数据 < 5000
,那么可以断定数据全部取完。 == 5000
的情况,则数据可能没有取完。
我们不能在取数前先通过 select count()
检查数据条数再选择具体的取数策略,因为多一次查询就会多 1 秒时间,整体上,理想情况下就会多出一倍的时间。只在必要的使用使用 select count()
进行查询。
一般,如果碰到了 == 5000
的情况,那么同样的取数,我们会加上 offset
再次尝试。直到 offset
我们认为已经超大,甚至是再通过这种方式尝试获取所有数据,已经不行了。这时,就要切换到其它的方案上。
3.4. 兜底解决极限情况
前面,有了一个大体上的优化过程,不过现实中什么情况都可能出现,平均数统计只是历史数据,下一个迭代就可能碰到一个巨大的数据量。
目前的兜底方案:
- 如果一次 5000 没有取完数据,则加上
offset
最多会尝试 10 轮,即offset
最大会到 45000 。 - 如果 10 轮之后,数据还没有完(大于 50000 了),则将时间范围区间缩小一半,用一半的区间,再去递归取数。
- 时间范围最多减少到只有 1 秒,在 1 秒范围内,
offset
将不再受限。 - 因为我们的精度只做到 1 秒。1 秒之内,数据量再大到
offset
搞不定的程度,就没有通用的办法了。 - 目前,实际遇到的,1 秒最多不超过 5 万条,离
offset
的上限还远。
3.5. 最小粒度和时间区间原则
最小粒度只能取到“秒”,不能取“毫秒”,保存数据的时候就应该考虑清楚这个问题。
- 原则上,需要精度的地方,都应该避免使用小数。
- timestamp 的定义,就是“秒”。
- 不同的环境,一定能取到整数的秒,不一定能取到整数的毫秒。
- “日期表示”,一般到 YYYY-mm-dd HH:MM:SS ,带上毫秒会写成
YYYY-mm-dd HH:MM:SS.xxx
。 - 大部分数据场景连真正的“实时”都尚且做不到,或者说没有必要,那精确到毫秒除了增加处理和维护成本也毫无意义。
对时间区间进行分片的时候,我们需要遵循“前闭后开”的原则,即 [A, B)
这样的区间。
个人使用过程中体会到的好处有:
- 在连续分片中可以自然衔接,避免重叠造成重复数据。比如第一个迭代是
[A, B)
,下一个就是[B, C)
。 - 对于“直到现在”的场景,不取“当前时间”是合理的。
- 大部分场景我使用 Python ,这个原则与 Python 的
range
对象是一致的。
3.6. 时间片的工具
它最基本 API ,大概如下:
timeline = Timeline(start, end, grain, factor) for start, end timeline.iter(): sql = get_sql(start, end) count = get_data(sql) timeline.add_statistics(start, end, count)
start
,end
,是时间线的范围。grain
粒度,表示的是这个对象的相关行为,是按“天”,“小时”,或者是“分钟”。factor
是进一步对粒度的拆分,表示的1 / factor
粒度,如果是2
,就是“半天”,“半小时”,“半分钟”等。iter()
会返回一个可迭代对象,使用它可以遍历完整的[start, end)
区间,每次的跨度,和grain
和factor
有关。
所以, Timeline
的最主要作用是三个:
- 处理粒度的边界问题。
- 方便在粒度条件下迭代时间区间。
- 维护前面提到的
day_per_minute_avg
,通过add_statistics
的反馈,动态改变iter()
每次弹出的start
和end
。
Timeline
的大体代码结构是:
# -*- coding: utf-8 -*- import datetime class Statistics(object): def datetime_to_order(self, dt): ... def __init__(self, init_minute_value=0): self.len = 24 * 60 self.value = [init_minute_value] * self.len self.counter = [1] * self.len def record(self, start, end, value): ... def get_value(self, start, end): if start >= end: return 0 n = 0 current = start while True: n += 1 current = current + datetime.timedelta(minutes=1) if current >= end: break if n == 0: return 0 result = 0 order = self.datetime_to_order(start) while True: if n == 0: break idx = order % self.len result += self.value[idx] n -= 1 order += 1 return result class Timeline(object): def __init__(self, start=None, end=None, grain='days'): ... def __contains__(self, timeline): ... def expand(self, timeline, force=False): ... def __str__(self): ... def setup_statistics(self, want, delta): self.iter_want = want sum_minute = 1 current = delta mark = datetime.timedelta(minutes=1) while True: if current <= mark: break current -= mark sum_minute += 1 self.statistics = Statistics(want / sum_minute) def add_statistics(self, start, end, value): self.statistics.record(start, end, value) def iter(self, grain='hours', factor=1, want=None): per = datetime.timedelta(**{grain: 1}) per = per / factor if want is not None: self.setup_statistics(want, per) start = self.start range = [] while True: end = start + per if end >= self.end: yield start, self.end return if not self.statistics: yield start, end start = end continue if range: range[1] = end else: range = [start, end] v = self.statistics.get_value(range[0], range[1]) if (v < self.iter_want) and (abs(v - self.iter_want) > 1): start = end continue else: yield range[0], range[1] range = [] start = end
4. 整体代码
有了 Timeline
,整体的功能代码就很简单了:
@tornado.gen.coroutine def fetch_seriously(self, count, start, end, grain, factor): '1秒内数据量可能很大,没有通用办法' c = yield self.fetch(start, end, grain, factor) return c @tornado.gen.coroutine def fetch_carefully(self, start, end, grain, factor): '数据量可能很大,不断减半时间区间' sql = self.get_count_sql(start, end) res = yield self.service.query(sql).call() if res['code'] != 0: raise Exception(res['msg']) count = res['data'][0]['count'] if count == 0: return 0 #数据量已经在正常范围内了,可以取数操作 if count < (self.limit * 10): c = yield self.fetch(start, end, grain, factor) return c diff = (end - start).total_seconds() half = math.floor(diff / 2) if half == 0: c = yield self.fetch_seriously(count, start, end, grain, factor) return c half_dt = start + datetime.timedelta(seconds=half) pre = yield self.fetch_carefully(start, half_dt, grain, factor) post = yield self.fetch_carefully(half_dt, end, grain, factor) return pre + post @tornado.gen.coroutine def fetch(self, start, end, grain, factor): data = [] offset = 0 limit = self.limit count = 0 while True: sql = self.get_sql(start, end, offset, limit) res = yield self.service.query(sql).call() if res['code'] != 0: raise Exception(res['msg']) data.extend(res['data']) #数据取完了 if len(res['data']) < limit: break else: offset += limit # no ways if (end - start).total_seconds() <= 1: continue #达到上限,开始时间区间减半的方案 if offset >= (self.limit * 10): count = yield self.fetch_carefully(start, end, grain, factor) return count count = len(data) obj_list_filter = filter(lambda o: o['message_id'] in to_insert_id_set, data) try: result_count = yield self.insert(obj_list_filter) except Exception as e: yield tornado.gen.sleep(5) try: result_count = yield self.insert(obj_list_filter) except Exception as e: result_count = 0 raise Exception(traceback.format_exc()) return count @tornado.gen.coroutine def execute(self): range = self.get_range() timeline = Timeline(start=range[0], end=range[1], grain=self.get_grain()) iter = timeline.iter(grain=self.get_grain(), factor=self.get_factor(), want=self.limit) for start, end in iter: retry = 0 while True: try: count = yield self.fetch(start, end, grain=self.get_grain(), factor=self.get_factor()) except: retry +=1 if retry == 10: raise Exception(traceback.format_exc()) yield tornado.gen.sleep(10) else: break # 数据量太大,看成是异常情况。为避免影响统计效果,不计入统计 if count < self.limit * 100: timeline.add_statistics(start, end, count) else: self.logger.info('HUGE NUMBER [{}] at [{}, {}]'.format(count, start, end))