본문 바로가기
개발 지식/Kafka

[Kafka] 카프카 스트림즈(Kafka Streams) API

by 에르주 2021. 11. 24.
반응형

카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해서 개발되었지만 일련의 연속된 메시지인 스트림을 처리 하는데도 점차 사용 되었다.

이번 글에서는 카프카에서 제공하고 있는 스트림 API를 통해 스트림을 처리하는 방법에 대해 정리해보고자 한다.

1. Stream Processing and Batch Processing

Stream Processing은 데이터 흐름에 따라 처리 즉 데이터가 분석 시스템이나 프로그램에 도달하자 마자 처리하기 때문에 실시간 분석이라고 하며 

Batch Processing은 이미 저장된 된 데이터를 기반으로 분석이나 질의를 수행하고 특정 시간에 처리하는 특징이 있다.

 

Stream Processing의 장점은 다음과 같이 이야기 할 수 있다.

- 이벤트에 즉각적으로 반응한다. 이벤트 발생 후 조치까지 지연시간이 거의 없으며 분석은 항상 최신의 데이터를 반영한다.

- 정적 분석보다 더 많은 데이터를 분석한다. (저장 후 분석 하지 않는다.)

- 지속적으로 유입되는 데이터 분석에 최적화 된다. (Stream.. 이므로)

- 대규모 공유 데이터베이스에 대한 요구를 줄일 수 있어 인프라에 독립적으로 수행할 수 있다.

 

 

2. Kafka Streams..?

카프카 스트림즈는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 클라이언트 라이브러리 이다.

 

특히 입력 Kafka 토픽을 출력 Kafka 토픽으로 변환하는 애플리케이션을 빌드하기 위한 라이브러리이다. (후 실습 진행)

ex)

기존 :Producer : ErJuer-Topic , Consumer : ErJuer-Topic

Kafka Streams 활용 : ErJuer-Topic-Input -> (Kafk aStreams API) ->ErJuer-Topic-output

 

 

추가.  타 스트림 처리 API와의 차이점

https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/

 

Introducing Kafka Streams: Stream Processing Made Simple | Confluent

Confluent is building the foundational platform for data in motion so any organization can innovate and win in a digital-first world.

www.confluent.io

 

스트림 프레임워크 Storm, Samza, Spark Streaming를 활용하기 위해서는 클러스터를 구축해야하며 해당 프레임워크 위에 핵심 앱을 구축해야 한다.

-> 즉 클러스터를 관리해야하며 다중화 리스스 배포의 효율 등 관리하는 포인트가 많아지게 된다. (MapReduce) : 쪼개고 합치다.

 

https://www.geeksforgeeks.org/mapreduce-architecture/, ㅉ

 

더 자세히 보자면

- 시스템 풀을 통해 많은 수의 임시 작업을 다중화하고 클러스터에서 리소스 배포를 효율적으로 예약해야 한다.

- 코드를 동적으로 패키징하고 구성, 라이브러리 및 기타 실행에 필요한 모든 것과 함께 코드를 실행할 시스템에 물리적으로 배포 해야한다.

- 프로세스를 관리하고 클러스터를 공유하는 작업 간의 격리를 보장해야한다.

 

이며 한마디로 줄이자면 다음과 같다.

나의 서비스, 나의 애플리케이션이 스트림 프레임워크에 종속된다.

 

 

하지만 카프카 스트림즈는 사용자 앱에서 라이브러리를 사용하고 원하는 만큼 앱 인스턴스를 시작하면 kafka는 이러한 인스턴스에 대해 작업을 분할하고 균형을 유지한다. (프레임워크가  필요없다.)

 

더 자세히 보자면

-  Kafka Streams는 스트림 처리 클러스터 없이 kafka와 나의 애플리케이션만 포함하는 완전히 내장된 라이브러리로 개발한다.

- 상태 테이블의 개념을 이벤트 스트림과 완전히 통합하고 이 두가지를 단일 개념 프레임워크에서 사용할 수 있도록 한다.

- 스트림 아키텍처에서 움직이는 조각의 총 수를 줄이기 위해 Kafka가 제공하는 핵심 추상화와 완전히 연결된 처리 모델을 제공한다.

 

 

https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ 카프카 스트림즈가 목표로 하는 수준

 

3. Kafka Stream Architecture

Kafka Streams Threads Model

 

Kafka Stream Architcture에 대해 정리하기 전에 Kafka Partition에 대해 다시 정리해보자면

 

A라는 토픽을 프로듀서가 카프카로 메시지 보내는 소요시간을 1초라고 생각하자.

A 토픽에 파티션이 존재하지 않으면 1초 -> 1초 -> 1초 -> 1초 총 4초가 걸리게 된다.

 

하지만 A 토픽에 파티션을 4개 만들게 되면 4 대의 프로듀서에서 A 토픽 메시지 발행시 1초 -> 1초 -> 1초 -> 1초 총 4초의 시간이 소요되는 것이 아닌 1초만에 전달 할 수 있다.

 

https://developer.confluent.io/learn-kafka/apache-kafka/partitions/?_ga=2.153975864.1010619977.1637764032-1352589646.1637764032&_gac=1.181521109.1637764040.CjwKCAiA4veMBhAMEiwAU4XRr0um66kOtxlcYxQoe1ABvvUbDrRnvoG5tMIOyneSPfryW7LTywC8tBoC9o0QAvD_BwE

 

 

Kafka Streams는 입력 스트림의 파티션 개수만큼 태스크를 생성하고 각 스트림의 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지이다.

사용자가 스레드의 개수를 지정할 수 있게 해주며 1개의 스레드는 1개 이상의 태스크를 처리 할 수 있다. 

위의 그림의 Kafka Architecture에서는 스레드 하나에 2개의 태스크가 있는 것을 확인 할 수 있다.

 

=> 즉 더 많은 스레드르 띄우거나 인스턴스를 생성 그리고 토폴로지를 복제 하여 파티션을 나누게 되면 병렬 처리를 수행 할 수 있다. 여러 개의 스트림에 대한 각 파티션을 나누는 것은 별도의 스트림즈 코디네이션 없이 카프카 코디네이션 방식을 사용한다.

 

 

Topology Model

또 카프카 스트림즈는 스파크 스트림이나 스톰과 같이 스트림 처리를 하는 프로세서들이 서로 연결되어 있는 형상 (토폴로지)를 만들어서 처리하는 API이다.


카프카 스트림즈 실습하기 (Mac 기준)

1. Mac -Maven 설정

  • /usr/local 디렉토리에 /apache-mvn 디렉토리 생성 (mkdir /apache-mvn)
  • apache-mvn 다운로드 (https://maven.apache.org/download.cgi)
  • apache-mvn 디렉토리에 압축 해제
  • bash_profile에 환경 설정 (sudo vim ~/.bash_profile)

sudo vim ~/.bash_profile

2. 프로젝트 생성

streams-quickstart-java maven 프로젝트 다운받기 (명령어 생성)

https://search.maven.org/artifact/org.apache.kafka/streams-quickstart-java/2.8.0/maven-archetype

 

Maven Central Repository Search

 

search.maven.org

mvn archetype:generate -DarchetypeGroupId=org.apache.kafka -DarchetypeArtifactId=streams-quickstart-java -DarchetypeVersion=2.8.1

mvn archetype:generate -DarchetypeGroupId=org.apache.kafka -DarchetypeArtifactId=streams-quickstart-java -DarchetypeVersion=2.8.1

 

3. 카프카 실습 메소드 작성 (PipeCom.java, LineSplit.java)

  3-1. PipeCom.java (다른 토픽간 메시지 스트림)

pipcom.java

  3-1-1. pipeCom 코드 

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package kafkastream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * In this example, we implement a simple LineSplit program using the high-level Streams DSL
 * that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
 * and writes the messages as-is into a sink topic "streams-pipe-output".
 */
public class PipeCom {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-ErJuer");  // 카프카 클러스터 내의 스트림즈 애플리케이션을 구분 할 떄 사용하는 것으로 유일한 값이어야 한다.
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 스트림즈 애플리케이션이 접근할 브로커의 아이피와 포트
        // 스트림즈 애플리케이션이 이후 데이터를 입력 받고 처리한 다음 다른 토픽에 저장할 때, 데이터를 잃어들이고 (Deserialization) 저장(Serialization) 시에 객체의 형식을 정하는 Serdes를 지정한다. 
        // 이 코드는 키에 문자열 타입을 받을 수 있는 Serdes.String()을 지정한다.
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        // Long, Integer, Short, Float, Bouble, ByteBuffer등이 있다.
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("ErJuer-input").to("ErJuer-output");

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1); // 메인 스레드와 새로운 스레드 간의 정보 전달

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {  // 사용자가 Ctrl + 눌렀을 때 반응할 메소
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            System.out.println("topology started");
            latch.await();  // 다른 스레드에서 notification을 생성하는 것을 기다림.
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

pipecom.java Running

 

  3-1-2. PipeCom Topologies 도면도 (그림)

kafka Streams 진행 방향

 

  3-1-3. 토픽 메시지 확인

ErJuer-Input
ErJuer-output

 => ErJuer-Input이라는 토픽으로 들어간 메시지가 ErJuer-output 토픽 consumer에서 확인 할 수 있다.

 

 


3-2. LineSplit.java (자바 띄어쓰기 분리 함수 적용)

LineSplit.java 

3-2-1. LineSplit 코드

package kafkastream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplit {

    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 스트림즈 애플리케이션이 접근할 브로커의 아이피와 포트
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());


        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String,String> source = builder.stream("ErJuer-plaintext-input");
        source.flatMapValues(o -> Arrays.asList(o.split("\\W+"))).to("ErJuer-linesplit-output"); // 함수형 인터페이스이므로 람더형식으로 구현



        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, properties);
        final CountDownLatch latch = new CountDownLatch(1); // 메인 스레드와 새로운 스레드 간의 정보 전달

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {  // 사용자가 Ctrl + 눌렀을 때 반응할 메소
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            System.out.println("topology started");
            latch.await();  // 다른 스레드에서 notification을 생성하는 것을 기다림.
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
 }

LineSplit.java&nbsp; Run console

 

3-2-2. 도면도

 

3-2-3. 토픽 메시지 확인

ErJuer-plainText-input
ErJuer-linesplit-output

 

=> ErJuer-plaintext-input 토픽으로 주어진 메시지가 띄어쓰기 기준으로 나뉘어지며 그 메시지 값들을 ErJuer-linesplit-output 토픽의 consumer에서 확인 할 수 있다.

 

끝.

반응형

댓글