Streaming
Real-time streaming with GraphQL subscriptions using multipart/mixed responses. Unlike traditional
request-response patterns, streaming delivers GenSearch results incrementally as they are generated,
enabling your application to display partial results immediately rather than waiting for the full
response.
Key Concepts
Streaming relies on the GraphQL subscription protocol over HTTP using multipart responses. The critical header that enables this behavior is:
Accept: multipart/mixed;subscriptionSpec=1.0
When the server receives a request with this header, it responds with a multipart/mixed content
type. The response body contains multiple parts separated by a boundary string, each part delivering
a JSON payload with incremental data (referred to as "deltas").
Subscription Query
The genSearchFast subscription streams conversation deltas as the response is generated. Each
delta represents a discrete update to the conversation state.
subscription Conversation($input: GenSearchInput!) {
genSearchFast(input: $input) {
deltas {
__typename
... on GenSearchConversationCreate {
conversation {
error {
code
}
id
markdown
progress
}
}
... on GenSearchResponseMarkdownAppend {
value
}
... on GenSearchResponseMarkdownSet {
value
}
}
}
}
Delta Types
As the response streams in, you will receive deltas of different types. Your client must inspect the
__typename field on each delta and handle it accordingly.
| Delta Type | Description | Action |
|---|---|---|
GenSearchConversationCreate | Initial response with conversation metadata | Set initial markdown and store the conversation id |
GenSearchResponseMarkdownAppend | Incremental text addition | Append value to existing markdown |
GenSearchResponseMarkdownSet | Complete response replacement | Replace entire markdown with value |
A typical stream begins with a GenSearchConversationCreate delta, followed by a series of
GenSearchResponseMarkdownAppend deltas, and may conclude with a GenSearchResponseMarkdownSet
delta containing the final assembled response.
Required Headers
Every streaming request must include the following headers:
| Header | Value |
|---|---|
x-api-key | Your API key |
clientid | Your client ID |
Authorization | Bearer {token} (your access token) |
Content-Type | application/json |
Accept | multipart/mixed;subscriptionSpec=1.0 |
Implementation
- Python
- JavaScript
- cURL
import os
import json
import requests
API_KEY = os.environ["ALPHASENSE_API_KEY"]
CLIENT_ID = os.environ["ALPHASENSE_CLIENT_ID"]
CLIENT_SECRET = os.environ["ALPHASENSE_CLIENT_SECRET"]
EMAIL = os.environ["ALPHASENSE_EMAIL"]
PASSWORD = os.environ["ALPHASENSE_PASSWORD"]
ENDPOINT = "https://api.alpha-sense.com/gql"
# Authenticate to obtain a bearer token (see Authentication guide)
auth_response = requests.post(
"https://api.alpha-sense.com/auth",
headers={
"x-api-key": API_KEY,
"Content-Type": "application/x-www-form-urlencoded",
},
data={
"grant_type": "password",
"username": EMAIL,
"password": PASSWORD,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
)
auth_response.raise_for_status()
ACCESS_TOKEN = auth_response.json()["access_token"]
SUBSCRIPTION_QUERY = """
subscription Conversation($input: GenSearchInput!) {
genSearchFast(input: $input) {
deltas {
__typename
... on GenSearchConversationCreate {
conversation {
error { code }
id
markdown
progress
}
}
... on GenSearchResponseMarkdownAppend {
value
}
... on GenSearchResponseMarkdownSet {
value
}
}
}
}
"""
headers = {
"x-api-key": API_KEY,
"clientid": CLIENT_ID,
"Authorization": f"Bearer {ACCESS_TOKEN}",
"Content-Type": "application/json",
"Accept": "multipart/mixed;subscriptionSpec=1.0",
}
payload = {
"query": SUBSCRIPTION_QUERY,
"variables": {
"input": {
"prompt": "What are the latest trends in AI?",
}
},
}
def parse_multipart_stream(response):
"""Parse a multipart/mixed streaming response and yield JSON payloads."""
# Extract the boundary from the Content-Type header
content_type = response.headers.get("Content-Type", "")
boundary = None
for part in content_type.split(";"):
part = part.strip()
if part.startswith("boundary="):
boundary = part.split("=", 1)[1].strip('"')
break
if not boundary:
raise ValueError("No boundary found in Content-Type header")
boundary_marker = f"--{boundary}".encode()
buffer = b""
for chunk in response.iter_content(chunk_size=1024):
if not chunk:
continue
buffer += chunk
# Split buffer on boundary markers and process complete parts
while boundary_marker in buffer:
part, buffer = buffer.split(boundary_marker, 1)
if not part.strip():
continue
# Each part has headers and a JSON body separated by a blank line
part_str = part.decode("utf-8", errors="replace").strip()
if "\r\n\r\n" in part_str:
_, body = part_str.split("\r\n\r\n", 1)
elif "\n\n" in part_str:
_, body = part_str.split("\n\n", 1)
else:
continue
body = body.strip()
if not body:
continue
try:
yield json.loads(body)
except json.JSONDecodeError:
continue
def stream_conversation(query_text):
"""Stream a GenSearch conversation and assemble the markdown response."""
payload["variables"]["input"]["prompt"] = query_text
response = requests.post(
ENDPOINT,
headers=headers,
json=payload,
stream=True,
)
response.raise_for_status()
markdown = ""
conversation_id = None
for part in parse_multipart_stream(response):
# Each multipart payload wraps the subscription data
deltas = (
part.get("payload", {})
.get("data", {})
.get("genSearchFast", {})
.get("deltas", [])
)
for delta in deltas:
typename = delta.get("__typename")
if typename == "GenSearchConversationCreate":
conversation = delta.get("conversation", {})
conversation_id = conversation.get("id")
markdown = conversation.get("markdown", "")
error = conversation.get("error")
if error:
print(f"Error: {error.get('code')}")
return
print(f"Conversation started: {conversation_id}")
elif typename == "GenSearchResponseMarkdownAppend":
value = delta.get("value", "")
markdown += value
print(value, end="", flush=True)
elif typename == "GenSearchResponseMarkdownSet":
markdown = delta.get("value", "")
print("\n[Full response replaced]")
print("\n\nStreaming complete.")
print(f"Conversation ID: {conversation_id}")
return markdown
if __name__ == "__main__":
result = stream_conversation("What are the latest trends in AI?")
const API_KEY = process.env.ALPHASENSE_API_KEY
const CLIENT_ID = process.env.ALPHASENSE_CLIENT_ID
const ENDPOINT = 'https://api.alpha-sense.com/gql'
// Authenticate to obtain a bearer token (see Authentication guide)
const authResponse = await fetch('https://api.alpha-sense.com/auth', {
method: 'POST',
headers: {
'x-api-key': API_KEY,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
grant_type: 'password',
username: process.env.ALPHASENSE_EMAIL,
password: process.env.ALPHASENSE_PASSWORD,
client_id: CLIENT_ID,
client_secret: process.env.ALPHASENSE_CLIENT_SECRET,
}),
})
const {access_token: ACCESS_TOKEN} = await authResponse.json()
const SUBSCRIPTION_QUERY = `
subscription Conversation($input: GenSearchInput!) {
genSearchFast(input: $input) {
deltas {
__typename
... on GenSearchConversationCreate {
conversation {
error { code }
id
markdown
progress
}
}
... on GenSearchResponseMarkdownAppend {
value
}
... on GenSearchResponseMarkdownSet {
value
}
}
}
}
`
async function streamConversation(queryText) {
const response = await fetch(ENDPOINT, {
method: 'POST',
headers: {
'x-api-key': API_KEY,
clientid: CLIENT_ID,
Authorization: `Bearer ${ACCESS_TOKEN}`,
'Content-Type': 'application/json',
Accept: 'multipart/mixed;subscriptionSpec=1.0',
},
body: JSON.stringify({
query: SUBSCRIPTION_QUERY,
variables: {
input: {
prompt: queryText,
},
},
}),
})
if (!response.ok) {
throw new Error(`Request failed with status ${response.status}`)
}
// Extract boundary from Content-Type header
const contentType = response.headers.get('Content-Type') || ''
const boundaryMatch = contentType.match(/boundary=("?)([^";]+)\1/)
if (!boundaryMatch) {
throw new Error('No boundary found in Content-Type header')
}
const boundary = boundaryMatch[2]
const boundaryMarker = `--${boundary}`
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
let markdown = ''
let conversationId = null
while (true) {
const {done, value} = await reader.read()
if (done) break
buffer += decoder.decode(value, {stream: true})
// Process complete parts separated by the boundary
while (buffer.includes(boundaryMarker)) {
const boundaryIndex = buffer.indexOf(boundaryMarker)
const part = buffer.slice(0, boundaryIndex).trim()
buffer = buffer.slice(boundaryIndex + boundaryMarker.length)
if (!part) continue
// Split headers from body (separated by a blank line)
const separatorIndex =
part.indexOf('\r\n\r\n') !== -1 ? part.indexOf('\r\n\r\n') : part.indexOf('\n\n')
if (separatorIndex === -1) continue
const body = part.slice(separatorIndex).trim()
if (!body) continue
let payload
try {
payload = JSON.parse(body)
} catch {
continue
}
const deltas = payload?.payload?.data?.genSearchFast?.deltas || []
for (const delta of deltas) {
switch (delta.__typename) {
case 'GenSearchConversationCreate': {
const conversation = delta.conversation || {}
conversationId = conversation.id
markdown = conversation.markdown || ''
if (conversation.error) {
console.error('Error:', conversation.error.code)
return
}
console.log('Conversation started:', conversationId)
break
}
case 'GenSearchResponseMarkdownAppend': {
const value = delta.value || ''
markdown += value
process.stdout.write(value)
break
}
case 'GenSearchResponseMarkdownSet': {
markdown = delta.value || ''
console.log('\n[Full response replaced]')
break
}
}
}
}
}
console.log('\n\nStreaming complete.')
console.log('Conversation ID:', conversationId)
return markdown
}
streamConversation('What are the latest trends in AI?')
curl --request POST 'https://api.alpha-sense.com/gql' \
--header "x-api-key: ${ALPHASENSE_API_KEY}" \
--header "clientid: ${ALPHASENSE_CLIENT_ID}" \
--header "Authorization: Bearer ${ACCESS_TOKEN}" \
--header 'Content-Type: application/json' \
--header 'Accept: multipart/mixed;subscriptionSpec=1.0' \
--data-raw '{
"query": "subscription Conversation($input: GenSearchInput!) { genSearchFast(input: $input) { deltas { __typename ... on GenSearchConversationCreate { conversation { error { code } id markdown progress } } ... on GenSearchResponseMarkdownAppend { value } ... on GenSearchResponseMarkdownSet { value } } } }",
"variables": {
"input": {
"prompt": "What are the latest trends in AI?"
}
}
}'
The cURL response will be a raw multipart/mixed stream. Each part is separated by a
boundary string and contains a JSON payload. In a production integration you should use a
language-specific client (such as the Python or JavaScript examples above) to parse the multipart
stream programmatically.
Streaming with Filters
Filters, AskInDoc, and web search work identically in streaming mode — only the
variables.input object changes. The subscription query and delta parsing remain the same.
- Python
- JavaScript
payload = {
"query": SUBSCRIPTION_QUERY,
"variables": {
"input": {
"prompt": "What is the outlook for cloud infrastructure spending?",
"filters": {
"companies": {"include": ["AMZN", "MSFT", "GOOGL"]},
"date": {"preset": "LAST_12_MONTHS"},
"sources": {"ids": ["31019"]}
}
}
},
}
const payload = {
query: SUBSCRIPTION_QUERY,
variables: {
input: {
prompt: "What is the outlook for cloud infrastructure spending?",
filters: {
companies: { include: ["AMZN", "MSFT", "GOOGL"] },
date: { preset: "LAST_12_MONTHS" },
sources: { ids: ["31019"] },
},
},
},
};
The subscription query and delta parsing are unchanged — only variables.input differs when
adding filters. See GenSearch Modes — Search Filters for
the full filter reference.
Streaming vs. Polling
Depending on your use case, you may prefer streaming or polling. The table below summarizes the trade-offs.
| Criteria | Streaming | Polling |
|---|---|---|
| Latency | Low -- results appear as soon as they are generated | Higher -- you must wait for each poll interval |
| Implementation complexity | Moderate -- requires multipart response parsing | Low -- standard request-response cycle |
| User experience | Excellent -- progressive display of results | Acceptable -- results appear all at once |
| Network efficiency | Single long-lived connection | Multiple round-trip requests |
| Error recovery | Requires reconnection logic | Naturally retries on next poll |
| Best for | Chat interfaces, real-time dashboards | Background jobs, batch processing |
Choose streaming when you need immediate, incremental feedback (e.g., a conversational UI where text appears as it is generated). Choose polling when you need simplicity or are running background tasks where latency is less critical.
Streaming subscriptions currently use genSearchFast only. The
auto, thinkLonger, and deepResearch modes do not support streaming subscriptions — use polling for
those modes instead.
- GenSearch Modes: Learn about the different GenSearch operation modes and when to use each one at GenSearch Modes.
- Response Parsing: For detailed guidance on handling and rendering the markdown content returned in deltas, see Response Parsing.
- Utility APIs: Look up filter values (source IDs, GICS codes, company tickers, etc.) at Utility APIs.