Installation
- pip
- poetry
- pipenv
Copy
Ask AI
pip install newscatcher-catchall-sdk
Copy
Ask AI
poetry add newscatcher-catchall-sdk
Copy
Ask AI
pipenv install newscatcher-catchall-sdk
Quickstart
Get started with CatchAll in three steps:Initialize the client
Copy
Ask AI
from newscatcher_catchall import CatchAllApi
client = CatchAllApi(api_key="YOUR_API_KEY")
Create a job
Copy
Ask AI
job = client.jobs.create_job(
query="AI company acquisitions",
limit=10,
)
job_id = job.job_id
Wait and retrieve results
Copy
Ask AI
import time
POLL_INTERVAL_SECONDS = 60
# Poll for completion
while True:
status = client.jobs.get_job_status(job_id)
if status.status == "completed":
break
time.sleep(POLL_INTERVAL_SECONDS)
# Get results
results = client.jobs.get_job_results(job_id)
print(f"Found {results.valid_records} valid records")
Jobs process asynchronously and typically complete in 10-15 minutes. See the
Quickstart for a complete walkthrough.
Working with jobs
- Get suggestions
- Create and track
- Continue jobs
- Early results
- List jobs
Get suggested validators, enrichments, and date ranges before creating a job:
Copy
Ask AI
import json
from newscatcher_catchall import CatchAllApi
client = CatchAllApi(api_key="YOUR_API_KEY")
suggestions = client.jobs.initialize(
query="AI company acquisitions",
context="Focus on deal size and acquiring company details"
)
print(json.dumps(suggestions.model_dump(), indent=2, default=str))
Show suggestions response
Show suggestions response
Copy
Ask AI
{
"validators": [
{
"name": "is_acquisition_event",
"description": "true if article describes a completed or announced acquisition",
"type": "boolean"
},
{
"name": "involves_ai_company",
"description": "true if acquiring or acquired company is in AI sector",
"type": "boolean"
}
],
"enrichments": [
{
"name": "acquirer_company",
"description": "Extract the acquiring company name",
"type": "company"
},
{
"name": "acquired_company",
"description": "Extract the acquired company name",
"type": "company"
},
{
"name": "deal_value",
"description": "Extract acquisition price if mentioned",
"type": "number"
},
{
"name": "announcement_date",
"description": "Extract date of announcement",
"type": "date"
},
{
"name": "acquirer_details",
"description": "Extract details about the acquiring company",
"type": "text"
}
],
"start_date": "2026-02-01T14:12:57.292205+00:00",
"end_date": "2026-02-06T14:12:57.292205+00:00",
"date_modification_message": [
"No dates were provided; using a default window of 5 days (2026-02-01 to 2026-02-06)."
]
}
To learn more, see the Initialize endpoint.
Submit a query and track its progress:
Copy
Ask AI
import time
POLL_INTERVAL_SECONDS = 60
# Create job with custom validators and enrichments
job = client.jobs.create_job(
query="AI company acquisitions",
context="Focus on deal size and acquiring company details",
limit=10,
validators=[
{
"name": "is_acquisition_event",
"description": "true if article describes a completed or announced acquisition",
"type": "boolean"
}
],
enrichments=[
{
"name": "acquirer_company",
"description": "Extract the acquiring company name",
"type": "company"
},
{
"name": "acquired_company",
"description": "Extract the acquired company name",
"type": "company"
},
{
"name": "deal_value",
"description": "Extract acquisition price if mentioned",
"type": "number"
}
]
)
print(f"Job created: {job.job_id}")
# Monitor progress
job_id = job.job_id
while True:
status = client.jobs.get_job_status(job_id)
if status.status == "completed":
break
current_step = next((s for s in status.steps if not s.completed), None)
if current_step:
print(f"Step {current_step.order}/7: {current_step.status}")
time.sleep(POLL_INTERVAL_SECONDS)
# Retrieve results
results = client.jobs.get_job_results(job_id)
print(f"\nFound {results.valid_records} valid records")
for record in results.all_records:
print(f" {record.record_title}")
Validators and enrichments are optional. If not provided, the system
generates them automatically based on your query.
Extend processing limits for completed jobs:
Copy
Ask AI
POLL_INTERVAL_SECONDS = 60
# Continue job to process more records
continued = client.jobs.continue_job(
job_id=job_id,
new_limit=50,
)
print(f"Continued: {continued.previous_limit} -> {continued.new_limit} records")
# Wait for completion
while True:
status = client.jobs.get_job_status(job_id)
if status.status == "completed":
break
time.sleep(POLL_INTERVAL_SECONDS)
# Get final results
final_results = client.jobs.get_job_results(job_id)
print(f"Total: {final_results.valid_records} valid records")
Use
limit parameter when creating jobs to start with fewer records for quick testing.
Continue the job if you need more records after reviewing initial results.Retrieve partial results during the enriching stage:
Copy
Ask AI
POLL_INTERVAL_SECONDS = 60
while True:
status = client.jobs.get_job_status(job_id)
if status.status in ["enriching", "completed"]:
results = client.jobs.get_job_results(job_id)
if results.valid_records is not None:
print(f"Progress: {results.progress_validated}/{results.candidate_records} validated, "
f"{results.valid_records} valid")
if status.status == "completed":
break
time.sleep(POLL_INTERVAL_SECONDS)
Retrieve all jobs created by your account:
Copy
Ask AI
jobs = client.jobs.get_user_jobs()
for job in jobs:
print(f"Job {job.job_id}: {job.query} ({job.status})")
Complete example with all features
Complete example with all features
Copy
Ask AI
from newscatcher_catchall import CatchAllApi
from newscatcher_catchall.core.api_error import ApiError
import time
POLL_INTERVAL_SECONDS = 60
client = CatchAllApi(api_key="YOUR_API_KEY")
try:
# Create job with custom enrichments
job = client.jobs.create_job(
query="AI company acquisitions",
context="Focus on deal size and acquiring company details",
limit=10,
enrichments=[
{
"name": "acquirer_company",
"description": "Extract the acquiring company name",
"type": "company"
},
{
"name": "deal_value",
"description": "Extract acquisition price if mentioned",
"type": "number"
}
]
)
job_id = job.job_id
print(f"Job created: {job_id}")
# Poll with early results access
while True:
status = client.jobs.get_job_status(job_id)
if status.status in ["enriching", "completed"]:
results = client.jobs.get_job_results(job_id)
if results.valid_records is not None:
print(f"Progress: {results.valid_records} valid records")
if status.status == "completed":
break
time.sleep(POLL_INTERVAL_SECONDS)
# Continue if needed
if results.valid_records >= 10:
client.jobs.continue_job(job_id=job_id, new_limit=50)
while True:
status = client.jobs.get_job_status(job_id)
if status.status == "completed":
break
time.sleep(POLL_INTERVAL_SECONDS)
results = client.jobs.get_job_results(job_id)
# Display results
print(f"\nFinal: {results.valid_records} valid records")
for record in results.all_records:
print(f" {record.record_title}")
except ApiError as e:
print(f"Status: {e.status_code}")
print(f"Error: {e.body}")
Working with monitors
Automate recurring queries with scheduled execution.- Create monitor
- Update monitor
- Pause/Resume
- List monitors
- Retrieve results
Create a monitor from a completed job:
Copy
Ask AI
monitor = client.monitors.create_monitor(
reference_job_id=job_id,
schedule="every day at 12 PM UTC",
webhook={
"url": "https://your-endpoint.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer YOUR_TOKEN"},
},
)
print(f"Monitor created: {monitor.monitor_id}")
Monitors require a minimum 24-hour interval between executions. Learn more in the
Monitors documentation.
Update webhook configuration for an existing monitor:
Copy
Ask AI
updated = client.monitors.update_monitor(
monitor_id=monitor.monitor_id,
webhook={
"url": "https://new-endpoint.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer NEW_TOKEN"},
},
)
print(f"Monitor updated: {updated.status}")
Control monitor execution:
Copy
Ask AI
# Pause monitor
client.monitors.disable_monitor(monitor.monitor_id)
print("Monitor paused")
# Resume monitor
client.monitors.enable_monitor(monitor.monitor_id)
print("Monitor resumed")
Retrieve all monitors for your account:
Copy
Ask AI
monitors = client.monitors.list_monitors()
print(f"Total monitors: {monitors.total_monitors}")
for m in monitors.monitors:
status = "active" if m.enabled else "paused"
print(f"{m.monitor_id}: {m.reference_job_query} ({status})")
Access aggregated results from all monitor executions:
Copy
Ask AI
# List execution history
jobs = client.monitors.list_monitor_jobs(
monitor_id=monitor.monitor_id,
sort="desc",
)
print(f"Monitor executed {jobs.total_jobs} jobs")
# Get all collected records
results = client.monitors.pull_monitor_results(monitor.monitor_id)
print(f"Total records: {results.records}")
for record in results.all_records:
print(f" {record.record_title}")
print(f" Added: {record.added_on}")
print(f" Updated: {record.updated_on}")
Complete monitor example
Complete monitor example
Copy
Ask AI
from newscatcher_catchall import CatchAllApi
from newscatcher_catchall.core.api_error import ApiError
client = CatchAllApi(api_key="YOUR_API_KEY")
try:
# Create monitor from completed job
job_id = "af7a26d6-cf0b-458c-a6ed-4b6318c74da3"
monitor = client.monitors.create_monitor(
reference_job_id=job_id,
schedule="every day at 12 PM UTC",
webhook={
"url": "https://your-endpoint.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer YOUR_TOKEN"},
},
)
monitor_id = monitor.monitor_id
print(f"Monitor created: {monitor_id}")
# Update webhook
client.monitors.update_monitor(
monitor_id=monitor_id,
webhook={
"url": "https://new-endpoint.com/webhook",
"method": "POST",
},
)
# List all monitors
all_monitors = client.monitors.list_monitors()
for m in all_monitors.monitors:
status = "active" if m.enabled else "paused"
print(f"{m.monitor_id}: {status}")
# Control execution
client.monitors.disable_monitor(monitor_id)
client.monitors.enable_monitor(monitor_id)
# List execution history
jobs = client.monitors.list_monitor_jobs(
monitor_id=monitor_id,
sort="desc",
)
print(f"\nMonitor executed {jobs.total_jobs} jobs")
for job in jobs.jobs:
print(f" Job {job.job_id}: {job.start_date} to {job.end_date}")
# Get aggregated results
results = client.monitors.pull_monitor_results(monitor_id)
print(f"\nCollected {results.records} total records")
for record in results.all_records:
print(f" {record.record_title}")
print(f" Added: {record.added_on}")
except ApiError as e:
print(f"Status: {e.status_code}")
print(f"Error: {e.body}")
Async usage
Use the async client for non-blocking API calls:Copy
Ask AI
import asyncio
from newscatcher_catchall import AsyncCatchAllApi
POLL_INTERVAL_SECONDS = 60
async def main():
client = AsyncCatchAllApi(api_key="YOUR_API_KEY")
job = await client.jobs.create_job(
query="AI company acquisitions",
context="Focus on deal size and acquiring company details",
)
while True:
status = await client.jobs.get_job_status(job.job_id)
if status.status == "completed":
break
await asyncio.sleep(POLL_INTERVAL_SECONDS)
results = await client.jobs.get_job_results(job.job_id)
print(f"Found {results.valid_records} records")
asyncio.run(main())
Error handling
Handle API errors with structured exception handling:Copy
Ask AI
from newscatcher_catchall.core.api_error import ApiError
try:
client.jobs.create_job(query="AI company acquisitions")
except ApiError as e:
print(f"Status: {e.status_code}")
print(f"Error: {e.body}")
Advanced usage
Pagination
Retrieve large result sets page by page:Copy
Ask AI
page = 1
while True:
results = client.jobs.get_job_results(
job_id=job_id,
page=page,
page_size=100,
)
print(f"Page {results.page}/{results.total_pages}")
for record in results.all_records:
print(f" {record.record_title}")
if results.page >= results.total_pages:
break
page += 1
Timeouts
Configure custom timeouts at client or request level:- Client-level
- Request-level
Copy
Ask AI
client = CatchAllApi(
api_key="YOUR_API_KEY",
timeout=30.0,
)
Copy
Ask AI
client.jobs.create_job(
query="AI company acquisitions",
request_options={"timeout_in_seconds": 10},
)
Retries
Configure automatic retry behavior for failed requests:- Client-level
- Request-level
Copy
Ask AI
client = CatchAllApi(
api_key="YOUR_API_KEY",
max_retries=3,
)
Copy
Ask AI
client.jobs.create_job(
query="AI company acquisitions",
request_options={"max_retries": 3},
)

