Skip to main content

Documentation Index

Fetch the complete documentation index at: https://newscatcherinc-docs.mintlify.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

Python SDK provides access to the CatchAll API from Python applications with support for both synchronous and asynchronous operations.

Installation

pip install newscatcher-catchall-sdk

Quickstart

Get started with CatchAll in three steps:
1

Initialize the client

from newscatcher_catchall import CatchAllApi

client = CatchAllApi(api_key="YOUR_API_KEY")
2

Create a job

job = client.jobs.create_job(
    query="AI company acquisitions",
    limit=10,
)
job_id = job.job_id
3

Wait and retrieve results

import time

POLL_INTERVAL_SECONDS = 60

# Poll for completion
while True:
    status = client.jobs.get_job_status(job_id)
    if status.status == "completed":
        break
    time.sleep(POLL_INTERVAL_SECONDS)

# Get results
results = client.jobs.get_job_results(job_id)
print(f"Found {results.valid_records} valid records")
Jobs process asynchronously and typically complete in 10-15 minutes. See the Quickstart for a complete walkthrough.

Working with jobs

Get suggested validators, enrichments, and date ranges before creating a job:
import json
from newscatcher_catchall import CatchAllApi

client = CatchAllApi(api_key="YOUR_API_KEY")

suggestions = client.jobs.initialize(
    query="AI company acquisitions",
    context="Focus on deal size and acquiring company details"
)

print(json.dumps(suggestions.model_dump(), indent=2, default=str))
To learn more, see the Initialize endpoint.
from newscatcher_catchall import CatchAllApi
from newscatcher_catchall.core.api_error import ApiError
import time

POLL_INTERVAL_SECONDS = 60

client = CatchAllApi(api_key="YOUR_API_KEY")

try:
    # Create job with custom enrichments
    job = client.jobs.create_job(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details",
        limit=10,
        enrichments=[
            {
                "name": "acquirer_company",
                "description": "Extract the acquiring company name",
                "type": "company"
            },
            {
                "name": "deal_value",
                "description": "Extract acquisition price if mentioned",
                "type": "number"
            }
        ]
    )
    job_id = job.job_id
    print(f"Job created: {job_id}")

    # Poll with early results access
    while True:
        status = client.jobs.get_job_status(job_id)

        if status.status in ["enriching", "completed"]:
            results = client.jobs.get_job_results(job_id)
            if results.valid_records is not None:
                print(f"Progress: {results.valid_records} valid records")

            if status.status == "completed":
                break

        time.sleep(POLL_INTERVAL_SECONDS)

    # Continue if needed
    if results.valid_records >= 10:
        client.jobs.continue_job(job_id=job_id, new_limit=50)
        
        while True:
            status = client.jobs.get_job_status(job_id)
            if status.status == "completed":
                break
            time.sleep(POLL_INTERVAL_SECONDS)
        
        results = client.jobs.get_job_results(job_id)

    # Display results
    print(f"\nFinal: {results.valid_records} valid records")
    for record in results.all_records:
        print(f"  {record.record_title}")

except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")

Working with monitors

Automate recurring queries with scheduled execution.
Create a monitor from a completed job:
monitor = client.monitors.create_monitor(
    reference_job_id=job_id,
    schedule="every day at 12 PM UTC",
    webhook={
        "url": "https://your-endpoint.com/webhook",
        "method": "POST",
        "headers": {"Authorization": "Bearer YOUR_TOKEN"},
    },
)
print(f"Monitor created: {monitor.monitor_id}")
Monitors require a minimum 24-hour interval between executions. Learn more in the Monitors documentation.
from newscatcher_catchall import CatchAllApi
from newscatcher_catchall.core.api_error import ApiError

client = CatchAllApi(api_key="YOUR_API_KEY")

try:
    # Create monitor from completed job
    job_id = "af7a26d6-cf0b-458c-a6ed-4b6318c74da3"
    
    monitor = client.monitors.create_monitor(
        reference_job_id=job_id,
        schedule="every day at 12 PM UTC",
        webhook={
            "url": "https://your-endpoint.com/webhook",
            "method": "POST",
            "headers": {"Authorization": "Bearer YOUR_TOKEN"},
        },
    )
    monitor_id = monitor.monitor_id
    print(f"Monitor created: {monitor_id}")

    # Update webhook
    client.monitors.update_monitor(
        monitor_id=monitor_id,
        webhook={
            "url": "https://new-endpoint.com/webhook",
            "method": "POST",
        },
    )

    # List all monitors
    all_monitors = client.monitors.list_monitors()
    for m in all_monitors.monitors:
        status = "active" if m.enabled else "paused"
        print(f"{m.monitor_id}: {status}")

    # Control execution
    client.monitors.disable_monitor(monitor_id)
    client.monitors.enable_monitor(monitor_id)

    # List execution history
    jobs = client.monitors.list_monitor_jobs(
        monitor_id=monitor_id,
        sort="desc",
    )
    print(f"\nMonitor executed {jobs.total_jobs} jobs")
    for job in jobs.jobs:
        print(f"  Job {job.job_id}: {job.start_date} to {job.end_date}")

    # Get aggregated results
    results = client.monitors.pull_monitor_results(monitor_id)
    print(f"\nCollected {results.records} total records")
    for record in results.all_records:
        print(f"  {record.record_title}")
        print(f"  Added: {record.added_on}")

except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")
Company search lets you track specific companies across jobs. Create entities, group them into a dataset, then connect the dataset to any job to get per-company relevance scores in results.
from newscatcher_catchall import (
    CatchAllApi,
    AdditionalAttributes,
    CompanyAttributes,
)

client = CatchAllApi(api_key="YOUR_API_KEY")

entity = client.entities.create_entity(
    name="NewsCatcher",
    entity_type="company",
    description="AI-powered news data provider",
    additional_attributes=AdditionalAttributes(
        company_attributes=CompanyAttributes(
            domain="newscatcherapi.com",
            alternative_names=["NC", "NewsCatcher API"],
            key_persons=["Artem Bugara", "Maksym Sugonyaka"],
        )
    ),
)
entity_id = entity.id
For a full step-by-step walkthrough including batch entity creation and the JSON API path, see Company Watchlist.

Async usage

Use the async client for non-blocking API calls:
import asyncio
from newscatcher_catchall import AsyncCatchAllApi

POLL_INTERVAL_SECONDS = 60

async def main():
    client = AsyncCatchAllApi(api_key="YOUR_API_KEY")
    
    job = await client.jobs.create_job(
        query="AI company acquisitions",
        context="Focus on deal size and acquiring company details",
    )
    
    while True:
        status = await client.jobs.get_job_status(job.job_id)
        if status.status == "completed":
            break
        await asyncio.sleep(POLL_INTERVAL_SECONDS)
    
    results = await client.jobs.get_job_results(job.job_id)
    print(f"Found {results.valid_records} records")

asyncio.run(main())

Error handling

Handle API errors with structured exception handling:
from newscatcher_catchall.core.api_error import ApiError

try:
    client.jobs.create_job(query="AI company acquisitions")
except ApiError as e:
    print(f"Status: {e.status_code}")
    print(f"Error: {e.body}")

Advanced usage

Pagination

Retrieve large result sets page by page:
page = 1
while True:
    results = client.jobs.get_job_results(
        job_id=job_id,
        page=page,
        page_size=100,
    )

    print(f"Page {results.page}/{results.total_pages}")

    for record in results.all_records:
        print(f"  {record.record_title}")

    if results.page >= results.total_pages:
        break
    page += 1

Timeouts

Configure custom timeouts at client or request level:
client = CatchAllApi(
    api_key="YOUR_API_KEY",
    timeout=30.0,
)

Retries

Configure automatic retry behavior for failed requests:
client = CatchAllApi(
    api_key="YOUR_API_KEY",
    max_retries=3,
)

Resources