이번 포스트에서는 Filebeat, Kafka, Nifi를 docker-compose로 구성하고, 데이터 스트림을 생성하는 과정을 진행해보겠습니다. 튜토리얼 방식으로, 개인 로컬 맥에서 간단하게 따라서 진행해주시면 됩니다.

데이터는 Filebeat -> Kafka -> Nifi -> Kafka 순서로 흘러가는 구조입니다. 데이터 소스로 Apache 웹 로그를 사용해 실제 환경과 같이 일정 속도로 흘러가도록 구성하고, 실시간으로 데이터 스트림이 동작하는 모습을 확인해보겠습니다.

Tested on:

  • MacOS v10.14.6
  • docker v19.03
  • docker-compose v1.24

구성하기

    1. 적당한 workspace로 이동한 뒤, Filebeat에 제공하기 위한 데이터 소스로 weblogs.log 파일을 생성합니다. 일단 빈 파일로 생성하고, 추후에 데이터를 추가할 예정입니다.
      touch weblogs.log
    2. filebeat.yml 파일을 생성하고 아래 내용을 추가합니다.
      filebeat:
        inputs:
        - type: log
          paths:
            - /data/weblogs.log
        config:
          modules:
            path: ${path.config}/modules.d/*.yml
            reload.enabled: false
      
      output:
        kafka:
          hosts: ['kafka:29092']
          topic: 'filebeats-weblogs'
          partition:
            round_robin:
              reachable_only: false
          required_acks: 1
          compression: gzip
          max_message_bytes: 1000000
    3. docker-compose.yml 파일을 생성하고 아래 내용을 추가합니다.
      version: '3'
      
      services:
        filebeat:
          image: docker.elastic.co/beats/filebeat:7.4.0
          volumes:
            - ./weblogs.log:/data/weblogs.log
            - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
          depends_on:
            - kafka
        zookeeper:
          image: confluentinc/cp-zookeeper:5.3.2
          ports:
            - 2181:2181
          environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000
        kafka:
          image: confluentinc/cp-kafka:5.3.2
          depends_on:
          - zookeeper
          ports:
            - 9092:9092
          environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        nifi:
          image: xemuliam/nifi:1.9
          ports:
            - 8080:8080
            - 8081:8081
            - 8443:8443
    4. docker 컨테이너를 실행합니다.
      docker-compose up -d
    5. 브라우저에서 nifi에 접속합니다. 접속까지 다소 오래 걸릴 수 있습니다. (브라우저 > localhost:8080/nifi 접속)

    1. ConsumerKafka 프로세서를 추가하고, PROPERTIES 탭의 설정 값을 아래와 같이 변경합니다. (Processor 드래그 > Filter에 kafka 검색 > ADD 클릭)
      • Kafka Brokers: kafka:29092
      • Topic Name(s): filebeats-weblogs
      • Group ID: filebeats-weblogs-consumer

    1. EvaluateJsonPath 프로세서를 추가하고, PROPERTIES 탭에서 우측 더하기 버튼으로 아래 설정 값을 추가합니다.
      • Property Name: message
      • Value: $.message

    1. PublishKafka 프로세서를 추가하고, PROPERTIES 탭의 설정 값을 아래와 같이 변경합니다.
      • Kafka Brokers: kafka:29092
      • Topic Name: weblogs

    1. 각 프로세서를 드래그해서 연결합니다. EvaulatedJsonPath에서 PublishKafka로 연결할 때의 관계는 matched로 설정합니다. (ConsumerKafka > EvaluateJsonPath > PublishKafka 순서)
    2. EvaluateJsonPath 프로세서와 PublishKafka 프로세서의 Automatically Terminate Relationships 옵션을 각각 아래와 같이 체크하고 저장해줍니다.
      • EvaluateJsonPath: failure, unmatched
      • PublishKafka: failure, success
    3. 각 프로세서를 시작해줍니다.

데이터 스트림 확인

    1. 이제 데이터 소스를 실제와 같이 구현하기 위해 아래 명령어를 실행합니다. 해당 명령어는 fake log 생성기인 flog를 이용해 1초에 한 번씩 10개 row를 weblogs.log 파일에 더해주는 명령입니다.
      while true; do docker run -it --rm mingrammer/flog:0.3.2 -n 10 >> weblogs.log; sleep 1; done
    2. 실제로 생성된 메세지를 확인하기 위해 터미널 탭을 2개 실행한 후, kafka 인스턴스에 접속합니다.
      docker-compose exec kafka bash
    3. 각 탭에서 두 개의 consumer를 통해 실제로 메세지가 잘 흘러가고 있는지 확인합니다.
      kafka-console-consumer --bootstrap-server localhost:9092 --topic weblogs
      kafka-console-consumer --bootstrap-server localhost:9092 --topic filebeats-weblogs
    4. nifi의 대시보드에서 kafka 프로세서 간 데이터 흐름을 확인합니다.