일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- Kibana
- kafka #streamdata-processing #mirrormaker
- kafka #streamdata-processing #kafka-consumer
- kafka #mirrormaker consumer lag
- Apache Druid #Apache Kafka #Apache Pinot #Real streaming
- Today
- Total
더비창고방
MirrorMaker 연동/기동 방법 본문
[목차]
- 개요
1.1 MirrorMaker란? - MirrorMaker 연동
2.1 MirrorMaker Instance 생성
2.2 Consumer API 설정
2.3 Producer API 설정 - MirrorMaker 기동
3.1 MirrorMaker 실행
3.2 Mirroring monitoring
1. 개요
1.1 MirrorMaker란?
서로 다른 두개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션으로,
MirrorMaker의 각 consumer는 source kafka로 부터 할당 된 토픽과 파티션들의 데이터를 읽은 후 공유되는 producer를 사용해, 그 데이터를 target kafka 클러스터에 전송한다.
2. MirrorMaker 연동
2.1 MirrorMaker Instance 생성
EC2내 아래 명령어를 실행하여 Kafka를 설치한다.
sudo yum install -y java-1.8.0-openjdk-devel.x86_64
wget <https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz>
tar -xvf kafka_2.12-2.6.2.tgz
2.2 Consumer API 설정
<step1> Source Kafka 내 토픽 연동을 위한 SSL/SASL 인증 설정
** Source Kafka가 SSL/SASL 인증을 통해 연동해야하면 아래 설정 필수
MirrorMaker 인스턴스에 source kafka의 cert 파일을 /var/ssl/private
경로로 옮긴다.
SASL 인증을 위한 consumer conf 파일을 kafka/conf 폴더 하위에 생성한다.
— consumer-jaas.conf
bootstrap.servers=SASL_PLAINTEXT://{source kafka bootstrap-server}
group.id={consumer group id}
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \\
required username="username" password="password";
<step2> Source Kafka 내 토픽 정보 확인
** Target Kafka 내 토픽 생성시 필요한 정보 (토픽명/파티션 수)를 조회하는 과정
아래 명령어를 실행하여 mirroring대상 topic의 정보 (토픽명, 파티션 수)를 조회한다.
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$./bin/kafka-topics.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--topic {topic_name} \\
--describe \\
--command-config config/consumer-jaas.conf
** source kafka에서 토픽에 대한 describe 권한이 없는경우, 아래와 같이 topic 정보를 조회한다.
토픽 명칭 조회
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$./bin/kafka-topics.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--list \\
--command-config config/consumer-jaas.conf
토픽 파티션 수 조회
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$./bin/kafka-consumer-groups.sh \\
--boostrap-server {source kafka bootstrap-server} \\
--command-config config/consumer-jaas.conf \\
--group {group id} \\
--describe
<step3> Source Kafka Consumer api 동작 check
아래 명령어를 실행하여 topic의 메세지가 출력되면, Consumer api가 정상적으로 동작하는 것이다.
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-console-consumer.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--topic {topic name} \\
--consumer.config config/consumer-jaas.conf
<step4> MirrorMaker consumer properties 설정
** Source Kafka → MirrorMaker topic을 pull 하기위한 consumer properties 설정
Kafka/config 폴더 내 아래와 같이 Source kafka에서 데이터를 pull하기 위한 configure 설정한 ‘mm-consumer.properties’ 파일을 생성한다.
[mm-consumer.properties]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers={source kafka bootstrap server}
# consumer group id
group.id={consuemr group id}
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
enable.auto.commit=false
auto.offset.reset=latest
#seralize
#kafka record key value json format string
key.deserializer=org.apache.kafka.commom.serialization.StringDeserializer
value.deserializer=org.apahce.kafka.common.serialization.StringDeserializer
#sasl - kafka .
# Plain user / password .
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\
username="username" password="password";
#ssl
# kafka server .
ssl.truststore.location=/var/ssl/private/kafka.client.truststore.jks
ssl.truststore.password=contentsds5756
ssl.protocol=TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1vi
2.3 Producer API 설정
<step1> Mirrormaker → Target Kafka SSL 설정
** MirrorMaker의 인스턴스내 JKS truststore를 생성하여, MirrorMaker와 Target Kafka 간의 SSL 통신설정을 한다.
아래 명령어를 실행해 kafka.client.truststore.jks 를 생성한다.
# cacerts -> kafka.client.truststore.jks
[ec2-user@ip-XX-XX-X-XXX ~]$ cp /usr/lib/jvm/{JDKFolder}/jre/lib/security/cacerts \\
/tmp/kafka.client.truststore.jks
kafka/bin 폴더 내 아래 configure를 설정한 client.properties 파일을 생성한다.
[client.properties]
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
<step2> Target Kafka 내 토픽 생성
** 2.2 Consumer API 설정 <step2> 에서 확인한 Source Kafka 토픽정보 (토픽명, 파티션 수) 와 같은 토픽을 생성한다.
아래 명령어를 실행해 Target Kafka내 토픽 생성한다.
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-topics.sh \\
--create \\
--boostrap-server {target kafka bootstrap server} \\
--topic {topic name} \\
--replication-factor 1 \\
--partitions {number of partitions} \\
--config retention.ms=259200000
<step3> MirrorMaker producer properties 설정
** MirrorMaker → Target Kafka 토픽을 push 하기위한 producer properties 설정
Kafka/config 폴더 내 아래와 같이 Target kafka에서 데이터를 push하기 위한 configure 설정한 ‘mm-producer.properties’ 파일을 생성한다.
[mm-producer.properties]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.producer.ProducerConfig for more details
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ..
bootstrap.servers={target kafka bootstrap server}
# broker number (acks parameter default value is -1. if not edit acks, it might be occured replicator lackage error )
acks=1
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
## Avoiding Data loss
max.in.flight.requests.per.connection=1
#retries=Int.Max_Value
#max.block.ms=Long.MAX_VALUE
#ssl
#kafka server
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
3. MirrorMaker 기동
3.1 MirrorMaker 실행
<step1> Kafka Heap설정을 위한 Memory Setting을 한다.
[/etc/profile]
export KAFKA_HEAP_OPTS='-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC \\
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 \\
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80'
<step2> MirrorMaker 실행
아래 명령어를 실행하여 MirrorMaker를 기동한다.
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-mirror-maker.sh \\
--consumer.config config/mm-consumer.properties \\
--producer.config config/mm-producer.properties \\
--whitelist {topic name} \\
--num.streams={setting num}
<step3> 토픽 mirroring 확인
consumer console을 이용해 Source Kafka → Target Kafka 로 토픽이 정상적으로 mirroring 되는지 확인한다.
[ec2-user@ip-XX-XX-X-XXX kafka_2.12-2.6.2]$ ./bin/kafka-console-consumer.sh \\
--bootstrap-server {target kafka bootstrap server} \\
--topic {topic name}
위 명령어를 실행하여 토픽의 메세지 내용이 출력되면, Source Kafka → Target Kafka 로 토픽이 정상적으로 mirroring 된 것이다.
3.2 Mirroring monitoring
mirroring 하는 인스턴스의 consumer api lag을 확인하여 MirrorMaker의 consumer 상태를 모니터링 한다.
[lag_monitor.sh]
while true; do
echo -e -n "$(date)\\t"
echo -e -n "{mirroring topic name}\\t"
echo -e $(/home/ec2-user/kafka_2.12-2.6.2/bin/kafka-consumer-groups.sh \\
--bootstrap-server {source kafka bootstrap server} \\
--group {consumer group id} --describe \\
--command-config {mirror maker consumer properties path} \\
--describe 2> /dev/null | grep {mirroring topic name}
| sed 's/\\s\\+/\\t/g' | cut -f 6 | xargs)
sleep 1
done
consumer 명령어와 timestamp를 조합한 lag모니터링 shell script를 통해 lag의 값이 일정한 주기로 감소하는지 모니터링한다.
'STUDY' 카테고리의 다른 글
Apache Druid 를 활용한 실시간 데이터 처리 파이프 라인 구축 실습 (0) | 2023.01.18 |
---|---|
elasticsearch-kibana 사용자 인증 및 관리 (0) | 2022.12.20 |
Kibana 설치하기 (1) | 2022.12.20 |
MirrorMaker consumer lag 모니터링 (0) | 2022.12.20 |
kafka-consumer (0) | 2022.12.20 |