Notice
Recent Posts
Recent Comments
Link
«   2025/05   »
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
Archives
Today
Total
관리 메뉴

더비창고방

MirrorMaker 연동/기동 방법 본문

STUDY

MirrorMaker 연동/기동 방법

moongzeee 2022. 12. 20. 17:37

[목차]

  1. 개요
    1.1 MirrorMaker란?
  2. MirrorMaker 연동
    2.1 MirrorMaker Instance 생성
    2.2 Consumer API 설정
    2.3 Producer API 설정
  3. MirrorMaker 기동
    3.1 MirrorMaker 실행
    3.2 Mirroring monitoring

 

1. 개요

1.1 MirrorMaker란?

서로 다른 두개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션으로,

MirrorMaker의 각 consumer는 source kafka로 부터 할당 된 토픽과 파티션들의 데이터를 읽은 후 공유되는 producer를 사용해, 그 데이터를 target kafka 클러스터에 전송한다.

2. MirrorMaker 연동

2.1 MirrorMaker Instance 생성

EC2내 아래 명령어를 실행하여 Kafka를 설치한다.

sudo yum install -y java-1.8.0-openjdk-devel.x86_64
wget <https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz>
tar -xvf kafka_2.12-2.6.2.tgz

2.2 Consumer API 설정

<step1> Source Kafka 내 토픽 연동을 위한 SSL/SASL 인증 설정

** Source Kafka가 SSL/SASL 인증을 통해 연동해야하면 아래 설정 필수

MirrorMaker 인스턴스에 source kafka의 cert 파일을 /var/ssl/private
경로로 옮긴다.

SASL 인증을 위한 consumer conf 파일을 kafka/conf 폴더 하위에 생성한다.

— consumer-jaas.conf

bootstrap.servers=SASL_PLAINTEXT://{source kafka bootstrap-server}
group.id={consumer group id}

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \\
required username="username" password="password";

<step2> Source Kafka 내 토픽 정보 확인

** Target Kafka 내 토픽 생성시 필요한 정보 (토픽명/파티션 수)를 조회하는 과정

아래 명령어를 실행하여 mirroring대상 topic의 정보 (토픽명, 파티션 수)를 조회한다.

[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$./bin/kafka-topics.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--topic {topic_name} \\
--describe \\
--command-config config/consumer-jaas.conf

** source kafka에서 토픽에 대한 describe 권한이 없는경우, 아래와 같이 topic 정보를 조회한다.

토픽 명칭 조회
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$./bin/kafka-topics.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--list \\
--command-config config/consumer-jaas.conf

토픽 파티션 수 조회
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$./bin/kafka-consumer-groups.sh \\
--boostrap-server {source kafka bootstrap-server} \\
--command-config config/consumer-jaas.conf \\
--group {group id} \\
--describe

<step3> Source Kafka Consumer api 동작 check

아래 명령어를 실행하여 topic의 메세지가 출력되면, Consumer api가 정상적으로 동작하는 것이다.

[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-console-consumer.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--topic {topic name} \\
--consumer.config config/consumer-jaas.conf

<step4> MirrorMaker consumer properties 설정

** Source Kafka → MirrorMaker topic을 pull 하기위한 consumer properties 설정

Kafka/config 폴더 내 아래와 같이 Source kafka에서 데이터를 pull하기 위한 configure 설정한 ‘mm-consumer.properties’ 파일을 생성한다.

[mm-consumer.properties]

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    <http://www.apache.org/licenses/LICENSE-2.0>
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers={source kafka bootstrap server}

# consumer group id
group.id={consuemr group id}
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
enable.auto.commit=false
auto.offset.reset=latest

#seralize 
#kafka record key value json format string
key.deserializer=org.apache.kafka.commom.serialization.StringDeserializer
value.deserializer=org.apahce.kafka.common.serialization.StringDeserializer

#sasl - kafka .
# Plain user / password .
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\
username="username" password="password";
#ssl
# kafka server .
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=contentsds5756
ssl.protocol=TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1vi 

2.3 Producer API 설정

<step1> Mirrormaker → Target Kafka SSL 설정

** MirrorMaker의 인스턴스내 JKS truststore를 생성하여, MirrorMaker와 Target Kafka 간의 SSL 통신설정을 한다.

아래 명령어를 실행해 kafka.client.truststore.jks 를 생성한다.

# cacerts -> kafka.client.truststore.jks 
[ec2-user@ip-XX-XX-X-XXX ~]$ cp /usr/lib/jvm/{JDKFolder}/jre/lib/security/cacerts \\
/tmp/kafka.client.truststore.jks

kafka/bin 폴더 내 아래 configure를 설정한 client.properties 파일을 생성한다.

[client.properties]

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

<step2> Target Kafka 내 토픽 생성

** 2.2 Consumer API 설정 <step2> 에서 확인한 Source Kafka 토픽정보 (토픽명, 파티션 수) 와 같은 토픽을 생성한다.

아래 명령어를 실행해 Target Kafka내 토픽 생성한다.

[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-topics.sh \\
--create \\
--boostrap-server {target kafka bootstrap server} \\
--topic {topic name} \\
--replication-factor  1 \\
--partitions {number of partitions} \\
--config retention.ms=259200000

<step3> MirrorMaker producer properties 설정

** MirrorMaker → Target Kafka 토픽을 push 하기위한 producer properties 설정

Kafka/config 폴더 내 아래와 같이 Target kafka에서 데이터를 push하기 위한 configure 설정한 ‘mm-producer.properties’ 파일을 생성한다.

[mm-producer.properties]

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ..
bootstrap.servers={target kafka bootstrap server}
# broker number (acks parameter default value is -1. if not edit acks, it might be occured replicator lackage error )
acks=1
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

## Avoiding Data loss
max.in.flight.requests.per.connection=1
#retries=Int.Max_Value
#max.block.ms=Long.MAX_VALUE

#ssl
#kafka server
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

3. MirrorMaker 기동

3.1 MirrorMaker 실행

<step1> Kafka Heap설정을 위한 Memory Setting을 한다.

[/etc/profile]

export KAFKA_HEAP_OPTS='-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC \\
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 \\
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'

<step2> MirrorMaker 실행

아래 명령어를 실행하여 MirrorMaker를 기동한다.

[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-mirror-maker.sh \\
--consumer.config config/mm-consumer.properties \\
--producer.config config/mm-producer.properties \\ 
--whitelist {topic name} \\
--num.streams={setting num}

<step3> 토픽 mirroring 확인

consumer console을 이용해 Source Kafka → Target Kafka 로 토픽이 정상적으로 mirroring 되는지 확인한다.

[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-console-consumer.sh \\
--bootstrap-server {target kafka bootstrap server} \\
--topic {topic name}

위 명령어를 실행하여 토픽의 메세지 내용이 출력되면, Source Kafka → Target Kafka 로 토픽이 정상적으로 mirroring 된 것이다.

3.2 Mirroring monitoring

mirroring 하는 인스턴스의 consumer api lag을 확인하여 MirrorMaker의 consumer 상태를 모니터링 한다.

[lag_monitor.sh]

while true; do
echo -e -n "$(date)\\t"
echo -e -n "{mirroring topic name}\\t"
echo -e $(/home/ec2-user/kafka_2.12-2.6.2/bin/kafka-consumer-groups.sh \\
				--bootstrap-server {source kafka bootstrap server} \\
				--group {consumer group id} --describe \\
				--command-config {mirror maker consumer properties path} \\
				--describe 2> /dev/null | grep {mirroring topic name}
				| sed 's/\\s\\+/\\t/g' | cut -f 6 | xargs)
sleep 1
done

consumer 명령어와 timestamp를 조합한 lag모니터링 shell script를 통해 lag의 값이 일정한 주기로 감소하는지 모니터링한다.