Spark 教程

Spark SQL

Spark 笔记

Spark MLlib

original icon
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.knowledgedict.com/tutorial/spark-dataframe-batch-write-to-redis.html

spark dataframe 数据批量写入 redis(pipeline、分批提交)

Spark DataFrame 原理及操作详解 Spark DataFrame 原理及操作详解


使用 python 的 spark 模块,即 pyspark,通过 spark sql 从 hive 或 mysql 等存储层读取相关数据,对应 spark 的 dataframe 对象,经过一系列的数据变化操作后(若需要),将最终数据通过 pipeline,并利用 spark 的分布式形式分批写入 redis。

推荐解决方式

假设 dataframe 数据中 id 为其中一列,其余为对应的统计数据列,以 hash 方式写入每行数据,基于 pyspark 封装了如下函数:

from pyspark.sql import DataFrame
from redis import Redis
from time import sleep


def df_2_redis(df: DataFrame, num_partitions=2, batch_size=500, sleep_secs=0.2):
    """
    将 spark sql 的 DataFrame 数据分布批量写入 Redis

    ``df`` 要写入的 spark DataFrame 对象数据。

    ``num_partitions`` int 类型,指定数据的分区数,默认为 2。

    ``batch_size`` int 类型,指定每个 worker 批量执行的数据条数,默认为 500。

    ``sleep_secs`` float 类型,指定每个 worker 批量执行数据后的睡眠时间,单位为秒,默认为 0.2。
    """
    count = df.count()
    if count == 0:
        print('df count 0')
        return

    col_names = df.columns

    def _save_2_redis(ite):
        redis_conn = Redis(
            host='xxxx.xxxxx.xxx',
            port=6379,
            password='xxxxxxxx'
        )
        pipe = redis_conn.pipeline(transaction=False)
        # 7 天有效期
        ex = 60 * 60 * 24 * 7
        idx = 1
        for i, row in enumerate(ite, start=1):
            idx = i
            redis_key = 'id_' + str(row.id)
            fv = {}
            for x in col_names:
                if x == 'id':
                    continue
                val = getattr(row, x, None)
                if val is not None:
                    fv[x] = val
            pipe.hmset(redis_key, fv)
            pipe.expire(redis_key, ex)
            if i % batch_size == 0:
                pipe.execute()
                #   批量提交后,睡眠指定时间,控制写入频率
                sleep(sleep_secs)
        if idx % batch_size != 0:
            #   最后不够 batch_size 的数据批量提交
            pipe.execute()

    #   分布批量写入
    df.repartition(num_partitions).foreachPartition(_save_2_redis)