2018-02-21

Setting up Kafka on a Raspberry Pi cluster via Ansible




After installing Docker and Kubernetes on my RPi cluster, I wanted to go on, with Kafka.


Prerequisites



First of all, I assume that you have a RPi cluster already configured to work with Ansible. If not, please refer to my previous article on the subject.

I could have refer to online archives, but as you need Java 8 and as Oracles forces you to acknowledge their license, I decided to start with archives previously downloaded.
You need :
  1. Kafka 1.0.0
  2. Java 8 (recent update) for ARM
You will then have to fill the var section in the playbook with the values matching your environment.


Ansible Playbook



The goal of this playbook is to provide a way to learn about using Kafka in a cluster, without having to cope with the installation. Therefore, you can decide whether it suits your needs or not.
And if it's the case, you still can learn about the installation process by simply reading the playbook.

The playbook is here for information, but if it were to be modified, it would be on my github.

Zookeeper and Kafka cluster configuration



One thing to have in mind when reading the Ansible playbook is that my nodes have fixed IPs that are following each other.
[pi-cluster]
192.168.0.23[1:4]

[masters]
192.168.0.231

[slaves]
192.168.0.232
192.168.0.233
192.168.0.234
As you can see, my first node IP last number is 1. The second one is 2 and so on. I took that as a fact when I wrote my playbook. If you have a different strategy, you'll have to modify the playbook accordingly.
This playbook is not that hard, but there is a little tricky part : the configuration of the ZooKeeper and Kafka clusters : I had to compose the cluster configuration from the information available through the Ansible inventory (= Ansible hosts).

For instance, the zookeeper cluster config looks like :
server.1=192.168.0.231:2888:3888
server.2=192.168.0.232:2888:3888
server.3=192.168.0.233:2888:3888
server.4=192.168.0.234:2888:3888
To reach that result, here is what I used :
- name: (ZOOKEEPER CONFIG) Adding cluster nodes to the zookeeper configuration
  lineinfile:
    path: "{{ zookeper_config }}"
    line: "server.{{ item[-1] }}={{ item + ':2888:3888' }}"
    insertafter: EOF
  with_items:
    - "{{ groups['pi-cluster'] }}"
And regarding the Kafka config, it looks like :
192.168.0.231:2181,192.168.0.232:2181,192.168.0.233:2181,192.168.0.234:2181
That was done by a simple var declaration :
"{{ groups['pi-cluster'] | join(':2181,') }}:2181"


Idempotent, yes but how ?



One of Ansible great strengths is the fact that most of its command are idempotent. Yet, in certain cases, I did not use lineinfile because I wanted to keep the original files intact to be able to start the configuration all over again without having to go to the process of copying archives and installing them from scratch. Maybe there's a better way to do it.
If so, leave a comment or better, open a PR !

The playbook




---
- name: Set Kafka up on the RPi cluster
  hosts: pi-cluster
  remote_user: pi

  tasks:
    - block:
      - name: (INSTALL) Checking if Java is already installed
        stat:
          path: "/opt/{{ java_version }}"
        register: javadir
      - name: (INSTALL) Checking if Kafka is already installed
        stat:
          path: "/opt/{{ kafka_version }}"
        register: kafkadir
      - name: (INSTALL) Unarchiving Java and Kafka
        unarchive:
          src: "{{ item }}"
          dest: /opt
          owner: pi
          group: pi
          mode: 0755
        with_items:
          - "{{ java_installer_path }}"
          - "{{ kafka_installer_path }}"
        when: javadir.stat.exists == false or kafkadir.stat.exists == false
      - name: (INSTALL) Fixing permissions for Java (unarchive user/group modification does not work with that one)
        file:
          path: /opt/{{ java_version }}
          owner: pi
          group: pi
          mode: 0755
          recurse: yes
        when: javadir.stat.exists == false
      - name: (INSTALL) Adding symbolic link for Java
        file:
          src: "/opt/{{ java_version }}/bin/java"
          dest: /usr/bin/java
          owner: pi
          group: pi
          state: link
        when: javadir.stat.exists == false
      - name: (INSTALL) Removing Kafka "windows" directory
        file:
          path: "/opt/{{ kafka_version }}/bin/windows"
          state: absent
        when: kafkadir.stat.exists == false
      - name: (BACKUP) Checking if previous config backups already exist
        stat:
          path: "{{ item }}"
        register: backup
        with_items:
          - "{{ zookeper_config }}.original"
          - "{{ kafka_config }}.original"
      - debug:
          var: backup
      - name: (BACKUP) Making backup copies of the zookeper and kafka config files, if never been backed up before
        copy:
          src: "{{ item }}"
          dest: "{{ item }}.original"
          owner: pi
          group: pi
          mode: 0755
          remote_src: yes
        with_items:
          - "{{ zookeper_config }}"
          - "{{ kafka_config }}"
        when: backup.results[0].stat.exists == false
      - name: (BACKUP) Restoring original file to be truly idempotent
        copy:
          src: "{{ item }}.original"
          dest: "{{ item }}"
          remote_src: true
        with_items:
          - "{{ zookeper_config }}"
          - "{{ kafka_config }}"
        when: backup.results[0].stat.exists == true
      - name: (ZOOKEEPER CONFIG) Creating zookeeper work directory
        file:
          path: /var/zookeeper
          owner: pi
          group: pi
          state: directory
          mode: 0755
      - name: (ZOOKEEPER CONFIG) Replacing the default config which sets the zookeeper workdir under var
        lineinfile:
          path: "{{ zookeper_config }}"
          regexp: '^dataDir=.*$'
          line: 'dataDir={{ zookeeper_workdir }}'
      - name: (ZOOKEEPER CONFIG) Adding useful configuration
        lineinfile:
          path: "{{ zookeper_config }}"
          line: "{{ item }}"
          insertafter: EOF
        with_items:
          - "tickTime=2000"
          - "initLimit=10"
          - "syncLimit=5"
      - name: (ZOOKEEPER CONFIG) Adding cluster nodes to the zookeeper configuration
        lineinfile:
          path: "{{ zookeper_config }}"
          line: "server.{{ item[-1] }}={{ item + ':2888:3888' }}"
          insertafter: EOF
        with_items:
          - "{{ groups['pi-cluster'] }}"
      - name: (ZOOKEEPER CONFIG) Removing a previous idFile
        file:
          path: "{{ zookeeper_workdir }}/myid"
          state: absent
      - name: (ZOOKEEPER CONFIG) Creating zookeeper id file
        file:
          path: "{{ zookeeper_workdir }}/myid"
          state: touch
          owner: pi
          group: pi
          mode: 0755
      - name: (ZOOKEEPER CONFIG) Filling id file with respecting id
        lineinfile:
          path: "{{ zookeeper_workdir }}/myid"
          line: "{{ inventory_hostname[-1] }}"
          insertafter: EOF
      - name: (KAFKA CONFIG) Defining the broker ID
        lineinfile:
          path: "{{ kafka_config }}"
          regexp: '^broker.id=.*$'
          line: 'broker.id={{ inventory_hostname[-1] }}'
      - name: (KAFKA CONFIG) Setting the listen address
        lineinfile:
          path: "{{ kafka_config }}"
          regexp: '^#listeners=.*$'
          line: 'listeners=PLAINTEXT://{{ inventory_hostname }}:9092'
      - name: (KAFKA CONFIG) Setting the zookeeper cluster address
        lineinfile:
          path: "{{ kafka_config }}"
          regexp: '^zookeeper.connect=.*$'
          line: 'zookeeper.connect={{ zookeeper_cluster_address }}'
      - name: (STARTUP) Starting ZooKeeper
        shell: "nohup /opt/{{ kafka_version }}/bin/zookeeper-server-start.sh {{ zookeper_config }} &"
        async: 10
        poll: 0
      - name: (STARTUP) Starting Kafka
        shell: "nohup /opt/{{ kafka_version }}/bin/kafka-server-start.sh {{ kafka_config }} &"
        async: 10
        poll: 0

      become: true
      vars:
        installer_dir: "YourPathToTheDownloadedArchives"
        java_version: "jdk1.8.0_162"
        kafka_version: "kafka_2.11-1.0.0"
        java_installer_path: "{{ installer_dir }}/jdk-8u162-linux-arm32-vfp-hflt.tar.gz"
        kafka_installer_path: "{{ installer_dir }}/{{ kafka_version }}.tgz"
        zookeper_config: "/opt/{{ kafka_version }}/config/zookeeper.properties"
        kafka_config: "/opt/{{ kafka_version }}/config/server.properties"
        zookeeper_workdir: "/var/zookeeper"
        zookeeper_cluster_address: "{{ groups['pi-cluster'] | join(':2181,') }}:2181"

Then to run it, use the following command :
ansible-playbook nameOfYourFile.yml --ask-become-pass 
Then you will be prompted the password (that's what the option --ask-become-pass do) for issuing commands as root.

Testing the cluster


Now, it's time to check that everything went smoothly. To do so, I'm going to use the test command lines shipped with the distribution.

First, start a producer from any of the nodes (or even on another machine, as long as Kafka is installed) :
/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh \
           --broker-list 192.168.0.231:9092,192.168.0.232:9092,192.168.0.233:9092,192.168.0.234:9092 --topic testopic
This will get you a command prompt in which anything you type will be sent across the cluster.

Then, start as many consumers as you want and just observe what comes up in the terminal. (You can use something like Terminator to handle multi dynamic display, even if I never managed to get it working on my Mac)
/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh \
           --zookeeper 192.168.0.231 --topic testopic from-beginning
Then type something in the producer prompt, and it should be displayed on all consumer terminals.


Troubleshooting


But if something does not work, I strongly suggest that you refer to these commands :
echo dump | nc localhost 2181
/opt/kafka_2.11-1.0.0/bin/zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/1"
Source : https://stackoverflow.com/questions/46158296/kafka-broker-not-available-at-starting

References


No comments: