ElasticSearch 集群冷热数据分离

ElasticSearch 集群冷热数据分离

背景

当前的 es 集群每天产生大量的数据,对存储和索引行成较大压力,因此我们计划使用集群冷热分离方案,来将 20天以前的数据标记为冷数据,并将其转移到集群中的冷节点。

冷热方案(hot-warm architecture)大致思想如下:

  • 在集群中将具有高配置的节点设置为 hot 节点,让他们承担索引和存储需求;
  • 在集群中添加普通配置节点,用来存储非频繁查询的只读索引,一般是几周或几个月前的陈旧索引;

该方案的好处是可以良好的平衡硬件资源需要,提高性能的同时大幅降低成本,需要为每个 ES 节点设置标签,相同标签的会视为一个分组。

我们利用冷热方案,可以减轻已有 es 节点的存储压力,实现数据长期存储和备份的功能。

当然还需要新增 es 节点,并且使得新增节点只担当数据节点功能,不作为 master 角色,也不提供查询请求接收服务,因此不需要监听 http.port: 9200

下面是详细操作过程:

设置 hot 节点

由于只有 elasticsearch cluster 相关配置可以通过 API 接口修改,而 node 节点的配置不支持,只能修改配置文件后重启服务才能生效,因此这一步需要重启每个节点。

重启节点不会导致集群数据丢失,重启过程中会有索引文档处于 yellow 和 red 状态,但不用担心。

重启节点至集群恢复为 Green 状态这个时间段,集群处于维护宕机状态,不同环境需要选择合理的操作时间。

为节点设置 hot 标签流程如下:

  • 确保当前节点数量不低于主分片数量
    节点数低于分片数时,发生节点宕机更容易丢失数据。这个要求不是必须的,但很有必要。

  • 停止 logstash
    停止 logstash,避免它在 ES 宕机期间往 ES 写入数据,引起数据不一致的问题。

    ssh 10.0.10.10"supervisorctl stop logstash"
    ssh 10.0.10.11 "supervisorctl stop logstash01"
    ssh 10.0.10.11 "supervisorctl stop logstash02"
    
  • 停止 es 分片分配功能
    这一做法避免 ES 节点停机期间发生数据重新分配,而在节点恢复后再次重新分配,导致不必要的资源和时间消耗。

    PUT /_cluster/settings
    {"transient":{"cluster.routing.allocation.enable": "none"}}
    
  • 设置分片延时重新分配
    这一步是为了避免在上面步骤遗漏的情况下,节点停机后立即引发分片重新平衡。
    关于重平衡 Rebalance 介绍详见:Shard Rebalancing Settings
    当 es 节点重启、宕机、内存oom、负载过高、响应迟缓等情况,我们希望延迟 es 重新分配 shards,可以这样设置:

    PUT _all/_settings
    {"settings":{"index.unassigned.node_left.delayed_timeout":"10m"}}
      
    POST /_flush/synced
    
  • 修改 es 配置并重启服务
    为节点新增 node.attr.box_type: hot 配置,这是一个标签,可以设置为其它值。
    这一步可以多个节点同时操作,减小宕机时间和 index 自我修复的次数。

    sed -e '/node.attr.rack/a \node.attr.box_type: hot' -i elasticsearch.yml
    ps -ef | awk '/elasticsearch/{print $2}' | xargs kill
    cd /home/worker/elasticsearch/bin/ && su worker -c "elasticsearch -d"
    
  • 等待服务启动及节点加入集群
    可以使用下面的命令查看:

    GET _cluster/health    # number_of_nodes 符合实际值
    GET /_cat/nodes?v
    GET /_cat/nodeattrs?v  # 如果每个节点都有 `box_type hot` 这样的结果,则设置成功
    
  • 恢复集群分片分配功能
    恢复设置,状态为 red 的 index 会自动修复为 yellow,red 恢复完成会自动修复 yellow 为 green,这个时间较长,视情况等待。

    PUT /_cluster/settings
    {"transient":{"cluster.routing.allocation.enable": "all"}}
    
  • 恢复 logstash
    logstash 在停服过程中,没有服务消费 kafka 消息,kafka 会出现消息堆积的告警,但数据会落盘存储不会丢失,因此也许有尽快恢复 logstash。

    ssh 10.0.1.52 "supervisorctl start logstash"
    ssh 10.0.1.252 "supervisorctl start logstash01"
    ssh 10.0.1.252 "supervisorctl start logstash02"
    

新增 data 节点

新增的集群节点不作为 master 节点,我们也不希望有索引被自动创建或重新分配到这些节点上,因此需要将已有索引通过 hot 标签绑定到原始节点上,同时设置索引模版,使得新增的索引也不会创建到新节点上。

操作过程如下:

  • 设置新建索引分配到 hot 节点

    PUT _template/default
    {
        "index_patterns": ["*"],
        "settings": {
    		"number_of_shards": "3",
    		"number_of_replicas":"1",
    		"refresh_interval": "10s",
    		"routing.allocation.require.box_type": "hot",
    		"unassigned.node_left.delayed_timeout": "10m"
      }
    }
    
  • 设置已有索引不会被分配到新节点上
    即将已有索引绑定在具有 hot 标签的节点上。有两种方案:

    • 当集群索引较少并且单个索引较小时,可以使用 API 接口一次性修改:
      PUT _all/_settings
      {"settings":{"index.routing.allocation.require.box_type":"hot"}}
      
    • 当集群索引较多时,使用上述方式会接口会出现 timeout,不能保证全部索引都被正确修改,则使用下面的脚本逐个修改:
      #!/bin/bash
      
      #funtion
      Usage(){
          echo "Usage:"
          echo "Change all indexes' box_type = hot"
          echo "  $0 es_uri"
          echo "  example: $0 http://127.0.0.1:9200"
      }
      
      # check env
      [[ -z "$1" ]] && Usage && exit 1
      
      ES_URI=$1
      
      curl -sNL -XGET "${ES_URI}/_cat/indices?h=index" > /tmp/indexes_all.info
      [[ ! -s /tmp/indexes_all.info ]] && echo "Get es indexes failed, exit." && exit 1
      
      num=1
      cat /tmp/indexes_all.info | while read index
      do
        let num++
        echo "$num, Set index tag \"box_type\":\"hot\" for ${index}"
        curl -H "Content-Type: application/json" -XPUT "${ES_URI}/${index}/_settings" -d '{"settings":{"index.routing.allocation.require.box_type":"hot"}}'
        echo
      done
      
      echo "Set index tag completed."
      
      索引较多的情况下,使用循环遍历会有点慢,但相对可靠。
  • 添加新节点
    设置 master 角色为 false,并禁用监听 http 9200:

    多个节点依次添加相应配置。
    vim conf/elasticsearch.yml

    # ---------------------------------- Cluster -----------------------------------
    cluster.name: glen-es-cluster
    #
    # ------------------------------------ Node ------------------------------------
    node.name: es-node04
    #
    # 设置为专用 data 节点
    node.master: false 
    node.data: true 
    node.ingest: false 
    search.remote.connect: false 
    #
    # 设置冷热分离标签
    node.attr.box_type: cool
    #
    # ----------------------------------- Paths ------------------------------------
    path.data: /data1/es-data
    path.logs: /data1/es-logs
    #
    # ----------------------------------- Memory -----------------------------------
    #
    # ---------------------------------- Network -----------------------------------
    transport.host: ["_eth0_"]
    transport.tcp.port: 9300
    #
    # disable HTTP on data-only nodes
    http.enabled: false
    # http.port: 9200
    # network.host: ["_eth0_"]
    #
    # --------------------------------- Discovery ----------------------------------
    discovery.zen.ping.unicast.hosts: ["es-node01", "es-node02", "es-node03"]
    #
    # ---------------------------------- Gateway -----------------------------------
    gateway.recover_after_nodes: 2
    xpack.security.enabled: false
    # (master_eligible_nodes / 2) + 1
    discovery.zen.minimum_master_nodes: 2
    #
    # ---------------------------------- Various -----------------------------------
    #action.destructive_requires_name: true
    
  • 根据节点内存大小调整 jvm 参数

    vim config/jvm.options
    -Xms4g
    -Xmx4g
    
  • 检查系统优化参数

    vim /etc/sysctl.conf
    vm.max_map_count = 262144
    
    # 修改最大文件描述符
    vim /etc/security/limits.d/20-nproc.conf
    *       soft    nproc   100000
    *       soft    nofile  100000
    *       hard    nofile  1000000
    *       hard    nproc   1000000
    
  • 启动节点服务:

    cd /home/worker/elasticsearch/bin/ && su worker -c "elasticsearch -d"
    
  • 检查情况

    GET /_cat/nodes?v
    GET /_cat/nodeattrs?v
    ip          heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
    10.10.10.10           58          97  20    3.11    3.08     2.95 mdi       *      node-1
    10.10.10.11           53          97  12    1.66    1.68     1.63 mdi       -      node-4
    10.10.10.12           12          75   0    0.00    0.01     0.05 d         -      node-5
    10.10.10.13           58          98  26    2.91    3.47     3.47 mdi       -      node-2
    10.10.10.14            6          97   0    0.00    0.04     0.05 d         -      node-6
    10.10.10.15           66          96  25    1.89    2.47     2.47 mdi       -      node-3
    10.10.10.16            9          97   0    0.04    0.07     0.07 d         -      node-7
    

    其中 node.role 标识对应为 master, data, ingest 角色类型。

自动转换冷热数据

当新的节点已经完全加入集群时,我们就可以做 index 的冷热迁移了。

这里我们将 10 天以前的 index 标签更改为 cool,使之自动从 hot 节点移动到 cool 节点。

  • 修改 index 标签
    要修改 10 天以前的 index 标签,但如果每天都更新10天以前的全部索引,随着时间增加要更新的索引数量会越来越多。
    因此,先一次性更新10天以前的全部索引,再使用定时任务每天更新第 10天和 11天之间的索引即可(多一天可防止定时任务执行期间新产生的索引被遗漏)。脚本如下:

    #!/bin/bash
    
    #funtion
    Usage(){
        echo "Usage:"
        echo "Move indexes X days before to cool nodes."
        echo "  $0 es_uri number"
        echo "  example: $0 http://127.0.0.1:9200 10"
    }
    
    # check env
    [[ -z "$1" ]] && Usage && exit 1
    [[ -z "$2" ]] && Usage && exit 1
    
    ES_URI=$1
    ES_NUM=$2
    
    # get index date in timestamp format; use creation.date.string to human readable format
    curl -sNL -XGET "${ES_URI}/_cat/indices?h=i,creation.date&s=creation.date" > /tmp/indexes-dates.info
    [[ ! -s /tmp/indexes-dates.info ]] && echo "Get es indexes failed, exit." && exit 1
    
    num=0
    # get the past 10th day and change it to timestamp format append with 3bits nanoseconds
    LAST_X_DAY=$(date --date="${ES_NUM} days ago" +"%s%3N")
    LAST_X1_DAY=$(date --date="$(expr ${ES_NUM} + 1) days ago" +"%s%3N")
    
    while read index datetime; do
        # if [[ ${datetime} -lt ${LAST_TWENTIETH_DAY} ]]; then # 一次性更新X天以前的全部索引
        if [[ ${datetime} -lt ${LAST_X_DAY} ]] && [[ ${datetime} -gt ${LAST_X1_DAY} ]]; then # 只更新过去X天到X=1天之间的索引,减少操作时间
            let num++
            # print JSON like log
            echo -e {\"number\": \"${num}\"\, \"now\": \"$(date +'%D %T')\"\, \"index\": \"${index}\"}
            curl -s -H "Content-Type: application/json" -XPUT "${ES_URI}/${index}/_settings" -d '{"settings":{"index.routing.allocation.require.box_type":"cool"}}'
            #sleep 60
            echo
        else
            continue
        fi
    done < <(cat /tmp/indexes-dates.info)
    
    echo "$num indexes found"
    
    X=$(date -d @${LAST_X_DAY:0:10} +"%Y/%m/%d %H:%M:%S")
    X1=$(date -d @${LAST_X1_DAY:0:10} +"%Y/%m/%d %H:%M:%S")
    echo "Indices created between date ${X1} and ${X} was moved to cool nodes."
    

    以上脚本使用如下方式从 wihle 循环获取自增变量的值:
    注意done 后面的写法:done < <(command)

    #!/bin/bash
    i=1
    while read x
    do
       i=$((i+1))
       echo $i
    done < <(find . -type f)
    echo $i
    

    使用如下方式将时间戳截取后解码并按照指定格式输出:

    X=1581436809566  #有3位精度时间戳,使用 date +"%s%3N" 生成
    date -d @${X:0:10} +"%Y/%m/%d %H:%M:%S"
    #2020/02/12 00:00:09
    

    正常情况下,修改了 index 标签会触发集群自动迁移 index 到新的节点。
    如果自动迁移 index 没有发生,需要使用 reroute 接口手动更新。

  • 检查节点 index 分配状况
    这个可以在 x-pack 监控界面查看,可以发现集群状态为 Green,新加节点也有分片存才了,并且在不断增加:

参考文档

  • master 节点配置
# ---------------------------------- Cluster -----------------------------------
cluster.name: glen-es-cluster
#
# ------------------------------------ Node ------------------------------------
node.name: es-node01
#
# 设置为专用 Master + Data 节点
node.master: true 
node.data: true 
node.ingest: true 
search.remote.connect: false 
#
# 设置冷热分离标签
node.attr.box_type: hot
#
# ----------------------------------- Paths ------------------------------------
path.data: /data1/es-data
path.logs: /data1/es-logs
#
# ----------------------------------- Memory -----------------------------------
#bootstrap.memory_lock: true
#
# ---------------------------------- Network -----------------------------------
transport.host: ["_eth0_"]
transport.tcp.port: 9300
http.enabled: true
http.port: 9200
network.host: ["_eth0_"]
#
# --------------------------------- Discovery ----------------------------------
discovery.zen.ping.unicast.hosts: ["es-node01", "es-node02", "es-node03"]
#
# ---------------------------------- Gateway -----------------------------------
gateway.recover_after_nodes: 2
xpack.security.enabled: false
# (master_eligible_nodes / 2) + 1
discovery.zen.minimum_master_nodes: 2
#
# ---------------------------------- Various -----------------------------------
#action.destructive_requires_name: true