Building a News Sentiment Broker
Building a News Sentiment Broker
I wanted to monitor news sentiment for stocks in real-time, but I didn't want to manually read through hundreds of articles every day. So I built a distributed system that scrapes news from Google Alerts, extracts structured information using LLMs, and indexes everything for semantic search.
Note: The original project wasn't actually about stock data - I'm using stocks here as a stand-in to protect other people's secrets. The system architecture and approach are similar, just with different questions and scoring criteria.
System Architecture
The system is built as a pipeline of microservices connected by Celery task queues:
Google Alerts RSS Feeds
↓
[1] Scraper Service
↓
URLs + Metadata
↓
[2] Page Parser Service
↓
Extracted Text + Title
↓
[3] Question Answer Service
↓
Structured Answers + Relevance Score
↓
[4] Vector Indexer Service
↓
Text Embeddings
↓
[5] Page Saver Service
↓
Elasticsearch Index
Stage 1: Scraping
The scraper polls Google Alerts RSS feeds for a list of stock tickers (AAPL, TSLA, NVDA, etc.). For each feed, it extracts URLs from entries and sends them to the parser queue with metadata like the ticker symbol and publication date.
for query, feed_url in feeds.items():
feed = feedparser.parse(feed_url)
for entry in feed.get("entries", []):
url = entry["links"][0]["href"].partition("url=")[-1].partition("&ct")[0]
payload = {
"updated": entry.get("updated"),
"title": entry.get("title"),
"published": entry.get("published"),
"query": query,
}
app.send_task('page_parser.parse_page',
args=[{"real_url": url, "index": INDEX_NAME, "payload": payload}],
queue='stage_page_parser')
I run the scraper in a loop, checking feeds every 15 seconds and then sleeping for 2 minutes before the next cycle. This keeps the system from hammering Google's servers while still catching new articles quickly.
Stage 2: Parsing
The page parser downloads HTML, extracts text using newspaper3k, and handles different content types (HTML, plain text, JSON, etc.). It also checks Elasticsearch to skip duplicates before processing.
┌─────────────────────────────────┐
│ Raw HTML Page │
│ │
│ <title>...</title> │
│ <p>Article content...</p> │
│ <div>More content...</div> │
└─────────────────────────────────┘
↓ newspaper3k
┌─────────────────────────────────┐
│ Parsed Article │
│ • Title │
│ • Text content │
│ • Authors │
│ • Publish date │
│ • Keywords │
└─────────────────────────────────┘
The parser uses BeautifulSoup to extract structured content from HTML, focusing on paragraphs, headings, and code blocks. It also runs NLP extraction to get keywords and summaries, which helps with later relevance scoring.
Stage 3: Question Answering
This is where it gets interesting. I use an LLM (deepseek-r1:7b via Ollama) to answer structured questions about each article. The questions help filter for relevance and measure sentiment:
- Is this positive news for the company?
- Is this negative news for the company?
- Did this event occur in the past week?
- Could this news cause a significant stock price movement?
user_prompt = f"""
Today's date is {today}
CONTEXT:
Title: "{data["title"]}"
Publish Date: "{data["publish_date"]}"
Text: "{text}"
INPUT:
* Is this positive news for the company?
* Is this negative news for the company?
* Did this event occur in the past week?
* Could this news cause a significant stock price movement?"""
response = client.chat(
messages=[{'role': 'user', 'content': user_prompt}],
model='deepseek-r1:7b',
format=Answers.model_json_schema(),
)
answers = Answers.model_validate_json(response.message.content)
I use Pydantic models to enforce structured output, which makes it easy to parse the LLM responses. The model returns a JSON object with boolean and string fields that I can use for filtering.
The relevance score is calculated from the answers:
score = int(answers.is_this_positive_news_for_the_company) + \
int(answers.is_this_negative_news_for_the_company) + \
int(answers.did_this_event_occur_in_the_past_week) + \
int(answers.could_this_news_cause_a_significant_stock_price_movement)
score = score / 4
This gives me a score between 0 and 1 that I can use to filter out irrelevant articles. I only keep articles that meet certain criteria (recent news with clear positive or negative sentiment that could impact stock prices).
Stage 4: Vector Indexing
Before saving to Elasticsearch, I generate text embeddings using sentence-transformers (all-MiniLM-L6-v2). These 384-dimensional vectors enable semantic search - I can find articles similar to a query even if they don't share exact keywords.
model = SentenceTransformer("all-MiniLM-L6-v2")
data["text_vector"] = model.encode(data["text"]).tolist()
The embeddings are stored in Elasticsearch as dense vectors, which lets me do cosine similarity searches later.
Stage 5: Storage
Everything gets saved to Elasticsearch with the vector embeddings, metadata, and structured answers. I use Elasticsearch's bulk API for efficient indexing, and I check for duplicates before inserting to avoid processing the same article twice.
mappings = {
"properties": {
"text_vector": {
"type": "dense_vector",
"dims": 384,
"index": "true",
"similarity": "cosine",
}
}
}
The Distributed Setup
Each service runs in its own Docker container with Celery workers. This makes it easy to scale horizontally - I can spin up more parser workers if the queue gets backed up, or add more QA workers if the LLM calls are slow.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Scraper │────▶│ Parser │────▶│ QA │
│ (Celery) │ │ (Celery) │ │ (Celery) │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Indexer │
│ (Celery) │
└─────────────┘
│
▼
┌───────────────┐
│ Elasticsearch │
└───────────────┘
The queues are separate (stage_page_parser, stage_answer_questions, stage_create_vector, stage_page_saver), which gives me fine-grained control over which workers handle which tasks.
Challenges
Duplicate detection: The same article can appear in multiple feeds or get scraped multiple times. I check the URL against Elasticsearch before processing, but I also handle duplicate URLs gracefully in each stage.
LLM reliability: The question-answering step is the slowest and most variable. I use structured output to reduce parsing errors, but I still need to handle cases where the LLM returns invalid JSON or misses the format.
Rate limiting: Google Alerts has rate limits, and I don't want to get blocked. I added random sleep intervals between requests and batch processing to spread the load.
Content extraction: Not all websites are easy to parse. Some use JavaScript-heavy frontends, some have paywalls, some return errors. I use newspaper3k as a fallback, but some articles still fail to extract properly.
What I Learned
This project taught me a lot about building reliable distributed systems:
-
Celery is great for pipelines: The task queue abstraction makes it easy to build multi-stage processing pipelines where each stage can scale independently.
-
Structured LLM output is crucial: Using Pydantic models with JSON schema validation catches most parsing errors early and makes the system more reliable.
-
Vector embeddings enable semantic search: Being able to find articles by meaning rather than exact keyword matches is powerful. I can search for "earnings beat" and find articles about "strong quarterly results" or "revenue surprise".
-
Deduplication matters: News articles get republished and syndicated across sites. Checking URLs before processing saves a lot of compute and storage.
-
Error handling is important: Each service needs to handle failures gracefully. If the LLM is down, the QA service should skip articles and retry later, not crash the entire pipeline.
The system processes thousands of articles per day and lets me quickly find relevant news about specific stocks. It's also a good example of how modern LLMs can be used for structured information extraction, not just chat.