python脚本实现Elasticsearch的冷热分离
起因
出于成本考虑,es 集群内节点机器配置有好有差,为了实现当天和前一天数据写在 ssd
节点上,历史数据迁移到普通节点,使用 python
写了一个小脚本,凌晨定时迁移分片,供参考. es 官方有类似 tags
的实现方式,等我实践后再来记录下
配置
启动脚本前必须修改下面两项配置,否则分片刚迁移过去,就又被均衡回来了
# 禁止集群自动分配,使主要分片或副本的分布失效
cluster.routing.allocation.disable_allocation: true
# 新增加的分片才可以参与分配
cluster.routing.allocation.enable: new_primaries
热更新配置:
curl -XPUT {ip}:9200/_cluster/settings -d'{"transient":{"cluster.routing.allocation.disable_allocation": true, "cluster.routing.allocation.enable" : "new_primaries"}}'
脚本
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
迁移分片:
指定日期前的分片从ssd机器迁移到普通硬盘机器
当天新建索引分片从普通硬盘机器迁移到ssd
机器分布:
ssd机器: es1, es2
普通硬盘机器: es3, es4
'''
import urllib2
import json
import re
import time
import random
import datetime
es_address = 'http://x.x.x.x'
def http_get(url):
response = urllib2.urlopen(url)
return response.read()
def http_post(url, jsonData):
jdata = json.dumps(jsonData)
req = urllib2.Request(url, jdata)
response = urllib2.urlopen(req)
return response.read()
def move_shard(index, shard, fromNode, toNode):
url = es_address + '/_cluster/reroute'
try:
jsonData = {
"commands" : [
{
"move" : {
"index" : index,
"shard" : shard,
"from_node" : fromNode,
"to_node" : toNode
}
}
]
}
resp = http_post(url, jsonData)
except urllib2.HTTPError as e:
toNode = "es2" if toNode=="es1" else "es1"
jsonData = {
"commands" : [
{
"move" : {
"index" : index,
"shard" : shard,
"from_node" : fromNode,
"to_node" : "es2"
}
}
]
}
resp = http_post(url, jsonData)
return resp
except Exception as e:
raise e
if __name__ == '__main__':
keepDay = 1
today = day = time.strftime('%Y%m%d',time.localtime(time.time()))
dt = datetime.datetime.now() - datetime.timedelta(days=keepDay)
day = dt.strftime("%Y%m%d")
print ">>>>>>>>>>>>>", day, "<<<<<<<<<<<<<"
fromNodeList = ["es4", "es5"]
toNodeList = ["es1", "es2"]
resp = http_get(es_address + '/_cat/shards')
shardList = resp.split("\n")
for shard in shardList:
try:
info = re.split(r'\s*', shard)
index = info[0]
dateObj = re.match(r'(.*)-(\d+)[\.|-](\d+)[\.|-](\d+)', index)
if (dateObj == None):
continue
date = dateObj.group(2) + dateObj.group(3) + dateObj.group(4)
if (date==today and info[7] in toNodeList):
move_shard(index, info[1], info[7], random.choice(fromNodeList))
print "move today: ", shard
if (info[3]=='UNASSIGNED' or date>=day or info[7] not in fromNodeList):
continue
toNode = random.choice(toNodeList)
move_shard(index, info[1], info[7], toNode)
print info[7], '------------------->', toNode, shard
except urllib2.HTTPError as e:
print e, shard
except Exception as e:
print '-----------------------------', shard, e
raise e
if "gb" in info[5]:
sleepSeconds = 600 if info[5]>"2gb" else 300
else:
sleepSeconds = random.randint(10, 15)
print 'sleep %d seconds' % sleepSeconds
time.sleep(sleepSeconds)