Skip to main content
Python SDK provides access to the CatchAll API from Python applications with support for both synchronous and asynchronous operations.

Installation

pip install newscatcher-catchall-sdk

Quickstart

Get started with CatchAll in three steps:
1

Initialize the client

from newscatcher_catchall import CatchAllApi

client = CatchAllApi(api_key="YOUR_API_KEY")
2

Create a job

job = client.jobs.create_job(
    query="AI company acquisitions",
    limit=10,
)
job_id = job.job_id
3

Wait and retrieve results

import time

POLL_INTERVAL_SECONDS = 60

# Poll for completion
while True:
    status = client.jobs.get_job_status(job_id)
    if status.status == "completed":
        break
    time.sleep(POLL_INTERVAL_SECONDS)

# Get results
results = client.jobs.get_job_results(job_id)
print(f"Found {results.valid_records} valid records")
Jobs process asynchronously and typically complete in 10-15 minutes. See the Quickstart for a complete walkthrough.

Working with jobs

Get suggested validators, enrichments, and date ranges before creating a job:
import json
from newscatcher_catchall import CatchAllApi

client = CatchAllApi(api_key="YOUR_API_KEY")

suggestions = client.jobs.initialize(
    query="AI company acquisitions",
    context="Focus on deal size and acquiring company details"
)

print(json.dumps(suggestions.model_dump(), indent=2, default=str))
To learn more, see the Initialize endpoint.
from newscatcher_catchall import CatchAllApi
from newscatcher_catchall.core.api_error import ApiError
import time

POLL_INTERVAL_SECONDS = 60

client = CatchAllApi(api_key="YOUR_API_KEY")

try:
    # Create job with custom enrichments
    job = client.jobs.create_job(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details",
        limit=10,
        enrichments=[
            {
                "name": "acquirer_company",
                "description": "Extract the acquiring company name",
                "type": "company"
            },
            {
                "name": "deal_value",
                "description": "Extract acquisition price if mentioned",
                "type": "number"
            }
        ]
    )
    job_id = job.job_id
    print(f"Job created: {job_id}")

    # Poll with early results access
    while True:
        status = client.jobs.get_job_status(job_id)

        if status.status in ["enriching", "completed"]:
            results = client.jobs.get_job_results(job_id)
            if results.valid_records is not None:
                print(f"Progress: {results.valid_records} valid records")

            if status.status == "completed":
                break

        time.sleep(POLL_INTERVAL_SECONDS)

    # Continue if needed
    if results.valid_records >= 10:
        client.jobs.continue_job(job_id=job_id, new_limit=50)
        
        while True:
            status = client.jobs.get_job_status(job_id)
            if status.status == "completed":
                break
            time.sleep(POLL_INTERVAL_SECONDS)
        
        results = client.jobs.get_job_results(job_id)

    # Display results
    print(f"\nFinal: {results.valid_records} valid records")
    for record in results.all_records:
        print(f"  {record.record_title}")

except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")

Working with monitors

Automate recurring queries with scheduled execution.
Create a monitor from a completed job:
monitor = client.monitors.create_monitor(
    reference_job_id=job_id,
    schedule="every day at 12 PM UTC",
    webhook={
        "url": "https://your-endpoint.com/webhook",
        "method": "POST",
        "headers": {"Authorization": "Bearer YOUR_TOKEN"},
    },
)
print(f"Monitor created: {monitor.monitor_id}")
Monitors require a minimum 24-hour interval between executions. Learn more in the Monitors documentation.
from newscatcher_catchall import CatchAllApi
from newscatcher_catchall.core.api_error import ApiError

client = CatchAllApi(api_key="YOUR_API_KEY")

try:
    # Create monitor from completed job
    job_id = "af7a26d6-cf0b-458c-a6ed-4b6318c74da3"
    
    monitor = client.monitors.create_monitor(
        reference_job_id=job_id,
        schedule="every day at 12 PM UTC",
        webhook={
            "url": "https://your-endpoint.com/webhook",
            "method": "POST",
            "headers": {"Authorization": "Bearer YOUR_TOKEN"},
        },
    )
    monitor_id = monitor.monitor_id
    print(f"Monitor created: {monitor_id}")

    # Update webhook
    client.monitors.update_monitor(
        monitor_id=monitor_id,
        webhook={
            "url": "https://new-endpoint.com/webhook",
            "method": "POST",
        },
    )

    # List all monitors
    all_monitors = client.monitors.list_monitors()
    for m in all_monitors.monitors:
        status = "active" if m.enabled else "paused"
        print(f"{m.monitor_id}: {status}")

    # Control execution
    client.monitors.disable_monitor(monitor_id)
    client.monitors.enable_monitor(monitor_id)

    # List execution history
    jobs = client.monitors.list_monitor_jobs(
        monitor_id=monitor_id,
        sort="desc",
    )
    print(f"\nMonitor executed {jobs.total_jobs} jobs")
    for job in jobs.jobs:
        print(f"  Job {job.job_id}: {job.start_date} to {job.end_date}")

    # Get aggregated results
    results = client.monitors.pull_monitor_results(monitor_id)
    print(f"\nCollected {results.records} total records")
    for record in results.all_records:
        print(f"  {record.record_title}")
        print(f"  Added: {record.added_on}")

except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")

Async usage

Use the async client for non-blocking API calls:
import asyncio
from newscatcher_catchall import AsyncCatchAllApi

POLL_INTERVAL_SECONDS = 60

async def main():
    client = AsyncCatchAllApi(api_key="YOUR_API_KEY")
    
    job = await client.jobs.create_job(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details",
    )
    
    while True:
        status = await client.jobs.get_job_status(job.job_id)
        if status.status == "completed":
            break
        await asyncio.sleep(POLL_INTERVAL_SECONDS)
    
    results = await client.jobs.get_job_results(job.job_id)
    print(f"Found {results.valid_records} records")

asyncio.run(main())

Error handling

Handle API errors with structured exception handling:
from newscatcher_catchall.core.api_error import ApiError

try:
    client.jobs.create_job(query="AI company acquisitions")
except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")

Advanced usage

Pagination

Retrieve large result sets page by page:
page = 1
while True:
    results = client.jobs.get_job_results(
        job_id=job_id,
        page=page,
        page_size=100,
    )

    print(f"Page {results.page}/{results.total_pages}")

    for record in results.all_records:
        print(f"  {record.record_title}")

    if results.page >= results.total_pages:
        break
    page += 1

Timeouts

Configure custom timeouts at client or request level:
client = CatchAllApi(
    api_key="YOUR_API_KEY",
    timeout=30.0,
)

Retries

Configure automatic retry behavior for failed requests:
client = CatchAllApi(
    api_key="YOUR_API_KEY",
    max_retries=3,
)

Resources