搜索系统 基础教程

搜索 query 分析

搜索系统 索引教程

搜索系统 高级教程

搜索系统 排序层

搜索系统 笔记

original icon
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.knowledgedict.com/tutorial/search-query-table-create.html

搜索系统 query 表的设计与建立实战


如果搭建的搜索系统是基于 query 的搜索的,建立 query 表是搭建 search system 的最基础工作,它是后续分词、chunk 分析、意图分类、term weight 等 query 分析工作的铺垫工作。

query 表的目的

建立 query 表主要是为了如下几点:

  • 标注正确的分词、chunk 分析、意图分类、term weight 等,同时也可以作为对应模型的训练所用;
  • 基于搜索业务建立的 term、短语抱团(chunk),作为分词器的自定义词表,使倒排索引更准确。

建立步骤

这里分享笔者建立 query 的步骤:

  1. 统计搜索系统一定时间内的 top 热词,实际是根据搜索词的埋点日志,按照出现次数进行排序后,以一定次数作为截断(如只同步出现 5 次及以上的)读取;
  2. 聚合 query 读取的,还需要进行一些预处理(如 去掉首尾空格、中间空格合并最多一个、英文小写化等等);
  3. 最后将处理后的 query,先查询是否已写入,若未写入,则插入操作;
  4. 每天按照如上 3 个步骤进行同步 query 操作,建立 query 同步机制。

代码实现

一般搜索词埋点数据在 hdfs 上,通过 hive 读取,query 表采用 mysql,我们通过 spark 任务每天调度同步任务,具体示例代码如下:

"""
  同步 query 任务
"""
import logging
import sys
import traceback
from datetime import datetime, timedelta
from time import sleep

import pymysql
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col, regexp_replace, desc, lower


def get_and_save_query(session: SparkSession, start_date: str, end_date: str, min_cnt: int, batch_size=1000, sleep_sec=1.0):
    sql = """
    SELECT 
        query, 
        COUNT(1) AS cnt
    FROM (
        SELECT 
            query
        FROM xxx.maidian_log 
        WHERE dt >= '{start_date}' 
            AND dt <= '{end_date}' 
            AND action_id = 188
        ) a
    GROUP BY query
    HAVING cnt >= {min_cnt}
    ORDER BY cnt DESC 
    """.format(start_date=start_date, end_date=end_date, min_cnt=min_cnt)
    logging.info(sql)
    df = session.sql(sql)
    logging.info('df schema ===> %s', df)
    #   去除首尾空格
    df = df.withColumn('query', trim(col('query')))
    #   中间若有多个空格,则只保留一个空格
    df = df.withColumn('query', regexp_replace('query', '\s+', ' '))
    #   英文字符小写化
    df = df.withColumn('query', lower(col('query')))
    df = df.filter('query is not null')
    df = df.orderBy(desc('cnt'))

    df.cache()
    df_count = df.count()
    logging.info("df count %s", df_count)
    count = 0
    for row in df.collect():
        if row.query == '':
            logging.info('query emtpy')
            continue
        if len(row.query) > 30:
            logging.info('query %s length > 30', row.query)
            continue
        if 'http' in row.query:
            logging.info('query %s contain http', row.query)
            continue
        if '¥' in row.query:
            logging.info('query %s contain ¥', row.query)
            continue
        if '@' in row.query:
            logging.info('query %s contain @', row.query)
            continue
        conn = connect_db(single_conn)
        save_2_mysql(conn, row.query)
        count += 1
        if count % batch_size == 0:
            logging.info('count %s, sleep %s sec', count, sleep_sec)
            sleep(sleep_sec)
    # noinspection PyBroadException
    try:
        conn.close()
        logging.info('connection close successfully')
    except Exception:
        traceback.print_exc()


single_conn = None


def connect_db(sc):
    if sc is None:
        sc = pymysql.connect(
            host=xxx.xx.xx.xxx',
            port=3306,
            user='xxxx',
            password='xxx',
            database='search',
            charset='utf8mb4'
        )
    return sc


def save_2_mysql(conn, query):
    conn.ping(reconnect=True)
    cursor = conn.cursor()
    # noinspection PyBroadException
    try:
        cursor.execute('SELECT * FROM tb_query where query = %s', query)
        select_result = cursor.fetchone()
        if select_result:
            logging.info('exists ===> %s', query)
        else:
            cursor.execute('INSERT INTO tb_query SET query = %s', query)
            conn.commit()
            logging.info('insert =========> %s', query)
    except Exception:
        traceback.print_exc()
    cursor.close()


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s  %(filename)s : %(levelname)s  %(message)s',
    )
    logging.info(sys.argv)
    dp = sys.argv[1]
    delta_day = 6
    min_count = 5
    yesterday_dt = datetime.strptime(dp, "%Y-%m-%d")
    start_dt_str = (yesterday_dt - timedelta(days=delta_day)).strftime("%Y-%m-%d")
    spark_session = SparkSession.builder.appName('sync_query').getOrCreate()
    spark_session.sparkContext.setLogLevel('ERROR')

    get_and_save_query(spark_session, start_dt_str, dp, min_count, batch_size=1000, sleep_sec=1.0)

 

一个健壮的搜索系统都有敏感词屏蔽模块,敏感词过滤的做法有很多,从最原始的方法,query 从头到尾搜索一遍,看是否存在此敏感词,再到稍微高级 ...
本章列出了搜索系统开发中常涉及的内容及常见问题的解决方案。 ...
在搜索系统的排序层的最后一环往往是业务干预逻辑,主要是根据业务需求针对排序结果进行人工调整,其中主要分为提权类、降权类、固定类以及其他。 ...
搜索系统的排序层又称为精排层,主要是基于离线训练好的模型,结合模型所用的特征给每个 item 进行打分,然后根据分数进行降序排序。搜索排序模 ...