标签 Elasticsearch 下的文章

es报错 es_rejected_execution_exception[status: 429]

描述

使用 go-mysql-elasticsearch 同步 mysql 数据到 elasticsearch 时, 由于量大出现一个错误, 如下:

time="2017-11-21T18:20:35+08:00" level=error msg="update index: prod_db_room, type: room, id: 556314, status: 429, error: {\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@16211f2a on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6f50e0ac[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 302613024]]\"}"

time="2017-11-21T18:20:35+08:00" level=error msg="update index: prod_db_vdoid, type: vdoid, id: 6926660, status: 429, error: {\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@50d7ff2 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6f50e0ac[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 302613024]]\"}"

分析

从报错信息初步判断是并发量大, 可用的8个线程和50个长度的队列不够用了, 处理不过来

解决

参考官方文档, 注意我这是 2.4 版本的文档, 各位看官可以根据自己的 es 版本在右侧选择对应版本的文档, 在目录中依次点击Modules - Thread pool 看你使用的版本说明, 其实这块基本通用, 不同版本变化不大

elasticsearch.yml 文件末尾添加如下配置后重启es即可:

threadpool.bulk.type: fixed
threadpool.bulk.size: 8
threadpool.bulk.queue_size: 1000

其中:

  • type 是要配置的线程池类型

    • bulk 批量操作, 也就是我们上面报错里提示的
    • index 用于索引/删除操作
    • search 计数/搜索/建议操作
    • get 获取操作
    • snapshot 用于快照/恢复操作
    • refresh 用于刷新操作
  • size 线程数量, 一般设置为 cpu 核数
  • queue_size 等待线程处理的队列容量

其它

增加线程和队列容量是一种解决办法, 另外节点数量和分片的分布也是影响原因
以批量操作为例, 队列默认配置是50个容量, 如果3个节点都是8核, 那批量操作的并发最大是 50 3 8 = 1200, 如果分片都在同一台机器上, 那可能只有 400
所以, 增加节点数和均匀分布分片也很重要


Elasticsearch 添加权限管理

Elasticsearch 默认是没有权限管理的, 只要能 ping 通地址的地方就可以读写数据, 所以还是很危险的, 这里选择使用插件 shield 来实现

环境版本


  • Elasticsearch 2.4.4

插件安装


# 这是收费插件, 安装后免费使用一个月, 到期后集群功能不能用,但基本api不受影响
bin/plugin install license 
bin/plugin install shield
service elasticsearch restart

添加用户


# lion 是用户名, 可以改成自己想要的
bin/shield/esusers useradd lion -r admin
# 再输入两次密码即可

常用用户管理命令:

bin/shield/esusers  -h    # 查看帮助
bin/shield/esusers list    # 查看用户列表
bin/shield/esusers  passwd lion # 修改密码
bin/shield/esusers userdel lion    # 删除用户

在 cli 环境下操作 elasticsearch 加 - u 用户名

curl -u lion x.x.x.x:9200/_cat/indices?pretty
# 按提示输出密码

Kibana 配置


kibana 配置文件 KAFKA_PATH/config/kibana.yml 里添加帐号密码

elasticsearch.username: "lion"
elasticsearch.password: "xxxxxxx"

再重启 kibana

Logstash 配置


elasticsearch-output 里添加两项:

elasticsearch {
    hosts => ...
    # 添加下面
    user => "lion"
    password => "xxxxxxx"
}

Hangout 配置


Hangout 是携程团队用 java 开发的代替 logstash 的一个日志手机工具, 还未提供 http ssl 认证支持...

php-elasticsearch 配置


php-elasticsearch 里初始化 elasticsearch 连接一般使用这种方式

$client = ClientBuilder::create()->setHosts($hosts)->build();

修改 $hosts 这里

第一种方式 :

$hosts = [
    // This is effectively equal to: "https://username:password!#$?*abc@foo.com:9200/"
    [
        'host' => 'foo.com',
        'port' => '9200',
        'scheme' => 'https',
        'user' => 'username',
        'password' => 'password!#$?*abc'
    ],

    // This is equal to "http://localhost:9200/"
    [
        'host' => 'localhost',    // Only host is required
    ]
];

第二种方式, 简单, 推荐

$hosts = [
    'http://user:pass@localhost:9200',       // HTTP Basic Authentication
    'http://user2:pass2@other-host.com:9200' // Different credentials on different host
];

取消权限认证


如果设置后想取消, 光删除用户是没用的,需要卸载 shield 插件

bin/plugin remove shield

python脚本实现Elasticsearch的冷热分离

起因

出于成本考虑,es 集群内节点机器配置有好有差,为了实现当天和前一天数据写在 ssd 节点上,历史数据迁移到普通节点,使用 python 写了一个小脚本,凌晨定时迁移分片,供参考. es 官方有类似 tags 的实现方式,等我实践后再来记录下

配置

启动脚本前必须修改下面两项配置,否则分片刚迁移过去,就又被均衡回来了

# 禁止集群自动分配,使主要分片或副本的分布失效
cluster.routing.allocation.disable_allocation: true 
# 新增加的分片才可以参与分配
cluster.routing.allocation.enable: new_primaries

热更新配置:

curl -XPUT {ip}:9200/_cluster/settings -d'{"transient":{"cluster.routing.allocation.disable_allocation": true, "cluster.routing.allocation.enable" : "new_primaries"}}'

脚本


#!/usr/bin/env python
#  -*- coding:utf-8 -*-

'''
迁移分片:
    指定日期前的分片从ssd机器迁移到普通硬盘机器
    当天新建索引分片从普通硬盘机器迁移到ssd
机器分布:
    ssd机器: es1, es2
    普通硬盘机器: es3, es4
'''

import urllib2
import json
import re
import time
import random
import datetime

es_address = 'http://x.x.x.x'

def http_get(url):
    response = urllib2.urlopen(url)
    return response.read()

def http_post(url, jsonData):
    jdata = json.dumps(jsonData)
    req = urllib2.Request(url, jdata)
    response = urllib2.urlopen(req)
    return response.read()

def move_shard(index, shard, fromNode, toNode):
    url = es_address + '/_cluster/reroute'
    try:
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : toNode
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
    except urllib2.HTTPError as e:
        toNode = "es2" if toNode=="es1" else "es1"
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : "es2"
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
        return resp
    except Exception as e:
        raise e

if __name__ == '__main__':
    keepDay = 1
    today = day = time.strftime('%Y%m%d',time.localtime(time.time()))
    dt = datetime.datetime.now() - datetime.timedelta(days=keepDay)
    day = dt.strftime("%Y%m%d")
    print ">>>>>>>>>>>>>", day, "<<<<<<<<<<<<<"
    fromNodeList = ["es4", "es5"]
    toNodeList = ["es1", "es2"]
    resp = http_get(es_address + '/_cat/shards')
    shardList = resp.split("\n")
    for shard in shardList:
        try:
            info = re.split(r'\s*', shard)
            index = info[0]
            dateObj = re.match(r'(.*)-(\d+)[\.|-](\d+)[\.|-](\d+)', index)
            if (dateObj == None):
                continue
            date = dateObj.group(2) + dateObj.group(3) + dateObj.group(4)
            if (date==today and info[7] in toNodeList):
                move_shard(index, info[1], info[7], random.choice(fromNodeList))
                print "move today: ", shard
            if (info[3]=='UNASSIGNED' or date>=day or info[7] not in fromNodeList):
                continue

            toNode = random.choice(toNodeList)
            move_shard(index, info[1], info[7], toNode)
            print info[7], '------------------->', toNode, shard
        except urllib2.HTTPError as e:
            print e, shard
        except Exception as e:
            print '-----------------------------', shard, e
            raise e

        if "gb" in info[5]:
            sleepSeconds = 600 if info[5]>"2gb" else 300
        else:
            sleepSeconds = random.randint(10, 15)
        print 'sleep %d seconds' % sleepSeconds
        time.sleep(sleepSeconds)

Elasticsearch查询实现类似MySQL的in/not in

别问我为什么要单独一篇文章写这个,我会告诉你这问题困扰我很久了?搞定后的酸爽谁试谁知道

数据说明

假如es里有index为lion_db_vdoid, type为t_vdoid
有两条记录,字段vdoid分别为1和2

实现IN

查询vdoid字段包含2,3的数据

  "query": {
    "terms": {
      "vdoid": [
        "2", "3"
      ]
    }
  }

mysql_in

实现NOT IN

查询vdoid字段不等于2,3的数据

  "query": {
    "bool": {
      "must_not": [
        {
          "terms": {
            "vdoid": [
              "2", "3"
            ]
          }
        }
      ]
    }
  }

mysql_not_in

启发

灵活使用must_not,结合其他系统参数,达到不可思议的结果

处理Elasticsearch集群yellow和red状态

RED

原因

red表示不是所有的主分片都可用,通常时由于某个索引的住分片为分片unassigned,只要找出这个索引的分片,手工分配即可

处理

  • 官方文档的详细说明
  • 通过curl GET http://{ESIP}:9200/_cluster/health?level=indices查看所有索引信息,查看下是哪个索引的status是red导致了集群都red了(集群索引多的时候一眼看不出来,直接把结果拷出来,搜red关键字就跟踪到索引和分片了)
  • 如果这个索引不是很重要,直接delete掉省事,集群状态马上恢复green完事~
  • 通过reroute强制分配该分片(见下文)

YELLOW

原因

yellow表示所有主分片可用,但不是所有副本分片都可用,最常见的情景是单节点时,由于es默认是有1个副本,主分片和副本不能在同一个节点上,所以副本就是未分配unassigned

处理

  • 过滤查看所有未分配索引的方式, curl -s "http://10.19.22.142:9200/_cat/shards" | grep UNASSIGNED结果如下,第一列表示索引名,第二列表示分片编号,第三列p是主分片,r是副本
curl -s "http://{ESIP}:9200/_cat/shards" | grep UNASSIGNED
eslog1                 3 p UNASSIGNED
eslog1                 3 r UNASSIGNED
eslog1                 1 p UNASSIGNED
eslog1                 1 r UNASSIGNED

分配分片

知道哪个索引的哪个分片就开始手动修复,通过reroute的allocate分配

curl -XPOST '{ESIP}:9200/_cluster/reroute' -d '{
    "commands" : [ {
          "allocate" : {
              "index" : "eslog1",
              "shard" : 4,
              "node" : "es1",
              "allow_primary" : true
          }
        }
    ]
}'

分配时可能遇到的坑,需要注意的地方


  • 分配副本时必须要带参数"allow_primary" : true, 不然会报错
  • 当集群中es版本不同时,如果这个未分配的分片是高版本生成的,不能分配到低版本节点上,反过来低版本的分片可以分配给高版本,如果遇到了,只要升级低版本节点的ES版本即可
  • (升级ES版本详见官方详细文档,我是ubuntu系统apt安装的,直接apt-get install elasticsearch升级的,elasticsearch.yml配置文件没变不用修改,但是/usr/share/elasticsearch/bin/elasticsearch文件中有个内存配置ES_HEAP_SIZE=6G需要再手动加一下&重启es)

Elk使用笔记(坑)(2017-02-17更新)

主要记录使用过程终于到的一些坑和需要注意的地方,有些坑想不起来了,以后再完善补上

elk版本


  • elasticsearch: 2.4.1
  • kibana: 4.6.3
  • logstash: 2.4.1

问题


Elasticseasrch报错,Request Timeout after 30000ms,log中无日志

一般重启elasticsearch即可,重启后节点状态状态Elasticsearch is still initializing the kibana index,这时候是elasticsearch在恢复数据(恢复数据时间可以自定义,具体看我另外一篇关于集群配置的介绍传送门),再等待几秒再观察节点状态

service elasticsearch restart 

Elasticseasrch查询list大于10000条报错,Result window is too large

貌似是2.x才有的问题,修改max_result_window默认配置,调大默认值

curl -XPUT http://{ip:port}/{mapping}/_settings -d '{ "index" : { "max_result_window" : 500000}}'

Elasticsearch服务service方式启动fail

让人着急的是日志没有信息,这时候大多是与内存有关,毕竟Es是个能吃内存的货。去安装目录bin下执行启动脚本,报Cannot allocate memory`不能分配内存。
解决:修改脚本bin/elasticsearch,注释或调小之前加的内存配置,具体看我之前写的一篇内存优化,就不往这迁移了(主要是懒)

Elasticsearch-PHP报错Elasticsearch\Common\Exceptions\NoNodesAvailableException No alive nodes found in your cluster

首先是觉得集群节点挂了,检查集群,检查节点,包括elk都正常,坑了我一圈,发现是Elasticsearch-PHP包的配置问题,官方文档中标明hosts可以写ip:port,ip,domains:port,domains的格式,结果现实并不是这样,因为我es绑定了内网地址,外网要访问,我用Nginx的domains 80端口将外网请求代理转发到内网的9200端口上,然后在Elasticsearch-PHP中就省去了80,只写了一个hosts上绑定的domains。所以只要在配置中把端口带上就好了domains:80,还没来得及看源码,应该扩展包检测没有配置端口就默认指定了9200端口,所以才连接不上

(2017年02月17日更新) 手动迁移副本,迁移成功后,原有节点的分片又跑了

例索引有两个分片1,2分别在两个节点A,B上,将分片1迁移到节点B的时候,虽然成功了,但是节点B上原有的分片2却自动移到了节点A上,因为没有开启这个配置项

curl -XPUT ip:9200/_cluster/settings -d'{
    "transient":{
        "cluster.routing.allocation.disable_allocation":true
    }
}'

优化


统一大小写

  • 操作: 在logstash中将所有字符串数据转换为小写,查询dsl语句中统一使用小写
  • 原因: kibana中查询貌似是不区分大小写的,然而有些查询聚合语句系统默认是区分了大小写,比如aggsterms.exclude
  • 执行: 应该在es的mapping template模板中设置也可以,暂还没找到具体执行办法,有知道的看官跪求告知
# logstash conf
filter{
    # 如果有json转换配置需要放置在json前
    mutate {
            lowercase => "message"
    }
}

常用命令


查询 curl -GET {command}
增改 curl -PUT {command}
删除 curl -DELETE {command}
url末尾加?pretty会自动美化结果的json数据,方便命令行下查看,例如:curl -GET "http://localhost:9200/_nodes/stats?pretty"查看节点状态

  • 查询集群状态(强烈推荐使用es的head插件管理集群)
/_cluster/health?pretty
  • 查看es节点状态
/_nodes/stats
  • 添加索引别名
/index1/_alias/index_alias
*/_alias/index_alias
  • es清理缓存(发现没什么卵用,理论上清理后第二次查询和第一次查询的速度一样慢,结果并不是,怀疑没清掉)
curl -XPOST "localhost:9200/_cache/clear"
  • es设置mapping模板
curl -XPUT 'http://{ip}:{port}/_template/eslog?pretty' -d@/tmp/mylog.template.json

扩展资料

  • logstash和kafka的整合传送
  • elkkafka的配置字段部分说明 传送
  • elasticsearch配置文件说明 传送
  • elasticsearch数据导出导入,有个elasticdump工具,还没有试验过,我推荐一个网友分享的python脚本,自己根据需求改一下就好了,elasticdump安装npm install elasticdump -g, 使用说明

Elasticsearch集群优化笔记

设置集群和节点名称

  • 设置集群名称,es默认集群名称都是elasticsearchg,是防止同网段内其他es机器自动加入了该集群
  • 设置节点名称,是方便节点管理时识别,不设置的话es每次重启会重新自动生成一串字符
cluster.name: elasticsearch-product
node.name: es-node1

集群使用单播方式通信

  • es默认是多播方式,网段内有相同集群名称的节点自动加入集群,安全上不可靠,尤其是公有机房
  • 多数云机房关闭了多播,只能使用单播咯
# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点.默认为1,对于大的集群来说,可以设置大一点的值,常用设置是N/2+1
discovery.zen.minimum_master_nodes: 2
# 探查的超时时间,默认3秒,提高一点以应对网络不好的时候,防止脑裂
discovery.zen.ping.timeout: 3s
# 当多播不可用或者集群跨网段的时候集群通信还是用单播吧
discovery.zen.ping.multicast.enabled: false
# 这是一个集群中的主节点的初始列表,当节点(主节点或者数据节点)启动时使用这个列表进行探测
discovery.zen.ping.unicast.hosts: ["192.168.10.2", "192.168.10.3", "192.168.10.4"]

设置数据恢复条件

  • 设置可以在集群重启的时候避免过多的分片交换,避免来回移动数据,消耗磁盘和带宽等资源
  • 它们是的关系,哪个条件先达到就开始恢复
# 满足几个节点时开始回复
gateway.expected_nodes: 10
# 满足时间开始数据回复
gateway.recover_after_time: 5m

指定主节点(2017年01月24日更新)

主节点设置node.master设置为true,其他节点全是false是,记住所有节点的discovery.zen.minimum_master_nodes: 1一定要改设置为1,不然集群组不起来

Elasticsearch之out_of_memory_error:Java heap space错误

环境

  • Ubuntu 14.04
  • ElasticSearch 2.4.1
  • CPU 4核i5
  • 内存 4G

背景

  • 使用elasticsearch-php执行elasticsearch搜索,报错内存溢出如下:
[Elasticsearch\Common\Exceptions\ServerErrorResponseException]  
  out_of_memory_error: Java heap space                            

[Elasticsearch\Common\Exceptions\ServerErrorResponseException]                                                                                                                         
  {"error":{"root_cause":[{"type":"out_of_memory_error","reason":"Java heap space"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":t  
  rue,"failed_shards":[{"shard":0,"index":"api-dev-2016-11-18","node":"zuUJrHUXRDWm1RX_D-0OjQ","reason":{"type":"out_of_memory_error","reason":"Java heap space"}}]},"status":500}

解决

修改elasticsearch安装目录下的bin/elasticsearch,添加

ES_HEAP_SIZE=5G

或者(Xms表示最小内存,Xmx表示最大内存)

ES_JAVA_OPTS="-Xms5g -Xmx5g"

重启elasticsearch,执行ps -ef |grep elasticsearch查看是否生效,找到-Xms5G -Xmx5G字段表示修改已生效

补充

物理内存的限制,也会导致这个内存溢出的报错,我遇到的就是这种情况,单条100W+数据的索引,本机只有4G内存,分到es上也没多少,改再大也没用,尴尬~~~

建议多查看节点状态

curl "localhost:9200/_nodes/stats"

jvm.mem.heap_used_percent如果长期在75以上,就是内存不足,该调大调大,该加内存加内存

参考elasticsearch官网说明 走你

特别建议:为了预防问题发生,都建议根据机器配置调整下集群中每台机器这个参数值,es默认512m-1g数据量大点就不够用了