Skip to main content

Webhook Endpoints - Real-Time Data Sharing with External Applications

Modern applications require real-time data synchronization and automated notifications. The External Data Provider worker enables webhook-style data sharing, allowing external applications to pull fresh data from your workflows on demand. This creates a simple yet powerful integration pattern for real-time data access.

This guide covers building webhook endpoints, notification systems, and real-time data synchronization patterns.

Webhook Integration Pattern

Real-Time Data Flow

Workflow Processing

  • Continuous data processing and analysis
  • Real-time signal generation and alerts
  • Automated report generation

External Data Provider

  • Creates stable API endpoints for data access
  • Supports frequent polling for real-time updates
  • Provides consistent data format for external consumption

External Application Integration

  • Polls endpoints for updates
  • Triggers actions based on data changes
  • Synchronizes with external systems

Notification System Example

Real-Time Alert Dashboard

Workflow Setup

  1. Add market data feeds (Twelve Data, Yahoo Finance)
  2. Include technical indicators (Support/Resistance, RSI)
  3. Set up condition checks for alert triggers
  4. Connect alerts to External Data Provider

Alert Data Structure

{
"alerts": [
{
"type": "PRICE_BREAKOUT",
"symbol": "AAPL",
"message": "Price broke above resistance at $200",
"severity": "HIGH",
"timestamp": "2025-12-26T10:15:00Z",
"current_price": 202.50,
"breakout_level": 200.00
},
{
"type": "VOLUME_SPIKE",
"symbol": "TSLA",
"message": "Unusual volume: 45M shares in 5 minutes",
"severity": "MEDIUM",
"timestamp": "2025-12-26T10:12:00Z",
"volume": 45000000,
"avg_volume": 25000000
}
],
"last_updated": "2025-12-26T10:15:00Z",
"active_alerts_count": 2
}

Dashboard Integration Code

import requests
import time
from dash import Dash, html, dcc
import plotly.graph_objects as go

app = Dash(__name__)

# ApudFlow alerts endpoint
alerts_endpoint = "https://api.apudflow.io/api/w/2511351432345546/3167507321031740"

app.layout = html.Div([
html.H1('Real-Time Trading Alerts'),
dcc.Interval(id='interval-component', interval=5000, n_intervals=0), # Update every 5 seconds
html.Div(id='alerts-container'),
dcc.Graph(id='alerts-chart')
])

@app.callback(
[Output('alerts-container', 'children'), Output('alerts-chart', 'figure')],
Input('interval-component', 'n_intervals')
)
def update_alerts(n):
try:
response = requests.get(alerts_endpoint, timeout=5)
alerts_data = response.json()['data']

# Create alert display
alert_items = []
for alert in alerts_data['alerts'][-10:]: # Show last 10 alerts
severity_color = {'HIGH': 'red', 'MEDIUM': 'orange', 'LOW': 'yellow'}
alert_items.append(
html.Div([
html.Span(f"{alert['timestamp']} - {alert['symbol']}: ", style={'fontWeight': 'bold'}),
html.Span(alert['message'], style={'color': severity_color.get(alert['severity'], 'black')})
], style={'margin': '10px', 'padding': '10px', 'border': '1px solid #ddd'})
)

# Create alerts timeline chart
timestamps = [alert['timestamp'] for alert in alerts_data['alerts']]
severities = [1 if alert['severity'] == 'HIGH' else 0.5 if alert['severity'] == 'MEDIUM' else 0.2
for alert in alerts_data['alerts']]

fig = go.Figure()
fig.add_trace(go.Scatter(
x=timestamps,
y=severities,
mode='markers+lines',
name='Alert Severity',
marker=dict(size=10, color=severities, colorscale='Reds', showscale=True)
))
fig.update_layout(title='Alert Timeline', xaxis_title='Time', yaxis_title='Severity')

return alert_items, fig

except Exception as e:
return [html.Div(f"Error loading alerts: {e}")], go.Figure()

if __name__ == '__main__':
app.run_server(debug=True)

External System Synchronization

CRM Data Integration

Customer Data Workflow

  1. Process customer interaction data
  2. Generate customer insights with AI Data Analyzer
  3. Create personalized recommendations
  4. Expose insights via External Data Provider

Customer Insights Output

{
"customer_id": "CUST_001",
"insights": {
"engagement_score": 8.5,
"preferred_products": ["AAPL", "MSFT", "GOOGL"],
"risk_profile": "moderate",
"next_best_action": "upgrade_to_premium"
},
"recommendations": [
{
"type": "product_suggestion",
"product": "AAPL_2026_CALLS",
"confidence": 0.82,
"reason": "Based on portfolio analysis and market trends"
}
],
"last_interaction": "2025-12-26T09:30:00Z"
}

CRM Synchronization Script

import requests
import json
from datetime import datetime, timedelta

# ApudFlow customer insights endpoint
insights_endpoint = "https://api.apudflow.io/api/w/2511351432345546/3167507321031740"

# CRM API configuration
CRM_BASE_URL = "https://api.crm-system.com"
CRM_API_KEY = "your_crm_api_key"

def sync_customer_insights():
"""Sync customer insights from ApudFlow to CRM system"""

# Fetch latest insights
response = requests.get(insights_endpoint)
if response.status_code != 200:
print(f"Failed to fetch insights: {response.status_code}")
return

insights_data = response.json()['data']

# Update CRM with insights
crm_payload = {
"customer_id": insights_data['customer_id'],
"custom_fields": {
"engagement_score": insights_data['insights']['engagement_score'],
"risk_profile": insights_data['insights']['risk_profile'],
"preferred_products": json.dumps(insights_data['insights']['preferred_products']),
"next_best_action": insights_data['insights']['next_best_action']
},
"notes": f"AI Insights updated at {datetime.now().isoformat()}"
}

crm_response = requests.put(
f"{CRM_BASE_URL}/customers/{insights_data['customer_id']}",
json=crm_payload,
headers={"Authorization": f"Bearer {CRM_API_KEY}"}
)

if crm_response.status_code == 200:
print(f"Successfully updated customer {insights_data['customer_id']} in CRM")

# Log recommendations for sales team
for rec in insights_data['recommendations']:
log_recommendation(insights_data['customer_id'], rec)
else:
print(f"Failed to update CRM: {crm_response.status_code} - {crm_response.text}")

def log_recommendation(customer_id, recommendation):
"""Log product recommendations for sales follow-up"""

log_payload = {
"customer_id": customer_id,
"recommendation_type": recommendation['type'],
"product": recommendation['product'],
"confidence": recommendation['confidence'],
"reason": recommendation['reason'],
"created_at": datetime.now().isoformat(),
"status": "pending_review"
}

# Log to CRM or separate recommendations system
requests.post(
f"{CRM_BASE_URL}/recommendations",
json=log_payload,
headers={"Authorization": f"Bearer {CRM_API_KEY}"}
)

# Sync every 15 minutes
import schedule
schedule.every(15).minutes.do(sync_customer_insights)

# Initial sync
sync_customer_insights()

IoT and Sensor Data Integration

Real-Time Monitoring Dashboard

Sensor Data Workflow

  1. Collect sensor readings from external sources
  2. Apply statistical analysis and anomaly detection
  3. Generate alerts for threshold breaches
  4. Provide real-time data access via endpoints

Sensor Monitoring Output

{
"sensor_readings": {
"temperature": {
"current": 75.2,
"average": 72.8,
"min": 68.5,
"max": 78.9,
"unit": "Fahrenheit"
},
"humidity": {
"current": 45.5,
"average": 42.1,
"min": 35.2,
"max": 52.3,
"unit": "Percent"
}
},
"alerts": [
{
"type": "THRESHOLD_BREACH",
"sensor": "temperature",
"message": "Temperature exceeded 75°F threshold",
"value": 75.2,
"threshold": 75.0,
"severity": "WARNING"
}
],
"system_status": "NORMAL",
"last_reading": "2025-12-26T11:45:00Z"
}

IoT Dashboard Integration

import requests
import streamlit as st
import pandas as pd
import plotly.express as px
from datetime import datetime

st.title("Real-Time Sensor Monitoring")

# ApudFlow sensor data endpoint
sensor_endpoint = "https://api.apudflow.io/api/w/2511351432345546/3167507321031740"

# Auto-refresh every 30 seconds
placeholder = st.empty()

while True:
try:
response = requests.get(sensor_endpoint, timeout=5)
sensor_data = response.json()['data']

with placeholder.container():
# Current readings
col1, col2 = st.columns(2)

with col1:
st.metric(
"Temperature",
f"{sensor_data['sensor_readings']['temperature']['current']}°F",
f"Avg: {sensor_data['sensor_readings']['temperature']['average']}°F"
)

with col2:
st.metric(
"Humidity",
f"{sensor_data['sensor_readings']['humidity']['current']}%",
f"Avg: {sensor_data['sensor_readings']['humidity']['average']}%"
)

# Alerts
if sensor_data['alerts']:
st.error("Active Alerts:")
for alert in sensor_data['alerts']:
st.write(f"⚠️ {alert['message']}")

# Historical chart (assuming endpoint provides history)
# This would be extended to show time-series data

st.write(f"Last updated: {sensor_data['last_reading']}")

except Exception as e:
st.error(f"Error fetching sensor data: {e}")

time.sleep(30) # Update every 30 seconds

Advanced Webhook Patterns

Conditional Data Updates

Smart Update Logic

def get_data_if_changed(endpoint_url, last_update=None):
"""Only fetch data if it has been updated since last check"""

headers = {}
if last_update:
headers['If-Modified-Since'] = last_update

response = requests.get(endpoint_url, headers=headers)

if response.status_code == 304: # Not Modified
return None # No changes

data = response.json()
return {
'data': data,
'last_modified': response.headers.get('Last-Modified')
}

# Usage
last_update = None
while True:
result = get_data_if_changed(endpoint_url, last_update)
if result:
process_data(result['data'])
last_update = result['last_modified']
time.sleep(60) # Check every minute

Batch Processing Integration

Efficient Bulk Operations

def process_batch_updates(endpoint_url, batch_size=100):
"""Process large datasets in batches"""

offset = 0
while True:
# Add pagination parameters
params = {'limit': batch_size, 'offset': offset}
response = requests.get(endpoint_url, params=params)

data = response.json()['data']

if not data: # No more data
break

# Process batch
for item in data:
process_item(item)

offset += batch_size

# Rate limiting
time.sleep(1)

# Usage
process_batch_updates(large_dataset_endpoint)

Performance Optimization

Caching Strategies

Client-Side Caching

import hashlib

class DataCache:
def __init__(self, ttl_seconds=300): # 5 minute cache
self.cache = {}
self.ttl = ttl_seconds

def get(self, endpoint_url):
cache_key = hashlib.md5(endpoint_url.encode()).hexdigest()

if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.ttl:
return cached_data

# Fetch fresh data
response = requests.get(endpoint_url)
data = response.json()

# Cache it
self.cache[cache_key] = (data, time.time())
return data

# Usage
cache = DataCache()
fresh_data = cache.get(endpoint_url)

Connection Pooling

Efficient HTTP Connections

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_resilient_session():
"""Create HTTP session with retries and connection pooling"""

session = requests.Session()

retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)

adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)

session.mount("https://", adapter)
session.mount("http://", adapter)

return session

# Usage
session = create_resilient_session()
response = session.get(endpoint_url)

Security and Reliability

Authentication and Authorization

Secure Endpoint Access

def authenticated_request(endpoint_url, api_key):
"""Make authenticated requests to protected endpoints"""

headers = {
'Authorization': f'Bearer {api_key}',
'User-Agent': 'ApudFlow-Integration/1.0'
}

response = requests.get(endpoint_url, headers=headers)
response.raise_for_status()

return response.json()

# Usage
secure_data = authenticated_request(endpoint_url, 'your_api_key')

Error Handling and Monitoring

Comprehensive Error Management

def robust_data_fetch(endpoint_url, max_retries=5):
"""Fetch data with comprehensive error handling"""

for attempt in range(max_retries):
try:
response = requests.get(endpoint_url, timeout=30)
response.raise_for_status()

data = response.json()

# Validate data structure
if 'data' not in data:
raise ValueError("Invalid response format")

return data

except requests.Timeout:
print(f"Timeout on attempt {attempt + 1}")
except requests.HTTPError as e:
if e.response.status_code >= 500:
# Server error, retry
print(f"Server error {e.response.status_code} on attempt {attempt + 1}")
else:
# Client error, don't retry
raise
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")

if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff

raise Exception(f"Failed to fetch data after {max_retries} attempts")

# Usage
try:
data = robust_data_fetch(endpoint_url)
process_data(data)
except Exception as e:
alert_admin(f"Data fetch failed: {e}")

These webhook integration patterns enable real-time data sharing, automated notifications, and seamless system integration, transforming your ApudFlow workflows into powerful data distribution hubs.