分类 数据 下的文章

es报错 es_rejected_execution_exception[status: 429]

描述

使用 go-mysql-elasticsearch 同步 mysql 数据到 elasticsearch 时, 由于量大出现一个错误, 如下:

time="2017-11-21T18:20:35+08:00" level=error msg="update index: prod_db_room, type: room, id: 556314, status: 429, error: {\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@16211f2a on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6f50e0ac[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 302613024]]\"}"

time="2017-11-21T18:20:35+08:00" level=error msg="update index: prod_db_vdoid, type: vdoid, id: 6926660, status: 429, error: {\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@50d7ff2 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6f50e0ac[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 302613024]]\"}"

分析

从报错信息初步判断是并发量大, 可用的8个线程和50个长度的队列不够用了, 处理不过来

解决

参考官方文档, 注意我这是 2.4 版本的文档, 各位看官可以根据自己的 es 版本在右侧选择对应版本的文档, 在目录中依次点击Modules - Thread pool 看你使用的版本说明, 其实这块基本通用, 不同版本变化不大

elasticsearch.yml 文件末尾添加如下配置后重启es即可:

threadpool.bulk.type: fixed
threadpool.bulk.size: 8
threadpool.bulk.queue_size: 1000

其中:

  • type 是要配置的线程池类型

    • bulk 批量操作, 也就是我们上面报错里提示的
    • index 用于索引/删除操作
    • search 计数/搜索/建议操作
    • get 获取操作
    • snapshot 用于快照/恢复操作
    • refresh 用于刷新操作
  • size 线程数量, 一般设置为 cpu 核数
  • queue_size 等待线程处理的队列容量

其它

增加线程和队列容量是一种解决办法, 另外节点数量和分片的分布也是影响原因
以批量操作为例, 队列默认配置是50个容量, 如果3个节点都是8核, 那批量操作的并发最大是 50 3 8 = 1200, 如果分片都在同一台机器上, 那可能只有 400
所以, 增加节点数和均匀分布分片也很重要


Elasticsearch 添加权限管理

Elasticsearch 默认是没有权限管理的, 只要能 ping 通地址的地方就可以读写数据, 所以还是很危险的, 这里选择使用插件 shield 来实现

环境版本


  • Elasticsearch 2.4.4

插件安装


# 这是收费插件, 安装后免费使用一个月, 到期后集群功能不能用,但基本api不受影响
bin/plugin install license 
bin/plugin install shield
service elasticsearch restart

添加用户


# lion 是用户名, 可以改成自己想要的
bin/shield/esusers useradd lion -r admin
# 再输入两次密码即可

常用用户管理命令:

bin/shield/esusers  -h    # 查看帮助
bin/shield/esusers list    # 查看用户列表
bin/shield/esusers  passwd lion # 修改密码
bin/shield/esusers userdel lion    # 删除用户

在 cli 环境下操作 elasticsearch 加 - u 用户名

curl -u lion x.x.x.x:9200/_cat/indices?pretty
# 按提示输出密码

Kibana 配置


kibana 配置文件 KAFKA_PATH/config/kibana.yml 里添加帐号密码

elasticsearch.username: "lion"
elasticsearch.password: "xxxxxxx"

再重启 kibana

Logstash 配置


elasticsearch-output 里添加两项:

elasticsearch {
    hosts => ...
    # 添加下面
    user => "lion"
    password => "xxxxxxx"
}

Hangout 配置


Hangout 是携程团队用 java 开发的代替 logstash 的一个日志手机工具, 还未提供 http ssl 认证支持...

php-elasticsearch 配置


php-elasticsearch 里初始化 elasticsearch 连接一般使用这种方式

$client = ClientBuilder::create()->setHosts($hosts)->build();

修改 $hosts 这里

第一种方式 :

$hosts = [
    // This is effectively equal to: "https://username:password!#$?*abc@foo.com:9200/"
    [
        'host' => 'foo.com',
        'port' => '9200',
        'scheme' => 'https',
        'user' => 'username',
        'password' => 'password!#$?*abc'
    ],

    // This is equal to "http://localhost:9200/"
    [
        'host' => 'localhost',    // Only host is required
    ]
];

第二种方式, 简单, 推荐

$hosts = [
    'http://user:pass@localhost:9200',       // HTTP Basic Authentication
    'http://user2:pass2@other-host.com:9200' // Different credentials on different host
];

取消权限认证


如果设置后想取消, 光删除用户是没用的,需要卸载 shield 插件

bin/plugin remove shield

Kafka 迁移 Topics

Kafka 扩展为集群后, 需要把原单机上的部分大 topic 平衡到新 broker 上
旧 kafka 的 broker id 为 0, 新机器的是 1

迁移

第一步

创建 move.json

{
    "topics": [{
        "topic": "test2"
    }],
    "version": 1
}

第二步

生成迁移分配规则 json 文件

注意 broker-list 我想所有分区都迁移到新机器, 所以只写了 1, 可以写 0,1

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file move.json --broker-list "1" --generate

执行结果:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1]},{"topic":"test2","partition":15,"replicas":[1]},{"topic":"test2","partition":6,"replicas":[1]},{"topic":"test2","partition":12,"replicas":[1]},{"topic":"test2","partition":7,"replicas":[1]},{"topic":"test2","partition":10,"replicas":[1]},{"topic":"test2","partition":13,"replicas":[1]},{"topic":"test2","partition":9,"replicas":[1]},{"topic":"test2","partition":3,"replicas":[1]},{"topic":"test2","partition":5,"replicas":[1]},{"topic":"test2","partition":1,"replicas":[1]},{"topic":"test2","partition":0,"replicas":[1]},{"topic":"test2","partition":8,"replicas":[1]},{"topic":"test2","partition":4,"replicas":[1]},{"topic":"test2","partition":11,"replicas":[1]},{"topic":"test2","partition":14,"replicas":[1]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1]},{"topic":"test2","partition":15,"replicas":[1]},{"topic":"test2","partition":6,"replicas":[1]},{"topic":"test2","partition":12,"replicas":[1]},{"topic":"test2","partition":7,"replicas":[1]},{"topic":"test2","partition":10,"replicas":[1]},{"topic":"test2","partition":13,"replicas":[1]},{"topic":"test2","partition":9,"replicas":[1]},{"topic":"test2","partition":3,"replicas":[1]},{"topic":"test2","partition":5,"replicas":[1]},{"topic":"test2","partition":1,"replicas":[1]},{"topic":"test2","partition":0,"replicas":[1]},{"topic":"test2","partition":8,"replicas":[1]},{"topic":"test2","partition":4,"replicas":[1]},{"topic":"test2","partition":11,"replicas":[1]},{"topic":"test2","partition":14,"replicas":[1]}]}

第三步

拷贝生成的 json 内容(第二段)到新文件 reassignment.json 中, 然后执行

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassignment.json --execute

第四步

查看 topics

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test2
Topic:test2    PartitionCount:16       ReplicationFactor:2     Configs:
        Topic: test2   Partition: 0    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 1    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 2    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 3    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 4    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 5    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 6    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 7    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 8    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 9    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 10   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 11   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 12   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 13   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 14   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 15   Leader: 0       Replicas: 1,0   Isr: 0

如果 topics 比较大, 迁移需要一会儿, 这个时候 Replicas 是 broker 的 0,1 共有, 稍后一会儿迁移完成后再查看

Topic:test2    PartitionCount:16       ReplicationFactor:1     Configs:
        Topic: test2   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 4    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 6    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 7    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 8    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 9    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 10   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 11   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 12   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 13   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 14   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 15   Leader: 1       Replicas: 1     Isr: 1

参考文档: http://blog.csdn.net/louisliaoxh/article/details/51605146

Kafka和Zookeeper单机扩展为集群的笔记(内附一个小问题的解决)

kafka版本 kafka_2.11-0.10.1.0
我用的 zookeeper 是 kafka 自带的
旧机器ip: x.x.x.x 新机器 y.y.y.y

起因

之前 kafka 一直是单机跑, 由于容量需要扩容, 所以新增一台机器将 zookeeperkafka 都扩展为集群
文章最后附 ansible 的安装脚本

Zookeeper

旧机器的编号是 0, 新机器编号 1, 主要修改下配置就可以自动加入集群了

#新机器上添加 zookeeper 编号
echo 1 > /data/zookeeper/myid

#新旧机器都修改 zookeeper.properties 文件
server.0=x.x.x.x:2888:3888
server.1=y.y.y.y:2888:3888

Kafka

kafka 的 broker 也需要编号, 需要在新机器上修改的配置如下, server.properties 文件

broker.id=1
advertised.listeners=PLAINTEXT://y.y.y.y:9092
zookeeper.connect=x.x.x.x:2181,y.y.y.y:2181

查看是否成功

bin/zookeeper-shell.sh localhost:2181
ls /brokers/ids/

遇到的问题

  • Kafka 启动的时候报错
[2017-06-13 10:07:24,025] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.NoClassDefFoundError: Could not initialize class kafka.network.RequestChannel$
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:114)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
        at kafka.network.Processor.run(SocketServer.scala:417)
        at java.lang.Thread.run(Thread.java:748)

一番百度搜索后, 有人说是 java 的小版本号不一致问题, 有人说要停掉旧 kafka 机器再启动. 然而我觉得并不是这么肤浅的原因, 然后没招就去搜旧的 java 安装包没找到, 果断放弃了, 最终只要加下 hosts 就解决了

# 新机器上 vim /etc/hosts
y.y.y.y y-y-y-y 

后来请教朋友得到的解释是, 光 ping 是没用的,Linux 上的很多服务都会先解析主机名的, 好吧又涨见识了

Ansible 部署脚本

# tasks/main.yml
---
- name: 检测是否已安装
  stat:
    path: /usr/local/kafka
  register: kafka

- name: 解压安装包
  unarchive: src=src/kafka_2.11-0.10.1.0.tgz dest=/usr/local/
  when: kafka.stat.exists == False

- name: 重命名目录
  shell: "{{item}}"
  with_items:
    - "mv /usr/local/kafka_2.11-0.10.1.0 /usr/local/kafka"
  when: kafka.stat.exists == False

- name: 创建 kafka 目录
  file:
    path: "{{item}}"
    state: directory
  with_items:
    - /data/zookeeper
    - /data/log/kafka

- name: init
  shell: "echo 1 > /data/zookeeper/myid"

- name: 复制 kafka & zookeeper 配置文件
  template:
    src: "{{ item }}"
    dest: "/usr/local/kafka/config/{{ item }}"
  with_items:
    - 'server.properties'
    - 'zookeeper.properties'

- name: 复制 supervisor 配置文件
  template:
    src: "{{ item }}"
    dest: "/data/supervisor/conf.d/{{ item }}"
  with_items:
    - 'kafka.conf'
    - 'zookeeper.conf'

- name: 启动 supervisor
  shell: "supervisorctl update"

删除kafka的consumer和topics

谨慎操作
kafka 版本 0.10

删除 consumers

原因: 重置 offset, 或者是强迫症想清空不用的 consumer
操作:

# 进入控制台
bin/zookeeper-shell.sh localhost:2181
# 查看所有消费者
ls /consumers
# fuck
rmr /consumers/hangou
# 再查看 没了
ls /consumers

删除 topics

原因: 腾出空间
操作:

  • 物理删除数据
# 进入 server.properties 里配置的数据目录 log.dirs=/data/kafka-logs 
cd /data/kafka-logs/
# 删除对应 topic 目录, 配置了多少分区这就有多少目录
rm -rf lion_sql-log*
  • 删除 zookeeper 里的记录
# 进入控制台
bin/zookeeper-shell.sh localhost:2181
# 查看当前的所有 topics, 发现刚删除的 topics 还在这里躺着
ls /brokers/topics/
# fuck you
rmr /brokers/topics/lion_sql-log
# 再查看 没了
ls /brokers/topics

基于ELK的日志统计系统实践

总结下近期的日志系统

目前的架构如图:
请输入图片描述

部分说明


Logstash

主要用来做数据收集和传输, 至于为什么选择它, 很尴尬的说因为早期版本只有 elk, 所以也没怎么特别针对它优化。目前我们使用了三台 Logstash 节点, 一台用于收集数据(主要是UDP)传输到 Kafka 中, 这里要提醒一下, Logstash 的性能并不是很理想, 后面也会考虑其他替代方式, 如 hangout Flume 等。

Kafka

用 Kafka 做缓存, 主要是基于两个考虑。一是穷, 没那么大内存, Kafka 是存储硬盘的, 效率也挺好。二是 Kafka 支持多重消费。其它的自动支持分布式, 对日志友好, 部署简单等等。

Elasticsearch

这是重头戏, 在这过程中也摔过很多跟头, 过程就不多说了, 直接说我们现在做的一些策略。

  • 分集群。针对不同业务, 不同场景, 分多个集群。比如用于离线数据分析的集群, 心跳数据集群以及给业务端用于实时搜索的集群, 保证一个集群异常后互不影响, 也方便针对性做集群扩展。
  • 冷热分离。这个我有专门写了篇博文来介绍( 传送门 ), 这里就不再赘述。
  • 关闭索引。是的,你没有看错。尤其是日志数据, 一般短期内的数据还会被用于数据统计分析, 历史数据再被翻阅的可能性比较小, 由于倒排词典的索引常驻内存,无法 GC, 尤其随着日积月累索引越来越多, 内存捉襟见肘(主要是穷), 动辄 gc 几十秒, 告警个不停, 或者直接 gg 了, 索引我们做了定时关闭历史索引, 只保留近期一段时间的索引开启着, 如果需要查询历史数据再单独开启。
  • 其它基于配置的一些小优化, 如 slowlog, fielddata.cache 啊, 网上都有。

Kibana, Grafana

这俩都是纯做数据展示的, 初期仅有 Kibana, 针对不同集群分别配置多个 Kibana, 主要用于日志查询, 开发调试跟踪, 这里推荐一个 Kibana 的插件 Sense, 用于调试 Elasticsearch 特别好使, 支持代码提示补全, 个人觉得比 Head 里的查询好使。又多了一个 Grafana 主要也是两方面考虑, 一是 Kibana 不支持权限分配管理, 尤其是 Dashboard 做数据维度展示的时候, 任何一个浏览者都可以随意改动, 用我们运营童鞋的话来说"不敢乱点", 第二个考虑就是因为老板不喜欢, "用不惯", 囧~~~。 Grafana 支持权限分配管理, 数据源支持也很丰富, 不同 Elasticsearch 集群的数据在同一个页面展示, 尤其后期我们可能也会考虑 influxdb, 可以更好的结合 Grafana。

Elasticsearch Monitor

主要用于监控 Elasticsearch 节点健康, 很简单的逻辑, 直接去访问所有节点ip:9200, 1s 不返回结果就立即告警。

告警

先说下为什么不选用 Elastalert, 一是我们想和 Elasticsearch 接偶, 不想在它身上绑太多挂件。目前比较简单, 是直接访问 Kafka, 根据日志里定义的级别, 自己写告警规则和通知, 更灵活一点, 后面可能会考虑平台化。

Spark Streaming

实时分析, 基于 Elasticsearch 的分析已经不能满足某些业务的需求, 这一块还在试水, 后面再来分析。

Mysql to Elasticsearch

主要是基于 Elasticsearch 的全文搜索和分词功能, 同步 MySQL 的数据到 Elasticsearch 中, 给业务端提供搜索服务。由于我们数据库是在云上, 服务商不给 binlog, 早期选用的是 Elasticsearch-jdbc (注意对应elasticsearch版本), 后来同步的表增加了之后, 这玩意儿莫名就会把 Elasticsaerch 整挂了(原因没查到), 现在使用的是 Logstash 的同步插件。这里主要的问题是更新数据和删除数据, 查询出来的语句设置好唯一主键对应 Elasticsarch 里的 _id, 后面根据数据表里的时间字段增量塞到 Elasticsearch中时, _id字段相同的数据会覆盖, 达到更新的目的。 删除主要是靠软删除, 目前还没啥好办法。 最后值得一提的是, MySQL 同步 Elasticsearch 需要注意数据的敏感性, 指定需要同步的字段, 不必要的字段就别同步了, 你懂得。

API 搜索

看各编程语言的插件了, 我使用的 elasticsearch-php 这货, 先在 Kibana 的 Sense 插件里手工调试好后, 再移植到代码里写 DSL 语句。

python脚本实现Elasticsearch的冷热分离

起因

出于成本考虑,es 集群内节点机器配置有好有差,为了实现当天和前一天数据写在 ssd 节点上,历史数据迁移到普通节点,使用 python 写了一个小脚本,凌晨定时迁移分片,供参考. es 官方有类似 tags 的实现方式,等我实践后再来记录下

配置

启动脚本前必须修改下面两项配置,否则分片刚迁移过去,就又被均衡回来了

# 禁止集群自动分配,使主要分片或副本的分布失效
cluster.routing.allocation.disable_allocation: true 
# 新增加的分片才可以参与分配
cluster.routing.allocation.enable: new_primaries

热更新配置:

curl -XPUT {ip}:9200/_cluster/settings -d'{"transient":{"cluster.routing.allocation.disable_allocation": true, "cluster.routing.allocation.enable" : "new_primaries"}}'

脚本


#!/usr/bin/env python
#  -*- coding:utf-8 -*-

'''
迁移分片:
    指定日期前的分片从ssd机器迁移到普通硬盘机器
    当天新建索引分片从普通硬盘机器迁移到ssd
机器分布:
    ssd机器: es1, es2
    普通硬盘机器: es3, es4
'''

import urllib2
import json
import re
import time
import random
import datetime

es_address = 'http://x.x.x.x'

def http_get(url):
    response = urllib2.urlopen(url)
    return response.read()

def http_post(url, jsonData):
    jdata = json.dumps(jsonData)
    req = urllib2.Request(url, jdata)
    response = urllib2.urlopen(req)
    return response.read()

def move_shard(index, shard, fromNode, toNode):
    url = es_address + '/_cluster/reroute'
    try:
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : toNode
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
    except urllib2.HTTPError as e:
        toNode = "es2" if toNode=="es1" else "es1"
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : "es2"
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
        return resp
    except Exception as e:
        raise e

if __name__ == '__main__':
    keepDay = 1
    today = day = time.strftime('%Y%m%d',time.localtime(time.time()))
    dt = datetime.datetime.now() - datetime.timedelta(days=keepDay)
    day = dt.strftime("%Y%m%d")
    print ">>>>>>>>>>>>>", day, "<<<<<<<<<<<<<"
    fromNodeList = ["es4", "es5"]
    toNodeList = ["es1", "es2"]
    resp = http_get(es_address + '/_cat/shards')
    shardList = resp.split("\n")
    for shard in shardList:
        try:
            info = re.split(r'\s*', shard)
            index = info[0]
            dateObj = re.match(r'(.*)-(\d+)[\.|-](\d+)[\.|-](\d+)', index)
            if (dateObj == None):
                continue
            date = dateObj.group(2) + dateObj.group(3) + dateObj.group(4)
            if (date==today and info[7] in toNodeList):
                move_shard(index, info[1], info[7], random.choice(fromNodeList))
                print "move today: ", shard
            if (info[3]=='UNASSIGNED' or date>=day or info[7] not in fromNodeList):
                continue

            toNode = random.choice(toNodeList)
            move_shard(index, info[1], info[7], toNode)
            print info[7], '------------------->', toNode, shard
        except urllib2.HTTPError as e:
            print e, shard
        except Exception as e:
            print '-----------------------------', shard, e
            raise e

        if "gb" in info[5]:
            sleepSeconds = 600 if info[5]>"2gb" else 300
        else:
            sleepSeconds = random.randint(10, 15)
        print 'sleep %d seconds' % sleepSeconds
        time.sleep(sleepSeconds)

Elasticsearch查询实现类似MySQL的in/not in

别问我为什么要单独一篇文章写这个,我会告诉你这问题困扰我很久了?搞定后的酸爽谁试谁知道

数据说明

假如es里有index为lion_db_vdoid, type为t_vdoid
有两条记录,字段vdoid分别为1和2

实现IN

查询vdoid字段包含2,3的数据

  "query": {
    "terms": {
      "vdoid": [
        "2", "3"
      ]
    }
  }

mysql_in

实现NOT IN

查询vdoid字段不等于2,3的数据

  "query": {
    "bool": {
      "must_not": [
        {
          "terms": {
            "vdoid": [
              "2", "3"
            ]
          }
        }
      ]
    }
  }

mysql_not_in

启发

灵活使用must_not,结合其他系统参数,达到不可思议的结果

处理Elasticsearch集群yellow和red状态

RED

原因

red表示不是所有的主分片都可用,通常时由于某个索引的住分片为分片unassigned,只要找出这个索引的分片,手工分配即可

处理

  • 官方文档的详细说明
  • 通过curl GET http://{ESIP}:9200/_cluster/health?level=indices查看所有索引信息,查看下是哪个索引的status是red导致了集群都red了(集群索引多的时候一眼看不出来,直接把结果拷出来,搜red关键字就跟踪到索引和分片了)
  • 如果这个索引不是很重要,直接delete掉省事,集群状态马上恢复green完事~
  • 通过reroute强制分配该分片(见下文)

YELLOW

原因

yellow表示所有主分片可用,但不是所有副本分片都可用,最常见的情景是单节点时,由于es默认是有1个副本,主分片和副本不能在同一个节点上,所以副本就是未分配unassigned

处理

  • 过滤查看所有未分配索引的方式, curl -s "http://10.19.22.142:9200/_cat/shards" | grep UNASSIGNED结果如下,第一列表示索引名,第二列表示分片编号,第三列p是主分片,r是副本
curl -s "http://{ESIP}:9200/_cat/shards" | grep UNASSIGNED
eslog1                 3 p UNASSIGNED
eslog1                 3 r UNASSIGNED
eslog1                 1 p UNASSIGNED
eslog1                 1 r UNASSIGNED

分配分片

知道哪个索引的哪个分片就开始手动修复,通过reroute的allocate分配

curl -XPOST '{ESIP}:9200/_cluster/reroute' -d '{
    "commands" : [ {
          "allocate" : {
              "index" : "eslog1",
              "shard" : 4,
              "node" : "es1",
              "allow_primary" : true
          }
        }
    ]
}'

分配时可能遇到的坑,需要注意的地方


  • 分配副本时必须要带参数"allow_primary" : true, 不然会报错
  • 当集群中es版本不同时,如果这个未分配的分片是高版本生成的,不能分配到低版本节点上,反过来低版本的分片可以分配给高版本,如果遇到了,只要升级低版本节点的ES版本即可
  • (升级ES版本详见官方详细文档,我是ubuntu系统apt安装的,直接apt-get install elasticsearch升级的,elasticsearch.yml配置文件没变不用修改,但是/usr/share/elasticsearch/bin/elasticsearch文件中有个内存配置ES_HEAP_SIZE=6G需要再手动加一下&重启es)

Elk使用笔记(坑)(2017-02-17更新)

主要记录使用过程终于到的一些坑和需要注意的地方,有些坑想不起来了,以后再完善补上

elk版本


  • elasticsearch: 2.4.1
  • kibana: 4.6.3
  • logstash: 2.4.1

问题


Elasticseasrch报错,Request Timeout after 30000ms,log中无日志

一般重启elasticsearch即可,重启后节点状态状态Elasticsearch is still initializing the kibana index,这时候是elasticsearch在恢复数据(恢复数据时间可以自定义,具体看我另外一篇关于集群配置的介绍传送门),再等待几秒再观察节点状态

service elasticsearch restart 

Elasticseasrch查询list大于10000条报错,Result window is too large

貌似是2.x才有的问题,修改max_result_window默认配置,调大默认值

curl -XPUT http://{ip:port}/{mapping}/_settings -d '{ "index" : { "max_result_window" : 500000}}'

Elasticsearch服务service方式启动fail

让人着急的是日志没有信息,这时候大多是与内存有关,毕竟Es是个能吃内存的货。去安装目录bin下执行启动脚本,报Cannot allocate memory`不能分配内存。
解决:修改脚本bin/elasticsearch,注释或调小之前加的内存配置,具体看我之前写的一篇内存优化,就不往这迁移了(主要是懒)

Elasticsearch-PHP报错Elasticsearch\Common\Exceptions\NoNodesAvailableException No alive nodes found in your cluster

首先是觉得集群节点挂了,检查集群,检查节点,包括elk都正常,坑了我一圈,发现是Elasticsearch-PHP包的配置问题,官方文档中标明hosts可以写ip:port,ip,domains:port,domains的格式,结果现实并不是这样,因为我es绑定了内网地址,外网要访问,我用Nginx的domains 80端口将外网请求代理转发到内网的9200端口上,然后在Elasticsearch-PHP中就省去了80,只写了一个hosts上绑定的domains。所以只要在配置中把端口带上就好了domains:80,还没来得及看源码,应该扩展包检测没有配置端口就默认指定了9200端口,所以才连接不上

(2017年02月17日更新) 手动迁移副本,迁移成功后,原有节点的分片又跑了

例索引有两个分片1,2分别在两个节点A,B上,将分片1迁移到节点B的时候,虽然成功了,但是节点B上原有的分片2却自动移到了节点A上,因为没有开启这个配置项

curl -XPUT ip:9200/_cluster/settings -d'{
    "transient":{
        "cluster.routing.allocation.disable_allocation":true
    }
}'

优化


统一大小写

  • 操作: 在logstash中将所有字符串数据转换为小写,查询dsl语句中统一使用小写
  • 原因: kibana中查询貌似是不区分大小写的,然而有些查询聚合语句系统默认是区分了大小写,比如aggsterms.exclude
  • 执行: 应该在es的mapping template模板中设置也可以,暂还没找到具体执行办法,有知道的看官跪求告知
# logstash conf
filter{
    # 如果有json转换配置需要放置在json前
    mutate {
            lowercase => "message"
    }
}

常用命令


查询 curl -GET {command}
增改 curl -PUT {command}
删除 curl -DELETE {command}
url末尾加?pretty会自动美化结果的json数据,方便命令行下查看,例如:curl -GET "http://localhost:9200/_nodes/stats?pretty"查看节点状态

  • 查询集群状态(强烈推荐使用es的head插件管理集群)
/_cluster/health?pretty
  • 查看es节点状态
/_nodes/stats
  • 添加索引别名
/index1/_alias/index_alias
*/_alias/index_alias
  • es清理缓存(发现没什么卵用,理论上清理后第二次查询和第一次查询的速度一样慢,结果并不是,怀疑没清掉)
curl -XPOST "localhost:9200/_cache/clear"
  • es设置mapping模板
curl -XPUT 'http://{ip}:{port}/_template/eslog?pretty' -d@/tmp/mylog.template.json

扩展资料

  • logstash和kafka的整合传送
  • elkkafka的配置字段部分说明 传送
  • elasticsearch配置文件说明 传送
  • elasticsearch数据导出导入,有个elasticdump工具,还没有试验过,我推荐一个网友分享的python脚本,自己根据需求改一下就好了,elasticdump安装npm install elasticdump -g, 使用说明

Elasticsearch集群优化笔记

设置集群和节点名称

  • 设置集群名称,es默认集群名称都是elasticsearchg,是防止同网段内其他es机器自动加入了该集群
  • 设置节点名称,是方便节点管理时识别,不设置的话es每次重启会重新自动生成一串字符
cluster.name: elasticsearch-product
node.name: es-node1

集群使用单播方式通信

  • es默认是多播方式,网段内有相同集群名称的节点自动加入集群,安全上不可靠,尤其是公有机房
  • 多数云机房关闭了多播,只能使用单播咯
# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点.默认为1,对于大的集群来说,可以设置大一点的值,常用设置是N/2+1
discovery.zen.minimum_master_nodes: 2
# 探查的超时时间,默认3秒,提高一点以应对网络不好的时候,防止脑裂
discovery.zen.ping.timeout: 3s
# 当多播不可用或者集群跨网段的时候集群通信还是用单播吧
discovery.zen.ping.multicast.enabled: false
# 这是一个集群中的主节点的初始列表,当节点(主节点或者数据节点)启动时使用这个列表进行探测
discovery.zen.ping.unicast.hosts: ["192.168.10.2", "192.168.10.3", "192.168.10.4"]

设置数据恢复条件

  • 设置可以在集群重启的时候避免过多的分片交换,避免来回移动数据,消耗磁盘和带宽等资源
  • 它们是的关系,哪个条件先达到就开始恢复
# 满足几个节点时开始回复
gateway.expected_nodes: 10
# 满足时间开始数据回复
gateway.recover_after_time: 5m

指定主节点(2017年01月24日更新)

主节点设置node.master设置为true,其他节点全是false是,记住所有节点的discovery.zen.minimum_master_nodes: 1一定要改设置为1,不然集群组不起来