Filebeat-Kafka-Nifi로 데이터 스트림 5분 안에 구성해보기
이번 포스트에서는 Filebeat, Kafka, Nifi를 docker-compose로 구성하고, 데이터 스트림을 생성하는 과정을 진행해보겠습니다. 튜토리얼 방식으로, 개인 로컬 맥에서 간단하게 따라서 진행해주시면 됩니다.
데이터는 Filebeat -> Kafka -> Nifi -> Kafka 순서로 흘러가는 구조입니다. 데이터 소스로 Apache 웹 로그를 사용해 실제 환경과 같이 일정 속도로 흘러가도록 구성하고, 실시간으로 데이터 스트림이 동작하는 모습을 확인해보겠습니다.
Tested on:
- MacOS v10.14.6
- docker v19.03
- docker-compose v1.24
구성하기
- 적당한 workspace로 이동한 뒤, Filebeat에 제공하기 위한 데이터 소스로 weblogs.log 파일을 생성합니다. 일단 빈 파일로 생성하고, 추후에 데이터를 추가할 예정입니다.
touch weblogs.log
- filebeat.yml 파일을 생성하고 아래 내용을 추가합니다.
filebeat: inputs: - type: log paths: - /data/weblogs.log config: modules: path: ${path.config}/modules.d/*.yml reload.enabled: false output: kafka: hosts: ['kafka:29092'] topic: 'filebeats-weblogs' partition: round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000
- docker-compose.yml 파일을 생성하고 아래 내용을 추가합니다.
version: '3' services: filebeat: image: docker.elastic.co/beats/filebeat:7.4.0 volumes: - ./weblogs.log:/data/weblogs.log - ./filebeat.yml:/usr/share/filebeat/filebeat.yml depends_on: - kafka zookeeper: image: confluentinc/cp-zookeeper:5.3.2 ports: - 2181:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:5.3.2 depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 nifi: image: xemuliam/nifi:1.9 ports: - 8080:8080 - 8081:8081 - 8443:8443
- docker 컨테이너를 실행합니다.
docker-compose up -d
- 브라우저에서 nifi에 접속합니다. 접속까지 다소 오래 걸릴 수 있습니다. (브라우저 > localhost:8080/nifi 접속)
- ConsumerKafka 프로세서를 추가하고, PROPERTIES 탭의 설정 값을 아래와 같이 변경합니다. (Processor 드래그 > Filter에 kafka 검색 > ADD 클릭)
- Kafka Brokers: kafka:29092
- Topic Name(s): filebeats-weblogs
- Group ID: filebeats-weblogs-consumer
- EvaluateJsonPath 프로세서를 추가하고, PROPERTIES 탭에서 우측 더하기 버튼으로 아래 설정 값을 추가합니다.
- Property Name: message
- Value: $.message
- PublishKafka 프로세서를 추가하고, PROPERTIES 탭의 설정 값을 아래와 같이 변경합니다.
- Kafka Brokers: kafka:29092
- Topic Name: weblogs
- 각 프로세서를 드래그해서 연결합니다. EvaulatedJsonPath에서 PublishKafka로 연결할 때의 관계는 matched로 설정합니다. (ConsumerKafka > EvaluateJsonPath > PublishKafka 순서)
- EvaluateJsonPath 프로세서와 PublishKafka 프로세서의 Automatically Terminate Relationships 옵션을 각각 아래와 같이 체크하고 저장해줍니다.
- EvaluateJsonPath: failure, unmatched
- PublishKafka: failure, success
- 각 프로세서를 시작해줍니다.
데이터 스트림 확인
- 이제 데이터 소스를 실제와 같이 구현하기 위해 아래 명령어를 실행합니다. 해당 명령어는 fake log 생성기인 flog를 이용해 1초에 한 번씩 10개 row를
weblogs.log
파일에 더해주는 명령입니다.while true; do docker run -it --rm mingrammer/flog:0.3.2 -n 10 >> weblogs.log; sleep 1; done
- 실제로 생성된 메세지를 확인하기 위해 터미널 탭을 2개 실행한 후, kafka 인스턴스에 접속합니다.
docker-compose exec kafka bash
- 각 탭에서 두 개의 consumer를 통해 실제로 메세지가 잘 흘러가고 있는지 확인합니다.
kafka-console-consumer --bootstrap-server localhost:9092 --topic weblogs
kafka-console-consumer --bootstrap-server localhost:9092 --topic filebeats-weblogs
- nifi의 대시보드에서 kafka 프로세서 간 데이터 흐름을 확인합니다.
'프로그래밍' 카테고리의 다른 글
자바스크립트 개요 (0) | 2021.02.13 |
---|---|
클라이언트 설치 없이 카프카 브로커와 통신하기 - Kafkacat (0) | 2020.11.29 |
Strimzi를 활용한 kafka 클러스터 구성하기 - 설치 (0) | 2019.08.04 |
MS Azure로 갈아타기 - 5. Circle CI로 CI/CD 구성하기 (0) | 2019.02.06 |
MS Azure로 갈아타기 - 4. React 어플리케이션 만들기 (0) | 2019.01.30 |