파일 데이터를 실시간으로 Azure Event Hub로 전송하기위한 Fluentd 파이프라인 구성 방법 및 주의사항을 다뤄보겠습니다.

사전 지식

본 파이프라인 이해를 위해서 필요한 사전지식을 아래에 정리했으니 기본적인 사항을 숙지하고 구성하시기 바랍니다.

더보기

AEH(Azure Event Hubs)란?

Azure Event Hubs는 빅데이터의 실시간 스트리밍 처리를 위한 Azure 클라우드 제품 중 하나입니다. AEH는 Apache Kafka의 Managed service라고 볼 수 있습니다. 실제로 카프카 프로토콜을 지원하여 동일한 방식으로 인터페이스 가능하며 이 외의 다른 연동 방법도 지원합니다.

AEH SAS(Shared Access Signature)

AEH는 기본적으로 신뢰 가능한 통신을 수행합니다. 이를 위해 수신자는 유효 자격을 증명해야 하며, 이에 필요한 Key를 제공하여 증명하는 방식이 SAS(Shared Access Signature)입니다.

Fluentd 플러그인

Fluentd는 사용자가 원하는 기능을 추가할 수 있도록 플러그인 방식을 채택하고 있습니다. Source(입력), Filter(처리), Output(출력) 구조에서 각 단계에 원하는 플러그인을 적용할 수 있으며, 플러그인을 직접 개발도 가능하도록 되어있습니다. Fluentd는 출력 플러그인으로 Kafka 플러그인, AEH 플러그인을 지원하고 있습니다.

여기서 AEH 플러그인은 공식 지원이 아닌 개인이 개발한 것이므로 본문에서는 공식 오픈소스인 Kafka 플러그인을 통해 구성해보겠습니다.

Fluentd 설치하기

설치는 서버 환경에 따라 다르므로 모든 케이스를 커버하지 않겠습니다. 여기서 작성하는 것은 CentOS 7+ 기준입니다. 설치 방법은 다양하지만 그 중 td-agent를 통해 설치해보겠습니다. (fluentd의 배포판 패키지)

# td-agent 설치 (root 계정 혹은 sudo 권한 필요)
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh

# systemd에서 로그파일 경로 변경
vi /usr/lib/systemd/system/td-agent.service
# Environment=TD_AGENT_LOG_FILE 항목을 원하는 경로로 변경
systemctl daemon-reload

# td-agent 설정 변경
vi /etc/td-agent/td-agent.conf

# td-agent 재기동
systemctl restart td-agent

Fluentd 설정하기

Fluentd 설정은 /etc/td-agent 경로에 있는 td-agent.conf 파일에서 할 수 있습니다. 아래는 기본적인 File to AEH 예시입니다. 주요 필드에 대한 설명은 주석으로 달았습니다.

<source>
  @type tail
  # 수집하기위한 파일의 경로를 지정합니다
  path /path/to/target/*.log
  
  # 현재까지 읽어들인 파일의 경로, 오프셋, inode 정보를 저장하는 파일입니다
  pos_file /path/to/data/pos/stream.default.pos
  
  # 프로세스 재기동이 없다면 pos 파일은 증분으로 커지기만 하는데, 이를 주기적으로 압축 정리하기위한 주기를 설정합니다
  pos_file_compaction_interval 24h
  follow_inodes true
  tag logs.stream.default

  <parse>
    @type regexp
    
    # 정규표현식을 통해 원하는 필드를 추출합니다. 예제는 파이프(|) 문자를 기준으로 logid, data 필드를 읽어들입니다
    expression /^(?<logid>[^\|].*?)\|(?<data>.*?)$/
    
    # 파싱에 성공하면 각 필드에 해당하는 값의 타입을 지정해줍니다
    types logid:string,data:string
  </parse>
</source>

<match logs.stream.**>
  @type kafka2

  # 브로커 정보에 이벤트허브 주소를 넣어줍니다. 9093 포트를 입력해야 합니다
  brokers <your-namespace>.servicebus.windows.net:9093

  # 브로커에 produce할 토픽 정보를 지정하거나, 이를 지정하지 않은 경우 기본값으로 전송할 default 토픽 명을 지정해줍니다
  topic_key logid
  default_topic default-message
 
  <format>
    @type json
  </format>

  # 버퍼 사이즈에 따라 카프카로 배치 전송하는 메세지 사이즈가 결정됩니다
  <buffer logid>
    @type file
    path /path/to/data/buffer/kafka
    
    # 버퍼에서 flush된 chunk는 곧바로 출력 queue로 들어가는데, flush를 수행할 주기를 설정합니다
    flush_interval 3s
    chunk_limit_size 800KB
  </buffer>

  # 카프카 전송이 실패했을 때 해당 메세지를 어디로 전달할지 지정합니다
  <secondary>
    @type file
    path /path/to/data/failed/records
  </secondary>
 
  # ruby-kafka producer options
  max_send_retries 1
  required_acks -1
 
  ssl_ca_certs_from_system true

  # Azure 포탈의 이벤트허브 탭에서 아래 정보를 확인하여 넣어줍니다
  username "$ConnectionString"
  password "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName={SHARED.ACCESS.KEY.NAME};SharedAccessKey={SHARED.ACCESS.KEY}"
</match>

주의사항

아래는 연동 과정 및 운영에서 발생할 수 있는 이슈와 해결 방안입니다.

메세지 전송 시 Could not connect to broker 에러

AEH 브로커로 메세지 전송 시 실패 및 전송을 반복하면서 아래와 같은 에러 로그를 확인할 수 있습니다.

Could not connect to broker your-namespace.servicebus.windows.net:9093 (node_id=0): Connection error Errno::ECONNRESET: Connection reset by peer

주요 원인 및 해결 방안은 아래와 같습니다.

  • 연결 설정이 잘못된 경우
    • AEH 연결이 잘 되는지 Kafkacat 명령어를 통해 확인해볼 수 있습니다.
    • 만약 연결이 잘 된다면 아래의 경우를 의심해야 합니다.
  • AEH 전송 Limit 초과한 경우
    • Azure Event Hubs 내부 Limit에 따른 전송 한계를 초과한 경우입니다. AEH는 단일 producer초당 1MB 이상의 메세지를 전송한 경우 강제로 연결을 거부하게 됩니다. 이 때는 버퍼 사이즈를 낮춰서 배치 전송 당 메세지 사이즈를 줄여야 합니다.
    • 카프카로 전송할 데이터의 양이 초당 1MB를 넘어갈 경우 멀티프로세스 워커를 통한 멀티 producer를 고려해야 합니다.

Fluentd에서의 Kafka 전송 실패 처리 로직

Fluentd는 output에서 전송에 실패할 경우 자동으로 재전송을 수행하도록 retry 로직을 내장하고 있습니다. retry 로직은 기본적으로 시도 횟수 상한, 시도에 대한 interval을 파라미터로 가지고 있으며, interval은 1초부터 매 횟수마다 2배씩 증가하도록 되어있습니다.

따라서 처음 전송 후 실패하게 되면 1초 뒤 재전송을 시도하고, 이에 실패하면 다시 2초 뒤에 재전송, 실패 시 4초 뒤 재전송 하는 방식으로 늘어나게 됩니다. 이는 무한정 재시도 하도록 옵션을 주지 않는 이상 최대 17회까지 재전송을 시도하게 되어있습니다. 이러한 값들은 전부 설정 파일에서 옵션 값으로 변경 가능합니다.

source 데이터의 증가 속도나 서버 내 잔여 디스크 사이즈 등 환경적인 부분을 고려해서 적절한 값을 선택하여 설정하는 것이 운영 상 중요할 수 있습니다.