RAG 기반 AI 주식 투자 Agent 5편 - FastAPI, PostgreSQL, pgvector로 실제 서비스 구조 만들기
이제 아키텍처를 코드 구조로 내려봅니다. FastAPI API 계층, PostgreSQL/pgvector 스키마, Redis 캐시, 비동기 작업 큐, 분석 실행 흐름, 핵심 모듈 분리까지 실제 개발 가능한 서비스 구조를 설명합니다.
이제는 설계를 코드 구조로 내려야 한다
앞선 글에서 아키텍처와 workflow를 정리했다면, 이제는 실제로 개발 가능한 구조로 내려야 합니다.
초기 구현 기준으로 추천하는 구조는 아래와 같습니다.
- API 서버: FastAPI
- 메타/시계열 저장: PostgreSQL
- 벡터 검색: pgvector
- 캐시와 작업 상태: Redis
- 비동기 배치: worker process
핵심은 모든 것을 한 파일에 넣지 않고, 도메인별로 분리하는 것입니다.
권장 디렉터리 구조
예시:
app/
├─ api/
│ ├─ routes/
│ │ ├─ analysis.py
│ │ ├─ screening.py
│ │ ├─ portfolio.py
│ │ └─ paper_trading.py
├─ core/
│ ├─ config.py
│ ├─ db.py
│ └─ logging.py
├─ models/
│ ├─ symbol.py
│ ├─ market_data.py
│ ├─ document.py
│ ├─ portfolio.py
│ └─ analysis.py
├─ services/
│ ├─ market_data_service.py
│ ├─ retrieval_service.py
│ ├─ agent_service.py
│ ├─ risk_service.py
│ └─ paper_trading_service.py
├─ agents/
│ ├─ router.py
│ ├─ screener.py
│ ├─ analyst.py
│ ├─ portfolio_agent.py
│ └─ response_composer.py
├─ workers/
│ ├─ news_ingestion.py
│ ├─ filing_ingestion.py
│ ├─ embedding_jobs.py
│ └─ daily_research_jobs.py
└─ schemas/
├─ analysis.py
├─ screening.py
└─ portfolio.py
API는 어떤 단위로 나눌까
초기에는 너무 많은 엔드포인트보다 핵심 use case 위주가 좋습니다.
예:
POST /analysis/stockPOST /analysis/screenPOST /portfolio/evaluatePOST /paper-trading/proposalsGET /analysis/{run_id}
이렇게 use case 중심으로 자르면 UI와 연동하기도 쉽습니다.
분석 요청 흐름 예시
POST /analysis/stock
입력:
{
"symbol": "NVDA",
"question": "최근 30일 기준 신규 진입 리스크를 분석해줘",
"portfolio_id": "pf_001"
}
내부 처리:
- API route가 요청 검증
agent_service호출- Router가 질문 유형 판별
market_data_service가 가격 요약 반환retrieval_service가 뉴스/공시 context 검색risk_service가 포트폴리오 룰 평가- Composer가 응답 생성
- 결과 저장 후 응답 반환
PostgreSQL 스키마는 어떻게 잡을까
초기 핵심 테이블 예:
create table symbol (
id bigserial primary key,
ticker text not null unique,
name text not null,
sector text,
industry text,
market text
);
create table news_article (
id bigserial primary key,
symbol_id bigint references symbol(id),
title text not null,
body text not null,
source text,
published_at timestamptz not null
);
create table filing_document (
id bigserial primary key,
symbol_id bigint references symbol(id),
doc_type text not null,
title text not null,
body text not null,
filed_at timestamptz not null
);
여기서 중요한 것은 시계열 정합성입니다. published_at, filed_at, event_time 같은 필드는 매우 중요합니다.
pgvector는 어떻게 붙일까
문서 chunk용 테이블 예:
create table document_chunk (
id bigserial primary key,
symbol_id bigint references symbol(id),
source_type text not null,
source_document_id bigint not null,
chunk_text text not null,
embedding vector(1536),
published_at timestamptz,
metadata jsonb default '{}'::jsonb
);
검색 시에는 단순 similarity 외에도 symbol_id, source_type, published_at 필터를 같이 걸어야 합니다.
Retrieval service는 어떤 인터페이스가 좋을까
예:
class RetrievalService:
async def search_symbol_context(
self,
symbol: str,
query: str,
days: int = 30,
top_k: int = 5,
) -> list[dict]:
...
이 인터페이스가 너무 많은 내부 구현을 노출하지 않는 편이 좋습니다.
반환 예:
[
{
"source_type": "news",
"title": "NVIDIA raises data center guidance",
"published_at": "2026-04-17T08:20:00Z",
"score": 0.84,
"chunk_text": "..."
}
]
Agent service는 오케스트레이션 계층이다
중심 서비스 예시:
class StockAnalysisAgentService:
async def run(self, symbol: str, question: str, portfolio_id: str | None):
query = self.router.parse(question)
price_summary = await self.market_data.get_price_summary(symbol)
contexts = await self.retrieval.search_symbol_context(symbol, question)
risk = await self.risk.evaluate(symbol, portfolio_id)
return await self.composer.compose(
symbol=symbol,
query=query,
price_summary=price_summary,
contexts=contexts,
risk=risk,
)
이 계층이 있으면 API route가 얇아지고, 나중에 worker나 batch job에서도 같은 분석 로직을 재사용할 수 있습니다.
비동기 작업은 어디서 쓰나
이 시스템에서 동기 처리와 비동기 처리를 구분해야 합니다.
동기 처리:
- 사용자 분석 요청
- 포트폴리오 점검
비동기 처리:
- 뉴스 수집
- 공시 수집
- transcript 적재
- 임베딩 생성
- 일일 리서치 리포트 생성
즉, 데이터 파이프라인과 사용자 질의 경로를 분리해야 합니다.
Redis는 캐시만이 아니라 상태 저장에도 쓸 수 있다
예:
- 인기 종목 최근 분석 결과 캐시
- analysis run 상태 저장
- 중복 요청 방지
- worker job 상태 추적
예시:
analysis:run:{id}
analysis:latest:NVDA
screening:daily:semiconductor
이렇게 해두면 재계산 비용과 응답 시간을 줄일 수 있습니다.
Paper trading과 연결되는 구조
분석 결과가 바로 주문으로 가지 않도록 별도 proposal 테이블을 두는 편이 좋습니다.
예:
analysis_runtrade_proposalproposal_approvalpaper_order
이 구조가 있으면 분석과 실행 사이에 사람 승인 단계를 자연스럽게 둘 수 있습니다.
마무리
RAG 기반 투자 Agent를 실제 서비스로 만들 때 중요한 건 화려한 프레임워크보다 계층 분리입니다.
- 데이터 적재와 사용자 질의를 분리하고
- API, 오케스트레이션, 리스크 엔진을 나누고
- 문서 검색은 symbol/time 필터와 함께 다루고
- 분석과 주문 사이에 승인 계층을 둬야 합니다
다음 글에서는 이 시스템을 운영 단계로 올리기 위해 paper trading, 모니터링, 실패 대응, 사람 승인 workflow, 안전장치를 어떻게 설계할지 정리해보겠습니다.