如何更有效率地拉取数据

邹业盛 2021-10-19 03:25 更新
  1. 背景
  2. 问题定义
  3. 问题解决
  4. 整体代码

1. 背景

以前从来没有想过,拉取数据这事,要把它做得好一点点,也不是那么容易和直接的。

不同于自己能控制的系统中的数据的导入导出,当你用别个系统提供的有限的手段,去处理量大一点的待获取数据时,就会发现有很多很现实的问题需要自己想办法去解决。

如果是自己系统,比如 clickhouse 吧,直接:

clickhouse-client --query="select * from xx FORMAT csv" > out.csv

就可以把“全部”数据都导出到 out.csv 文件中了。

一次性地这个“全部”可以是多少呢?

对于 2G 左右的数据量,6, 7 个字段数量的话大概是 1 亿条,你机器内存比 16G 多一点就可以一次导出了,导出的文件不到 15G , tar.gz 能压缩到 2G 内。即使内存不够,通过时间分片一下,2, 3 次也很容易处理完。整体时间应该不会超过 5 分钟。

不过,如果是从其它系统拉取数据,假设单次最多 5000 条,那么 1 亿条最少需要请求 2 万次,平均每次 1 秒时间,就是 2 万秒时间, 5 个多小时吧。但是注意,这是完全理想情况,现实中如果不附加额外的处理机制,只是简单写写,要想顺利完成全部数据的获取,这 5 个小时可能变成 50 个小时。即使尽量优化,最终的时间最快估计也要 10+ 个小时。

因为,在保证可行的前提下,你是没有办法得知单次请求如何才能取到足够的 5000 条数据的。

select * from t order by date asc limit 5000 offset ?

offset 在 100 万的时候,系统估计已经跑不动了。

通常这种情况,我们是通过一个单调键(一般是日期),不断顺序地获取数据:

select * from t where date >= A and date < B

但是这时,就没有办法保证 [A, B) 段正好有 5000 条数据。

嗯,我是想说,在 [A, B) 如果有 1000 万的数据被直接查询,那系统也肯定挂了,不是对方挂就是你自己挂,或者中间请求超时挂,反正结果一样,你数据拿不全。

2. 问题定义

这里,只关注获取的策略本身就行,不用额外考虑系统的鲁棒性,假设网络环境稳定,系统也不会有任何不良反应。

我们也不考虑“并发”。可以理解成,这本身是并发之后面对的状况,本来有 10 亿数据, 10 个并发,现在还剩 1 亿要想办法去解决。

关于并发,多说几句,有两个方向可以处理并发时的数据分片:

按时间,或者其它单调键分片。比如 2 年的时间,分成 4 个半年,4 个任务并行执行。

这可能是最直观的分片处理办法,但是,这种方式会给任务的中断与恢复带来麻烦。同时在“部署”时,也必须依赖其它的配套机制,才能把 4 个任务顺利分派出去。

另一种分片方式,是依赖某个维度字段分,比如,按类型,或按国家,每个并行的任务是处理的不同类型,或者不同国家的数据。执行时,就是带了一个过滤条件。

这种方式不会带来任何的额外问题,但是任务数量,受限于维度字段本身的成员数量及数据事实上的分布情况。比如,想并发 4 个任务,那么这个维度的成员数量,必须 >= 4 ,一般也要 < 10 ,就是它的数量本身和最终的任务数,要能匹配。

当然,无论用哪种方式执行并行的任务,最终的时间肯定是大大的多于 1/4 总时间的,原因是数据的分布本身不均匀,时间上不均匀,维度上也不均匀。所以最终的完成时间依赖分到数据最多的那个任务的时间(也是一个木桶原理的例子)。

3. 问题解决

下面介绍解决问题的各个单项思路。因为每个单项之间是相互依赖的,所以,很难有一个正确的顺序。尽量单个地能把一个点说清楚吧。

3.1. 理解情况

1 亿条数据,每次 5000 条,理解情况 20000 次就能拿完,共计 20000 秒, 5.5 小时。

什么情况下能达到这种最理解的情况?在你曾经拥有过这 1 亿条数据的情况下。

(其实,服务提供方应该提供下面说的数据,这有利于整体系统的效率)

比如,你以前数据已经全量获取过,但是某一天你的磁盘坏了,要重新获取。如果你之前保存过这理想 20000 次的时间区间分布,那么恭喜你,依赖这些精准的时间区间,你每次请求几乎都可以拿到 5000 条数据。

可以使用类似以下的 SQL ,来对于全量数据,得到每 5000 条数据的时间区间分布:

select min(grass_date) as t, max(grass_date) + INTERVAL 1 SECOND, sum(ct), g from
(
    select grass_date, ct, c , intDiv(c, 4990) as g from 
    (
        select grass_date, count(id) as ct, sum(ct) over (order by grass_date) as c from
        data_table
        group by grass_date
    )
)
group by g order by t;

我在 SQL 方面并不擅长,如果不使用窗口函数,或者不使用 from 子查询,也能达到目的,请告诉我。

最终的数据类似于:

"2021-10-10 00:00:00","2021-10-10 00:19:00",4987,0
"2021-10-10 00:19:01","2021-10-10 00:35:48",4988,1
"2021-10-10 00:35:48","2021-10-10 00:51:19",4991,2
"2021-10-10 00:51:19","2021-10-10 01:06:03",4992,3
"2021-10-10 01:06:03","2021-10-10 01:20:31",4990,4
"2021-10-10 01:20:31","2021-10-10 01:34:16",4990,5
"2021-10-10 01:34:16","2021-10-10 01:47:32",4990,6
"2021-10-10 01:47:32","2021-10-10 01:59:16",4988,7
"2021-10-10 01:59:16","2021-10-10 02:11:10",4992,8
"2021-10-10 02:11:10","2021-10-10 02:22:10",4991,9
"2021-10-10 02:22:10","2021-10-10 02:33:09",4989,10

请求时,按 1,2 列的时间区间去请求数据,得到的就是第 3 列的数据量。

3.2. 如何才能更有效率

回到一般情况,没有上帝视角。

如果每次都能获取到被允许的上限(比如 5000 条)的数据条数,则整体取数请求数最少,花的总时间也越少。

这里补充一下,一般分析型的数据系统,单次请求的“固定成本”远比数据量的“变动成本”要大得多,也就是说,一次取 2000 条,花 1 秒,一次取 1 条,可能也要花 1 秒。

在不了解数据分布的情况下,是没有办法做到每次都刚好取到 5000 条数据的,所以,我们的方向就是,只能不断去试,碰到具体情况,再动态调节。

如果仅从平均条数看,应该每 2 分钟取数一次。这样,1 小时的范围要取数 30 次,2 年就要取数 52 万次,一次 1 秒,共计 144 小时。

实际的情况还要远多于 144 小时,因为很多时间段是 0 数据的同时,还有一些时间段数据不止 5000 条,不能一次取完。

144 小时对比理想情况的 5.5 小时,相差巨大吧。

有人可能会说,想快,为啥不 1 天取一次,这样不到 1000 轮就可以取完。这个问题前面已经说过了,每次只能取 5000 ,要取完平均每天 27 万的数据:

select * from data where date >= A and date < B limit 5000 offset 265000

平均情况下,都可能出现 offset 265000 ,碰到极限一点的情况, offset 再大,对方系统不挂,请求也肯定超时。

而且,取同样的数,系统执行上,随着 offset 的变大,系统的响应时间也会变慢,那时本来 1 秒响应的,可能变成 10 秒响应。

所以,不能使用过大的 offset ,要把 144 小时的时间减少,只能调节每次迭代的 [A, B) 范围。

考虑数据的业务背景,数据量的分布,肯定是服从固定的分布规律的,比如,每天下午 14 点到 16 点,是业务量最大,也是数据量最大的时候。半夜一般就没什么数据量。

因为没有办法事先得到具体的分布(也没有必要事先关心),所以,我们的思路变成,在过程中,根据实际的数据,不断地去刻画这种分布的内在规律,再把积累的信息,用于后面的时间范围分配。比如,当发现 14 点 16 点数据量很大的时候,就每 2 分钟迭代一次。而晚上几乎没有数据,就直接取 6 小时的时间范围。这样可以整体上减少请求量,大幅度提高效率。

目前我的做法是:

比如,初始选择以 10 分钟为粒度迭代,因为 day_per_minute_avg 中的每分钟平均值就是 5000 / 10 = 500 条。

第一天,每个迭代得到的时间区间,肯定都是 10 分钟一个,因为 500 * 10 ,刚好 5000 。

假设很不幸,每一天跑完,0 条数量。那么这时,因为 day_per_minute_count 中的成员都 +1 ,重新计算后的 day_per_minute_avg 中的每个成员的值现在都是 250 了。

第二天再迭代的时候,返回的时间区,就都会以 20 分钟为间隔了。因为 250 * 20 = 5000

这个过程也可以看成,系统认为数据数量少,可以尝试更大的时间范围,以便可以更快的迭代完整个过程。

当然,现实中,肯定不会整天没有数据,而是晚上没有数据,白天某个时间段有很多的数据。所以,通过积累的每分钟平均数据量,就可以实现“当发现 14 点 16 点数据量很大的时候,我就每 2 分钟迭代一次。而晚上几乎没有数据,就直接取 6 小时的时间范围”。

3.3. 判断数据是否获取完整

下一步介绍碰到数据量很大的时间片时,如何处理。在此之前,先简单说一下,某个时间片数据是否取完的判断。

在面对限制返回条数的服务上,我们可以顶着上限去 limit ,但是无法直接判断在指定时间区间内的所有数据,是否已经全部返回。

select * from data limit 5000 where date >= A and date < B

对于这条 SQL ,如果返回的数据 < 5000 ,那么可以断定数据全部取完。 == 5000 的情况,则数据可能没有取完。

我们不能在取数前先通过 select count() 检查数据条数再选择具体的取数策略,因为多一次查询就会多 1 秒时间,整体上,理想情况下就会多出一倍的时间。只在必要的使用使用 select count() 进行查询。

一般,如果碰到了 == 5000 的情况,那么同样的取数,我们会加上 offset 再次尝试。直到 offset 我们认为已经超大,甚至是再通过这种方式尝试获取所有数据,已经不行了。这时,就要切换到其它的方案上。

3.4. 兜底解决极限情况

前面,有了一个大体上的优化过程,不过现实中什么情况都可能出现,平均数统计只是历史数据,下一个迭代就可能碰到一个巨大的数据量。

目前的兜底方案:

3.5. 最小粒度和时间区间原则

最小粒度只能取到“秒”,不能取“毫秒”,保存数据的时候就应该考虑清楚这个问题。

对时间区间进行分片的时候,我们需要遵循“前闭后开”的原则,即 [A, B) 这样的区间。

个人使用过程中体会到的好处有:

3.6. 时间片的工具

它最基本 API ,大概如下:

timeline = Timeline(start, end, grain, factor)
for start, end timeline.iter():
    sql = get_sql(start, end)
    count = get_data(sql)
    timeline.add_statistics(start, end, count)

所以, Timeline 的最主要作用是三个:

Timeline 的大体代码结构是:

# -*- coding: utf-8 -*-
 
import datetime


class Statistics(object):
    def datetime_to_order(self, dt):
        ...

    def __init__(self, init_minute_value=0):
        self.len = 24 * 60
        self.value = [init_minute_value] * self.len
        self.counter = [1] * self.len

    def record(self, start, end, value):
        ...


    def get_value(self, start, end):
        if start >= end: return 0
        n = 0
        current = start
        while True:
            n += 1
            current = current + datetime.timedelta(minutes=1)
            if current >= end: break

        if n == 0:
            return 0


        result = 0
        order = self.datetime_to_order(start)

        while True:
            if n == 0: break
            idx = order % self.len
            result += self.value[idx]
            n -= 1
            order += 1

        return result



class Timeline(object):
    def __init__(self, start=None, end=None, grain='days'):
        ...


    def __contains__(self, timeline):
        ...


    def expand(self, timeline, force=False):
        ...

    def __str__(self):
        ...

    def setup_statistics(self, want, delta):
        self.iter_want = want
        sum_minute = 1
        current = delta
        mark = datetime.timedelta(minutes=1)
        while True:
            if current <= mark:
                break
            current -= mark
            sum_minute += 1

        self.statistics = Statistics(want / sum_minute)

    def add_statistics(self, start, end, value):
        self.statistics.record(start, end, value)

    def iter(self, grain='hours', factor=1, want=None):
        per = datetime.timedelta(**{grain: 1})
        per = per / factor
        if want is not None:
            self.setup_statistics(want, per)

        start = self.start
        range = []
        while True:
            end = start + per
            if end >= self.end:
                yield start, self.end
                return

            if not self.statistics:
                yield start, end
                start = end
                continue

            if range:
                range[1] = end
            else:
                range = [start, end]

            v = self.statistics.get_value(range[0], range[1])
            if (v < self.iter_want) and (abs(v - self.iter_want) > 1):
                start = end
                continue
            else:
                yield range[0], range[1]
                range = []
                start = end

4. 整体代码

有了 Timeline ,整体的功能代码就很简单了:

@tornado.gen.coroutine
def fetch_seriously(self, count, start, end, grain, factor):
    '1秒内数据量可能很大,没有通用办法'

    c = yield self.fetch(start, end, grain, factor)
    return c


@tornado.gen.coroutine
def fetch_carefully(self, start, end, grain, factor):
    '数据量可能很大,不断减半时间区间'

    sql = self.get_count_sql(start, end)
    res = yield self.service.query(sql).call()
    if res['code'] != 0:
        raise Exception(res['msg'])

    count = res['data'][0]['count']
    if count == 0: return 0

    #数据量已经在正常范围内了,可以取数操作
    if count < (self.limit * 10):
        c = yield self.fetch(start, end, grain, factor)
        return c

    diff = (end - start).total_seconds()
    half = math.floor(diff / 2)
    if half == 0:
        c = yield self.fetch_seriously(count, start, end, grain, factor)
        return c

    half_dt = start + datetime.timedelta(seconds=half)
    pre = yield self.fetch_carefully(start, half_dt, grain, factor)
    post = yield self.fetch_carefully(half_dt, end, grain, factor)
    return pre + post


@tornado.gen.coroutine
def fetch(self, start, end, grain, factor):
    data = []
    offset = 0
    limit = self.limit
    count = 0
    while True:
        sql = self.get_sql(start, end, offset, limit)
        res = yield self.service.query(sql).call()
        if res['code'] != 0: raise Exception(res['msg'])
        data.extend(res['data'])

        #数据取完了
        if len(res['data']) < limit: break
        else: offset += limit

        # no ways
        if (end - start).total_seconds() <= 1: continue

        #达到上限,开始时间区间减半的方案
        if offset >= (self.limit * 10):
            count = yield self.fetch_carefully(start, end, grain, factor)
            return count

    count = len(data)

    obj_list_filter = filter(lambda o: o['message_id'] in to_insert_id_set, data)

    try:
        result_count = yield self.insert(obj_list_filter)
    except Exception as e:
        yield tornado.gen.sleep(5)
        try:
            result_count = yield self.insert(obj_list_filter)
        except Exception as e:
            result_count = 0
            raise Exception(traceback.format_exc())

    return count



@tornado.gen.coroutine
def execute(self):
    range = self.get_range()
    timeline = Timeline(start=range[0], end=range[1], grain=self.get_grain())
    iter = timeline.iter(grain=self.get_grain(), factor=self.get_factor(), want=self.limit)

    for start, end in iter:
        retry = 0
        while True:
            try:
                count = yield self.fetch(start, end, grain=self.get_grain(), factor=self.get_factor())
            except:
                retry +=1 
                if retry == 10: raise Exception(traceback.format_exc())
                yield tornado.gen.sleep(10)
            else:
                break

        # 数据量太大,看成是异常情况。为避免影响统计效果,不计入统计
        if count < self.limit * 100:
            timeline.add_statistics(start, end, count)
        else:
            self.logger.info('HUGE NUMBER [{}] at [{}, {}]'.format(count, start, end))
评论
©2010-2021 zouyesheng.com All rights reserved. Powered by GitHub , txt2tags , MathJax