python脚本实现Elasticsearch的冷热分离

起因

出于成本考虑,es 集群内节点机器配置有好有差,为了实现当天和前一天数据写在 ssd 节点上,历史数据迁移到普通节点,使用 python 写了一个小脚本,凌晨定时迁移分片,供参考. es 官方有类似 tags 的实现方式,等我实践后再来记录下

配置

启动脚本前必须修改下面两项配置,否则分片刚迁移过去,就又被均衡回来了

# 禁止集群自动分配,使主要分片或副本的分布失效
cluster.routing.allocation.disable_allocation: true 
# 新增加的分片才可以参与分配
cluster.routing.allocation.enable: new_primaries

热更新配置:

curl -XPUT {ip}:9200/_cluster/settings -d'{"transient":{"cluster.routing.allocation.disable_allocation": true, "cluster.routing.allocation.enable" : "new_primaries"}}'

脚本


#!/usr/bin/env python
#  -*- coding:utf-8 -*-

'''
迁移分片:
    指定日期前的分片从ssd机器迁移到普通硬盘机器
    当天新建索引分片从普通硬盘机器迁移到ssd
机器分布:
    ssd机器: es1, es2
    普通硬盘机器: es3, es4
'''

import urllib2
import json
import re
import time
import random
import datetime

es_address = 'http://x.x.x.x'

def http_get(url):
    response = urllib2.urlopen(url)
    return response.read()

def http_post(url, jsonData):
    jdata = json.dumps(jsonData)
    req = urllib2.Request(url, jdata)
    response = urllib2.urlopen(req)
    return response.read()

def move_shard(index, shard, fromNode, toNode):
    url = es_address + '/_cluster/reroute'
    try:
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : toNode
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
    except urllib2.HTTPError as e:
        toNode = "es2" if toNode=="es1" else "es1"
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : "es2"
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
        return resp
    except Exception as e:
        raise e

if __name__ == '__main__':
    keepDay = 1
    today = day = time.strftime('%Y%m%d',time.localtime(time.time()))
    dt = datetime.datetime.now() - datetime.timedelta(days=keepDay)
    day = dt.strftime("%Y%m%d")
    print ">>>>>>>>>>>>>", day, "<<<<<<<<<<<<<"
    fromNodeList = ["es4", "es5"]
    toNodeList = ["es1", "es2"]
    resp = http_get(es_address + '/_cat/shards')
    shardList = resp.split("\n")
    for shard in shardList:
        try:
            info = re.split(r'\s*', shard)
            index = info[0]
            dateObj = re.match(r'(.*)-(\d+)[\.|-](\d+)[\.|-](\d+)', index)
            if (dateObj == None):
                continue
            date = dateObj.group(2) + dateObj.group(3) + dateObj.group(4)
            if (date==today and info[7] in toNodeList):
                move_shard(index, info[1], info[7], random.choice(fromNodeList))
                print "move today: ", shard
            if (info[3]=='UNASSIGNED' or date>=day or info[7] not in fromNodeList):
                continue

            toNode = random.choice(toNodeList)
            move_shard(index, info[1], info[7], toNode)
            print info[7], '------------------->', toNode, shard
        except urllib2.HTTPError as e:
            print e, shard
        except Exception as e:
            print '-----------------------------', shard, e
            raise e

        if "gb" in info[5]:
            sleepSeconds = 600 if info[5]>"2gb" else 300
        else:
            sleepSeconds = random.randint(10, 15)
        print 'sleep %d seconds' % sleepSeconds
        time.sleep(sleepSeconds)

标签: Elasticsearch