Replica Entre DCs a Traves De Kafka (KafkaMirror)
Replicating Kafkas between several DCs towards a centralized Data Lake? Easy peasy! Or so I thought. This is the story of the harsh reality :)
When the price bills, bandwidth, and management started to increase, they realized that it was a bad idea and that it would only lead to various problems in the future.
So, we migrated to what we have now in hand, basically, in each DC, we have a Kafka and Zookeeper cluster that works internally.
Through KafkaMirror, we replicate to a centralized data lake where we squeeze the Hadoop cluster.
The problem comes because KafkaMirror, as far as it is understood, works super cool out of the box and should not fail or give you problems…:/
Well, it has given me all kinds of problems, the last one took me 3 days of intense effort and checking each configuration one by one until I found the damn issue (it was a silly thing, but you take it for granted and it is not like that).
How it works
KafkaMirror is a simple consumer with an embedded producer, so it connects as a consumer to the Kafka you want and injects it locally with the producer in the local Kafka.
Since it is a consumer, it allows you to have multithreading using Kafka partitions, for each partition, you can tell a KafkaMirror to consume that thread, so a single KafkaMirror reading from 5 threads can consume 5 partitions at once.
If we add to this that we can run several mirrors in several Kafkas, it gives us the possibility of having high throughput (since we read from several partitions at once) and failover (since we have several KafkaMirror reading at the same time from the same source Kafka).
All the information about the offsets (which are the last consumed messages) as well as the mirror members, is managed through the consumer groups.
In the oldConsumer, this information is saved in Zookeeper by creating a znode, in the newConsumer, it is saved directly in the Kafka brokers (and we will come back to this topic later, as it was the root of our problem).
When the members of the KafkaMirror belong to the same consumerGroup, that’s when the load is distributed among all the members of the same, so we have several Kafka servers running KafkaMirror that understand each other and distribute which topics they are going to consume from the source Kafka.
The truth is that the system works well, very well… until it stops working.
Our problem
The topology of our Kafkas is simple: for each DC distributed around the world, we have a Kafka+Zookeeper that feeds microservices, we will call them PROx.
Then we have the local DataLake, which we use for testing, BI, development, etc. We call it DWdev-1 (yes, we’ve brainstormed the name). It connects as a consumer to the PROx and injects the messages to the Kafka in DWdev-1. Then, we have DWpro-1, which is the big one, here we run all the BI, Reports, etc of all environments and they are a rack of iron.
When the info is in the destination Kafka, in DWdev-1 and DWpro-1, through KafkaConnect, we parse and make all the info look pretty to put it into Hive and use it. This KafkaConnect reads messages like any other consumer, parses them, and injects them into HDFS in ORC format for Hive to read with the best performance (this last thing, after coming from the Data Works summit in Berlin, is debatable).
The problem arises because KafkaConnect ONLY accepts the newConsumer, which is the low-level consumer that connects to Kafka brokers, reads the info
Bloody __consumeroffsets
Basically, all the information about topics, messages, and so on is saved by Kafka in a folder, in our case, it’s in a RAID without LVM called /kafka-data. All the topics that are in memory before they expire are stored there.
When a broker comes to life, it registers with Zookeeper, telling it what broker ID it has, its hostname, port, a kind of GUID, and gives it a kiss on the forehead. This information is also saved in the __consumeroffsets and is stored in the /kafka-data folder.
If, by chance, the broker ID changes, it is modified in the zooKeeper’s zNode, but not in the __consumeroffsets :( so the oldConsumer works, but the newConsumer waits for the brokers with the assigned IDs to send the data. Even if the hostname is the same, it doesn’t work. It simply doesn’t know what to do. It doesn’t fail, timeout, or throw an exception, nothing.
How did I discover this problem? By adding the newConsumer (following this stackoverflow thread) --partition 0, you force it to bring everything that partition contains. So you don’t have to wait for metadata, you give it to them. Also, if you try to get the messages from the __consumeroffsets, it says there is no leader for that topic (waaat?!?!??!)
So you go to Zookeeper and look at who the leaders of the partitions in _/_consumeroffsets are. It tells you that the servers have IDs:
- 1001
- 1002
- 1002
However, in the zNode, in brokers, it says that the servers have IDs:
- 1010
- 1011
- 1012
In total, the newConsumer was waiting to obtain partitions from servers 1001, 1002, and 1003, but the oldConsumer knew that the leaders of the topics were servers 1010, 1011, and 1012.
The solution? Physically delete _/_consumeroffsets from /kafka-data and restart Kafka. Worked like a charm.
Como montar la replicacion y no morir en el intento
Despues de este pequeno detour y de explicar los motivos de montar la replicacion, vamos a empezar con como los he montado, un punto imporante es tener el kafkaMirror como un servicio, pero no es mas que un sh al que le pasas parametros, con lo cual entra en juego daemonize que basicamente, daemoniza todo.
Luego, kafkaMirror necesita de una configuracion de un consumer para leer del kafka de origen y una de consumer, para escribir en el destino. Lo que he hecho es llamar a un wrapper que corre el daemonize que a su vez ejecuta el sh de consumer / producer con los parametros.
instalamos daemonize
yum install daemonize -ycreamos las carpetas para el daemon (PID, log y lock) y le ponemos los permisos que creamos oportunos
mkdir -p /opt/kafkaMirror/daemon; chown -Rvf kafka.kafka /opt/kafkaMirror; chmod -Rvf 700 /opt/kafkaMirrortransferimos la configuracion del consumer (Esta es la configuracion que yo he encontrado que es buena como punta de partida)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44# consumer (from remote kafka >>> DWdev-1)
#################################
#OLD method
zookeeper.connect=ZOOKEEPER:PUERTO
zookeeper.session.timeout.ms=10000
zookeeper.sync.time.ms=5000
rebalance.max.retries=10
auto.offset.reset=largest
consumer.timeout.ms=5000
exclude.external.topics=true
fetch.message.max.bytes=10000000
rebalance.backoff.ms=10000
socket.receive.buffer.bytes=33554432
partition.assignment.strategy=roundrobin
#To commit on zookeeper and kafkabrokers
dual.commit.enabled=true
############
#NEW method
#bootstrap.servers=KAFKABROKER:PUERTO
#
#Retrieve the last commited offset (instead the earliest one)
#auto.offset.reset=latest
#
#heartbeat.interval.ms=6000
#session.timeout.ms=10000
#
#auto commit to zookeeper
#enable.auto.commit=true
#exclude.internal.topics=true
#request.timeout.ms=20000
#
#Leader metadata refresh
#metadata.max.age.ms=120000
#reconnect.backoff.max.ms=5000
#retry.backoff.ms=1000
############
#STANDAR config
client.id=HOSTNAME
group.id=DWdev1transferimos la configuracion del produccer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18#KafkaMirror Producer
bootstrap.servers=KAFKABROKERS:PUERTO
connect.timeout.ms=20000
(Disabled due to the kafka version: should be enabled on 10.2 (we are on 10.1))
#producer.type=async
#key.class=kafka.serializer.DefaultEncoder
(https://engineering.salesforce.com/mirrormaker-performance-tuning-63afaed12c21)
compression.type=gzip
linger.ms=15000
batch.size=50000
buffer.memory=2000000000
max.request.size=1000000
#avoid data lost
#https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_kafka-component-guide/content/avoiding-data-loss.html
block.on.buffer.full=true
acks=-1
max.in.flight.requests.per.connection=1
retries=5transferimos el wrapper, uno por servicio si teneis como yo, que traer datos de varios entorno a uno central
wrapper_.sh 1
2
3
4
5
6
7
8
9
10
11
12#!/bin/bash
#Nicolas Tobias twitter: @nicolastobias_ ## Marzo-2017
#Simple Wrapper
################################################################################
daemonize -c /opt/kafkaMirror/daemon /
-e /opt/kafkaMirror/daemon/<KM-SERVICIO>.error.log /
-o /opt/kafkaMirror/daemon/<KM-SERVICIO>.ouput.log /
-p /opt/kafkaMirror/daemon/<KM-SERVICIO>.pid /
-l /opt/kafkaMirror/daemon/<KM-SERVICIO>.lock /usr/hdp/current/kafka-broker/bin/kafka-mirror-maker.sh /
--consumer.config /opt/kafkaMirror/<ENV>-consumer.cfg /
--producer.config /opt/kafkaMirror/<ENV>-producer.cfg /
--whitelist="topic1,topic2,..."ajustamos los permisos al usuario kafka
chown -Rvf kafka.kafka /opt/kafkaMirror; chmod -Rvf 700 /opt/kafkaMirrorhabilitamos el servicio en systemd un fichero como este por servicio en
/usr/lib/systemd/system/<SERVICIO>.service1
2
3
4
5
6
7
8
9[Unit]
Description= <DESCRIPCION>
[Service]
Type=forking
ExecStart=/opt/kafkaMirror/daemon/<WRAPPER SCRIPT>
[Install]
WantedBy=multi-user.targetponemos los permisos correctos de 0755 al wrapper
recargamos el daemon de systemd
systemctl daemon-reloadhabilitamos y arrancamos
systemctl enable <SERVICIO>; systemctl start <SERVICIO>
Si todo ha ido bien, aunque no sea un error, el daemonize se toma la salida del kafkamirror de consola como un error y lo mando al fichero .error.log, asi que mirar ahi a ver si os da algun mensaje.
listo!
Con esto, deberiais de tener el KafkaMirror arrancado en un servicio que usando systemctl status os dara el estado del servicio.
1 | << CORP >>> root@hdp-dw-1-nn-1:/home/nicolast# systemctl status kafkaMirror-pro2 |
Playbook de ansible
Podria ponerlo en github pero por el momento no he tenido tiempo, asi que si quereis, asi teneis lo teneis zipeado.
Echarlo un ojo al vars para configurar la variables a gusto :)