分类 工具 下的文章

HTTP/2 - 安装部署

本文讲述如何部署 HTTPS 和 HTTP2, 后续会再整理其他相关理论, 如 HTTP/2 的好处, HTTP/2 与 HTTPS 的关系, 从 HTTP/1.1 升级到 HTTP/2 有什么要注意的, HTTP/2 为什么不叫 HTTP/2.0

环境

Ubuntu 16.04, nginx 1.12.2(安装时需要 ssl 和 http_v2 模块)

获取 SSL 证书

获取证书有多种方式, 购买(赛门铁克,也可以在国内各大云服务厂商购买), 自签名, 免费的 FreeSSL

本文说明是使用的是免费的 FreeSSL

在 FreeSSL 上根据提示操作下载得到两个文件 full_chain.pem, private.key 就是我们后面要用到的

部署 HTTPS

部署 HTTP/2 必须要先部署 https, 这里用 nginx 配置 ssl 证书, 在 nginx.conf 中添加:

server
{
    # 默认是 listen 80
    listen 443 ssl;
    ...

    # ssl https
    ssl                  on;
    ssl_certificate      /usr/local/nginx/ssl/full_chain.pem;
    ssl_certificate_key  /usr/local/nginx/ssl/private.key;
    ...
}

如果强制 http 跳转到 https, 再自己在原有 server 里加 rewrite, 验证配置文件有效后重启 nginx, 就可以使用 https 访问站点了

# 验证 nginx 配置文件
sudo nginx -t
# reload
sudo service nginx reload

配置 HTTP/2 (Nginx)

查看 nginx 是否编译了 http_v2_module 模块

➜ sudo nginx -V
nginx version: nginx/1.13.9
built by gcc 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.9)
built with OpenSSL 1.0.2l  25 May 2017
TLS SNI support enabled
configure arguments: --user=www --group=www --prefix=/usr/local/nginx --with-http_stub_status_module --with-http_ssl_module --with-http_v2_module --with-http_gzip_static_module --with-ipv6 --with-http_sub_module --with-openssl=/home/jw/下载/lnmp1.4-full/src/openssl-1.0.2l

修改 nginx 配置文件, 在 https 配置里加 http2 关键字就可以了

server
{
    # 默认是 listen 80
    listen 443 ssl http2;
    ...

    # ssl https
    ssl                  on;
    ssl_certificate      /usr/local/nginx/ssl/full_chain.pem;
    ssl_certificate_key  /usr/local/nginx/ssl/private.key;
    ...
}

重启 nginx 就可以了, 更多关于 nginx http2 的详细配置

验证

建议安装 chrome 的扩展 HTTP/2 and SPDY indicator 方便查看, 当你浏览的网页是 HTTP/2 的时候, 闪电图标直接亮起
chrome-ext

点击图标,查看详细
chrome-detail

打开 chrome 的调试工具, 添加上 Protocol, 能看到当前域名已经变成 h2
chrome-h2-protocol

HTTP_PUSH

最新版本的 nginx-1.13.9 已经支持 HTTP/2 Server Push

es报错 es_rejected_execution_exception[status: 429]

描述

使用 go-mysql-elasticsearch 同步 mysql 数据到 elasticsearch 时, 由于量大出现一个错误, 如下:

time="2017-11-21T18:20:35+08:00" level=error msg="update index: prod_db_room, type: room, id: 556314, status: 429, error: {\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@16211f2a on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6f50e0ac[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 302613024]]\"}"

time="2017-11-21T18:20:35+08:00" level=error msg="update index: prod_db_vdoid, type: vdoid, id: 6926660, status: 429, error: {\"type\":\"es_rejected_execution_exception\",\"reason\":\"rejected execution of org.elasticsearch.transport.TransportService$4@50d7ff2 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@6f50e0ac[Running, pool size = 8, active threads = 8, queued tasks = 50, completed tasks = 302613024]]\"}"

分析

从报错信息初步判断是并发量大, 可用的8个线程和50个长度的队列不够用了, 处理不过来

解决

参考官方文档, 注意我这是 2.4 版本的文档, 各位看官可以根据自己的 es 版本在右侧选择对应版本的文档, 在目录中依次点击Modules - Thread pool 看你使用的版本说明, 其实这块基本通用, 不同版本变化不大

elasticsearch.yml 文件末尾添加如下配置后重启es即可:

threadpool.bulk.type: fixed
threadpool.bulk.size: 8
threadpool.bulk.queue_size: 1000

其中:

  • type 是要配置的线程池类型

    • bulk 批量操作, 也就是我们上面报错里提示的
    • index 用于索引/删除操作
    • search 计数/搜索/建议操作
    • get 获取操作
    • snapshot 用于快照/恢复操作
    • refresh 用于刷新操作
  • size 线程数量, 一般设置为 cpu 核数
  • queue_size 等待线程处理的队列容量

其它

增加线程和队列容量是一种解决办法, 另外节点数量和分片的分布也是影响原因
以批量操作为例, 队列默认配置是50个容量, 如果3个节点都是8核, 那批量操作的并发最大是 50 3 8 = 1200, 如果分片都在同一台机器上, 那可能只有 400
所以, 增加节点数和均匀分布分片也很重要


Elasticsearch 添加权限管理

Elasticsearch 默认是没有权限管理的, 只要能 ping 通地址的地方就可以读写数据, 所以还是很危险的, 这里选择使用插件 shield 来实现

环境版本


  • Elasticsearch 2.4.4

插件安装


# 这是收费插件, 安装后免费使用一个月, 到期后集群功能不能用,但基本api不受影响
bin/plugin install license 
bin/plugin install shield
service elasticsearch restart

添加用户


# lion 是用户名, 可以改成自己想要的
bin/shield/esusers useradd lion -r admin
# 再输入两次密码即可

常用用户管理命令:

bin/shield/esusers  -h    # 查看帮助
bin/shield/esusers list    # 查看用户列表
bin/shield/esusers  passwd lion # 修改密码
bin/shield/esusers userdel lion    # 删除用户

在 cli 环境下操作 elasticsearch 加 - u 用户名

curl -u lion x.x.x.x:9200/_cat/indices?pretty
# 按提示输出密码

Kibana 配置


kibana 配置文件 KAFKA_PATH/config/kibana.yml 里添加帐号密码

elasticsearch.username: "lion"
elasticsearch.password: "xxxxxxx"

再重启 kibana

Logstash 配置


elasticsearch-output 里添加两项:

elasticsearch {
    hosts => ...
    # 添加下面
    user => "lion"
    password => "xxxxxxx"
}

Hangout 配置


Hangout 是携程团队用 java 开发的代替 logstash 的一个日志手机工具, 还未提供 http ssl 认证支持...

php-elasticsearch 配置


php-elasticsearch 里初始化 elasticsearch 连接一般使用这种方式

$client = ClientBuilder::create()->setHosts($hosts)->build();

修改 $hosts 这里

第一种方式 :

$hosts = [
    // This is effectively equal to: "https://username:password!#$?*abc@foo.com:9200/"
    [
        'host' => 'foo.com',
        'port' => '9200',
        'scheme' => 'https',
        'user' => 'username',
        'password' => 'password!#$?*abc'
    ],

    // This is equal to "http://localhost:9200/"
    [
        'host' => 'localhost',    // Only host is required
    ]
];

第二种方式, 简单, 推荐

$hosts = [
    'http://user:pass@localhost:9200',       // HTTP Basic Authentication
    'http://user2:pass2@other-host.com:9200' // Different credentials on different host
];

取消权限认证


如果设置后想取消, 光删除用户是没用的,需要卸载 shield 插件

bin/plugin remove shield

Kafka 迁移 Topics

Kafka 扩展为集群后, 需要把原单机上的部分大 topic 平衡到新 broker 上
旧 kafka 的 broker id 为 0, 新机器的是 1

迁移

第一步

创建 move.json

{
    "topics": [{
        "topic": "test2"
    }],
    "version": 1
}

第二步

生成迁移分配规则 json 文件

注意 broker-list 我想所有分区都迁移到新机器, 所以只写了 1, 可以写 0,1

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file move.json --broker-list "1" --generate

执行结果:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1]},{"topic":"test2","partition":15,"replicas":[1]},{"topic":"test2","partition":6,"replicas":[1]},{"topic":"test2","partition":12,"replicas":[1]},{"topic":"test2","partition":7,"replicas":[1]},{"topic":"test2","partition":10,"replicas":[1]},{"topic":"test2","partition":13,"replicas":[1]},{"topic":"test2","partition":9,"replicas":[1]},{"topic":"test2","partition":3,"replicas":[1]},{"topic":"test2","partition":5,"replicas":[1]},{"topic":"test2","partition":1,"replicas":[1]},{"topic":"test2","partition":0,"replicas":[1]},{"topic":"test2","partition":8,"replicas":[1]},{"topic":"test2","partition":4,"replicas":[1]},{"topic":"test2","partition":11,"replicas":[1]},{"topic":"test2","partition":14,"replicas":[1]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1]},{"topic":"test2","partition":15,"replicas":[1]},{"topic":"test2","partition":6,"replicas":[1]},{"topic":"test2","partition":12,"replicas":[1]},{"topic":"test2","partition":7,"replicas":[1]},{"topic":"test2","partition":10,"replicas":[1]},{"topic":"test2","partition":13,"replicas":[1]},{"topic":"test2","partition":9,"replicas":[1]},{"topic":"test2","partition":3,"replicas":[1]},{"topic":"test2","partition":5,"replicas":[1]},{"topic":"test2","partition":1,"replicas":[1]},{"topic":"test2","partition":0,"replicas":[1]},{"topic":"test2","partition":8,"replicas":[1]},{"topic":"test2","partition":4,"replicas":[1]},{"topic":"test2","partition":11,"replicas":[1]},{"topic":"test2","partition":14,"replicas":[1]}]}

第三步

拷贝生成的 json 内容(第二段)到新文件 reassignment.json 中, 然后执行

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassignment.json --execute

第四步

查看 topics

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test2
Topic:test2    PartitionCount:16       ReplicationFactor:2     Configs:
        Topic: test2   Partition: 0    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 1    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 2    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 3    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 4    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 5    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 6    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 7    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 8    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 9    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 10   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 11   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 12   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 13   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 14   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 15   Leader: 0       Replicas: 1,0   Isr: 0

如果 topics 比较大, 迁移需要一会儿, 这个时候 Replicas 是 broker 的 0,1 共有, 稍后一会儿迁移完成后再查看

Topic:test2    PartitionCount:16       ReplicationFactor:1     Configs:
        Topic: test2   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 4    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 6    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 7    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 8    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 9    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 10   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 11   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 12   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 13   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 14   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 15   Leader: 1       Replicas: 1     Isr: 1

参考文档: http://blog.csdn.net/louisliaoxh/article/details/51605146

Kafka和Zookeeper单机扩展为集群的笔记(内附一个小问题的解决)

kafka版本 kafka_2.11-0.10.1.0
我用的 zookeeper 是 kafka 自带的
旧机器ip: x.x.x.x 新机器 y.y.y.y

起因

之前 kafka 一直是单机跑, 由于容量需要扩容, 所以新增一台机器将 zookeeperkafka 都扩展为集群
文章最后附 ansible 的安装脚本

Zookeeper

旧机器的编号是 0, 新机器编号 1, 主要修改下配置就可以自动加入集群了

#新机器上添加 zookeeper 编号
echo 1 > /data/zookeeper/myid

#新旧机器都修改 zookeeper.properties 文件
server.0=x.x.x.x:2888:3888
server.1=y.y.y.y:2888:3888

Kafka

kafka 的 broker 也需要编号, 需要在新机器上修改的配置如下, server.properties 文件

broker.id=1
advertised.listeners=PLAINTEXT://y.y.y.y:9092
zookeeper.connect=x.x.x.x:2181,y.y.y.y:2181

查看是否成功

bin/zookeeper-shell.sh localhost:2181
ls /brokers/ids/

遇到的问题

  • Kafka 启动的时候报错
[2017-06-13 10:07:24,025] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.NoClassDefFoundError: Could not initialize class kafka.network.RequestChannel$
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:114)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
        at kafka.network.Processor.run(SocketServer.scala:417)
        at java.lang.Thread.run(Thread.java:748)

一番百度搜索后, 有人说是 java 的小版本号不一致问题, 有人说要停掉旧 kafka 机器再启动. 然而我觉得并不是这么肤浅的原因, 然后没招就去搜旧的 java 安装包没找到, 果断放弃了, 最终只要加下 hosts 就解决了

# 新机器上 vim /etc/hosts
y.y.y.y y-y-y-y 

后来请教朋友得到的解释是, 光 ping 是没用的,Linux 上的很多服务都会先解析主机名的, 好吧又涨见识了

Ansible 部署脚本

# tasks/main.yml
---
- name: 检测是否已安装
  stat:
    path: /usr/local/kafka
  register: kafka

- name: 解压安装包
  unarchive: src=src/kafka_2.11-0.10.1.0.tgz dest=/usr/local/
  when: kafka.stat.exists == False

- name: 重命名目录
  shell: "{{item}}"
  with_items:
    - "mv /usr/local/kafka_2.11-0.10.1.0 /usr/local/kafka"
  when: kafka.stat.exists == False

- name: 创建 kafka 目录
  file:
    path: "{{item}}"
    state: directory
  with_items:
    - /data/zookeeper
    - /data/log/kafka

- name: init
  shell: "echo 1 > /data/zookeeper/myid"

- name: 复制 kafka & zookeeper 配置文件
  template:
    src: "{{ item }}"
    dest: "/usr/local/kafka/config/{{ item }}"
  with_items:
    - 'server.properties'
    - 'zookeeper.properties'

- name: 复制 supervisor 配置文件
  template:
    src: "{{ item }}"
    dest: "/data/supervisor/conf.d/{{ item }}"
  with_items:
    - 'kafka.conf'
    - 'zookeeper.conf'

- name: 启动 supervisor
  shell: "supervisorctl update"

删除kafka的consumer和topics

谨慎操作
kafka 版本 0.10

删除 consumers

原因: 重置 offset, 或者是强迫症想清空不用的 consumer
操作:

# 进入控制台
bin/zookeeper-shell.sh localhost:2181
# 查看所有消费者
ls /consumers
# fuck
rmr /consumers/hangou
# 再查看 没了
ls /consumers

删除 topics

原因: 腾出空间
操作:

  • 物理删除数据
# 进入 server.properties 里配置的数据目录 log.dirs=/data/kafka-logs 
cd /data/kafka-logs/
# 删除对应 topic 目录, 配置了多少分区这就有多少目录
rm -rf lion_sql-log*
  • 删除 zookeeper 里的记录
# 进入控制台
bin/zookeeper-shell.sh localhost:2181
# 查看当前的所有 topics, 发现刚删除的 topics 还在这里躺着
ls /brokers/topics/
# fuck you
rmr /brokers/topics/lion_sql-log
# 再查看 没了
ls /brokers/topics

Elk使用笔记(坑)(2017-02-17更新)

主要记录使用过程终于到的一些坑和需要注意的地方,有些坑想不起来了,以后再完善补上

elk版本


  • elasticsearch: 2.4.1
  • kibana: 4.6.3
  • logstash: 2.4.1

问题


Elasticseasrch报错,Request Timeout after 30000ms,log中无日志

一般重启elasticsearch即可,重启后节点状态状态Elasticsearch is still initializing the kibana index,这时候是elasticsearch在恢复数据(恢复数据时间可以自定义,具体看我另外一篇关于集群配置的介绍传送门),再等待几秒再观察节点状态

service elasticsearch restart 

Elasticseasrch查询list大于10000条报错,Result window is too large

貌似是2.x才有的问题,修改max_result_window默认配置,调大默认值

curl -XPUT http://{ip:port}/{mapping}/_settings -d '{ "index" : { "max_result_window" : 500000}}'

Elasticsearch服务service方式启动fail

让人着急的是日志没有信息,这时候大多是与内存有关,毕竟Es是个能吃内存的货。去安装目录bin下执行启动脚本,报Cannot allocate memory`不能分配内存。
解决:修改脚本bin/elasticsearch,注释或调小之前加的内存配置,具体看我之前写的一篇内存优化,就不往这迁移了(主要是懒)

Elasticsearch-PHP报错Elasticsearch\Common\Exceptions\NoNodesAvailableException No alive nodes found in your cluster

首先是觉得集群节点挂了,检查集群,检查节点,包括elk都正常,坑了我一圈,发现是Elasticsearch-PHP包的配置问题,官方文档中标明hosts可以写ip:port,ip,domains:port,domains的格式,结果现实并不是这样,因为我es绑定了内网地址,外网要访问,我用Nginx的domains 80端口将外网请求代理转发到内网的9200端口上,然后在Elasticsearch-PHP中就省去了80,只写了一个hosts上绑定的domains。所以只要在配置中把端口带上就好了domains:80,还没来得及看源码,应该扩展包检测没有配置端口就默认指定了9200端口,所以才连接不上

(2017年02月17日更新) 手动迁移副本,迁移成功后,原有节点的分片又跑了

例索引有两个分片1,2分别在两个节点A,B上,将分片1迁移到节点B的时候,虽然成功了,但是节点B上原有的分片2却自动移到了节点A上,因为没有开启这个配置项

curl -XPUT ip:9200/_cluster/settings -d'{
    "transient":{
        "cluster.routing.allocation.disable_allocation":true
    }
}'

优化


统一大小写

  • 操作: 在logstash中将所有字符串数据转换为小写,查询dsl语句中统一使用小写
  • 原因: kibana中查询貌似是不区分大小写的,然而有些查询聚合语句系统默认是区分了大小写,比如aggsterms.exclude
  • 执行: 应该在es的mapping template模板中设置也可以,暂还没找到具体执行办法,有知道的看官跪求告知
# logstash conf
filter{
    # 如果有json转换配置需要放置在json前
    mutate {
            lowercase => "message"
    }
}

常用命令


查询 curl -GET {command}
增改 curl -PUT {command}
删除 curl -DELETE {command}
url末尾加?pretty会自动美化结果的json数据,方便命令行下查看,例如:curl -GET "http://localhost:9200/_nodes/stats?pretty"查看节点状态

  • 查询集群状态(强烈推荐使用es的head插件管理集群)
/_cluster/health?pretty
  • 查看es节点状态
/_nodes/stats
  • 添加索引别名
/index1/_alias/index_alias
*/_alias/index_alias
  • es清理缓存(发现没什么卵用,理论上清理后第二次查询和第一次查询的速度一样慢,结果并不是,怀疑没清掉)
curl -XPOST "localhost:9200/_cache/clear"
  • es设置mapping模板
curl -XPUT 'http://{ip}:{port}/_template/eslog?pretty' -d@/tmp/mylog.template.json

扩展资料

  • logstash和kafka的整合传送
  • elkkafka的配置字段部分说明 传送
  • elasticsearch配置文件说明 传送
  • elasticsearch数据导出导入,有个elasticdump工具,还没有试验过,我推荐一个网友分享的python脚本,自己根据需求改一下就好了,elasticdump安装npm install elasticdump -g, 使用说明

Elasticsearch之out_of_memory_error:Java heap space错误

环境

  • Ubuntu 14.04
  • ElasticSearch 2.4.1
  • CPU 4核i5
  • 内存 4G

背景

  • 使用elasticsearch-php执行elasticsearch搜索,报错内存溢出如下:
[Elasticsearch\Common\Exceptions\ServerErrorResponseException]  
  out_of_memory_error: Java heap space                            

[Elasticsearch\Common\Exceptions\ServerErrorResponseException]                                                                                                                         
  {"error":{"root_cause":[{"type":"out_of_memory_error","reason":"Java heap space"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":t  
  rue,"failed_shards":[{"shard":0,"index":"api-dev-2016-11-18","node":"zuUJrHUXRDWm1RX_D-0OjQ","reason":{"type":"out_of_memory_error","reason":"Java heap space"}}]},"status":500}

解决

修改elasticsearch安装目录下的bin/elasticsearch,添加

ES_HEAP_SIZE=5G

或者(Xms表示最小内存,Xmx表示最大内存)

ES_JAVA_OPTS="-Xms5g -Xmx5g"

重启elasticsearch,执行ps -ef |grep elasticsearch查看是否生效,找到-Xms5G -Xmx5G字段表示修改已生效

补充

物理内存的限制,也会导致这个内存溢出的报错,我遇到的就是这种情况,单条100W+数据的索引,本机只有4G内存,分到es上也没多少,改再大也没用,尴尬~~~

建议多查看节点状态

curl "localhost:9200/_nodes/stats"

jvm.mem.heap_used_percent如果长期在75以上,就是内存不足,该调大调大,该加内存加内存

参考elasticsearch官网说明 走你

特别建议:为了预防问题发生,都建议根据机器配置调整下集群中每台机器这个参数值,es默认512m-1g数据量大点就不够用了

Hive查询非Group By字段

示例表结构和数据:

hive> desc test2;
OK
id                      int                                         
value                   string                                      
Time taken: 0.024 seconds, Fetched: 2 row(s)

hive> select * from test2;
OK
1    a
1    b
2    c
3    d
Time taken: 0.042 seconds, Fetched: 4 row(s)


如下SQL语句在MySQL中是比较常见的写法,但是在Hive中缺不行:

select id, value from test2 group by id;

在Hive中执行会报错:
FAILED: SemanticException [Error 10025]: Line 1:10 Expression not in GROUP BY key 'value'
当使用group by字句,select语句,只能包含group by包含的列。当然,在select语句,可以有多个聚合函数(例如count)

-- 聚合函数是可以的
select id, count(*) from test2 group by id;

解决办法


  • 第一种方式: 妥协。一般group by后还要查非分组字段,如果业务上这个字段也是相同的,将这个字段也加入到group by中
select id,value from test2 group by id,value;
  • 第二种方式:collect_set()
hive> select id,collect_set(value) from test2 group by id;

1    ["b","a"]
2    ["c"]
3    ["d"]

更神奇的来了:

hive> select id, collect_set(value)[0] from test2 group by id;

1    a
2    c
3    d

炸裂,有没有,惊呼: 这样也可以....

PHP+Hadoop实现数据统计分析

记一次完全独立完成的统计分析系统的搭建过程,主要用到了PHP+Hadoop+Hive+Thrift+Mysql实现

流程图

安装


Hadoop安装: http://www.powerxing.com/install-hadoop/
Hadoop集群配置: http://www.powerxing.com/install-hadoop-cluster/
Hive安装: https://chu888chu888.gitbooks.io/hadoopstudy/content/Content/8/chapter0807.html

安装具体教程请看上面链接,本地测试只用了单机配置,集群配置(后面的flume用到)看上面的详细链接, 因为之前没有接触过java的相关,这里说下遇到的几个问题.

  • Hadoop和Hive的1.x和2.x版本要对应
  • JAVA/Hadoop相关的环境变量配置,习惯了PHP的童鞋在这块可能容易忽略
  • 启动Hadoop提示Starting namenodes on [],namenodes为空,是因为没有指定ip或端口,修改hadoop/core-site.xml如下
<configuration>
<property>
<name>dfs.namenode.rpc-address</name>
<value>127.0.0.0:9001</value>
</property>
</configuration>
  • 安装完成后输入jps可以查看到NameNode,DataNode等

上报和接收


  • swoole和workerman都有简单版本实现的数据监控,包括上报,接收,存储,展示, 主要使用udp上传(swoole版本已升级为tcp长连接),redis缓存,文件持久化,highcharts展示,可以作为思路参考
    swoole-statistics : https://github.com/smalleyes/statistics

workerman-statistics : https://github.com/walkor/workerman-statistics

  • 本例使用swoole提供的接口实现UDP传输,因为上报数据是一定程度可以容错,所以选择UDP效率优先
  • 接收数据临时存储在Redis中,每隔几分钟刷到文件中存储,文件名按模块和时间分割存储,字段|分割(后面与hive对应)

数据转存


创建Hive数据表

  • 根据文件数据格式编写Hive数据表, TERMINATED BY字段与前面文件字段分隔符想对应
  • 对表按日期分区PARTITIONED BY
CREATE TABLE login (
    time int comment '登陆时间', 
    type string comment '类型,email,username,qq等', 
    device string comment '登陆设备,pc,android,ios', 
    ip string comment '登陆ip', 
    uid int comment '用户id', 
    is_old int comment '是否老用户'
) 
PARTITIONED BY (
     `date` string COMMENT 'date'
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
  • 定时(Crontab)创建hadoop分区
hive -e "use web_stat; alter table login add if not exists partition (date='${web_stat_day}')"

转存

  • Flume监听文件目录,将数据传输到能访问Hdfs集群的服务器上,这里传输到了224机器的7000端口
#agent3表示代理名称 login
agent3.sources=source1
agent3.sinks=sink1
agent3.channels=channel1

#配置source1
agent3.sources.source1.type=spooldir
agent3.sources.source1.spoolDir=/data/releases/stat/Data/10001/
agent3.sources.source1.channels=channel1
agent3.sources.source1.fileHeader = false

#配置sink1
agent3.sinks.sink1.type=avro
agent3.sinks.sink1.hostname=192.168.23.224
agent3.sinks.sink1.port=7000
agent3.sinks.sink1.channel=channel1


#配置channel1
agent3.channels.channel1.type=file
agent3.channels.channel1.checkpointDir=/data/flume_data/checkpoint_login
agent3.channels.channel1.dataDirs=/data/flume_data/channelData_login
  • 启动flume
# 加到supervisor守护进程
/home/flume/bin/flume-ng agent -n agent3 -c /home/flume/conf/ -f /home/flume/conf/statistics/login_flume.conf  -Dflume.root.logger=info,console
  • 224机器监听7000端口,将数据写到hdfs集群
#agent1表示代理名称
agent4.sources=source1
agent4.sinks=sink1
agent4.channels=channel1


#配置source1
agent4.sources.source1.type=avro
agent4.sources.source1.bind=192.168.23.224
agent4.sources.source1.port=7000
agent4.sources.source1.channels=channel1

#配置sink1
agent4.sinks.sink1.type=hdfs
agent4.sinks.sink1.hdfs.path=hdfs://hdfs/umr-ubvzlf/uhiveubnhq5/warehouse/web_stat.db/login/date\=%Y-%m-%d
agent4.sinks.sink1.hdfs.fileType=DataStream
agent4.sinks.sink1.hdfs.filePrefix=buffer_census_
agent4.sinks.sink1.hdfs.writeFormat=TEXT
agent4.sinks.sink1.hdfs.rollInterval=30
agent4.sinks.sink1.hdfs.inUsePrefix = .
agent4.sinks.sink1.hdfs.rollSize=536870912
agent4.sinks.sink1.hdfs.useLocalTimeStamp = true
agent4.sinks.sink1.hdfs.rollCount=0
agent4.sinks.sink1.channel=channel1


#配置channel1
agent4.channels.channel1.type=file
agent4.channels.channel1.checkpointDir=/data/flume_data/login_checkpoint
agent4.channels.channel1.dataDirs=/data/flume_data/login_channelData
  • 启动
# 加到supervisor守护进程
/usr/local/flume/bin/flume-ng agent -n agent4 -c /usr/local/flume/conf/ -f /usr/local/flume/conf/statistics/login_flume.conf -Dflume.root.logger=info,console

清洗数据


通过Thrift的PHP扩展包调用Hive,编写类SQL的HQL转换为MapReduce任务读取计算HDFS里的数据, 将结果存储在MySQL中
php-thrift-client下载地址: https://github.com/garamon/php-thrift-hive-client

define('THRIFT_HIVE' , ROOT .'/libs/thrift');
$GLOBALS['THRIFT_ROOT'] = THRIFT_HIVE . '/lib';
require_once $GLOBALS['THRIFT_ROOT'] . '/packages/hive_service/ThriftHive.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php';
require_once THRIFT_HIVE . '/ThriftHiveClientEx.php';

$transport = new \TSocket('127.0.0.1', 10000);
$transport->setSendTimeout(600 * 1000);
$transport->setRecvTimeout(600 * 1000);
$this->client = new \ThriftHiveClientEx(new \TBinaryProtocol($transport));
$this->client->open();
$this->client->execute("show databases");
$result = $this->client->fetchAll();
var_dump($result);
$this->client->close();
select * from login limit 5;
// php处理
$count = 0;
foreach ($queryResult as $row) {
  $count ++;
}
  • 一次性转换为MapReduce,利用Hadoop的计算能力
select type,count(*) from login group by type;  // 这样就用到了
  • 建表使用了PARTITIONED BY分区断言后,查询就可以利用分区剪枝(input pruning)的特性,但是断言字段必须离where关键字最近才能被利用上
// 如前面的login表使用到了date分区断言,这里就得把date条件放在第一位
select count(*) from login where date='2016-08-23' and is_old=1;
  • Hive中不支持等值连表,如下
select * from dual a,dual b where a.key = b.key;

应写为:

select * from dual a join dual b on a.key = b.key;
  • Hive中不支持insert,而且逻辑上也不允许,应为hadoop是我们用来做大数据分析,而不应该作为业务细分数据

数据报表展示


这一步就简单了,读取MySQL数据,使用highcharts等工具做各种展示,也可以用crontab定时执行php脚本发送日报,周报等等

后续更新

最近看一些资料和别人沟通发现,清洗数据这一步完全不用php,可以专注于HQL实现清洗逻辑,将结果保存在hadoop中,再用Sqoop将hadoop数据和MySQL数据同步。即简化了流程,免去mysql手工插入,又做到了数据更实时,为二次清洗逻辑的连表HQL做了铺垫