Break project into multiple files

This commit is contained in:
Regis David Souza Mesquita 2025-03-02 03:22:35 +00:00
parent 1f5b333dc6
commit 353c69294a
15 changed files with 687 additions and 702 deletions

3
.gitignore vendored
View file

@ -1,2 +1,5 @@
cache/
__init__.py
*.pyc
.DS_Store
.aider*

13
Makefile Normal file
View file

@ -0,0 +1,13 @@
.PHONY: test run serve clean
test:
python -m unittest discover -s tests
run:
python -m vibe.main --generate --prompt "Your interests here" --max-articles 5 --output summary.mp3
serve:
python -m vibe.main --serve
clean:
rm -rf cache

128
README.md
View file

@ -1,103 +1,83 @@
# vibe: Article Summarization & TTS Pipeline
vibe is a Python-based pipeline that automatically fetches the latest Computer Science research articles from arXiv, filters them for relevance using a language model (LLM), converts article PDFs to Markdown with Docling, generates narrative summaries, and synthesizes the summaries into an MP3 audio file using a text-to-speech (TTS) system. This tool is ideal for users who prefer listening to curated research summaries on the go or integrating the process into a larger system via an API.
vibe is a Python-based pipeline that automatically fetches the latest Computer Science research articles from arXiv, filters them for relevance using a language model (LLM), converts article PDFs to Markdown with Docling, generates narrative summaries, and synthesizes the summaries into an MP3 audio file using a text-to-speech (TTS) system.
## Features
This repository has been refactored into a modular structure for improved maintainability.
- **Fetch Articles:** Retrieves the latest Computer Science articles from arXiv.
- **Cache Mechanism:** Caches article metadata and converted content to speed up subsequent requests.
- **Relevance Filtering:** Uses an LLM to filter articles based on user-provided interests.
- **PDF Conversion:** Converts PDF articles to Markdown format using Docling.
- **Summarization:** Generates a fluid, narrative-style summary for each relevant article with the help of an LLM.
- **Text-to-Speech:** Converts the final narrative summary into an MP3 file using KPipeline.
- **Flask API:** Exposes the functionality via a RESTful endpoint for dynamic requests.
- **CLI and Server Modes:** Run the pipeline as a one-off CLI command or as a continuously running Flask server.
## Project Structure
## Why Use vibe?
- **Stay Updated:** Automatically curate and summarize the latest research articles so you can keep up with advancements in your field.
- **Hands-Free Listening:** Enjoy audio summaries during your commute or while multitasking.
- **Automated Workflow:** Seamlessly integrate multiple processing steps—from fetching and filtering to summarization and TTS.
- **Flexible Deployment:** Use the CLI mode for quick summaries or deploy the Flask API for integration with other systems.
- **vibe/** - Main package containing all modules:
- `config.py` - Configuration, constants, and cache setup.
- `fetcher.py` - Module to fetch articles from arXiv.
- `filter.py` - Module for relevance filtering using an LLM.
- `rerank.py` - Module to rerank articles.
- `converter.py` - Module to convert PDFs to Markdown.
- `summarizer.py` - Module to generate article summaries.
- `tts.py` - Module for text-to-speech conversion.
- `orchestrator.py` - Orchestrates the complete pipeline.
- `server.py` - Flask server exposing a REST API.
- `main.py` - CLI entry point.
- **tests/** - Contains unit tests.
- **requirements.txt** - Python package requirements.
- **Makefile** - Makefile to run common tasks.
## Installation
1. **Prerequisites:**
Ensure you have Python 3.x installed on your system.
- Python 3.x
- Install dependencies:
```bash
pip install -r requirements.txt
```
2. **Clone the Repository:**
Clone this repository to your local machine.
2. **Clone the repository:**
```bash
git clone <repository_url>
cd <repository_directory>
3. **Install Dependencies:**
Navigate to the project directory and install the required packages:
```
pip install -r requirements.txt
```
Running the Application
## Usage
CLI Mode
### CLI Mode
To generate a summary MP3 using the CLI:
Run the pipeline once to generate an MP3 summary file. For example:
```
python vibe.py --generate --prompt "I live in a mid-sized European city, working in the tech industry on AI-driven automation solutions. I prefer content focused on deep learning and reinforcement learning applications, and I want to filter out less relevant topics. Only include articles that are rated 9 or 10 on a relevance scale from 0 to 10." --max-articles 10 --output summary_cli.mp3
```
This command fetches the latest articles from arXiv, filters and ranks them based on your specified interests, generates narrative summaries, and converts the final summary into an MP3 file named `summary_cli.mp3`.
python vibe/main.py --generate --prompt "Your interests and context here" --max-articles 5 --output summary.mp3
### Server Mode
Server Mode
To run the Flask server:
python vibe/main.py --serve
Then, you can make a POST request to http://127.0.0.1:5000/process with a JSON payload:
Alternatively, you can run vibe as a Flask server:
```
python vibe.py --serve
```
Once the server is running, you can process requests by sending a POST request to the `/process` endpoint. For example:
```
curl -X POST http://127.0.0.1:5000/process \
-H "Content-Type: application/json" \
-d '{"user_info": "Your interests here", "max_articles": 5, "new_only": false}'
```
The server processes the articles, generates an MP3 summary, and returns the file as a downloadable response.
## Environment Variables
Running Tests
The following environment variables can be set to customize the behavior of vibe:
The project includes basic tests to verify that modules are working as expected. To run the tests, execute:
- `ARXIV_URL`: The URL used to fetch the latest arXiv articles. Defaults to `https://arxiv.org/list/cs/new`.
- `LLM_URL`: The URL for the language model endpoint. Defaults to `http://127.0.0.1:4000/v1/chat/completions` (this is a litellm instance).
- `MODEL_NAME`: The model name to be used by the LLM. Defaults to `mistral-small-latest`.
make test
Note that using the `mistral-small` model through their cloud service typically costs a few cents per run and completes the summarization process in around 4 minutes. It is also possible to run vibe with local LLMs (such as qwen 2.5 14b or mistral-small), although these local runs may take up to an hour.
or
## Project Structure
python -m unittest discover -s tests
- **vibe.py:** Main application file containing modules for:
- Fetching and caching arXiv articles.
- Filtering articles for relevance.
- Converting PDFs to Markdown using Docling.
- Summarizing articles via an LLM.
- Converting text summaries to speech (MP3) using KPipeline.
- Exposing a Flask API for processing requests.
- **requirements.txt:** Contains the list of Python packages required by the project.
- **CACHE_DIR:** Directory created at runtime for caching articles and processed files.
Makefile Commands
• make test - Run the unit tests.
• make run - Run the application in CLI mode (you can modify the command inside the Makefile).
• make serve - Run the Flask server.
• make clean - Clean up temporary files (e.g., remove the cache directory).
## Dependencies
Environment Variables
The project relies on several key libraries:
- Flask
- requests
- beautifulsoup4
- soundfile
- docling
- kokoro
The following environment variables can be set to customize the behavior:
• ARXIV_URL
• LLM_URL
• MODEL_NAME
## Contributing
License
Contributions are welcome! Feel free to fork this repository and submit pull requests with improvements or bug fixes.
## License
This project is licensed under the MIT License.
## Acknowledgments
Thanks to the developers of [Docling](https://github.com/docling) and [Kokoro](https://github.com/kokoro) as well as the maintainers of BeautifulSoup and Flask for providing great tools that made this project possible.
This project is licensed under the MIT License.

103
tests/test_vibe.py Normal file
View file

@ -0,0 +1,103 @@
import unittest
from unittest.mock import patch, MagicMock
# Import modules from the vibe package
from vibe.fetcher import fetch_arxiv_list
from vibe.filter import batch_relevance_filter
from vibe.rerank import rerank_articles
from vibe.converter import fetch_and_convert_article
from vibe.summarizer import generate_article_summary
from vibe.orchestrator import process_articles
class TestVibeModules(unittest.TestCase):
@patch("vibe.fetcher.requests.get")
def test_fetch_arxiv_list(self, mock_get):
# Setup a fake response for arXiv HTML
fake_html = """
<html>
<body>
<dl>
<dt><a title="Abstract">arXiv:1234.5678</a> <a title="Download PDF" href="/pdf/1234.5678.pdf"></a></dt>
<dd>
<div class="list-title">Title: Test Article</div>
<p class="mathjax">This is a test abstract.</p>
</dd>
</dl>
</body>
</html>
"""
mock_get.return_value.status_code = 200
mock_get.return_value.text = fake_html
articles = fetch_arxiv_list(force_refresh=True, arxiv_url="http://fakeurl")
self.assertEqual(len(articles), 1)
self.assertEqual(articles[0]["id"], "arXiv:1234.5678")
@patch("vibe.filter.requests.post")
def test_batch_relevance_filter(self, mock_post):
# Simulate LLM response
fake_response = MagicMock()
fake_response.status_code = 200
fake_response.json.return_value = {"choices": [{"message": {"content": '{"arXiv:1234.5678": "yes"}'}}]}
mock_post.return_value = fake_response
articles = [{"id": "arXiv:1234.5678", "title": "Test", "abstract": "Test abstract"}]
relevant_ids = batch_relevance_filter(articles, "dummy user")
self.assertIn("arXiv:1234.5678", relevant_ids)
@patch("vibe.rerank.requests.post")
def test_rerank_articles(self, mock_post):
fake_response = MagicMock()
fake_response.status_code = 200
fake_response.json.return_value = {"choices": [{"message": {"content": '{"ranking": ["arXiv:1234.5678"]}'}}]}
mock_post.return_value = fake_response
articles = [{"id": "arXiv:1234.5678", "title": "Test", "abstract": "Test abstract"}]
ranked = rerank_articles(articles, "dummy user")
self.assertEqual(ranked[0]["id"], "arXiv:1234.5678")
@patch("vibe.converter.requests.get")
def test_fetch_and_convert_article(self, mock_get):
# This test will simulate a failure to download a PDF
article = {"id": "arXiv:1234.5678", "pdf_url": "http://fakepdf", "title": "Test", "abstract": "Test abstract"}
mock_get.return_value.status_code = 404
content = fetch_and_convert_article(article)
self.assertEqual(content, "")
@patch("vibe.summarizer.requests.post")
def test_generate_article_summary(self, mock_post):
fake_response = MagicMock()
fake_response.status_code = 200
fake_response.json.return_value = {"choices": [{"message": {"content": "Summary text"}}]}
mock_post.return_value = fake_response
summary = generate_article_summary({"id": "arXiv:1234.5678", "title": "Test"}, "content", "dummy user")
self.assertEqual(summary, "Summary text")
@patch("vibe.orchestrator.fetch_arxiv_list")
@patch("vibe.orchestrator.batch_relevance_filter")
@patch("vibe.orchestrator.rerank_articles")
@patch("vibe.orchestrator.fetch_and_convert_article")
@patch("vibe.orchestrator.generate_article_summary")
def test_process_articles(self, mock_summary, mock_convert, mock_rerank, mock_filter, mock_fetch):
# Setup mocks for orchestrator pipeline
mock_fetch.return_value = [{
"id": "arXiv:1234.5678",
"title": "Test Article",
"abstract": "Test abstract",
"pdf_url": "http://fakepdf"
}]
mock_filter.return_value = {"arXiv:1234.5678"}
mock_rerank.return_value = [{
"id": "arXiv:1234.5678",
"title": "Test Article",
"abstract": "Test abstract",
"pdf_url": "http://fakepdf"
}]
mock_convert.return_value = "Converted content"
mock_summary.return_value = "Final summary"
summary = process_articles("dummy user", max_articles=1)
self.assertIn("Final summary", summary)
if __name__ == "__main__":
unittest.main()

628
vibe.py
View file

@ -1,628 +0,0 @@
#!/usr/bin/env python3
import os
import json
import requests
import subprocess
from datetime import datetime
import tempfile
import logging
import concurrent.futures
import re
from bs4 import BeautifulSoup
# --- Docling Imports ---
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.base_models import InputFormat
from docling_core.types.doc import ImageRefMode
# --- Kokoro & TTS Imports ---
from kokoro import KPipeline
import soundfile as sf
# --- Flask Imports ---
from flask import Flask, send_file, request, jsonify
# --- Logging Configuration ---
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Cache Setup ---
CACHE_DIR = "cache"
ARXIV_CACHE_FILE = os.path.join(CACHE_DIR, "arxiv_list.json")
ARTICLES_CACHE_DIR = os.path.join(CACHE_DIR, "articles")
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
logger.debug("Created cache directory: %s", CACHE_DIR)
if not os.path.exists(ARTICLES_CACHE_DIR):
os.makedirs(ARTICLES_CACHE_DIR)
logger.debug("Created articles cache directory: %s", ARTICLES_CACHE_DIR)
# --- Instantiate Docling Converter ---
logger.debug("Instantiating Docling converter with PDF options.")
pdf_options = PdfFormatOption(
pipeline_options=PdfPipelineOptions(generate_picture_images=True)
)
doc_converter = DocumentConverter(format_options={InputFormat.PDF: pdf_options})
DEFAULT_ARXIV_URL = os.environ.get("ARXIV_URL", "https://arxiv.org/list/cs/new")
DEFAULT_LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:4000/v1/chat/completions")
DEFAULT_MODEL_NAME = os.environ.get("MODEL_NAME", "mistral-small-latest")
# --- Module: Fetcher ---
def fetch_arxiv_list(force_refresh=False, arxiv_url=DEFAULT_ARXIV_URL):
"""
Fetches the latest CS articles from arXiv. If a cache exists, reads from it
unless force_refresh is True. Otherwise, parses the arXiv page, extracts
article metadata, and caches it.
"""
logger.debug("Checking for cached arXIV list at %s", ARXIV_CACHE_FILE)
if not force_refresh and os.path.exists(ARXIV_CACHE_FILE):
logger.info("Cache found for arXiv list. Loading from cache.")
with open(ARXIV_CACHE_FILE, "r", encoding="utf-8") as f:
articles = json.load(f)
logger.debug("Loaded %d articles from cache.", len(articles))
return articles
url = arxiv_url
logger.info("Fetching arXiv page from %s", url)
response = requests.get(url)
if response.status_code != 200:
logger.error(
"Failed to fetch arXiv page. Status code: %d", response.status_code
)
raise Exception("Failed to fetch arXiv page.")
logger.debug("Parsing arXiv HTML content.")
soup = BeautifulSoup(response.text, "html.parser")
articles = []
dl = soup.find("dl")
if not dl:
logger.error("No article list found on arXiv page.")
raise Exception("No article list found on arXiv page.")
dts = dl.find_all("dt")
dds = dl.find_all("dd")
logger.debug("Found %d dt tags and %d dd tags.", len(dts), len(dds))
for dt, dd in zip(dts, dds):
id_link = dt.find("a", title="Abstract")
if not id_link:
logger.debug("Skipping an article with no abstract link.")
continue
article_id = id_link.text.strip()
pdf_link = dt.find("a", title="Download PDF")
pdf_url = "https://arxiv.org" + pdf_link["href"] if pdf_link else None
title_div = dd.find("div", class_="list-title")
title = (
title_div.text.replace("Title:", "").strip() if title_div else "No title"
)
abstract_div = dd.find("p", class_="mathjax")
abstract = abstract_div.text.strip() if abstract_div else "No abstract"
articles.append(
{
"id": article_id,
"title": title,
"abstract": abstract,
"pdf_url": pdf_url,
}
)
logger.debug("Parsed article: %s", article_id)
with open(ARXIV_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(articles, f)
logger.info("Cached %d articles to %s", len(articles), ARXIV_CACHE_FILE)
return articles
# --- Module: Batched Relevance Filter (Parallelized) ---
def batch_relevance_filter(
articles,
user_info,
batch_size=50,
llm_url=DEFAULT_LLM_URL,
model_name=DEFAULT_MODEL_NAME,
):
"""
Sends articles to the LLM in batches to check their relevance.
Expects a JSON response mapping article IDs to "yes" or "no".
This version parallelizes the batched requests.
"""
relevant_article_ids = set()
url = llm_url
logger.info("Starting batched relevance check for %d articles.", len(articles))
def process_batch(batch):
local_relevant_ids = set()
prompt_lines = [f"User info: {user_info}\n"]
prompt_lines.append(
"For each of the following articles, determine if it is relevant to the user. Respond in JSON format the keys are the article IDs and the values are 'yes' or 'no', do not add any preamble or any other form of text, your response will be parsed by a json parser immediatelly. remember you have to start your answer with valid json , you cannot add any text, the first char of your answer must be a { , no text."
)
for article in batch:
prompt_lines.append(
f"Article ID: {article['id']}\nTitle: {article['title']}\nAbstract: {article['abstract']}\n"
)
prompt = "\n".join(prompt_lines)
payload = {
"model": model_name,
"messages": [{"role": "user", "content": prompt}],
}
try:
response = requests.post(url, json=payload)
if response.status_code != 200:
logger.error(
"LLM batched relevance check failed for batch starting with article '%s' with status code: %d",
batch[0]["id"],
response.status_code,
)
return local_relevant_ids
data = response.json()
text_response = data["choices"][0]["message"]["content"].strip()
try:
match = re.search(r"\{.*\}", text_response, re.DOTALL)
if not match:
raise ValueError("No valid JSON object found in response")
json_str = match.group(0)
logger.debug("Batch response: %s", json_str[:200])
result = json.loads(json_str)
for article_id, verdict in result.items():
if isinstance(verdict, str) and verdict.lower().strip() == "yes":
local_relevant_ids.add(article_id)
except Exception as e:
logger.exception("Failed to parse JSON from LLM response: %s", e)
return local_relevant_ids
except Exception as e:
logger.exception("Error during batched relevance check: %s", e)
return local_relevant_ids
batches = [
articles[i : i + batch_size] for i in range(0, len(articles), batch_size)
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_batch, batch) for batch in batches]
for future in concurrent.futures.as_completed(futures):
relevant_article_ids.update(future.result())
logger.info(
"Batched relevance check complete. %d articles marked as relevant.",
len(relevant_article_ids),
)
return relevant_article_ids
# --- Module: Rerank Articles (Improved JSON extraction) ---
def rerank_articles(
articles, user_info, llm_url=DEFAULT_LLM_URL, model_name=DEFAULT_MODEL_NAME
):
"""
Calls the LLM to reorder the articles by importance. Returns the reordered list.
Expects a JSON response with a 'ranking' key pointing to a list of article IDs, ordered from most relevant to least relevant.
"""
if not articles:
return []
url = llm_url
logger.info("Starting rerank for %d articles.", len(articles))
prompt_lines = [
f"User info: {user_info}\n",
'Please rank the following articles from most relevant to least relevant. Return your answer as valid JSON in the format: { "ranking": [ "id1", "id2", ... ] }.',
]
for article in articles:
prompt_lines.append(
f"Article ID: {article['id']}\nTitle: {article['title']}\nAbstract: {article['abstract']}\n"
)
prompt = "\n".join(prompt_lines)
payload = {"model": model_name, "messages": [{"role": "user", "content": prompt}]}
try:
response = requests.post(url, json=payload)
if response.status_code != 200:
logger.error(
"LLM reranking request failed with status code: %d",
response.status_code,
)
return articles # fallback: return original order
data = response.json()
text_response = data["choices"][0]["message"]["content"].strip()
match = re.search(r"\{.*\}", text_response, re.DOTALL)
if not match:
logger.error("No valid JSON found in rerank response.")
return articles
json_str = match.group(0)
rerank_result = json.loads(json_str)
ranking_list = rerank_result.get("ranking", [])
# Create a map for quick lookup
article_map = {a["id"]: a for a in articles}
reordered = []
for art_id in ranking_list:
if art_id in article_map:
reordered.append(article_map[art_id])
# Add any articles not mentioned in the ranking_list, to preserve them at the end
remaining = [a for a in articles if a["id"] not in ranking_list]
reordered.extend(remaining)
return reordered
except Exception as e:
logger.exception("Error during rerank: %s", e)
return articles
# --- Module: Document Converter ---
def fetch_and_convert_article(article):
"""
Checks for a cached conversion of the article.
If absent, downloads the PDF, converts it using Docling,
caches the Markdown text, and returns it.
"""
safe_id = article["id"].replace(":", "_")
cache_file = os.path.join(ARTICLES_CACHE_DIR, f"{safe_id}.txt")
logger.debug("Checking for cached conversion of article '%s'.", article["id"])
if os.path.exists(cache_file):
logger.info("Found cached conversion for article '%s'.", article["id"])
with open(cache_file, "r", encoding="utf-8") as f:
return f.read()
if not article["pdf_url"]:
logger.error("No PDF URL for article '%s'. Skipping conversion.", article["id"])
return ""
logger.info(
"Downloading PDF for article '%s' from %s", article["id"], article["pdf_url"]
)
response = requests.get(article["pdf_url"])
if response.status_code != 200:
logger.error("Failed to download PDF for article '%s'.", article["id"])
return ""
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp_pdf:
tmp_pdf.write(response.content)
tmp_pdf_path = tmp_pdf.name
logger.debug("PDF saved temporarily at %s", tmp_pdf_path)
try:
logger.info("Converting PDF for article '%s' using Docling.", article["id"])
conv_result = doc_converter.convert(source=tmp_pdf_path)
converted_text = conv_result.document.export_to_markdown()
with open(cache_file, "w", encoding="utf-8") as f:
f.write(converted_text)
logger.info(
"Conversion successful for article '%s'. Cached output.", article["id"]
)
return converted_text
except Exception as e:
logger.exception("Conversion failed for article '%s': %s", article["id"], e)
return ""
finally:
if os.path.exists(tmp_pdf_path):
os.unlink(tmp_pdf_path)
logger.debug("Temporary PDF file %s removed.", tmp_pdf_path)
# --- Module: Summarizer (Parallelizable) ---
def generate_article_summary(
article, content, user_info, llm_url=DEFAULT_LLM_URL, model_name=DEFAULT_MODEL_NAME
):
"""
Generates a fluid, narrative summary for the article using the LLM.
The summary starts with a connecting phrase like 'And now, {article title}'.
"""
url = llm_url
prompt = (
f"User info: {user_info}\n\n"
f"Please summarize the following article titled '{article['title']}' in a fluid narrative prose style without lists or visual cues. "
f"Begin the summary with a connecting segment like 'And now, Article: {article['title']}'.\n\n"
f"Article Content:\n{content}"
)
payload = {
"model": model_name,
"messages": [{"role": "user", "content": prompt}],
}
logger.info("Generating summary for article '%s'.", article["id"])
try:
response = requests.post(url, json=payload)
if response.status_code != 200:
logger.error(
"LLM summarization failed for article '%s'. Status code: %d",
article["id"],
response.status_code,
)
return ""
data = response.json()
summary = data["choices"][0]["message"]["content"].strip()
logger.debug("Summary for article '%s': %s", article["id"], summary[:100])
return summary
except Exception as e:
logger.exception("Error summarizing article '%s': %s", article["id"], e)
return ""
# --- Module: TTS Converter ---
def text_to_speech(text, output_mp3):
"""
Converts the provided text to speech using KPipeline.
A temporary WAV file is generated and then converted to MP3 using ffmpeg.
"""
logger.info("Starting text-to-speech conversion.")
pipeline = KPipeline(lang_code="a")
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
temp_wav_path = tmp_wav.name
logger.debug("Temporary WAV file created at %s", temp_wav_path)
try:
generator = pipeline(text, voice="af_bella", speed=1, split_pattern=r"\n+")
with sf.SoundFile(temp_wav_path, "w", 24000, channels=1) as f:
for chunk_index, (_, _, audio) in enumerate(generator):
logger.debug("Writing audio chunk %d to WAV file.", chunk_index)
f.write(audio)
logger.info("WAV file generated. Converting to MP3 with ffmpeg.")
subprocess.run(["ffmpeg", "-y", "-i", temp_wav_path, output_mp3], check=True)
logger.info("MP3 file created at %s", output_mp3)
finally:
if os.path.exists(temp_wav_path):
os.unlink(temp_wav_path)
logger.debug("Temporary WAV file %s removed.", temp_wav_path)
# --- Orchestrator: Process Articles (Parallelizing summarization) ---
def process_articles(
user_info,
arxiv_url=DEFAULT_ARXIV_URL,
llm_url=DEFAULT_LLM_URL,
model_name=DEFAULT_MODEL_NAME,
max_articles=5,
new_only=False,
):
"""
Executes the full pipeline:
1. Fetch arXiv articles (cached if available, unless new_only=True).
2. If new_only, filter out articles that have already been cached as .txt files.
3. Batch-check relevance via LLM (parallelized).
4. Re-rank articles by importance using the LLM.
5. Select the top `max_articles`.
6. For each selected article, download and convert the PDF to Markdown (sequential).
7. Generate a narrative summary for each article (parallelized if not cached).
8. Combine all summaries into a final narrative.
"""
logger.info("Starting article processing pipeline.")
# Step 1: fetch articles with potential force_refresh
articles = fetch_arxiv_list(force_refresh=new_only, arxiv_url=arxiv_url)
logger.info("Total articles fetched: %d", len(articles))
# Step 2: if new_only is True, filter out articles older than the most recent cached article
if new_only:
cached_articles = [
f[:-4] for f in os.listdir(ARTICLES_CACHE_DIR) if f.endswith(".txt")
]
if cached_articles:
def parse_id(id_str):
if id_str.lower().startswith("ar"):
id_str = id_str[6:]
parts = id_str.split(".")
return (int(parts[0][:2]), int(parts[0][2:]), int(parts[1]))
most_recent = max(cached_articles, key=parse_id)
articles = [
article
for article in articles
if parse_id(article["id"]) > parse_id(most_recent)
]
logger.info(
"After filtering by most recent article id %s, %d articles remain.",
most_recent,
len(articles),
)
else:
logger.info(
"No cached articles found, proceeding with all fetched articles."
)
# Step 3: batch relevance check (parallelized)
relevant_ids = batch_relevance_filter(
articles, user_info, llm_url=llm_url, model_name=model_name
)
relevant_articles = [
article for article in articles if article["id"] in relevant_ids
]
logger.info(
"Found %d relevant articles out of %d.", len(relevant_articles), len(articles)
)
# Step 4: rerank
reranked_articles = rerank_articles(
relevant_articles, user_info, llm_url=llm_url, model_name=model_name
)
# Step 5: select top max_articles
final_candidates = reranked_articles[:max_articles]
# Step 6: convert PDFs sequentially
articles_with_content = []
for article in final_candidates:
content = fetch_and_convert_article(article)
if content:
articles_with_content.append((article, content))
else:
logger.warning("No content obtained for article '%s'.", article["id"])
# Step 7: generate summaries in parallel
summaries = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_article = {
executor.submit(
generate_article_summary,
article,
content,
user_info,
llm_url,
model_name,
): article
for article, content in articles_with_content
}
for future in concurrent.futures.as_completed(future_to_article):
article = future_to_article[future]
try:
summary = future.result()
if summary:
summaries.append(summary)
else:
logger.warning(
"No summary generated for article '%s'.", article["id"]
)
except Exception as e:
logger.exception(
"Error generating summary for article '%s': %s", article["id"], e
)
# Step 8: combine summaries
final_summary = "\n\n".join(summaries) + " "
final_summary += f"\n\nThanks for listening to the report. Generated on {datetime.now().strftime('%B %d, %Y at %I:%M %p')} by vibe.py"
logger.info(
"Final summary generated with length %d characters.", len(final_summary)
)
return final_summary
# --- Flask Application ---
app = Flask(__name__)
@app.route("/process", methods=["POST"])
def process_endpoint():
"""
Expects JSON with a 'user_info' field.
Optionally accepts 'max_articles' (default 5) and 'new_only' (boolean).
Runs the complete pipeline and returns the final MP3 file.
"""
data = request.get_json()
user_info = data.get("user_info", "")
if not user_info:
logger.error("user_info not provided in request.")
return jsonify({"error": "user_info not provided"}), 400
max_articles = data.get("max_articles", 5)
new_only = data.get("new_only", False)
logger.info(
"Processing request with user_info: %s, max_articles: %s, new_only: %s",
user_info,
max_articles,
new_only,
)
final_summary = process_articles(
user_info,
arxiv_url=DEFAULT_ARXIV_URL,
llm_url=DEFAULT_LLM_URL,
model_name=DEFAULT_MODEL_NAME,
max_articles=max_articles,
new_only=new_only,
)
if not final_summary.strip():
logger.error("No summaries generated.")
return jsonify({"error": "No summaries generated."}), 500
output_mp3 = os.path.join(CACHE_DIR, "final_output.mp3")
try:
text_to_speech(final_summary, output_mp3)
except Exception as e:
logger.exception("TTS conversion failed: %s", e)
return jsonify({"error": f"TTS conversion failed: {e}"}), 500
logger.info("Process complete. Returning MP3 file.")
return send_file(output_mp3, as_attachment=True)
# --- Main ---
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="vibe: Article Summarization & TTS Pipeline"
)
parser.add_argument("--serve", action="store_true", help="Run as a Flask server.")
parser.add_argument(
"--generate",
action="store_true",
help="Run the pipeline once, generate a summary MP3, then exit.",
)
parser.add_argument(
"--prompt",
type=str,
default="",
help="User info (interests, context) for LLM filtering & summaries.",
)
parser.add_argument(
"--max-articles",
type=int,
default=5,
help="Maximum articles to process in the pipeline.",
)
parser.add_argument(
"--new-only",
action="store_true",
help="If set, only process articles newer than cached.",
)
parser.add_argument(
"--arxiv-url",
type=str,
default=DEFAULT_ARXIV_URL,
help="URL for fetching arXiv articles.",
)
parser.add_argument(
"--llm-url", type=str, default=DEFAULT_LLM_URL, help="URL of the LLM endpoint."
)
parser.add_argument(
"--model-name",
type=str,
default=DEFAULT_MODEL_NAME,
help="Name of model to pass to the LLM endpoint.",
)
parser.add_argument(
"--output",
type=str,
default="final_output.mp3",
help="Output path for the generated MP3 file.",
)
args = parser.parse_args()
if args.serve:
logger.info("Starting Flask application in verbose mode.")
app.run(debug=True)
elif args.generate:
# Run the pipeline directly and produce an MP3 file
logger.info("Running pipeline in CLI mode.")
user_info = args.prompt
final_summary = process_articles(
user_info=user_info,
arxiv_url=args.arxiv_url,
llm_url=args.llm_url,
model_name=args.model_name,
max_articles=args.max_articles,
new_only=args.new_only,
)
if not final_summary.strip():
logger.error("No summaries generated.")
exit(1)
output_mp3 = args.output
try:
text_to_speech(final_summary, output_mp3)
logger.info(f"Generated MP3 at: {output_mp3}")
except Exception as e:
logger.exception("TTS conversion failed: %s", e)
exit(1)
else:
# Default to Flask server if neither flag is set
logger.info("No --serve or --generate specified; running Flask by default.")
app.run(debug=True)

22
vibe/config.py Normal file
View file

@ -0,0 +1,22 @@
import os
import logging
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
CACHE_DIR = "cache"
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
logger.debug("Created cache directory: %s", CACHE_DIR)
ARXIV_CACHE_FILE = os.path.join(CACHE_DIR, "arxiv_list.json")
ARTICLES_CACHE_DIR = os.path.join(CACHE_DIR, "articles")
if not os.path.exists(ARTICLES_CACHE_DIR):
os.makedirs(ARTICLES_CACHE_DIR)
logger.debug("Created articles cache directory: %s", ARTICLES_CACHE_DIR)
DEFAULT_ARXIV_URL = os.environ.get("ARXIV_URL", "https://arxiv.org/list/cs/new")
DEFAULT_LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:4000/v1/chat/completions")
DEFAULT_MODEL_NAME = os.environ.get("MODEL_NAME", "mistral-small-latest")

60
vibe/converter.py Normal file
View file

@ -0,0 +1,60 @@
import os
import json
import tempfile
import requests
import logging
import subprocess
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.base_models import InputFormat
from .config import ARTICLES_CACHE_DIR
logger = logging.getLogger(__name__)
pdf_options = PdfFormatOption(pipeline_options=PdfPipelineOptions(generate_picture_images=True))
doc_converter = DocumentConverter(format_options={InputFormat.PDF: pdf_options})
def fetch_and_convert_article(article):
"""
Checks for a cached conversion of the article.
If absent, downloads the PDF, converts it using Docling,
caches the Markdown text, and returns it.
"""
safe_id = article["id"].replace(":", "_")
cache_file = os.path.join(ARTICLES_CACHE_DIR, f"{safe_id}.txt")
logger.debug("Checking for cached conversion of article '%s'.", article["id"])
if os.path.exists(cache_file):
logger.info("Found cached conversion for article '%s'.", article["id"])
with open(cache_file, "r", encoding="utf-8") as f:
return f.read()
if not article["pdf_url"]:
logger.error("No PDF URL for article '%s'. Skipping conversion.", article["id"])
return ""
logger.info("Downloading PDF for article '%s' from %s", article["id"], article["pdf_url"])
response = requests.get(article["pdf_url"])
if response.status_code != 200:
logger.error("Failed to download PDF for article '%s'.", article["id"])
return ""
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp_pdf:
tmp_pdf.write(response.content)
tmp_pdf_path = tmp_pdf.name
logger.debug("PDF saved temporarily at %s", tmp_pdf_path)
try:
logger.info("Converting PDF for article '%s' using Docling.", article["id"])
conv_result = doc_converter.convert(source=tmp_pdf_path)
converted_text = conv_result.document.export_to_markdown()
with open(cache_file, "w", encoding="utf-8") as f:
f.write(converted_text)
logger.info("Conversion successful for article '%s'. Cached output.", article["id"])
return converted_text
except Exception as e:
logger.exception("Conversion failed for article '%s': %s", article["id"], e)
return ""
finally:
if os.path.exists(tmp_pdf_path):
os.unlink(tmp_pdf_path)
logger.debug("Temporary PDF file %s removed.", tmp_pdf_path)

71
vibe/fetcher.py Normal file
View file

@ -0,0 +1,71 @@
import os
import json
import requests
from bs4 import BeautifulSoup
import logging
from .config import ARXIV_CACHE_FILE
logger = logging.getLogger(__name__)
def fetch_arxiv_list(force_refresh=False, arxiv_url=None):
"""
Fetches the latest CS articles from arXiv. If a cache exists, reads from it
unless force_refresh is True. Otherwise, parses the arXiv page, extracts
article metadata, and caches it.
"""
if arxiv_url is None:
from .config import DEFAULT_ARXIV_URL
arxiv_url = DEFAULT_ARXIV_URL
logger.debug("Checking for cached arXIV list at %s", ARXIV_CACHE_FILE)
if not force_refresh and os.path.exists(ARXIV_CACHE_FILE):
logger.info("Cache found for arXiv list. Loading from cache.")
with open(ARXIV_CACHE_FILE, "r", encoding="utf-8") as f:
articles = json.load(f)
logger.debug("Loaded %d articles from cache.", len(articles))
return articles
logger.info("Fetching arXiv page from %s", arxiv_url)
response = requests.get(arxiv_url)
if response.status_code != 200:
logger.error("Failed to fetch arXiv page. Status code: %d", response.status_code)
raise Exception("Failed to fetch arXiv page.")
logger.debug("Parsing arXiv HTML content.")
soup = BeautifulSoup(response.text, "html.parser")
articles = []
dl = soup.find("dl")
if not dl:
logger.error("No article list found on arXiv page.")
raise Exception("No article list found on arXiv page.")
dts = dl.find_all("dt")
dds = dl.find_all("dd")
logger.debug("Found %d dt tags and %d dd tags.", len(dts), len(dds))
for dt, dd in zip(dts, dds):
id_link = dt.find("a", title="Abstract")
if not id_link:
logger.debug("Skipping an article with no abstract link.")
continue
article_id = id_link.text.strip()
pdf_link = dt.find("a", title="Download PDF")
pdf_url = "https://arxiv.org" + pdf_link["href"] if pdf_link else None
title_div = dd.find("div", class_="list-title")
title = title_div.text.replace("Title:", "").strip() if title_div else "No title"
abstract_div = dd.find("p", class_="mathjax")
abstract = abstract_div.text.strip() if abstract_div else "No abstract"
articles.append({
"id": article_id,
"title": title,
"abstract": abstract,
"pdf_url": pdf_url,
})
logger.debug("Parsed article: %s", article_id)
with open(ARXIV_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(articles, f)
logger.info("Cached %d articles to %s", len(articles), ARXIV_CACHE_FILE)
return articles

69
vibe/filter.py Normal file
View file

@ -0,0 +1,69 @@
import json
import re
import requests
import logging
import concurrent.futures
logger = logging.getLogger(__name__)
def batch_relevance_filter(articles, user_info, batch_size=50, llm_url=None, model_name=None):
"""
Sends articles to the LLM in batches to check their relevance.
Expects a JSON response mapping article IDs to "yes" or "no".
This version parallelizes the batched requests.
"""
if llm_url is None or model_name is None:
from .config import DEFAULT_LLM_URL, DEFAULT_MODEL_NAME
llm_url = llm_url or DEFAULT_LLM_URL
model_name = model_name or DEFAULT_MODEL_NAME
relevant_article_ids = set()
logger.info("Starting batched relevance check for %d articles.", len(articles))
def process_batch(batch):
local_relevant_ids = set()
prompt_lines = [f"User info: {user_info}\n"]
prompt_lines.append(
"For each of the following articles, determine if it is relevant to the user. Respond in JSON format with keys as the article IDs and values as 'yes' or 'no'. Do not add any extra text; the response must start with a '{'."
)
for article in batch:
prompt_lines.append(
f"Article ID: {article['id']}\nTitle: {article['title']}\nAbstract: {article['abstract']}\n"
)
prompt = "\n".join(prompt_lines)
payload = {
"model": model_name,
"messages": [{"role": "user", "content": prompt}],
}
try:
response = requests.post(llm_url, json=payload)
if response.status_code != 200:
logger.error("LLM batched relevance check failed for batch starting with article '%s' with status code: %d", batch[0]["id"], response.status_code)
return local_relevant_ids
data = response.json()
text_response = data["choices"][0]["message"]["content"].strip()
try:
match = re.search(r"\{.*\}", text_response, re.DOTALL)
if not match:
raise ValueError("No valid JSON object found in response")
json_str = match.group(0)
logger.debug("Batch response: %s", json_str[:200])
result = json.loads(json_str)
for article_id, verdict in result.items():
if isinstance(verdict, str) and verdict.lower().strip() == "yes":
local_relevant_ids.add(article_id)
except Exception as e:
logger.exception("Failed to parse JSON from LLM response: %s", e)
return local_relevant_ids
except Exception as e:
logger.exception("Error during batched relevance check: %s", e)
return local_relevant_ids
batches = [articles[i: i + batch_size] for i in range(0, len(articles), batch_size)]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(process_batch, batch) for batch in batches]
for future in concurrent.futures.as_completed(futures):
relevant_article_ids.update(future.result())
logger.info("Batched relevance check complete. %d articles marked as relevant.", len(relevant_article_ids))
return relevant_article_ids

49
vibe/main.py Normal file
View file

@ -0,0 +1,49 @@
import argparse
import logging
from vibe.orchestrator import process_articles
from vibe.tts import text_to_speech
from vibe.config import DEFAULT_ARXIV_URL, DEFAULT_LLM_URL, DEFAULT_MODEL_NAME
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description="vibe: Article Summarization & TTS Pipeline")
parser.add_argument("--serve", action="store_true", help="Run as a Flask server.")
parser.add_argument("--generate", action="store_true", help="Run the pipeline once and generate a summary MP3, then exit.")
parser.add_argument("--prompt", type=str, default="", help="User info for LLM filtering & summaries.")
parser.add_argument("--max-articles", type=int, default=5, help="Maximum articles to process in the pipeline.")
parser.add_argument("--new-only", action="store_true", help="Only process articles newer than cached.")
parser.add_argument("--arxiv-url", type=str, default=DEFAULT_ARXIV_URL, help="URL for fetching arXiv articles.")
parser.add_argument("--llm-url", type=str, default=DEFAULT_LLM_URL, help="URL of the LLM endpoint.")
parser.add_argument("--model-name", type=str, default=DEFAULT_MODEL_NAME, help="Name of model to pass to the LLM endpoint.")
parser.add_argument("--output", type=str, default="final_output.mp3", help="Output path for the generated MP3 file.")
args = parser.parse_args()
if args.serve:
from vibe.server import app
logger.info("Starting Flask server.")
app.run(debug=True)
elif args.generate:
logger.info("Running pipeline in CLI mode.")
user_info = args.prompt
final_summary = process_articles(user_info, arxiv_url=args.arxiv_url, llm_url=args.llm_url, model_name=args.model_name, max_articles=args.max_articles, new_only=args.new_only)
if not final_summary.strip():
logger.error("No summaries generated.")
exit(1)
try:
text_to_speech(final_summary, args.output)
logger.info(f"Generated MP3 at: {args.output}")
except Exception as e:
logger.exception("TTS conversion failed: %s", e)
exit(1)
else:
logger.info("No mode specified; defaulting to Flask server.")
from vibe.server import app
app.run(debug=True)
if __name__ == "__main__":
main()

80
vibe/orchestrator.py Normal file
View file

@ -0,0 +1,80 @@
import os
import logging
import concurrent.futures
from datetime import datetime
from .config import ARTICLES_CACHE_DIR
from .fetcher import fetch_arxiv_list
from .filter import batch_relevance_filter
from .rerank import rerank_articles
from .converter import fetch_and_convert_article
from .summarizer import generate_article_summary
from .tts import text_to_speech
logger = logging.getLogger(__name__)
def process_articles(user_info, arxiv_url=None, llm_url=None, model_name=None, max_articles=5, new_only=False):
"""
Executes the full pipeline:
1. Fetch arXiv articles.
2. Optionally filter out articles older than cached ones if new_only is True.
3. Batch-check relevance via LLM.
4. Rerank articles.
5. Select top max_articles.
6. Convert PDFs to Markdown.
7. Generate narrative summaries.
8. Combine summaries into a final narrative.
"""
articles = fetch_arxiv_list(force_refresh=new_only, arxiv_url=arxiv_url)
logger.info("Total articles fetched: %d", len(articles))
if new_only:
cached_articles = [f[:-4] for f in os.listdir(ARTICLES_CACHE_DIR) if f.endswith(".txt")]
if cached_articles:
def parse_id(id_str):
if id_str.lower().startswith("ar"):
id_str = id_str[6:]
parts = id_str.split(".")
return (int(parts[0][:2]), int(parts[0][2:]), int(parts[1]))
most_recent = max(cached_articles, key=parse_id)
articles = [article for article in articles if parse_id(article["id"]) > parse_id(most_recent)]
logger.info("After filtering by most recent article id %s, %d articles remain.", most_recent, len(articles))
else:
logger.info("No cached articles found, proceeding with all fetched articles.")
relevant_ids = batch_relevance_filter(articles, user_info, llm_url=llm_url, model_name=model_name)
relevant_articles = [article for article in articles if article["id"] in relevant_ids]
logger.info("Found %d relevant articles out of %d.", len(relevant_articles), len(articles))
reranked_articles = rerank_articles(relevant_articles, user_info, llm_url=llm_url, model_name=model_name)
final_candidates = reranked_articles[:max_articles]
articles_with_content = []
for article in final_candidates:
content = fetch_and_convert_article(article)
if content:
articles_with_content.append((article, content))
else:
logger.warning("No content obtained for article '%s'.", article["id"])
summaries = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_article = {
executor.submit(generate_article_summary, article, content, user_info, llm_url, model_name): article
for article, content in articles_with_content
}
for future in concurrent.futures.as_completed(future_to_article):
article = future_to_article[future]
try:
summary = future.result()
if summary:
summaries.append(summary)
else:
logger.warning("No summary generated for article '%s'.", article["id"])
except Exception as e:
logger.exception("Error generating summary for article '%s': %s", article["id"], e)
final_summary = "\n\n".join(summaries)
final_summary += f"\n\nThanks for listening to the report. Generated on {datetime.now().strftime('%B %d, %Y at %I:%M %p')} by vibe."
logger.info("Final summary generated with length %d characters.", len(final_summary))
return final_summary

54
vibe/rerank.py Normal file
View file

@ -0,0 +1,54 @@
import json
import re
import requests
import logging
logger = logging.getLogger(__name__)
def rerank_articles(articles, user_info, llm_url=None, model_name=None):
"""
Calls the LLM to reorder the articles by importance. Returns the reordered list.
Expects a JSON response with a 'ranking' key pointing to a list of article IDs.
"""
if not articles:
return []
if llm_url is None or model_name is None:
from .config import DEFAULT_LLM_URL, DEFAULT_MODEL_NAME
llm_url = llm_url or DEFAULT_LLM_URL
model_name = model_name or DEFAULT_MODEL_NAME
logger.info("Starting rerank for %d articles.", len(articles))
prompt_lines = [
f"User info: {user_info}\n",
'Please rank the following articles from most relevant to least relevant. Return your answer as valid JSON in the format: { "ranking": [ "id1", "id2", ... ] }.',
]
for article in articles:
prompt_lines.append(
f"Article ID: {article['id']}\nTitle: {article['title']}\nAbstract: {article['abstract']}\n"
)
prompt = "\n".join(prompt_lines)
payload = {"model": model_name, "messages": [{"role": "user", "content": prompt}]}
try:
response = requests.post(llm_url, json=payload)
if response.status_code != 200:
logger.error("LLM reranking request failed with status code: %d", response.status_code)
return articles
data = response.json()
text_response = data["choices"][0]["message"]["content"].strip()
match = re.search(r"\{.*\}", text_response, re.DOTALL)
if not match:
logger.error("No valid JSON found in rerank response.")
return articles
json_str = match.group(0)
rerank_result = json.loads(json_str)
ranking_list = rerank_result.get("ranking", [])
article_map = {a["id"]: a for a in articles}
reordered = [article_map[art_id] for art_id in ranking_list if art_id in article_map]
remaining = [a for a in articles if a["id"] not in ranking_list]
reordered.extend(remaining)
return reordered
except Exception as e:
logger.exception("Error during rerank: %s", e)
return articles

38
vibe/server.py Normal file
View file

@ -0,0 +1,38 @@
from flask import Flask, send_file, request, jsonify
import logging
from .orchestrator import process_articles
from .config import CACHE_DIR
logger = logging.getLogger(__name__)
app = Flask(__name__)
@app.route("/process", methods=["POST"])
def process_endpoint():
data = request.get_json()
user_info = data.get("user_info", "")
if not user_info:
logger.error("user_info not provided in request.")
return jsonify({"error": "user_info not provided"}), 400
max_articles = data.get("max_articles", 5)
new_only = data.get("new_only", False)
logger.info("Processing request with user_info: %s, max_articles: %s, new_only: %s", user_info, max_articles, new_only)
final_summary = process_articles(user_info, max_articles=max_articles, new_only=new_only)
if not final_summary.strip():
logger.error("No summaries generated.")
return jsonify({"error": "No summaries generated."}), 500
output_mp3 = f"{CACHE_DIR}/final_output.mp3"
try:
from .tts import text_to_speech
text_to_speech(final_summary, output_mp3)
except Exception as e:
logger.exception("TTS conversion failed: %s", e)
return jsonify({"error": f"TTS conversion failed: {e}"}), 500
logger.info("Process complete. Returning MP3 file.")
return send_file(output_mp3, as_attachment=True)
if __name__ == "__main__":
app.run(debug=True)

38
vibe/summarizer.py Normal file
View file

@ -0,0 +1,38 @@
import requests
import logging
from .config import DEFAULT_LLM_URL, DEFAULT_MODEL_NAME
logger = logging.getLogger(__name__)
def generate_article_summary(article, content, user_info, llm_url=None, model_name=None):
"""
Generates a fluid, narrative summary for the article using the LLM.
The summary starts with a connecting phrase.
"""
if llm_url is None or model_name is None:
llm_url = DEFAULT_LLM_URL
model_name = DEFAULT_MODEL_NAME
prompt = (
f"User info: {user_info}\n\n"
f"Please summarize the following article titled '{article['title']}' in a fluid narrative prose style without lists or visual cues. "
f"Begin the summary with a connecting segment like 'And now, Article: {article['title']}'.\n\n"
f"Article Content:\n{content}"
)
payload = {
"model": model_name,
"messages": [{"role": "user", "content": prompt}],
}
logger.info("Generating summary for article '%s'.", article["id"])
try:
response = requests.post(llm_url, json=payload)
if response.status_code != 200:
logger.error("LLM summarization failed for article '%s'. Status code: %d", article["id"], response.status_code)
return ""
data = response.json()
summary = data["choices"][0]["message"]["content"].strip()
logger.debug("Summary for article '%s': %s", article["id"], summary[:100])
return summary
except Exception as e:
logger.exception("Error summarizing article '%s': %s", article["id"], e)
return ""

33
vibe/tts.py Normal file
View file

@ -0,0 +1,33 @@
import os
import subprocess
import tempfile
import logging
import soundfile as sf
from kokoro import KPipeline
logger = logging.getLogger(__name__)
def text_to_speech(text, output_mp3):
"""
Converts the provided text to speech using KPipeline.
Generates a temporary WAV file and converts it to MP3 using ffmpeg.
"""
logger.info("Starting text-to-speech conversion.")
pipeline = KPipeline(lang_code="a")
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
temp_wav_path = tmp_wav.name
logger.debug("Temporary WAV file created at %s", temp_wav_path)
try:
generator = pipeline(text, voice="af_bella", speed=1, split_pattern=r"\n+")
with sf.SoundFile(temp_wav_path, "w", 24000, channels=1) as f:
for chunk_index, (_, _, audio) in enumerate(generator):
logger.debug("Writing audio chunk %d to WAV file.", chunk_index)
f.write(audio)
logger.info("WAV file generated. Converting to MP3 with ffmpeg.")
subprocess.run(["ffmpeg", "-y", "-i", temp_wav_path, output_mp3], check=True)
logger.info("MP3 file created at %s", output_mp3)
finally:
if os.path.exists(temp_wav_path):
os.unlink(temp_wav_path)
logger.debug("Temporary WAV file %s removed.", temp_wav_path)