admin 发布的文章

基于ELK的日志统计系统实践

总结下近期的日志系统

目前的架构如图:
请输入图片描述

部分说明


Logstash

主要用来做数据收集和传输, 至于为什么选择它, 很尴尬的说因为早期版本只有 elk, 所以也没怎么特别针对它优化。目前我们使用了三台 Logstash 节点, 一台用于收集数据(主要是UDP)传输到 Kafka 中, 这里要提醒一下, Logstash 的性能并不是很理想, 后面也会考虑其他替代方式, 如 hangout Flume 等。

Kafka

用 Kafka 做缓存, 主要是基于两个考虑。一是穷, 没那么大内存, Kafka 是存储硬盘的, 效率也挺好。二是 Kafka 支持多重消费。其它的自动支持分布式, 对日志友好, 部署简单等等。

Elasticsearch

这是重头戏, 在这过程中也摔过很多跟头, 过程就不多说了, 直接说我们现在做的一些策略。

  • 分集群。针对不同业务, 不同场景, 分多个集群。比如用于离线数据分析的集群, 心跳数据集群以及给业务端用于实时搜索的集群, 保证一个集群异常后互不影响, 也方便针对性做集群扩展。
  • 冷热分离。这个我有专门写了篇博文来介绍( 传送门 ), 这里就不再赘述。
  • 关闭索引。是的,你没有看错。尤其是日志数据, 一般短期内的数据还会被用于数据统计分析, 历史数据再被翻阅的可能性比较小, 由于倒排词典的索引常驻内存,无法 GC, 尤其随着日积月累索引越来越多, 内存捉襟见肘(主要是穷), 动辄 gc 几十秒, 告警个不停, 或者直接 gg 了, 索引我们做了定时关闭历史索引, 只保留近期一段时间的索引开启着, 如果需要查询历史数据再单独开启。
  • 其它基于配置的一些小优化, 如 slowlog, fielddata.cache 啊, 网上都有。

Kibana, Grafana

这俩都是纯做数据展示的, 初期仅有 Kibana, 针对不同集群分别配置多个 Kibana, 主要用于日志查询, 开发调试跟踪, 这里推荐一个 Kibana 的插件 Sense, 用于调试 Elasticsearch 特别好使, 支持代码提示补全, 个人觉得比 Head 里的查询好使。又多了一个 Grafana 主要也是两方面考虑, 一是 Kibana 不支持权限分配管理, 尤其是 Dashboard 做数据维度展示的时候, 任何一个浏览者都可以随意改动, 用我们运营童鞋的话来说"不敢乱点", 第二个考虑就是因为老板不喜欢, "用不惯", 囧~~~。 Grafana 支持权限分配管理, 数据源支持也很丰富, 不同 Elasticsearch 集群的数据在同一个页面展示, 尤其后期我们可能也会考虑 influxdb, 可以更好的结合 Grafana。

Elasticsearch Monitor

主要用于监控 Elasticsearch 节点健康, 很简单的逻辑, 直接去访问所有节点ip:9200, 1s 不返回结果就立即告警。

告警

先说下为什么不选用 Elastalert, 一是我们想和 Elasticsearch 接偶, 不想在它身上绑太多挂件。目前比较简单, 是直接访问 Kafka, 根据日志里定义的级别, 自己写告警规则和通知, 更灵活一点, 后面可能会考虑平台化。

Spark Streaming

实时分析, 基于 Elasticsearch 的分析已经不能满足某些业务的需求, 这一块还在试水, 后面再来分析。

Mysql to Elasticsearch

主要是基于 Elasticsearch 的全文搜索和分词功能, 同步 MySQL 的数据到 Elasticsearch 中, 给业务端提供搜索服务。由于我们数据库是在云上, 服务商不给 binlog, 早期选用的是 Elasticsearch-jdbc (注意对应elasticsearch版本), 后来同步的表增加了之后, 这玩意儿莫名就会把 Elasticsaerch 整挂了(原因没查到), 现在使用的是 Logstash 的同步插件。这里主要的问题是更新数据和删除数据, 查询出来的语句设置好唯一主键对应 Elasticsarch 里的 _id, 后面根据数据表里的时间字段增量塞到 Elasticsearch中时, _id字段相同的数据会覆盖, 达到更新的目的。 删除主要是靠软删除, 目前还没啥好办法。 最后值得一提的是, MySQL 同步 Elasticsearch 需要注意数据的敏感性, 指定需要同步的字段, 不必要的字段就别同步了, 你懂得。

API 搜索

看各编程语言的插件了, 我使用的 elasticsearch-php 这货, 先在 Kibana 的 Sense 插件里手工调试好后, 再移植到代码里写 DSL 语句。

python脚本实现Elasticsearch的冷热分离

起因

出于成本考虑,es 集群内节点机器配置有好有差,为了实现当天和前一天数据写在 ssd 节点上,历史数据迁移到普通节点,使用 python 写了一个小脚本,凌晨定时迁移分片,供参考. es 官方有类似 tags 的实现方式,等我实践后再来记录下

配置

启动脚本前必须修改下面两项配置,否则分片刚迁移过去,就又被均衡回来了

# 禁止集群自动分配,使主要分片或副本的分布失效
cluster.routing.allocation.disable_allocation: true 
# 新增加的分片才可以参与分配
cluster.routing.allocation.enable: new_primaries

热更新配置:

curl -XPUT {ip}:9200/_cluster/settings -d'{"transient":{"cluster.routing.allocation.disable_allocation": true, "cluster.routing.allocation.enable" : "new_primaries"}}'

脚本


#!/usr/bin/env python
#  -*- coding:utf-8 -*-

'''
迁移分片:
    指定日期前的分片从ssd机器迁移到普通硬盘机器
    当天新建索引分片从普通硬盘机器迁移到ssd
机器分布:
    ssd机器: es1, es2
    普通硬盘机器: es3, es4
'''

import urllib2
import json
import re
import time
import random
import datetime

es_address = 'http://x.x.x.x'

def http_get(url):
    response = urllib2.urlopen(url)
    return response.read()

def http_post(url, jsonData):
    jdata = json.dumps(jsonData)
    req = urllib2.Request(url, jdata)
    response = urllib2.urlopen(req)
    return response.read()

def move_shard(index, shard, fromNode, toNode):
    url = es_address + '/_cluster/reroute'
    try:
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : toNode
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
    except urllib2.HTTPError as e:
        toNode = "es2" if toNode=="es1" else "es1"
        jsonData = {
            "commands" : [
                {
                    "move" : {
                        "index" : index,
                        "shard" : shard,
                        "from_node" : fromNode,
                        "to_node" : "es2"
                    }
                }
            ]
        }
        resp = http_post(url, jsonData)
        return resp
    except Exception as e:
        raise e

if __name__ == '__main__':
    keepDay = 1
    today = day = time.strftime('%Y%m%d',time.localtime(time.time()))
    dt = datetime.datetime.now() - datetime.timedelta(days=keepDay)
    day = dt.strftime("%Y%m%d")
    print ">>>>>>>>>>>>>", day, "<<<<<<<<<<<<<"
    fromNodeList = ["es4", "es5"]
    toNodeList = ["es1", "es2"]
    resp = http_get(es_address + '/_cat/shards')
    shardList = resp.split("\n")
    for shard in shardList:
        try:
            info = re.split(r'\s*', shard)
            index = info[0]
            dateObj = re.match(r'(.*)-(\d+)[\.|-](\d+)[\.|-](\d+)', index)
            if (dateObj == None):
                continue
            date = dateObj.group(2) + dateObj.group(3) + dateObj.group(4)
            if (date==today and info[7] in toNodeList):
                move_shard(index, info[1], info[7], random.choice(fromNodeList))
                print "move today: ", shard
            if (info[3]=='UNASSIGNED' or date>=day or info[7] not in fromNodeList):
                continue

            toNode = random.choice(toNodeList)
            move_shard(index, info[1], info[7], toNode)
            print info[7], '------------------->', toNode, shard
        except urllib2.HTTPError as e:
            print e, shard
        except Exception as e:
            print '-----------------------------', shard, e
            raise e

        if "gb" in info[5]:
            sleepSeconds = 600 if info[5]>"2gb" else 300
        else:
            sleepSeconds = random.randint(10, 15)
        print 'sleep %d seconds' % sleepSeconds
        time.sleep(sleepSeconds)

Elasticsearch查询实现类似MySQL的in/not in

别问我为什么要单独一篇文章写这个,我会告诉你这问题困扰我很久了?搞定后的酸爽谁试谁知道

数据说明

假如es里有index为lion_db_vdoid, type为t_vdoid
有两条记录,字段vdoid分别为1和2

实现IN

查询vdoid字段包含2,3的数据

  "query": {
    "terms": {
      "vdoid": [
        "2", "3"
      ]
    }
  }

mysql_in

实现NOT IN

查询vdoid字段不等于2,3的数据

  "query": {
    "bool": {
      "must_not": [
        {
          "terms": {
            "vdoid": [
              "2", "3"
            ]
          }
        }
      ]
    }
  }

mysql_not_in

启发

灵活使用must_not,结合其他系统参数,达到不可思议的结果

处理Elasticsearch集群yellow和red状态

RED

原因

red表示不是所有的主分片都可用,通常时由于某个索引的住分片为分片unassigned,只要找出这个索引的分片,手工分配即可

处理

  • 官方文档的详细说明
  • 通过curl GET http://{ESIP}:9200/_cluster/health?level=indices查看所有索引信息,查看下是哪个索引的status是red导致了集群都red了(集群索引多的时候一眼看不出来,直接把结果拷出来,搜red关键字就跟踪到索引和分片了)
  • 如果这个索引不是很重要,直接delete掉省事,集群状态马上恢复green完事~
  • 通过reroute强制分配该分片(见下文)

YELLOW

原因

yellow表示所有主分片可用,但不是所有副本分片都可用,最常见的情景是单节点时,由于es默认是有1个副本,主分片和副本不能在同一个节点上,所以副本就是未分配unassigned

处理

  • 过滤查看所有未分配索引的方式, curl -s "http://10.19.22.142:9200/_cat/shards" | grep UNASSIGNED结果如下,第一列表示索引名,第二列表示分片编号,第三列p是主分片,r是副本
curl -s "http://{ESIP}:9200/_cat/shards" | grep UNASSIGNED
eslog1                 3 p UNASSIGNED
eslog1                 3 r UNASSIGNED
eslog1                 1 p UNASSIGNED
eslog1                 1 r UNASSIGNED

分配分片

知道哪个索引的哪个分片就开始手动修复,通过reroute的allocate分配

curl -XPOST '{ESIP}:9200/_cluster/reroute' -d '{
    "commands" : [ {
          "allocate" : {
              "index" : "eslog1",
              "shard" : 4,
              "node" : "es1",
              "allow_primary" : true
          }
        }
    ]
}'

分配时可能遇到的坑,需要注意的地方


  • 分配副本时必须要带参数"allow_primary" : true, 不然会报错
  • 当集群中es版本不同时,如果这个未分配的分片是高版本生成的,不能分配到低版本节点上,反过来低版本的分片可以分配给高版本,如果遇到了,只要升级低版本节点的ES版本即可
  • (升级ES版本详见官方详细文档,我是ubuntu系统apt安装的,直接apt-get install elasticsearch升级的,elasticsearch.yml配置文件没变不用修改,但是/usr/share/elasticsearch/bin/elasticsearch文件中有个内存配置ES_HEAP_SIZE=6G需要再手动加一下&重启es)

用Go写PHP小扩展

问:为什么是小扩展
答:因为我还没在生产环境上测试过~~囧~

环境

  • ubuntu 16.04
  • go1.6.3 linux/amd64
  • php-go
  • php7.0.9

php-go安装

参考说明文档

示例

示例模块php-go/example/hello.go中比较全的写出了大部分示例(虽然没多少注释~)

# 编译示例模块hello.so
cd cd $GOPATH/src/github.com/kitech/php-go
make
ls -lh php-go/hello.so

tips: make时可能会出现make: /usr/bin/php-config:命令未找到,手动修改下Makefile的第10行PHPCFG={yourPath}

将php-go/hello.so拷贝到你php的extension目录下,添加php.ini,php -m |grep hello能看到hello扩展没报错就ok了,重启nginx就可以在php里测试了

编写自己的小Demo

  • go
// php-go/example/jw.go
package main

import (
    "github.com/kitech/php-go/phpgo"
)

type Jw struct {
}

func NewJw() *Jw {
    return &Jw{}
}

func (j *Jw) Test(m int, n int) int {
    return m + n
}

func main() {
    panic("wtf")
}

func Jw_hello() string {
    return "Aaaa"
}

func init() {
    phpgo.InitExtension("jw", "0.1")
    phpgo.AddFunc("jw_hello", Jw_hello)
    phpgo.AddClass("Jw", NewJw)
}
  • 编译

    • 先修改php-go/Makefile,添加编译模块

      all:
      go build -v -buildmode=c-shared -o jw.so examples/jw.go
      clean:
      rm -f jw.so
- 编译
cd cd $GOPATH/src/github.com/kitech/php-go
make
ls -lh php-go/jw.so

sudo cp $GOPATH/src/github.com/kitech/php-go/jw.so  $PHP_PATH/lib/php/extensions/no-debug-non-zts-20151012
echo "jw.so" >> $PHP_PATH/etc/php.ini
sudo service nginx restart

- 写php测试模块

php-ext-result

Elk使用笔记(坑)(2017-02-17更新)

主要记录使用过程终于到的一些坑和需要注意的地方,有些坑想不起来了,以后再完善补上

elk版本


  • elasticsearch: 2.4.1
  • kibana: 4.6.3
  • logstash: 2.4.1

问题


Elasticseasrch报错,Request Timeout after 30000ms,log中无日志

一般重启elasticsearch即可,重启后节点状态状态Elasticsearch is still initializing the kibana index,这时候是elasticsearch在恢复数据(恢复数据时间可以自定义,具体看我另外一篇关于集群配置的介绍传送门),再等待几秒再观察节点状态

service elasticsearch restart 

Elasticseasrch查询list大于10000条报错,Result window is too large

貌似是2.x才有的问题,修改max_result_window默认配置,调大默认值

curl -XPUT http://{ip:port}/{mapping}/_settings -d '{ "index" : { "max_result_window" : 500000}}'

Elasticsearch服务service方式启动fail

让人着急的是日志没有信息,这时候大多是与内存有关,毕竟Es是个能吃内存的货。去安装目录bin下执行启动脚本,报Cannot allocate memory`不能分配内存。
解决:修改脚本bin/elasticsearch,注释或调小之前加的内存配置,具体看我之前写的一篇内存优化,就不往这迁移了(主要是懒)

Elasticsearch-PHP报错Elasticsearch\Common\Exceptions\NoNodesAvailableException No alive nodes found in your cluster

首先是觉得集群节点挂了,检查集群,检查节点,包括elk都正常,坑了我一圈,发现是Elasticsearch-PHP包的配置问题,官方文档中标明hosts可以写ip:port,ip,domains:port,domains的格式,结果现实并不是这样,因为我es绑定了内网地址,外网要访问,我用Nginx的domains 80端口将外网请求代理转发到内网的9200端口上,然后在Elasticsearch-PHP中就省去了80,只写了一个hosts上绑定的domains。所以只要在配置中把端口带上就好了domains:80,还没来得及看源码,应该扩展包检测没有配置端口就默认指定了9200端口,所以才连接不上

(2017年02月17日更新) 手动迁移副本,迁移成功后,原有节点的分片又跑了

例索引有两个分片1,2分别在两个节点A,B上,将分片1迁移到节点B的时候,虽然成功了,但是节点B上原有的分片2却自动移到了节点A上,因为没有开启这个配置项

curl -XPUT ip:9200/_cluster/settings -d'{
    "transient":{
        "cluster.routing.allocation.disable_allocation":true
    }
}'

优化


统一大小写

  • 操作: 在logstash中将所有字符串数据转换为小写,查询dsl语句中统一使用小写
  • 原因: kibana中查询貌似是不区分大小写的,然而有些查询聚合语句系统默认是区分了大小写,比如aggsterms.exclude
  • 执行: 应该在es的mapping template模板中设置也可以,暂还没找到具体执行办法,有知道的看官跪求告知
# logstash conf
filter{
    # 如果有json转换配置需要放置在json前
    mutate {
            lowercase => "message"
    }
}

常用命令


查询 curl -GET {command}
增改 curl -PUT {command}
删除 curl -DELETE {command}
url末尾加?pretty会自动美化结果的json数据,方便命令行下查看,例如:curl -GET "http://localhost:9200/_nodes/stats?pretty"查看节点状态

  • 查询集群状态(强烈推荐使用es的head插件管理集群)
/_cluster/health?pretty
  • 查看es节点状态
/_nodes/stats
  • 添加索引别名
/index1/_alias/index_alias
*/_alias/index_alias
  • es清理缓存(发现没什么卵用,理论上清理后第二次查询和第一次查询的速度一样慢,结果并不是,怀疑没清掉)
curl -XPOST "localhost:9200/_cache/clear"
  • es设置mapping模板
curl -XPUT 'http://{ip}:{port}/_template/eslog?pretty' -d@/tmp/mylog.template.json

扩展资料

  • logstash和kafka的整合传送
  • elkkafka的配置字段部分说明 传送
  • elasticsearch配置文件说明 传送
  • elasticsearch数据导出导入,有个elasticdump工具,还没有试验过,我推荐一个网友分享的python脚本,自己根据需求改一下就好了,elasticdump安装npm install elasticdump -g, 使用说明

Elasticsearch集群优化笔记

设置集群和节点名称

  • 设置集群名称,es默认集群名称都是elasticsearchg,是防止同网段内其他es机器自动加入了该集群
  • 设置节点名称,是方便节点管理时识别,不设置的话es每次重启会重新自动生成一串字符
cluster.name: elasticsearch-product
node.name: es-node1

集群使用单播方式通信

  • es默认是多播方式,网段内有相同集群名称的节点自动加入集群,安全上不可靠,尤其是公有机房
  • 多数云机房关闭了多播,只能使用单播咯
# 设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点.默认为1,对于大的集群来说,可以设置大一点的值,常用设置是N/2+1
discovery.zen.minimum_master_nodes: 2
# 探查的超时时间,默认3秒,提高一点以应对网络不好的时候,防止脑裂
discovery.zen.ping.timeout: 3s
# 当多播不可用或者集群跨网段的时候集群通信还是用单播吧
discovery.zen.ping.multicast.enabled: false
# 这是一个集群中的主节点的初始列表,当节点(主节点或者数据节点)启动时使用这个列表进行探测
discovery.zen.ping.unicast.hosts: ["192.168.10.2", "192.168.10.3", "192.168.10.4"]

设置数据恢复条件

  • 设置可以在集群重启的时候避免过多的分片交换,避免来回移动数据,消耗磁盘和带宽等资源
  • 它们是的关系,哪个条件先达到就开始恢复
# 满足几个节点时开始回复
gateway.expected_nodes: 10
# 满足时间开始数据回复
gateway.recover_after_time: 5m

指定主节点(2017年01月24日更新)

主节点设置node.master设置为true,其他节点全是false是,记住所有节点的discovery.zen.minimum_master_nodes: 1一定要改设置为1,不然集群组不起来

Elasticsearch之out_of_memory_error:Java heap space错误

环境

  • Ubuntu 14.04
  • ElasticSearch 2.4.1
  • CPU 4核i5
  • 内存 4G

背景

  • 使用elasticsearch-php执行elasticsearch搜索,报错内存溢出如下:
[Elasticsearch\Common\Exceptions\ServerErrorResponseException]  
  out_of_memory_error: Java heap space                            

[Elasticsearch\Common\Exceptions\ServerErrorResponseException]                                                                                                                         
  {"error":{"root_cause":[{"type":"out_of_memory_error","reason":"Java heap space"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query","grouped":t  
  rue,"failed_shards":[{"shard":0,"index":"api-dev-2016-11-18","node":"zuUJrHUXRDWm1RX_D-0OjQ","reason":{"type":"out_of_memory_error","reason":"Java heap space"}}]},"status":500}

解决

修改elasticsearch安装目录下的bin/elasticsearch,添加

ES_HEAP_SIZE=5G

或者(Xms表示最小内存,Xmx表示最大内存)

ES_JAVA_OPTS="-Xms5g -Xmx5g"

重启elasticsearch,执行ps -ef |grep elasticsearch查看是否生效,找到-Xms5G -Xmx5G字段表示修改已生效

补充

物理内存的限制,也会导致这个内存溢出的报错,我遇到的就是这种情况,单条100W+数据的索引,本机只有4G内存,分到es上也没多少,改再大也没用,尴尬~~~

建议多查看节点状态

curl "localhost:9200/_nodes/stats"

jvm.mem.heap_used_percent如果长期在75以上,就是内存不足,该调大调大,该加内存加内存

参考elasticsearch官网说明 走你

特别建议:为了预防问题发生,都建议根据机器配置调整下集群中每台机器这个参数值,es默认512m-1g数据量大点就不够用了

Hive查询非Group By字段

示例表结构和数据:

hive> desc test2;
OK
id                      int                                         
value                   string                                      
Time taken: 0.024 seconds, Fetched: 2 row(s)

hive> select * from test2;
OK
1    a
1    b
2    c
3    d
Time taken: 0.042 seconds, Fetched: 4 row(s)


如下SQL语句在MySQL中是比较常见的写法,但是在Hive中缺不行:

select id, value from test2 group by id;

在Hive中执行会报错:
FAILED: SemanticException [Error 10025]: Line 1:10 Expression not in GROUP BY key 'value'
当使用group by字句,select语句,只能包含group by包含的列。当然,在select语句,可以有多个聚合函数(例如count)

-- 聚合函数是可以的
select id, count(*) from test2 group by id;

解决办法


  • 第一种方式: 妥协。一般group by后还要查非分组字段,如果业务上这个字段也是相同的,将这个字段也加入到group by中
select id,value from test2 group by id,value;
  • 第二种方式:collect_set()
hive> select id,collect_set(value) from test2 group by id;

1    ["b","a"]
2    ["c"]
3    ["d"]

更神奇的来了:

hive> select id, collect_set(value)[0] from test2 group by id;

1    a
2    c
3    d

炸裂,有没有,惊呼: 这样也可以....

PHP+Hadoop实现数据统计分析

记一次完全独立完成的统计分析系统的搭建过程,主要用到了PHP+Hadoop+Hive+Thrift+Mysql实现

流程图

安装


Hadoop安装: http://www.powerxing.com/install-hadoop/
Hadoop集群配置: http://www.powerxing.com/install-hadoop-cluster/
Hive安装: https://chu888chu888.gitbooks.io/hadoopstudy/content/Content/8/chapter0807.html

安装具体教程请看上面链接,本地测试只用了单机配置,集群配置(后面的flume用到)看上面的详细链接, 因为之前没有接触过java的相关,这里说下遇到的几个问题.

  • Hadoop和Hive的1.x和2.x版本要对应
  • JAVA/Hadoop相关的环境变量配置,习惯了PHP的童鞋在这块可能容易忽略
  • 启动Hadoop提示Starting namenodes on [],namenodes为空,是因为没有指定ip或端口,修改hadoop/core-site.xml如下
<configuration>
<property>
<name>dfs.namenode.rpc-address</name>
<value>127.0.0.0:9001</value>
</property>
</configuration>
  • 安装完成后输入jps可以查看到NameNode,DataNode等

上报和接收


  • swoole和workerman都有简单版本实现的数据监控,包括上报,接收,存储,展示, 主要使用udp上传(swoole版本已升级为tcp长连接),redis缓存,文件持久化,highcharts展示,可以作为思路参考
    swoole-statistics : https://github.com/smalleyes/statistics

workerman-statistics : https://github.com/walkor/workerman-statistics

  • 本例使用swoole提供的接口实现UDP传输,因为上报数据是一定程度可以容错,所以选择UDP效率优先
  • 接收数据临时存储在Redis中,每隔几分钟刷到文件中存储,文件名按模块和时间分割存储,字段|分割(后面与hive对应)

数据转存


创建Hive数据表

  • 根据文件数据格式编写Hive数据表, TERMINATED BY字段与前面文件字段分隔符想对应
  • 对表按日期分区PARTITIONED BY
CREATE TABLE login (
    time int comment '登陆时间', 
    type string comment '类型,email,username,qq等', 
    device string comment '登陆设备,pc,android,ios', 
    ip string comment '登陆ip', 
    uid int comment '用户id', 
    is_old int comment '是否老用户'
) 
PARTITIONED BY (
     `date` string COMMENT 'date'
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';
  • 定时(Crontab)创建hadoop分区
hive -e "use web_stat; alter table login add if not exists partition (date='${web_stat_day}')"

转存

  • Flume监听文件目录,将数据传输到能访问Hdfs集群的服务器上,这里传输到了224机器的7000端口
#agent3表示代理名称 login
agent3.sources=source1
agent3.sinks=sink1
agent3.channels=channel1

#配置source1
agent3.sources.source1.type=spooldir
agent3.sources.source1.spoolDir=/data/releases/stat/Data/10001/
agent3.sources.source1.channels=channel1
agent3.sources.source1.fileHeader = false

#配置sink1
agent3.sinks.sink1.type=avro
agent3.sinks.sink1.hostname=192.168.23.224
agent3.sinks.sink1.port=7000
agent3.sinks.sink1.channel=channel1


#配置channel1
agent3.channels.channel1.type=file
agent3.channels.channel1.checkpointDir=/data/flume_data/checkpoint_login
agent3.channels.channel1.dataDirs=/data/flume_data/channelData_login
  • 启动flume
# 加到supervisor守护进程
/home/flume/bin/flume-ng agent -n agent3 -c /home/flume/conf/ -f /home/flume/conf/statistics/login_flume.conf  -Dflume.root.logger=info,console
  • 224机器监听7000端口,将数据写到hdfs集群
#agent1表示代理名称
agent4.sources=source1
agent4.sinks=sink1
agent4.channels=channel1


#配置source1
agent4.sources.source1.type=avro
agent4.sources.source1.bind=192.168.23.224
agent4.sources.source1.port=7000
agent4.sources.source1.channels=channel1

#配置sink1
agent4.sinks.sink1.type=hdfs
agent4.sinks.sink1.hdfs.path=hdfs://hdfs/umr-ubvzlf/uhiveubnhq5/warehouse/web_stat.db/login/date\=%Y-%m-%d
agent4.sinks.sink1.hdfs.fileType=DataStream
agent4.sinks.sink1.hdfs.filePrefix=buffer_census_
agent4.sinks.sink1.hdfs.writeFormat=TEXT
agent4.sinks.sink1.hdfs.rollInterval=30
agent4.sinks.sink1.hdfs.inUsePrefix = .
agent4.sinks.sink1.hdfs.rollSize=536870912
agent4.sinks.sink1.hdfs.useLocalTimeStamp = true
agent4.sinks.sink1.hdfs.rollCount=0
agent4.sinks.sink1.channel=channel1


#配置channel1
agent4.channels.channel1.type=file
agent4.channels.channel1.checkpointDir=/data/flume_data/login_checkpoint
agent4.channels.channel1.dataDirs=/data/flume_data/login_channelData
  • 启动
# 加到supervisor守护进程
/usr/local/flume/bin/flume-ng agent -n agent4 -c /usr/local/flume/conf/ -f /usr/local/flume/conf/statistics/login_flume.conf -Dflume.root.logger=info,console

清洗数据


通过Thrift的PHP扩展包调用Hive,编写类SQL的HQL转换为MapReduce任务读取计算HDFS里的数据, 将结果存储在MySQL中
php-thrift-client下载地址: https://github.com/garamon/php-thrift-hive-client

define('THRIFT_HIVE' , ROOT .'/libs/thrift');
$GLOBALS['THRIFT_ROOT'] = THRIFT_HIVE . '/lib';
require_once $GLOBALS['THRIFT_ROOT'] . '/packages/hive_service/ThriftHive.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php';
require_once THRIFT_HIVE . '/ThriftHiveClientEx.php';

$transport = new \TSocket('127.0.0.1', 10000);
$transport->setSendTimeout(600 * 1000);
$transport->setRecvTimeout(600 * 1000);
$this->client = new \ThriftHiveClientEx(new \TBinaryProtocol($transport));
$this->client->open();
$this->client->execute("show databases");
$result = $this->client->fetchAll();
var_dump($result);
$this->client->close();
select * from login limit 5;
// php处理
$count = 0;
foreach ($queryResult as $row) {
  $count ++;
}
  • 一次性转换为MapReduce,利用Hadoop的计算能力
select type,count(*) from login group by type;  // 这样就用到了
  • 建表使用了PARTITIONED BY分区断言后,查询就可以利用分区剪枝(input pruning)的特性,但是断言字段必须离where关键字最近才能被利用上
// 如前面的login表使用到了date分区断言,这里就得把date条件放在第一位
select count(*) from login where date='2016-08-23' and is_old=1;
  • Hive中不支持等值连表,如下
select * from dual a,dual b where a.key = b.key;

应写为:

select * from dual a join dual b on a.key = b.key;
  • Hive中不支持insert,而且逻辑上也不允许,应为hadoop是我们用来做大数据分析,而不应该作为业务细分数据

数据报表展示


这一步就简单了,读取MySQL数据,使用highcharts等工具做各种展示,也可以用crontab定时执行php脚本发送日报,周报等等

后续更新

最近看一些资料和别人沟通发现,清洗数据这一步完全不用php,可以专注于HQL实现清洗逻辑,将结果保存在hadoop中,再用Sqoop将hadoop数据和MySQL数据同步。即简化了流程,免去mysql手工插入,又做到了数据更实时,为二次清洗逻辑的连表HQL做了铺垫

Redis 内存优化案例

Redis的配置文件中有这么两项配置:

hash-max-ziplist-entries 512
hash-max-ziplist-value 64

其中的ziplist代表数据结构,是一种数据压缩方式,作用是减少内存的使用空间

在某个阀值范围内,hashtable会使用ziplist,对数据进行压缩,超出阀值后,会自动转为使用正常的hashmap结构

上面这两项就是定义这个阀值

hash-max-ziplist-entries 512

hashtable中的条目数量在512以下时,使用ziplist

hash-max-ziplist-value 64

hashtable中每个key/value的长度都小于64字节时,使用ziplist

以上2个条件中,任意一个条件超过设置值时,就不再使用ziplist了

案例

之前在网上看过一个案例,介绍了Instagram使用这项配置的实践经验

Instagram是一个超大型的图片类应用,他们有一个需求:
根据图片ID得到作者ID

最简单的实现方式就是使用string类型
图片ID为KEY,作者ID为VALUE,一条一条的set/get

经测试,图片量为3亿时,一共需要20G左右的内存

经过一些优化后,效果不明显,他们便向Redis的一个开发者咨询解决方案

得到的建议是:

对数据进行分段,使用hash结构

因为hash结构在一定数据量下会进行压缩存储,可以节约很多内存

经过反复测试,在他们的环境下,entries阀值为1000时最合适,超过的话,CPU的压力较大

所以,数据分段的方式为:

1000条数据为一段,放在一个hash表中

例如 图片ID为1234888

他在一个hash表中,这个hash表的key为 1234,里面有1000个field,其中就包括了 888,值为其作者ID

请输入图片描述

取1234888的作者ID时,就是取得key为1234的hashtable中field为888的值

经过测试,使用这种方式后,内存的使用量降到了5G左右,效果非常明显

类似的配置项还有:
list-max-ziplistzset-max-ziplist