데이터 스트리밍: 데이터 과학자를 위한 가이드

대부분의 데이터 과학은 이미 발생한 사건을 반영하는 정적 데이터셋으로 작업하는 형태로 이루어집니다. 하지만 실제 세계의 데이터는 연속적입니다. 클릭, 거래, 센서 업데이트, 리뷰 등은 모두 배치 형태가 아닌 실시간으로 발생합니다.
이러한 유형의 데이터로 작업하려면 다른 접근 방식이 필요합니다.
이 가이드는 데이터 과학 관점에서 데이터. 스트리밍을 소개합니다. 데이터 스트리밍이 무엇인지, 왜 중요한지, 그리고 Apache Kafka, Apache Flink 및 PyFlink와 같은 도구를 사용하여 실시간 파이프라인을 구축하는 방법을 설명합니다. 또한 이상 탐지 및 GPT-4와 같은 기초 모델을 사용한 리뷰 분석과 같은 예제를 살펴보겠습니다.
데이터 스트리밍이란 무엇이며 데이터 과학자는 왜 관심을 가져야 할까요?
데이터 스트리밍은 데이터가 생성될 때 연속적으로 처리하는 관행입니다. 데이터 배치가 도착할 때까지 기다리는 대신, 각 이벤트, 클릭, 메시지 또는 업데이트를 수신 즉시 처리합니다.
이러한 변화가 중요한 이유는 더 많은 데이터가 실시간으로 생성되고 있기 때문입니다. 예를 들어:
- 웹사이트를 탐색하는 사용자의 모든 클릭, 스크롤 또는 검색은 신호입니다.
- 결제 처리기가 의심스러운 거래를 즉시 표시합니다.
- 배송 트럭 집단이 매초마다 GPS 및 센서 데이터를 전송합니다.
- 챗봇 에이전트가 사용자의 실시간 피드백에 따라 동작을 조정합니다.
데이터 과학자에게 이는 새로운 기회를 제공합니다. 현재 상황에 반응하는 모델을 구축할 수 있고, 즉시 통찰력을 제공하는 분석을 배포할 수 있으며, 시기적절하고 맥락이 풍부한 데이터에 의존하는 AI 시스템을 지원할 수 있습니다.
스트리밍은 배치 처리를 대체하는 것이 아니라 보완하는 것입니다. 지연 시간이 짧고, 빈도가 높거나, 연속 모니터링이 중요한 경우에 사용하세요. 장기 트렌드 모델링이나 대규모 과거 데이터 학습에는 배치 처리가 여전히 효과적입니다.
그렇다 하더라도 스트리밍은 배치가 단순히 할 수 없는 일들을 가능하게 합니다. ML 워크플로우에서 영향력이 큰 세 가지 영역이 있습니다:
- 온라인 예측: 몇 시간마다 배치로 추천이나 예측을 생성하는 대신 실시간으로 생성할 수 있습니다. 예를 들어, 전자상거래 사이트는 사용자가 현재 탐색 중인 내용을 기반으로 실시간 할인을 계산할 수 있습니다.
- 실시간 모니터링: 배치 작업이 모델 문제를 표시할 때까지 기다리는 대신 라이브 데이터에서 메트릭을 계산하여 탐지 및 응답 시간을 단축할 수 있습니다. 이는 새 모델 배포 후 특히 가치가 있습니다.
스트리밍 시스템은 또한 상태 유지 처리를 가능하게 하여 동일한 데이터를 반복해서 재처리할 필요가 없습니다.

예를 들어, 사용자의 지난 30분 동안의 평균 장바구니 가치를 계산하는 경우 매번 처음부터 시작할 필요가 없습니다. 시스템이 이전 상태를 기억합니다. 이는 서버리스 함수나 임시 배치 작업과 같은 무상태 컴퓨팅보다 빠르고, 저렴하며, 더 잘 확장됩니다.
스트리밍의 핵심 도구: Apache Kafka와 Apache Flink
두 가지 기술이 대부분의 실시간 데이터 시스템의 기반이 되었습니다:
- Apache Kafka는 분산 이벤트 스트리밍 플랫폼입니다. 이벤트 스트림을 게시하고 구독하고, 안정적으로 저장하고, 도착 시 처리할 수 있습니다. 실시간으로 시스템 간에 데이터를 이동하는 백본입니다.
- Apache Flink는 스트림 처리 엔진입니다. 연속적이고 무제한 데이터를 처리하도록 설계되었으며 저수준 이벤트 처리와 고수준 분석을 모두 지원합니다. Flink는 정확히 한 번의 보장과 낮은 지연 시간으로 초당 수백만 개의 이벤트를 처리할 수 있습니다.
Kafka와 Flink는 함께 현대적인 스트리밍 아키텍처의 핵심을 형성합니다. Kafka는 데이터 수집 및 전송을 처리하고, Flink는 계산 및 분석을 처리합니다.

일반적인 스트리밍 파이프라인에서 Kafka는 업스트림 시스템(웹사이트나 IoT 장치 등)에서 데이터를 수집하고, Flink는 분석이나 모델 추론을 실행하기 위해 이러한 토픽을 구독합니다. 처리된 데이터는 새로운 Kafka 토픽, 데이터베이스 또는 대시보드로 전송됩니다.
스트리밍이 이미 가치를 제공하는 분야
세계에서 가장 데이터 중심적인 많은 기업들이 이미 스트리밍에 의존하여 실시간으로 고객 경험을 제공하고 있습니다.
- Netflix는 개인화 엔진의 백본으로 Kafka를 사용합니다. 모든 클릭, 일시 중지 및 재생은 각 사용자가 다음에 볼 내용을 개선하는 데 도움이 되는 Kafka Streams로 피드됩니다.
- Pinterest 또한 추천 엔진에 Kafka와 스트림 처리를 활용합니다. 사용자가 콘텐츠와 상호 작용할 때 최근 작업은 실시간으로 처리되어 몇 초 내에 새롭고 관련성 있는 핀을 제공할 수 있습니다. 상태 유지 처리를 통해 Pinterest는 세션과 작업 전반에 걸쳐 컨텍스트를 추적하고 단순히 고립된 이벤트에 반응하지 않습니다.
이는 단지 두 가지 예시일 뿐이지만, 이 패턴은 광범위하게 적용됩니다.
PyFlink 소개: Python으로 스트림 처리하기
Flink는 강력한 도구이지만, 데이터 엔지니어링 세계의 많은 도구처럼 Java와 Scala로 작성되었습니다. 여기서 PyFlink가 등장합니다. PyFlink는 Apache Flink의 Python API로, 데이터 과학자와 Python 개발자가 접근할 수 있게 해줍니다.
PyFlink를 사용하면 Python 생태계를 떠나지 않고도 강력한 스트리밍 파이프라인을 구축할 수 있습니다.
Flink는 두 가지 주요 API를 제공합니다:
- Table API: 구조화된 데이터 작업에 이상적인 높은 수준의 SQL과 유사한 추상화입니다. 선언적인 방식으로 스트림을 필터링하고, 집계하고, 결합하는 데 적합합니다.
- DataStream API: 이벤트 처리 및 처리에 대한 세부적인 제어를 제공하는 낮은 수준의 더 유연한 API입니다. 사용자 정의 로직을 적용하거나 ML 모델과 통합해야 할 때 가장 적합합니다.
데이터 과학자에게 이러한 유연성은 핵심입니다. 고객 클릭 스트림을 제품 메타데이터와 결합하기 위해 Table API를 사용할 수 있습니다. 또는 DataStream API를 사용하여 센서 데이터에 실시간으로 이상 탐지 모델을 적용할 수 있습니다.
Flink는 특히 AI 사용 사례에 적합합니다. 실시간 추론, 모니터링 및 피드백 루프에 중요한 낮은 지연 시간과 높은 내결함성으로 대량의 데이터를 처리할 수 있기 때문입니다. 정확히 한 번의 보장, 동적 확장 및 기본 상태 관리를 지원하므로 AI 파이프라인을 구축하는 데 있어 강력한 선택입니다.
금융, 전자상거래, 제조 및 물류와 같은 산업은 이미 Flink에 의존하여 사기 탐지 및 추천 엔진에서 예측 유지보수 및 공급망 경보에 이르기까지 실시간 의사 결정 시스템을 제공합니다.

DataStream API를 사용한 간단한 이상 탐지 예제를 살펴보겠습니다.
예제: PyFlink 및 Kafka를 사용한 실시간 이상 탐지
공장에서 센서 데이터를 스트리밍하는 Kafka 토픽이 있다고 가정해 보겠습니다. 각 메시지에는 device_id
, timestamp
및 temperature reading
이 포함되어 있습니다. 우리는 실시간으로 온도 이상을 감지하고 플래그 표시된 이벤트를 알림이나 추가 분석을 위해 다른 Kafka 토픽에 기록하려고 합니다.
다음은 Kafka에서 읽고, 온도 이상을 표시하고, 다운스트림 토픽에 기록하는 최소한의 PyFlink 파이프라인입니다.
먼저, 간단한 이상 탐지 함수를 정의합니다:
def detectanomalies(eventstr):
try:
event = cleanandparse(event_str)
temperature = float(event.get('temperature', 0))
if temperature > 100: # simple threshold for demo
event['anomaly'] = True
return json.dumps(event)
except Exception:
pass
return None
그런 다음 스트리밍 환경을 설정합니다:
env = StreamExecutionEnvironment.getexecutionenvironment()
env.set_parallelism(1)
env.setruntimemode(RuntimeExecutionMode.STREAMING)
들어오는 이벤트를 읽기 위한 Kafka 소스를 생성합니다:
source = KafkaSource.builder() \
.setbootstrapservers(confluent_config.get("bootstrap.servers")
) \
.settopics("sensordata") \
.setproperties(confluentconfig) \
.setstartingoffsets(KafkaOffsetsInitializer.earliest()) \
.setvalueonly_deserializer(SimpleStringSchema()) \
.build()
ds = env.fromsource(source, WatermarkStrategy.formonotonous_timestamps(), "Kafka Source")
플래그 표시된 이상을 위한 Kafka 싱크를 정의합니다:
sink = KafkaSink.builder() \
.setbootstrapservers(confluent_config.get("bootstrap.servers")
) \
.setrecordserializer(
KafkaRecordSerializationSchema.builder()
.settopic("sensoranomalies")
.setvalueserialization_schema(SimpleStringSchema())
.build()
) \
.setdeliveryguarantee(DeliveryGuarantee.ATLEASTONCE) \
.setproperty("security.protocol", confluentconfig.get("security.protocol")) \
.setproperty("sasl.mechanism", confluentconfig.get("sasl.mechanism")) \
.setproperty("sasl.jaas.config", confluentconfig.get("sasl.jaas.config")) \
.build()
변환을 적용하고 이상을 싱크로 보냅니다:
anomaliesstream = ds.map(detectanomalies, output_type=Types.STRING()) \
.filter(lambda x: x is not None)
anomaliesstream.sinkto(sink)
env.execute()
이 과정에서 일어나는 일
이 파이프라인은 각 이벤트가 도착할 때 처리하고, 실시간으로 이상을 감지하며, 그것들을 다운스트림으로 라우팅합니다. 위의 예제는 매우 간단하지만, 적절한 ML 모델(예: scikit-learn을 사용한 이상 탐지)을 스왑하거나, 상태를 추가하거나, 출력을 모니터링 시스템으로 라우팅하는 것은 모두 이와 동일한 패턴을 기반으로 합니다.
더 복잡한 다른 예제를 살펴보겠습니다.
예제: GPT-4를 사용한 제품 리뷰의 실시간 주제 분석
고객이 말하는 내용을 규모에 맞게 실시간으로 이해하는 것은 항상 어려운 문제였습니다. 감정이나 주제 분석에 대한 전통적인 접근 방식은 배치 파이프라인에 의존합니다: 리뷰 수집, 정리, 오프라인에서 모델 실행, 그리고 보고서 생성. 통찰력이 표면화되는 시점에는 이미 그 순간이 지나갔습니다.
그러나 스트리밍 인프라와 GPT-4와 같은 모델을 사용하면 다르게 접근할 수 있습니다. 리뷰가 도착하는 대로 분석하고, 즉시 주제를 추출하며, 실시간 대시보드나 알림을 위해 분류할 수 있습니다.
실시간이 중요한 이유는 다음과 같습니다:
- 더 빠른 피드백 루프: 제품 문제나 고객 고통점이 발생하는 즉시 포착합니다.
- 더 스마트한 개인화: 최신 피드백을 기반으로 추천을 조정합니다.
고객 리뷰가 Kafka 토픽으로 푸시되고 있다고 가정해 보겠습니다. 각 리뷰를 읽고, GPT-4를 통해 주제를 추출하며, 구조화된 결과를 다른 스트림이나 분석 저장소에 기록하는 PyFlink 파이프라인을 구축할 수 있습니다. 또한 집계된 주제를 처리하여 실시간으로 트렌드를 식별할 수 있습니다.
1단계: GPT-4로 주제 추출하기
먼저 Kafka 제품 리뷰 이벤트에서 주제를 추출하는 함수를 정의해 보겠습니다:
def extractthemes(reviewtext):
try:
client = OpenAI(apikey=openai_key)
chat_completion = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": f"""Identify key themes in this product review: {review_text}
Return a JSON array of themes"""}
]
)
content = chat_completion.choices[0].message.content
logging.info(f"OpenAI API call successful: {content}")
return json.loads(chat_completion.choices[0].message.content)
except Exception as e:
print(f"Error in extract_themes: {e}")
return []
def processreviews(eventstr):
try:
review = cleanandparse(event_str)
reviewtext = review.get('reviewtext', '')
review['themes'] = extractthemes(reviewtext)
return json.dumps(review)
except Exception:
pass
return None
Kafka 소스와 싱크 설정 보일러플레이트는 건너뛰겠습니다. 간단히 말해, productreviews 토픽에서 리뷰를 소비하고, processreviews 함수를 적용하여 주제를 추출한 다음, 강화된 출력을 productreviewswith_themes라는 새로운 토픽에 기록합니다.
themesstream = ds.map(processreviews, output_type=Types.STRING()) \
.filter(lambda x: x is not None)
themesstream.sinkto(sink)
2단계: 실시간으로 주제 분석하기
이제 리뷰가 들어올 때 주제가 추출되어 productreviewswith_themes 토픽에 기록되고 있으므로, 카운트를 집계하여 실시간으로 분석할 수 있습니다. 이를 통해 고객이 피드백을 남길 때 어떤 주제가 트렌드가 되고 있는지 추적할 수 있습니다.
themes_source = KafkaSource.builder()...
dsthemes = env.fromsource(themessource, WatermarkStrategy.formonotonous_timestamps(), "Kafka Themes Source")
Process reviews and count themes
themecounts = dsthemes.map(cleanandparse, outputtype=Types.MAP(Types.STRING(), Types.OBJECTARRAY(Types.BYTE()))) \
.filter(lambda x: x is not None and "themes" in x) \
.flatmap(lambda review: [(theme.lower(), 1) for theme in review["themes"]], outputtype=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda x: x[0]) \
.sum(1)
Print results
theme_counts.print()
이것이 중요한 이유
역사적으로, 주제 분석은 수동 태깅, 규칙 기반 NLP 파이프라인 또는 잠재 디리클레 할당(LDA)과 같은 사후 주제 모델링이 필요했습니다. 이러한 접근 방식은 규모에 맞게 확장되지 않고, 미묘한 차이를 다루는 데 어려움이 있으며, 즉시성이 부족합니다.
이제 GPT-4 및 PyFlink와 같은 도구를 사용하면 주관적이고 비구조화된 데이터를 실시간으로 분석할 수 있으며, 배치 작업을 기다리거나 처음부터 사용자 정의 NLP 모델을 구축할 필요가 없습니다.
이는 스트리밍 데이터와 기초 모델에 의해 구동되는 더 스마트한 고객 피드백 루프, 실시간 제품 모니터링 및 더 풍부한 분석을 위한 문을 열어줍니다.
마무리
데이터 스트리밍은 데이터 작업 방식을 변화시킵니다. 데이터 과학자에게 이는 가치가 창출되는 곳, 즉 데이터가 생성되는 순간과 결정이 내려지는 순간에 더 가까이 다가갈 수 있는 기회입니다.
실시간으로 작동하는 모델과 분석을 구축하고 사용함으로써, 더 이상 무슨 일이 일어났는지 설명하기 위해 어제의 데이터를 기다릴 필요가 없습니다. 다음에 일어날 일을 형성하는 데 도움을 주고 있는 것입니다. 이는 강력한 위치로, 당신의 작업을 고객, 운영 및 수익에 더 직접적으로 연결합니다.
Kafka, Flink, PyFlink와 같은 도구와 GPT-4와 같은 기초 모델의 등장으로 스트리밍 데이터 작업의 장벽은 그 어느 때보다 낮아졌습니다. 모델을 라이브 데이터에 더 가깝게 가져오는 것은 비즈니스에 당신의 영향력을 더 가깝게 가져오는 것을 의미합니다.