Portfolio Monitoring at Scale
Analyst teams that cover hundreds of companies on a watchlist face a tradeoff when automating daily research:
- Run GenSearch on every name every day → guaranteed coverage, but credit cost scales linearly with the watchlist (10 credits × 500 companies = 5,000 credits/day on Auto mode).
- Run GenSearch on a sampled subset → cheaper, but you miss material updates on the names that weren't sampled that day.
- Run Deep Research across 50+ companies at a time, could be a waste of tokens if no companies have updates or a single Deep Research might not be enough if multiple companies have extensive updates. Post-processing parsing will have to break apart updates as well.
This pattern uses the Document Search API as a cheap gate in front of GenSearch Auto mode. A company only consumes GenSearch credits on days where new documents have actually been published since the last analysis — most days, that's zero. Because most companies on most days have no new content, overall credit usage should be more optimized than running GenSearch on every name daily.
How it works
SearchFilter.companies.ids accepts an array, so one paginated Document Search covers every ticker
on the watchlist. The per-company threshold check happens in memory against the response. Only the
companies that clear the threshold consume GenSearch credits.
Prerequisites
| Requirement | Where to set up |
|---|---|
| Service account | A service account or user with API access. See Authentication. |
| Watchlist | A saved watchlist on the calling user's account. Look it up with user { watchlists { id name } } — see Utility APIs. |
| State storage | Anywhere you can read/write a small JSON blob — a file, S3 object, or row in your warehouse. Examples below use a local JSON file. |
If you run this for many end users from one integration, mint an OBO token for each user before running the loop. Credits and audit logs are then attributed to that user, not the service account.
Shared request setup
Authenticate once as described in Authentication. Every step snippet
below takes the resulting access_token and builds its request headers with this single helper — so
the auth details live in one place rather than being repeated in each call:
import os
import requests
def headers(access_token):
return {
"x-api-key": os.environ["ALPHASENSE_API_KEY"],
"clientid": os.environ["ALPHASENSE_CLIENT_ID"],
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
Step 1: Load the watchlist
Resolve the watchlist ID to its company list. The user.watchlists query returns every watchlist
belonging to the calling user, including each watchlist's companies with primaryTickerCode — the
value the Document Search and GenSearch APIs accept. Filter to the watchlist you want client-side.
query Watchlists {
user {
watchlists {
id
name
companies {
id
name
primaryTickerCode
}
}
}
}
def load_watchlist(access_token, watchlist_id):
query = """
query Watchlists {
user {
watchlists {
id
name
companies { name primaryTickerCode }
}
}
}
"""
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": query},
timeout=30,
)
response.raise_for_status()
for wl in response.json()["data"]["user"]["watchlists"]:
if wl["id"] == watchlist_id:
return [c for c in wl["companies"] if c.get("primaryTickerCode")]
raise RuntimeError(f"Watchlist {watchlist_id} not found.")
Step 2: Maintain per-company state
For each company, remember the releasedAt of the most recent document already analyzed (epoch
ms — the format Document Search returns). A new document "counts" only if its releasedAt is
greater than this cutoff. A flat JSON file keyed by ticker is enough:
{
"AAPL": {"last_seen_release_at": 1749...},
"MSFT": {"last_seen_release_at": 1749...},
"NVDA": {"last_seen_release_at": 1749...}
}
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
STATE_PATH = Path("state.json")
DEFAULT_LOOKBACK_DAYS = 7
def load_state():
if STATE_PATH.exists():
return json.loads(STATE_PATH.read_text())
return {}
def save_state(state):
STATE_PATH.write_text(json.dumps(state, indent=2))
def cutoff_for(state, ticker):
"""Epoch-ms cutoff for this ticker. Fresh tickers fall back to DEFAULT_LOOKBACK_DAYS."""
entry = state.get(ticker)
if entry and entry.get("last_seen_release_at"):
return int(entry["last_seen_release_at"])
fallback = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS)
return int(fallback.timestamp() * 1000)
last_run_atA missed cron run doesn't lose coverage. The next run still asks "what's new since the last doc I
analyzed for AAPL," regardless of how many days passed. With last_run_at, a skipped day creates a
gap.
Step 3: Bulk Document Search across the watchlist
Pass every ticker in filter.companies.ids and set the date window to the oldest cutoff across
the watchlist. One paginated query returns every document anyone might care about today. Then bucket
the response by ticker in memory.
query NewDocuments($filter: SearchFilter!, $cursor: String) {
search(filter: $filter, limit: 100, sorting: {field: DATE, direction: DESC}, cursor: $cursor) {
cursor
documents {
id
releasedAt
companies {
primaryTickerCode
}
}
}
}
from collections import defaultdict
def fetch_new_documents(access_token, tickers, since_epoch_ms):
"""Paginated Document Search across every ticker. Returns the raw doc list."""
since_date = datetime.fromtimestamp(since_epoch_ms / 1000, tz=timezone.utc).date().isoformat()
today = datetime.now(timezone.utc).date().isoformat()
query = """
query NewDocuments($filter: SearchFilter!, $cursor: String) {
search(filter: $filter, limit: 100, sorting: {field: DATE, direction: DESC}, cursor: $cursor) {
cursor
documents {
id
releasedAt
companies { primaryTickerCode }
}
}
}
"""
base_filter = {
"companies": {"ids": tickers},
"date": {"customRange": {"from": since_date, "to": today}},
}
docs, cursor = [], None
while True:
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": query, "variables": {"filter": base_filter, "cursor": cursor}},
timeout=30,
)
response.raise_for_status()
result = response.json()["data"]["search"]
docs.extend(result["documents"])
cursor = result.get("cursor")
if not cursor:
return docs
def group_by_ticker(docs):
"""Bucket docs by their company tickers; one doc can land in several buckets."""
grouped = defaultdict(list)
for doc in docs:
for company in doc.get("companies") or []:
ticker = company.get("primaryTickerCode")
if ticker:
grouped[ticker].append(doc)
return grouped
For each company, the "new docs since last analysis" count is just
[d for d in grouped[ticker] if d["releasedAt"] > cutoff_for(state, ticker)]. No additional API
calls.
raise_for_status() only catches HTTP failures. A GraphQL request can return HTTP 200 with
{"data": null, "errors": [...]} (expired token, oversized request, schema violation). The
end-to-end script below wraps every request in a small post_graphql helper that surfaces those
errors before they cause a NoneType crash on the next line.
Step 4: Decide whether to run GenSearch
A simple integer threshold against the per-company doc count is enough to start. Tune it once you've watched a few days of output.
NEW_DOC_THRESHOLD = 3
new_docs = [d for d in grouped.get(ticker, []) if d["releasedAt"] > cutoff_for(state, ticker)]
if len(new_docs) >= NEW_DOC_THRESHOLD:
# ...run GenSearch for this company
Common variations:
- Source-weighted threshold — weight earnings, SEC filings, and broker research more than news
wires. Add
filters.types.idsto the bulk Document Search, or checkdoc.typesper group. - Always-on tier — designate a small "always analyze" list that runs daily regardless of new doc count. Loop those first; gate the rest of the watchlist.
- Cooldown — skip a company for N days after its most recent analysis to absorb doc trickle-in.
Step 5: Run GenSearch on the companies that crossed the threshold
Use Auto mode — the recommended default for scheduled jobs. Reuse the same company + date range that
the bulk Document Search just confirmed has new content. since_iso_date comes from the company's
cutoff (cutoff_for(state, ticker) converted to YYYY-MM-DD).
def start_gensearch(access_token, ticker, company_name, since_iso_date):
today = datetime.now(timezone.utc).date().isoformat()
mutation = """
mutation GenSearchAuto($input: GenSearchInput!) {
genSearch { auto(input: $input) { id } }
}
"""
variables = {
"input": {
"prompt": (
f"What changed for {company_name} ({ticker}) between "
f"{since_iso_date} and {today}? Focus on material events, "
f"earnings updates, analyst revisions, and management commentary."
),
"filters": {
"companies": {"include": [ticker]},
"date": {"customRange": {"from": since_iso_date, "to": today}},
},
}
}
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": mutation, "variables": variables},
timeout=30,
)
response.raise_for_status()
return response.json()["data"]["genSearch"]["auto"]["id"]
Step 6: Poll for results and advance the cutoff
Poll until progress reaches 1.0, then save the markdown report. After a successful analysis,
advance the company's last_seen_release_at to the max releasedAt of the docs that triggered
this run — that's what makes the next run incremental.
state[ticker] = {"last_seen_release_at": max(d["releasedAt"] for d in new_docs)}
save_state(state)
import time
def poll_until_done(access_token, conversation_id, interval=5, timeout=600):
query = """
query Poll($conversationId: String!) {
genSearch {
conversation(id: $conversationId) {
markdown
progress
error { code }
}
}
}
"""
deadline = time.time() + timeout
while time.time() < deadline:
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": query, "variables": {"conversationId": conversation_id}},
timeout=60,
)
response.raise_for_status()
conv = response.json()["data"]["genSearch"]["conversation"]
if conv.get("error"):
raise RuntimeError(f"GenSearch error: {conv['error']['code']}")
if conv["progress"] >= 1.0:
return conv["markdown"]
time.sleep(interval)
raise TimeoutError(f"GenSearch {conversation_id} did not finish within {timeout}s.")
Complete end-to-end script
The script below stitches the steps together. Run it on a daily schedule (cron, GitHub Actions, Airflow, etc.). Each run touches every company with one cheap Document Search call and only spends GenSearch credits on the ones that have actually accumulated new content.
#!/usr/bin/env python3
"""Daily watchlist monitoring — one bulk Document Search gates GenSearch."""
import json
import os
import time
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
GRAPHQL_URL = "https://api.alpha-sense.com/gql"
AUTH_URL = "https://api.alpha-sense.com/auth"
WATCHLIST_ID = int(os.environ["WATCHLIST_ID"])
NEW_DOC_THRESHOLD = int(os.getenv("NEW_DOC_THRESHOLD", "3"))
DEFAULT_LOOKBACK_DAYS = int(os.getenv("DEFAULT_LOOKBACK_DAYS", "7"))
STATE_PATH = Path(os.getenv("STATE_PATH", "state.json"))
REPORTS_DIR = Path(os.getenv("REPORTS_DIR", "reports"))
def headers(access_token):
return {
"x-api-key": os.environ["ALPHASENSE_API_KEY"],
"clientid": os.environ["ALPHASENSE_CLIENT_ID"],
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
def post_graphql(access_token, query, variables=None, timeout=30):
"""POST a GraphQL request, surfacing both HTTP and GraphQL errors."""
response = requests.post(
GRAPHQL_URL,
headers=headers(access_token),
json={"query": query, "variables": variables or {}},
timeout=timeout,
)
response.raise_for_status()
payload = response.json()
if payload.get("errors"):
raise RuntimeError(f"GraphQL error: {payload['errors'][0].get('message')}")
return payload["data"]
def authenticate():
response = requests.post(
AUTH_URL,
headers={
"x-api-key": os.environ["ALPHASENSE_API_KEY"],
"Content-Type": "application/x-www-form-urlencoded",
},
data={
"grant_type": "password",
"username": os.environ["ALPHASENSE_EMAIL"],
"password": os.environ["ALPHASENSE_PASSWORD"],
"client_id": os.environ["ALPHASENSE_CLIENT_ID"],
"client_secret": os.environ["ALPHASENSE_CLIENT_SECRET"],
},
timeout=30,
)
response.raise_for_status()
return response.json()["access_token"]
def load_watchlist(access_token, watchlist_id):
query = """
query Watchlists {
user {
watchlists {
id
name
companies { name primaryTickerCode }
}
}
}
"""
data = post_graphql(access_token, query)
for wl in data["user"]["watchlists"]:
if wl["id"] == watchlist_id:
return [c for c in wl["companies"] if c.get("primaryTickerCode")]
raise RuntimeError(f"Watchlist {watchlist_id} not found.")
def load_state():
return json.loads(STATE_PATH.read_text()) if STATE_PATH.exists() else {}
def save_state(state):
STATE_PATH.write_text(json.dumps(state, indent=2))
def cutoff_for(state, ticker):
"""Epoch-ms cutoff for this ticker."""
entry = state.get(ticker)
if entry and entry.get("last_seen_release_at"):
return int(entry["last_seen_release_at"])
fallback = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS)
return int(fallback.timestamp() * 1000)
def fetch_new_documents(access_token, tickers, since_epoch_ms):
"""Paginated Document Search across every ticker."""
since_date = datetime.fromtimestamp(since_epoch_ms / 1000, tz=timezone.utc).date().isoformat()
today = datetime.now(timezone.utc).date().isoformat()
query = """
query NewDocuments($filter: SearchFilter!, $cursor: String) {
search(filter: $filter, limit: 100, sorting: {field: DATE, direction: DESC}, cursor: $cursor) {
cursor
documents {
id
releasedAt
companies { primaryTickerCode }
}
}
}
"""
base_filter = {
"companies": {"ids": tickers},
"date": {"customRange": {"from": since_date, "to": today}},
}
docs, cursor = [], None
while True:
result = post_graphql(access_token, query, {"filter": base_filter, "cursor": cursor})["search"]
docs.extend(result["documents"])
cursor = result.get("cursor")
if not cursor:
return docs
def group_by_ticker(docs):
grouped = defaultdict(list)
for doc in docs:
for company in doc.get("companies") or []:
ticker = company.get("primaryTickerCode")
if ticker:
grouped[ticker].append(doc)
return grouped
def start_gensearch(access_token, ticker, company_name, since_iso_date):
today = datetime.now(timezone.utc).date().isoformat()
mutation = """
mutation GenSearchAuto($input: GenSearchInput!) {
genSearch { auto(input: $input) { id } }
}
"""
variables = {
"input": {
"prompt": (
f"What changed for {company_name} ({ticker}) between "
f"{since_iso_date} and {today}? Focus on material events, "
f"earnings updates, analyst revisions, and management commentary."
),
"filters": {
"companies": {"include": [ticker]},
"date": {"customRange": {"from": since_iso_date, "to": today}},
},
}
}
return post_graphql(access_token, mutation, variables)["genSearch"]["auto"]["id"]
def poll_until_done(access_token, conversation_id, interval=5, timeout=600):
query = """
query Poll($conversationId: String!) {
genSearch {
conversation(id: $conversationId) {
markdown
progress
error { code }
}
}
}
"""
deadline = time.time() + timeout
while time.time() < deadline:
conv = post_graphql(
access_token, query, {"conversationId": conversation_id}, timeout=60,
)["genSearch"]["conversation"]
if conv.get("error"):
raise RuntimeError(f"GenSearch error: {conv['error']['code']}")
if conv["progress"] >= 1.0:
return conv["markdown"]
time.sleep(interval)
raise TimeoutError(f"GenSearch {conversation_id} did not finish within {timeout}s.")
def main():
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
access_token = authenticate()
state = load_state()
companies = load_watchlist(access_token, WATCHLIST_ID)
print(f"Loaded {len(companies)} companies from watchlist {WATCHLIST_ID}.")
cutoffs = {c["primaryTickerCode"]: cutoff_for(state, c["primaryTickerCode"]) for c in companies}
tickers = list(cutoffs.keys())
oldest = min(cutoffs.values())
oldest_date = datetime.fromtimestamp(oldest / 1000, tz=timezone.utc).date().isoformat()
all_docs = fetch_new_documents(access_token, tickers, oldest)
grouped = group_by_ticker(all_docs)
print(f"Bulk Document Search since {oldest_date}: {len(all_docs)} docs across "
f"{len(grouped)} companies.")
today = datetime.now(timezone.utc).date().isoformat()
analyzed = skipped = 0
for company in companies:
ticker = company["primaryTickerCode"]
name = company["name"]
cutoff_ms = cutoffs[ticker]
new_docs = [d for d in grouped.get(ticker, []) if d["releasedAt"] > cutoff_ms]
if len(new_docs) < NEW_DOC_THRESHOLD:
if new_docs:
print(f" {ticker:8s} {len(new_docs):>4} new docs — skip")
skipped += 1
continue
since_date = datetime.fromtimestamp(cutoff_ms / 1000, tz=timezone.utc).date().isoformat()
print(f" {ticker:8s} {len(new_docs):>4} new docs since {since_date} — analyzing")
conversation_id = start_gensearch(access_token, ticker, name, since_date)
markdown = poll_until_done(access_token, conversation_id)
(REPORTS_DIR / f"{today}_{ticker}.md").write_text(markdown)
# Advance the cutoff to the newest doc in this run — incremental across missed days.
state[ticker] = {"last_seen_release_at": max(d["releasedAt"] for d in new_docs)}
save_state(state)
analyzed += 1
print(f"\nDone. Analyzed {analyzed} companies; skipped {skipped}.")
if __name__ == "__main__":
main()
- Document Search API — the gating call used in Step 3.
- GenSearch Modes and Inputs — full reference for
automode, filters, and follow-ups. - Credits & Rate Limits — how credits are metered across modes.
- On Behalf Of Requests — run this loop per end user when one integration serves many AlphaSense users.