2017年6月

Kafka 迁移 Topics

Kafka 扩展为集群后, 需要把原单机上的部分大 topic 平衡到新 broker 上
旧 kafka 的 broker id 为 0, 新机器的是 1

迁移

第一步

创建 move.json

{
    "topics": [{
        "topic": "test2"
    }],
    "version": 1
}

第二步

生成迁移分配规则 json 文件

注意 broker-list 我想所有分区都迁移到新机器, 所以只写了 1, 可以写 0,1

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file move.json --broker-list "1" --generate

执行结果:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1]},{"topic":"test2","partition":15,"replicas":[1]},{"topic":"test2","partition":6,"replicas":[1]},{"topic":"test2","partition":12,"replicas":[1]},{"topic":"test2","partition":7,"replicas":[1]},{"topic":"test2","partition":10,"replicas":[1]},{"topic":"test2","partition":13,"replicas":[1]},{"topic":"test2","partition":9,"replicas":[1]},{"topic":"test2","partition":3,"replicas":[1]},{"topic":"test2","partition":5,"replicas":[1]},{"topic":"test2","partition":1,"replicas":[1]},{"topic":"test2","partition":0,"replicas":[1]},{"topic":"test2","partition":8,"replicas":[1]},{"topic":"test2","partition":4,"replicas":[1]},{"topic":"test2","partition":11,"replicas":[1]},{"topic":"test2","partition":14,"replicas":[1]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test2","partition":2,"replicas":[1]},{"topic":"test2","partition":15,"replicas":[1]},{"topic":"test2","partition":6,"replicas":[1]},{"topic":"test2","partition":12,"replicas":[1]},{"topic":"test2","partition":7,"replicas":[1]},{"topic":"test2","partition":10,"replicas":[1]},{"topic":"test2","partition":13,"replicas":[1]},{"topic":"test2","partition":9,"replicas":[1]},{"topic":"test2","partition":3,"replicas":[1]},{"topic":"test2","partition":5,"replicas":[1]},{"topic":"test2","partition":1,"replicas":[1]},{"topic":"test2","partition":0,"replicas":[1]},{"topic":"test2","partition":8,"replicas":[1]},{"topic":"test2","partition":4,"replicas":[1]},{"topic":"test2","partition":11,"replicas":[1]},{"topic":"test2","partition":14,"replicas":[1]}]}

第三步

拷贝生成的 json 内容(第二段)到新文件 reassignment.json 中, 然后执行

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassignment.json --execute

第四步

查看 topics

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test2
Topic:test2    PartitionCount:16       ReplicationFactor:2     Configs:
        Topic: test2   Partition: 0    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 1    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 2    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 3    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 4    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 5    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 6    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 7    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 8    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 9    Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 10   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 11   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 12   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 13   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 14   Leader: 0       Replicas: 1,0   Isr: 0
        Topic: test2   Partition: 15   Leader: 0       Replicas: 1,0   Isr: 0

如果 topics 比较大, 迁移需要一会儿, 这个时候 Replicas 是 broker 的 0,1 共有, 稍后一会儿迁移完成后再查看

Topic:test2    PartitionCount:16       ReplicationFactor:1     Configs:
        Topic: test2   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 4    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 6    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 7    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 8    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 9    Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 10   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 11   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 12   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 13   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 14   Leader: 1       Replicas: 1     Isr: 1
        Topic: test2   Partition: 15   Leader: 1       Replicas: 1     Isr: 1

参考文档: http://blog.csdn.net/louisliaoxh/article/details/51605146

Kafka和Zookeeper单机扩展为集群的笔记(内附一个小问题的解决)

kafka版本 kafka_2.11-0.10.1.0
我用的 zookeeper 是 kafka 自带的
旧机器ip: x.x.x.x 新机器 y.y.y.y

起因

之前 kafka 一直是单机跑, 由于容量需要扩容, 所以新增一台机器将 zookeeperkafka 都扩展为集群
文章最后附 ansible 的安装脚本

Zookeeper

旧机器的编号是 0, 新机器编号 1, 主要修改下配置就可以自动加入集群了

#新机器上添加 zookeeper 编号
echo 1 > /data/zookeeper/myid

#新旧机器都修改 zookeeper.properties 文件
server.0=x.x.x.x:2888:3888
server.1=y.y.y.y:2888:3888

Kafka

kafka 的 broker 也需要编号, 需要在新机器上修改的配置如下, server.properties 文件

broker.id=1
advertised.listeners=PLAINTEXT://y.y.y.y:9092
zookeeper.connect=x.x.x.x:2181,y.y.y.y:2181

查看是否成功

bin/zookeeper-shell.sh localhost:2181
ls /brokers/ids/

遇到的问题

  • Kafka 启动的时候报错
[2017-06-13 10:07:24,025] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.NoClassDefFoundError: Could not initialize class kafka.network.RequestChannel$
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:114)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
        at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
        at kafka.network.Processor.run(SocketServer.scala:417)
        at java.lang.Thread.run(Thread.java:748)

一番百度搜索后, 有人说是 java 的小版本号不一致问题, 有人说要停掉旧 kafka 机器再启动. 然而我觉得并不是这么肤浅的原因, 然后没招就去搜旧的 java 安装包没找到, 果断放弃了, 最终只要加下 hosts 就解决了

# 新机器上 vim /etc/hosts
y.y.y.y y-y-y-y 

后来请教朋友得到的解释是, 光 ping 是没用的,Linux 上的很多服务都会先解析主机名的, 好吧又涨见识了

Ansible 部署脚本

# tasks/main.yml
---
- name: 检测是否已安装
  stat:
    path: /usr/local/kafka
  register: kafka

- name: 解压安装包
  unarchive: src=src/kafka_2.11-0.10.1.0.tgz dest=/usr/local/
  when: kafka.stat.exists == False

- name: 重命名目录
  shell: "{{item}}"
  with_items:
    - "mv /usr/local/kafka_2.11-0.10.1.0 /usr/local/kafka"
  when: kafka.stat.exists == False

- name: 创建 kafka 目录
  file:
    path: "{{item}}"
    state: directory
  with_items:
    - /data/zookeeper
    - /data/log/kafka

- name: init
  shell: "echo 1 > /data/zookeeper/myid"

- name: 复制 kafka & zookeeper 配置文件
  template:
    src: "{{ item }}"
    dest: "/usr/local/kafka/config/{{ item }}"
  with_items:
    - 'server.properties'
    - 'zookeeper.properties'

- name: 复制 supervisor 配置文件
  template:
    src: "{{ item }}"
    dest: "/data/supervisor/conf.d/{{ item }}"
  with_items:
    - 'kafka.conf'
    - 'zookeeper.conf'

- name: 启动 supervisor
  shell: "supervisorctl update"