#!/usr/bin/env python3
"""Export PostgreSQL requirements to YAML."""
import asyncio
import json
import os
from pathlib import Path
import yaml
from psycopg_pool import AsyncConnectionPool
from psycopg.rows import dict_row
async def export_to_yaml(database_url: str, output_path: Path, workspace_id: str = None):
pool = AsyncConnectionPool(conninfo=database_url, min_size=1, max_size=3)
await pool.open()
query = "SELECT * FROM requirements"
params = []
if workspace_id:
query += " WHERE workspace_id = %s"
params = [workspace_id]
async with pool.connection() as conn:
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(query, params or None)
rows = await cur.fetchall()
output_path.mkdir(parents=True, exist_ok=True)
for row in rows:
req_dir = output_path / row['id']
req_dir.mkdir(exist_ok=True)
# Convert JSONB fields
req_data = {
'requirement': {
'id': row['id'],
'title': row['title'],
'type': row['type'],
'classification': row['classification'],
'priority': row['priority'],
'status': row['status'],
'version': row.get('version', '1.0'),
'description': row.get('description', ''),
'business_rationale': row.get('business_rationale'),
'detailed_requirements': json.loads(row['detailed_requirements'] or '[]'),
'success_criteria': json.loads(row['success_criteria'] or '[]'),
'acceptance_criteria': json.loads(row['acceptance_criteria'] or '[]'),
'relationships': json.loads(row['relationships'] or '{}'),
'stakeholders': json.loads(row['stakeholders'] or 'null'),
'metadata': json.loads(row['metadata'] or '{}'),
}
}
with open(req_dir / 'definition.yml', 'w') as f:
yaml.dump(req_data, f, default_flow_style=False, allow_unicode=True)
print(f"Exported: {row['id']}")
await pool.close()
print(f"Exported {len(rows)} requirements to {output_path}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--database-url', default=os.getenv('DATABASE_URL'))
parser.add_argument('--output-path', default='./requirements_export')
parser.add_argument('--workspace-id')
args = parser.parse_args()
asyncio.run(export_to_yaml(
args.database_url,
Path(args.output_path),
args.workspace_id
))