TestForge Blog
← 전체 포스트

RAG 기반 AI 주식 투자 Agent 5편 - FastAPI, PostgreSQL, pgvector로 실제 서비스 구조 만들기

이제 아키텍처를 코드 구조로 내려봅니다. FastAPI API 계층, PostgreSQL/pgvector 스키마, Redis 캐시, 비동기 작업 큐, 분석 실행 흐름, 핵심 모듈 분리까지 실제 개발 가능한 서비스 구조를 설명합니다.

TestForge Team ·

이제는 설계를 코드 구조로 내려야 한다

앞선 글에서 아키텍처와 workflow를 정리했다면, 이제는 실제로 개발 가능한 구조로 내려야 합니다.

초기 구현 기준으로 추천하는 구조는 아래와 같습니다.

  • API 서버: FastAPI
  • 메타/시계열 저장: PostgreSQL
  • 벡터 검색: pgvector
  • 캐시와 작업 상태: Redis
  • 비동기 배치: worker process

핵심은 모든 것을 한 파일에 넣지 않고, 도메인별로 분리하는 것입니다.

권장 디렉터리 구조

예시:

app/
├─ api/
│  ├─ routes/
│  │  ├─ analysis.py
│  │  ├─ screening.py
│  │  ├─ portfolio.py
│  │  └─ paper_trading.py
├─ core/
│  ├─ config.py
│  ├─ db.py
│  └─ logging.py
├─ models/
│  ├─ symbol.py
│  ├─ market_data.py
│  ├─ document.py
│  ├─ portfolio.py
│  └─ analysis.py
├─ services/
│  ├─ market_data_service.py
│  ├─ retrieval_service.py
│  ├─ agent_service.py
│  ├─ risk_service.py
│  └─ paper_trading_service.py
├─ agents/
│  ├─ router.py
│  ├─ screener.py
│  ├─ analyst.py
│  ├─ portfolio_agent.py
│  └─ response_composer.py
├─ workers/
│  ├─ news_ingestion.py
│  ├─ filing_ingestion.py
│  ├─ embedding_jobs.py
│  └─ daily_research_jobs.py
└─ schemas/
   ├─ analysis.py
   ├─ screening.py
   └─ portfolio.py

API는 어떤 단위로 나눌까

초기에는 너무 많은 엔드포인트보다 핵심 use case 위주가 좋습니다.

예:

  • POST /analysis/stock
  • POST /analysis/screen
  • POST /portfolio/evaluate
  • POST /paper-trading/proposals
  • GET /analysis/{run_id}

이렇게 use case 중심으로 자르면 UI와 연동하기도 쉽습니다.

분석 요청 흐름 예시

POST /analysis/stock

입력:

{
  "symbol": "NVDA",
  "question": "최근 30일 기준 신규 진입 리스크를 분석해줘",
  "portfolio_id": "pf_001"
}

내부 처리:

  1. API route가 요청 검증
  2. agent_service 호출
  3. Router가 질문 유형 판별
  4. market_data_service가 가격 요약 반환
  5. retrieval_service가 뉴스/공시 context 검색
  6. risk_service가 포트폴리오 룰 평가
  7. Composer가 응답 생성
  8. 결과 저장 후 응답 반환

PostgreSQL 스키마는 어떻게 잡을까

초기 핵심 테이블 예:

create table symbol (
  id bigserial primary key,
  ticker text not null unique,
  name text not null,
  sector text,
  industry text,
  market text
);

create table news_article (
  id bigserial primary key,
  symbol_id bigint references symbol(id),
  title text not null,
  body text not null,
  source text,
  published_at timestamptz not null
);

create table filing_document (
  id bigserial primary key,
  symbol_id bigint references symbol(id),
  doc_type text not null,
  title text not null,
  body text not null,
  filed_at timestamptz not null
);

여기서 중요한 것은 시계열 정합성입니다. published_at, filed_at, event_time 같은 필드는 매우 중요합니다.

pgvector는 어떻게 붙일까

문서 chunk용 테이블 예:

create table document_chunk (
  id bigserial primary key,
  symbol_id bigint references symbol(id),
  source_type text not null,
  source_document_id bigint not null,
  chunk_text text not null,
  embedding vector(1536),
  published_at timestamptz,
  metadata jsonb default '{}'::jsonb
);

검색 시에는 단순 similarity 외에도 symbol_id, source_type, published_at 필터를 같이 걸어야 합니다.

Retrieval service는 어떤 인터페이스가 좋을까

예:

class RetrievalService:
    async def search_symbol_context(
        self,
        symbol: str,
        query: str,
        days: int = 30,
        top_k: int = 5,
    ) -> list[dict]:
        ...

이 인터페이스가 너무 많은 내부 구현을 노출하지 않는 편이 좋습니다.

반환 예:

[
  {
    "source_type": "news",
    "title": "NVIDIA raises data center guidance",
    "published_at": "2026-04-17T08:20:00Z",
    "score": 0.84,
    "chunk_text": "..."
  }
]

Agent service는 오케스트레이션 계층이다

중심 서비스 예시:

class StockAnalysisAgentService:
    async def run(self, symbol: str, question: str, portfolio_id: str | None):
        query = self.router.parse(question)
        price_summary = await self.market_data.get_price_summary(symbol)
        contexts = await self.retrieval.search_symbol_context(symbol, question)
        risk = await self.risk.evaluate(symbol, portfolio_id)
        return await self.composer.compose(
            symbol=symbol,
            query=query,
            price_summary=price_summary,
            contexts=contexts,
            risk=risk,
        )

이 계층이 있으면 API route가 얇아지고, 나중에 worker나 batch job에서도 같은 분석 로직을 재사용할 수 있습니다.

비동기 작업은 어디서 쓰나

이 시스템에서 동기 처리와 비동기 처리를 구분해야 합니다.

동기 처리:

  • 사용자 분석 요청
  • 포트폴리오 점검

비동기 처리:

  • 뉴스 수집
  • 공시 수집
  • transcript 적재
  • 임베딩 생성
  • 일일 리서치 리포트 생성

즉, 데이터 파이프라인과 사용자 질의 경로를 분리해야 합니다.

Redis는 캐시만이 아니라 상태 저장에도 쓸 수 있다

예:

  • 인기 종목 최근 분석 결과 캐시
  • analysis run 상태 저장
  • 중복 요청 방지
  • worker job 상태 추적

예시:

analysis:run:{id}
analysis:latest:NVDA
screening:daily:semiconductor

이렇게 해두면 재계산 비용과 응답 시간을 줄일 수 있습니다.

Paper trading과 연결되는 구조

분석 결과가 바로 주문으로 가지 않도록 별도 proposal 테이블을 두는 편이 좋습니다.

예:

  • analysis_run
  • trade_proposal
  • proposal_approval
  • paper_order

이 구조가 있으면 분석과 실행 사이에 사람 승인 단계를 자연스럽게 둘 수 있습니다.

마무리

RAG 기반 투자 Agent를 실제 서비스로 만들 때 중요한 건 화려한 프레임워크보다 계층 분리입니다.

  • 데이터 적재와 사용자 질의를 분리하고
  • API, 오케스트레이션, 리스크 엔진을 나누고
  • 문서 검색은 symbol/time 필터와 함께 다루고
  • 분석과 주문 사이에 승인 계층을 둬야 합니다

다음 글에서는 이 시스템을 운영 단계로 올리기 위해 paper trading, 모니터링, 실패 대응, 사람 승인 workflow, 안전장치를 어떻게 설계할지 정리해보겠습니다.