Skip to main content

Portfolio Monitoring at Scale

Analyst teams that cover hundreds of companies on a watchlist face a tradeoff when automating daily research:

  • Run GenSearch on every name every day → guaranteed coverage, but credit cost scales linearly with the watchlist (10 credits × 500 companies = 5,000 credits/day on Auto mode).
  • Run GenSearch on a sampled subset → cheaper, but you miss material updates on the names that weren't sampled that day.
  • Run Deep Research across 50+ companies at a time, could be a waste of tokens if no companies have updates or a single Deep Research might not be enough if multiple companies have extensive updates. Post-processing parsing will have to break apart updates as well.

This pattern uses the Document Search API as a cheap gate in front of GenSearch Auto mode. A company only consumes GenSearch credits on days where new documents have actually been published since the last analysis — most days, that's zero. Because most companies on most days have no new content, overall credit usage should be more optimized than running GenSearch on every name daily.

How it works

Four-step portfolio monitoring workflow: load watchlist, bulk Document Search, filter by threshold, GenSearch Auto.

SearchFilter.companies.ids accepts an array, so one paginated Document Search covers every ticker on the watchlist. The per-company threshold check happens in memory against the response. Only the companies that clear the threshold consume GenSearch credits.

Prerequisites

RequirementWhere to set up
Service accountA service account or user with API access. See Authentication.
WatchlistA saved watchlist on the calling user's account. Look it up with user { watchlists { id name } } — see Utility APIs.
State storageAnywhere you can read/write a small JSON blob — a file, S3 object, or row in your warehouse. Examples below use a local JSON file.
On Behalf Of

If you run this for many end users from one integration, mint an OBO token for each user before running the loop. Credits and audit logs are then attributed to that user, not the service account.

Shared request setup

Authenticate once as described in Authentication. Every step snippet below takes the resulting access_token and builds its request headers with this single helper — so the auth details live in one place rather than being repeated in each call:

import os

import requests


def headers(access_token):
return {
"x-api-key": os.environ["ALPHASENSE_API_KEY"],
"clientid": os.environ["ALPHASENSE_CLIENT_ID"],
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}

Step 1: Load the watchlist

Resolve the watchlist ID to its company list. The user.watchlists query returns every watchlist belonging to the calling user, including each watchlist's companies with primaryTickerCode — the value the Document Search and GenSearch APIs accept. Filter to the watchlist you want client-side.

query Watchlists {
user {
watchlists {
id
name
companies {
id
name
primaryTickerCode
}
}
}
}
def load_watchlist(access_token, watchlist_id):
query = """
query Watchlists {
user {
watchlists {
id
name
companies { name primaryTickerCode }
}
}
}
"""
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": query},
timeout=30,
)
response.raise_for_status()

for wl in response.json()["data"]["user"]["watchlists"]:
if wl["id"] == watchlist_id:
return [c for c in wl["companies"] if c.get("primaryTickerCode")]
raise RuntimeError(f"Watchlist {watchlist_id} not found.")

Step 2: Maintain per-company state

For each company, remember the releasedAt of the most recent document already analyzed (epoch ms — the format Document Search returns). A new document "counts" only if its releasedAt is greater than this cutoff. A flat JSON file keyed by ticker is enough:

state.json
{
"AAPL": {"last_seen_release_at": 1749...},
"MSFT": {"last_seen_release_at": 1749...},
"NVDA": {"last_seen_release_at": 1749...}
}
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path

STATE_PATH = Path("state.json")
DEFAULT_LOOKBACK_DAYS = 7


def load_state():
if STATE_PATH.exists():
return json.loads(STATE_PATH.read_text())
return {}


def save_state(state):
STATE_PATH.write_text(json.dumps(state, indent=2))


def cutoff_for(state, ticker):
"""Epoch-ms cutoff for this ticker. Fresh tickers fall back to DEFAULT_LOOKBACK_DAYS."""
entry = state.get(ticker)
if entry and entry.get("last_seen_release_at"):
return int(entry["last_seen_release_at"])
fallback = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS)
return int(fallback.timestamp() * 1000)
Why a per-doc cutoff instead of last_run_at

A missed cron run doesn't lose coverage. The next run still asks "what's new since the last doc I analyzed for AAPL," regardless of how many days passed. With last_run_at, a skipped day creates a gap.

Step 3: Bulk Document Search across the watchlist

Pass every ticker in filter.companies.ids and set the date window to the oldest cutoff across the watchlist. One paginated query returns every document anyone might care about today. Then bucket the response by ticker in memory.

query NewDocuments($filter: SearchFilter!, $cursor: String) {
search(filter: $filter, limit: 100, sorting: {field: DATE, direction: DESC}, cursor: $cursor) {
cursor
documents {
id
releasedAt
companies {
primaryTickerCode
}
}
}
}
from collections import defaultdict


def fetch_new_documents(access_token, tickers, since_epoch_ms):
"""Paginated Document Search across every ticker. Returns the raw doc list."""
since_date = datetime.fromtimestamp(since_epoch_ms / 1000, tz=timezone.utc).date().isoformat()
today = datetime.now(timezone.utc).date().isoformat()
query = """
query NewDocuments($filter: SearchFilter!, $cursor: String) {
search(filter: $filter, limit: 100, sorting: {field: DATE, direction: DESC}, cursor: $cursor) {
cursor
documents {
id
releasedAt
companies { primaryTickerCode }
}
}
}
"""
base_filter = {
"companies": {"ids": tickers},
"date": {"customRange": {"from": since_date, "to": today}},
}
docs, cursor = [], None
while True:
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": query, "variables": {"filter": base_filter, "cursor": cursor}},
timeout=30,
)
response.raise_for_status()
result = response.json()["data"]["search"]
docs.extend(result["documents"])
cursor = result.get("cursor")
if not cursor:
return docs


def group_by_ticker(docs):
"""Bucket docs by their company tickers; one doc can land in several buckets."""
grouped = defaultdict(list)
for doc in docs:
for company in doc.get("companies") or []:
ticker = company.get("primaryTickerCode")
if ticker:
grouped[ticker].append(doc)
return grouped

For each company, the "new docs since last analysis" count is just [d for d in grouped[ticker] if d["releasedAt"] > cutoff_for(state, ticker)]. No additional API calls.

Surface GraphQL errors

raise_for_status() only catches HTTP failures. A GraphQL request can return HTTP 200 with {"data": null, "errors": [...]} (expired token, oversized request, schema violation). The end-to-end script below wraps every request in a small post_graphql helper that surfaces those errors before they cause a NoneType crash on the next line.

Step 4: Decide whether to run GenSearch

A simple integer threshold against the per-company doc count is enough to start. Tune it once you've watched a few days of output.

NEW_DOC_THRESHOLD = 3

new_docs = [d for d in grouped.get(ticker, []) if d["releasedAt"] > cutoff_for(state, ticker)]
if len(new_docs) >= NEW_DOC_THRESHOLD:
# ...run GenSearch for this company

Common variations:

  • Source-weighted threshold — weight earnings, SEC filings, and broker research more than news wires. Add filters.types.ids to the bulk Document Search, or check doc.types per group.
  • Always-on tier — designate a small "always analyze" list that runs daily regardless of new doc count. Loop those first; gate the rest of the watchlist.
  • Cooldown — skip a company for N days after its most recent analysis to absorb doc trickle-in.

Step 5: Run GenSearch on the companies that crossed the threshold

Use Auto mode — the recommended default for scheduled jobs. Reuse the same company + date range that the bulk Document Search just confirmed has new content. since_iso_date comes from the company's cutoff (cutoff_for(state, ticker) converted to YYYY-MM-DD).

def start_gensearch(access_token, ticker, company_name, since_iso_date):
today = datetime.now(timezone.utc).date().isoformat()
mutation = """
mutation GenSearchAuto($input: GenSearchInput!) {
genSearch { auto(input: $input) { id } }
}
"""
variables = {
"input": {
"prompt": (
f"What changed for {company_name} ({ticker}) between "
f"{since_iso_date} and {today}? Focus on material events, "
f"earnings updates, analyst revisions, and management commentary."
),
"filters": {
"companies": {"include": [ticker]},
"date": {"customRange": {"from": since_iso_date, "to": today}},
},
}
}
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": mutation, "variables": variables},
timeout=30,
)
response.raise_for_status()
return response.json()["data"]["genSearch"]["auto"]["id"]

Step 6: Poll for results and advance the cutoff

Poll until progress reaches 1.0, then save the markdown report. After a successful analysis, advance the company's last_seen_release_at to the max releasedAt of the docs that triggered this run — that's what makes the next run incremental.

state[ticker] = {"last_seen_release_at": max(d["releasedAt"] for d in new_docs)}
save_state(state)
import time

def poll_until_done(access_token, conversation_id, interval=5, timeout=600):
query = """
query Poll($conversationId: String!) {
genSearch {
conversation(id: $conversationId) {
markdown
progress
error { code }
}
}
}
"""
deadline = time.time() + timeout
while time.time() < deadline:
response = requests.post(
"https://api.alpha-sense.com/gql",
headers=headers(access_token),
json={"query": query, "variables": {"conversationId": conversation_id}},
timeout=60,
)
response.raise_for_status()
conv = response.json()["data"]["genSearch"]["conversation"]
if conv.get("error"):
raise RuntimeError(f"GenSearch error: {conv['error']['code']}")
if conv["progress"] >= 1.0:
return conv["markdown"]
time.sleep(interval)
raise TimeoutError(f"GenSearch {conversation_id} did not finish within {timeout}s.")

Complete end-to-end script

The script below stitches the steps together. Run it on a daily schedule (cron, GitHub Actions, Airflow, etc.). Each run touches every company with one cheap Document Search call and only spends GenSearch credits on the ones that have actually accumulated new content.

portfolio_monitor.py
#!/usr/bin/env python3
"""Daily watchlist monitoring — one bulk Document Search gates GenSearch."""

import json
import os
import time
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests

GRAPHQL_URL = "https://api.alpha-sense.com/gql"
AUTH_URL = "https://api.alpha-sense.com/auth"

WATCHLIST_ID = int(os.environ["WATCHLIST_ID"])
NEW_DOC_THRESHOLD = int(os.getenv("NEW_DOC_THRESHOLD", "3"))
DEFAULT_LOOKBACK_DAYS = int(os.getenv("DEFAULT_LOOKBACK_DAYS", "7"))
STATE_PATH = Path(os.getenv("STATE_PATH", "state.json"))
REPORTS_DIR = Path(os.getenv("REPORTS_DIR", "reports"))


def headers(access_token):
return {
"x-api-key": os.environ["ALPHASENSE_API_KEY"],
"clientid": os.environ["ALPHASENSE_CLIENT_ID"],
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}


def post_graphql(access_token, query, variables=None, timeout=30):
"""POST a GraphQL request, surfacing both HTTP and GraphQL errors."""
response = requests.post(
GRAPHQL_URL,
headers=headers(access_token),
json={"query": query, "variables": variables or {}},
timeout=timeout,
)
response.raise_for_status()
payload = response.json()
if payload.get("errors"):
raise RuntimeError(f"GraphQL error: {payload['errors'][0].get('message')}")
return payload["data"]


def authenticate():
response = requests.post(
AUTH_URL,
headers={
"x-api-key": os.environ["ALPHASENSE_API_KEY"],
"Content-Type": "application/x-www-form-urlencoded",
},
data={
"grant_type": "password",
"username": os.environ["ALPHASENSE_EMAIL"],
"password": os.environ["ALPHASENSE_PASSWORD"],
"client_id": os.environ["ALPHASENSE_CLIENT_ID"],
"client_secret": os.environ["ALPHASENSE_CLIENT_SECRET"],
},
timeout=30,
)
response.raise_for_status()
return response.json()["access_token"]


def load_watchlist(access_token, watchlist_id):
query = """
query Watchlists {
user {
watchlists {
id
name
companies { name primaryTickerCode }
}
}
}
"""
data = post_graphql(access_token, query)
for wl in data["user"]["watchlists"]:
if wl["id"] == watchlist_id:
return [c for c in wl["companies"] if c.get("primaryTickerCode")]
raise RuntimeError(f"Watchlist {watchlist_id} not found.")


def load_state():
return json.loads(STATE_PATH.read_text()) if STATE_PATH.exists() else {}


def save_state(state):
STATE_PATH.write_text(json.dumps(state, indent=2))


def cutoff_for(state, ticker):
"""Epoch-ms cutoff for this ticker."""
entry = state.get(ticker)
if entry and entry.get("last_seen_release_at"):
return int(entry["last_seen_release_at"])
fallback = datetime.now(timezone.utc) - timedelta(days=DEFAULT_LOOKBACK_DAYS)
return int(fallback.timestamp() * 1000)


def fetch_new_documents(access_token, tickers, since_epoch_ms):
"""Paginated Document Search across every ticker."""
since_date = datetime.fromtimestamp(since_epoch_ms / 1000, tz=timezone.utc).date().isoformat()
today = datetime.now(timezone.utc).date().isoformat()
query = """
query NewDocuments($filter: SearchFilter!, $cursor: String) {
search(filter: $filter, limit: 100, sorting: {field: DATE, direction: DESC}, cursor: $cursor) {
cursor
documents {
id
releasedAt
companies { primaryTickerCode }
}
}
}
"""
base_filter = {
"companies": {"ids": tickers},
"date": {"customRange": {"from": since_date, "to": today}},
}
docs, cursor = [], None
while True:
result = post_graphql(access_token, query, {"filter": base_filter, "cursor": cursor})["search"]
docs.extend(result["documents"])
cursor = result.get("cursor")
if not cursor:
return docs


def group_by_ticker(docs):
grouped = defaultdict(list)
for doc in docs:
for company in doc.get("companies") or []:
ticker = company.get("primaryTickerCode")
if ticker:
grouped[ticker].append(doc)
return grouped


def start_gensearch(access_token, ticker, company_name, since_iso_date):
today = datetime.now(timezone.utc).date().isoformat()
mutation = """
mutation GenSearchAuto($input: GenSearchInput!) {
genSearch { auto(input: $input) { id } }
}
"""
variables = {
"input": {
"prompt": (
f"What changed for {company_name} ({ticker}) between "
f"{since_iso_date} and {today}? Focus on material events, "
f"earnings updates, analyst revisions, and management commentary."
),
"filters": {
"companies": {"include": [ticker]},
"date": {"customRange": {"from": since_iso_date, "to": today}},
},
}
}
return post_graphql(access_token, mutation, variables)["genSearch"]["auto"]["id"]


def poll_until_done(access_token, conversation_id, interval=5, timeout=600):
query = """
query Poll($conversationId: String!) {
genSearch {
conversation(id: $conversationId) {
markdown
progress
error { code }
}
}
}
"""
deadline = time.time() + timeout
while time.time() < deadline:
conv = post_graphql(
access_token, query, {"conversationId": conversation_id}, timeout=60,
)["genSearch"]["conversation"]
if conv.get("error"):
raise RuntimeError(f"GenSearch error: {conv['error']['code']}")
if conv["progress"] >= 1.0:
return conv["markdown"]
time.sleep(interval)
raise TimeoutError(f"GenSearch {conversation_id} did not finish within {timeout}s.")


def main():
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
access_token = authenticate()
state = load_state()
companies = load_watchlist(access_token, WATCHLIST_ID)
print(f"Loaded {len(companies)} companies from watchlist {WATCHLIST_ID}.")

cutoffs = {c["primaryTickerCode"]: cutoff_for(state, c["primaryTickerCode"]) for c in companies}
tickers = list(cutoffs.keys())

oldest = min(cutoffs.values())
oldest_date = datetime.fromtimestamp(oldest / 1000, tz=timezone.utc).date().isoformat()
all_docs = fetch_new_documents(access_token, tickers, oldest)
grouped = group_by_ticker(all_docs)
print(f"Bulk Document Search since {oldest_date}: {len(all_docs)} docs across "
f"{len(grouped)} companies.")

today = datetime.now(timezone.utc).date().isoformat()
analyzed = skipped = 0

for company in companies:
ticker = company["primaryTickerCode"]
name = company["name"]
cutoff_ms = cutoffs[ticker]
new_docs = [d for d in grouped.get(ticker, []) if d["releasedAt"] > cutoff_ms]

if len(new_docs) < NEW_DOC_THRESHOLD:
if new_docs:
print(f" {ticker:8s} {len(new_docs):>4} new docs — skip")
skipped += 1
continue

since_date = datetime.fromtimestamp(cutoff_ms / 1000, tz=timezone.utc).date().isoformat()
print(f" {ticker:8s} {len(new_docs):>4} new docs since {since_date} — analyzing")
conversation_id = start_gensearch(access_token, ticker, name, since_date)
markdown = poll_until_done(access_token, conversation_id)

(REPORTS_DIR / f"{today}_{ticker}.md").write_text(markdown)
# Advance the cutoff to the newest doc in this run — incremental across missed days.
state[ticker] = {"last_seen_release_at": max(d["releasedAt"] for d in new_docs)}
save_state(state)
analyzed += 1

print(f"\nDone. Analyzed {analyzed} companies; skipped {skipped}.")


if __name__ == "__main__":
main()
Related pages