Webhook Endpoints - Real-Time Data Sharing with External Applications
Modern applications require real-time data synchronization and automated notifications. The External Data Provider worker enables webhook-style data sharing, allowing external applications to pull fresh data from your workflows on demand. This creates a simple yet powerful integration pattern for real-time data access.
This guide covers building webhook endpoints, notification systems, and real-time data synchronization patterns.
Webhook Integration Pattern
Real-Time Data Flow
Workflow Processing
- Continuous data processing and analysis
- Real-time signal generation and alerts
- Automated report generation
External Data Provider
- Creates stable API endpoints for data access
- Supports frequent polling for real-time updates
- Provides consistent data format for external consumption
External Application Integration
- Polls endpoints for updates
- Triggers actions based on data changes
- Synchronizes with external systems
Notification System Example
Real-Time Alert Dashboard
Workflow Setup
- Add market data feeds (Twelve Data, Yahoo Finance)
- Include technical indicators (Support/Resistance, RSI)
- Set up condition checks for alert triggers
- Connect alerts to External Data Provider
Alert Data Structure
{
"alerts": [
{
"type": "PRICE_BREAKOUT",
"symbol": "AAPL",
"message": "Price broke above resistance at $200",
"severity": "HIGH",
"timestamp": "2025-12-26T10:15:00Z",
"current_price": 202.50,
"breakout_level": 200.00
},
{
"type": "VOLUME_SPIKE",
"symbol": "TSLA",
"message": "Unusual volume: 45M shares in 5 minutes",
"severity": "MEDIUM",
"timestamp": "2025-12-26T10:12:00Z",
"volume": 45000000,
"avg_volume": 25000000
}
],
"last_updated": "2025-12-26T10:15:00Z",
"active_alerts_count": 2
}
Dashboard Integration Code
import requests
import time
from dash import Dash, html, dcc
import plotly.graph_objects as go
app = Dash(__name__)
# ApudFlow alerts endpoint
alerts_endpoint = "https://api.apudflow.io/api/w/2511351432345546/3167507321031740"
app.layout = html.Div([
html.H1('Real-Time Trading Alerts'),
dcc.Interval(id='interval-component', interval=5000, n_intervals=0), # Update every 5 seconds
html.Div(id='alerts-container'),
dcc.Graph(id='alerts-chart')
])
@app.callback(
[Output('alerts-container', 'children'), Output('alerts-chart', 'figure')],
Input('interval-component', 'n_intervals')
)
def update_alerts(n):
try:
response = requests.get(alerts_endpoint, timeout=5)
alerts_data = response.json()['data']
# Create alert display
alert_items = []
for alert in alerts_data['alerts'][-10:]: # Show last 10 alerts
severity_color = {'HIGH': 'red', 'MEDIUM': 'orange', 'LOW': 'yellow'}
alert_items.append(
html.Div([
html.Span(f"{alert['timestamp']} - {alert['symbol']}: ", style={'fontWeight': 'bold'}),
html.Span(alert['message'], style={'color': severity_color.get(alert['severity'], 'black')})
], style={'margin': '10px', 'padding': '10px', 'border': '1px solid #ddd'})
)
# Create alerts timeline chart
timestamps = [alert['timestamp'] for alert in alerts_data['alerts']]
severities = [1 if alert['severity'] == 'HIGH' else 0.5 if alert['severity'] == 'MEDIUM' else 0.2
for alert in alerts_data['alerts']]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=timestamps,
y=severities,
mode='markers+lines',
name='Alert Severity',
marker=dict(size=10, color=severities, colorscale='Reds', showscale=True)
))
fig.update_layout(title='Alert Timeline', xaxis_title='Time', yaxis_title='Severity')
return alert_items, fig
except Exception as e:
return [html.Div(f"Error loading alerts: {e}")], go.Figure()
if __name__ == '__main__':
app.run_server(debug=True)
External System Synchronization
CRM Data Integration
Customer Data Workflow
- Process customer interaction data
- Generate customer insights with AI Data Analyzer
- Create personalized recommendations
- Expose insights via External Data Provider
Customer Insights Output
{
"customer_id": "CUST_001",
"insights": {
"engagement_score": 8.5,
"preferred_products": ["AAPL", "MSFT", "GOOGL"],
"risk_profile": "moderate",
"next_best_action": "upgrade_to_premium"
},
"recommendations": [
{
"type": "product_suggestion",
"product": "AAPL_2026_CALLS",
"confidence": 0.82,
"reason": "Based on portfolio analysis and market trends"
}
],
"last_interaction": "2025-12-26T09:30:00Z"
}
CRM Synchronization Script
import requests
import json
from datetime import datetime, timedelta
# ApudFlow customer insights endpoint
insights_endpoint = "https://api.apudflow.io/api/w/2511351432345546/3167507321031740"
# CRM API configuration
CRM_BASE_URL = "https://api.crm-system.com"
CRM_API_KEY = "your_crm_api_key"
def sync_customer_insights():
"""Sync customer insights from ApudFlow to CRM system"""
# Fetch latest insights
response = requests.get(insights_endpoint)
if response.status_code != 200:
print(f"Failed to fetch insights: {response.status_code}")
return
insights_data = response.json()['data']
# Update CRM with insights
crm_payload = {
"customer_id": insights_data['customer_id'],
"custom_fields": {
"engagement_score": insights_data['insights']['engagement_score'],
"risk_profile": insights_data['insights']['risk_profile'],
"preferred_products": json.dumps(insights_data['insights']['preferred_products']),
"next_best_action": insights_data['insights']['next_best_action']
},
"notes": f"AI Insights updated at {datetime.now().isoformat()}"
}
crm_response = requests.put(
f"{CRM_BASE_URL}/customers/{insights_data['customer_id']}",
json=crm_payload,
headers={"Authorization": f"Bearer {CRM_API_KEY}"}
)
if crm_response.status_code == 200:
print(f"Successfully updated customer {insights_data['customer_id']} in CRM")
# Log recommendations for sales team
for rec in insights_data['recommendations']:
log_recommendation(insights_data['customer_id'], rec)
else:
print(f"Failed to update CRM: {crm_response.status_code} - {crm_response.text}")
def log_recommendation(customer_id, recommendation):
"""Log product recommendations for sales follow-up"""
log_payload = {
"customer_id": customer_id,
"recommendation_type": recommendation['type'],
"product": recommendation['product'],
"confidence": recommendation['confidence'],
"reason": recommendation['reason'],
"created_at": datetime.now().isoformat(),
"status": "pending_review"
}
# Log to CRM or separate recommendations system
requests.post(
f"{CRM_BASE_URL}/recommendations",
json=log_payload,
headers={"Authorization": f"Bearer {CRM_API_KEY}"}
)
# Sync every 15 minutes
import schedule
schedule.every(15).minutes.do(sync_customer_insights)
# Initial sync
sync_customer_insights()
IoT and Sensor Data Integration
Real-Time Monitoring Dashboard
Sensor Data Workflow
- Collect sensor readings from external sources
- Apply statistical analysis and anomaly detection
- Generate alerts for threshold breaches
- Provide real-time data access via endpoints
Sensor Monitoring Output
{
"sensor_readings": {
"temperature": {
"current": 75.2,
"average": 72.8,
"min": 68.5,
"max": 78.9,
"unit": "Fahrenheit"
},
"humidity": {
"current": 45.5,
"average": 42.1,
"min": 35.2,
"max": 52.3,
"unit": "Percent"
}
},
"alerts": [
{
"type": "THRESHOLD_BREACH",
"sensor": "temperature",
"message": "Temperature exceeded 75°F threshold",
"value": 75.2,
"threshold": 75.0,
"severity": "WARNING"
}
],
"system_status": "NORMAL",
"last_reading": "2025-12-26T11:45:00Z"
}
IoT Dashboard Integration
import requests
import streamlit as st
import pandas as pd
import plotly.express as px
from datetime import datetime
st.title("Real-Time Sensor Monitoring")
# ApudFlow sensor data endpoint
sensor_endpoint = "https://api.apudflow.io/api/w/2511351432345546/3167507321031740"
# Auto-refresh every 30 seconds
placeholder = st.empty()
while True:
try:
response = requests.get(sensor_endpoint, timeout=5)
sensor_data = response.json()['data']
with placeholder.container():
# Current readings
col1, col2 = st.columns(2)
with col1:
st.metric(
"Temperature",
f"{sensor_data['sensor_readings']['temperature']['current']}°F",
f"Avg: {sensor_data['sensor_readings']['temperature']['average']}°F"
)
with col2:
st.metric(
"Humidity",
f"{sensor_data['sensor_readings']['humidity']['current']}%",
f"Avg: {sensor_data['sensor_readings']['humidity']['average']}%"
)
# Alerts
if sensor_data['alerts']:
st.error("Active Alerts:")
for alert in sensor_data['alerts']:
st.write(f"⚠️ {alert['message']}")
# Historical chart (assuming endpoint provides history)
# This would be extended to show time-series data
st.write(f"Last updated: {sensor_data['last_reading']}")
except Exception as e:
st.error(f"Error fetching sensor data: {e}")
time.sleep(30) # Update every 30 seconds
Advanced Webhook Patterns
Conditional Data Updates
Smart Update Logic
def get_data_if_changed(endpoint_url, last_update=None):
"""Only fetch data if it has been updated since last check"""
headers = {}
if last_update:
headers['If-Modified-Since'] = last_update
response = requests.get(endpoint_url, headers=headers)
if response.status_code == 304: # Not Modified
return None # No changes
data = response.json()
return {
'data': data,
'last_modified': response.headers.get('Last-Modified')
}
# Usage
last_update = None
while True:
result = get_data_if_changed(endpoint_url, last_update)
if result:
process_data(result['data'])
last_update = result['last_modified']
time.sleep(60) # Check every minute
Batch Processing Integration
Efficient Bulk Operations
def process_batch_updates(endpoint_url, batch_size=100):
"""Process large datasets in batches"""
offset = 0
while True:
# Add pagination parameters
params = {'limit': batch_size, 'offset': offset}
response = requests.get(endpoint_url, params=params)
data = response.json()['data']
if not data: # No more data
break
# Process batch
for item in data:
process_item(item)
offset += batch_size
# Rate limiting
time.sleep(1)
# Usage
process_batch_updates(large_dataset_endpoint)
Performance Optimization
Caching Strategies
Client-Side Caching
import hashlib
class DataCache:
def __init__(self, ttl_seconds=300): # 5 minute cache
self.cache = {}
self.ttl = ttl_seconds
def get(self, endpoint_url):
cache_key = hashlib.md5(endpoint_url.encode()).hexdigest()
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.ttl:
return cached_data
# Fetch fresh data
response = requests.get(endpoint_url)
data = response.json()
# Cache it
self.cache[cache_key] = (data, time.time())
return data
# Usage
cache = DataCache()
fresh_data = cache.get(endpoint_url)
Connection Pooling
Efficient HTTP Connections
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_resilient_session():
"""Create HTTP session with retries and connection pooling"""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# Usage
session = create_resilient_session()
response = session.get(endpoint_url)
Security and Reliability
Authentication and Authorization
Secure Endpoint Access
def authenticated_request(endpoint_url, api_key):
"""Make authenticated requests to protected endpoints"""
headers = {
'Authorization': f'Bearer {api_key}',
'User-Agent': 'ApudFlow-Integration/1.0'
}
response = requests.get(endpoint_url, headers=headers)
response.raise_for_status()
return response.json()
# Usage
secure_data = authenticated_request(endpoint_url, 'your_api_key')
Error Handling and Monitoring
Comprehensive Error Management
def robust_data_fetch(endpoint_url, max_retries=5):
"""Fetch data with comprehensive error handling"""
for attempt in range(max_retries):
try:
response = requests.get(endpoint_url, timeout=30)
response.raise_for_status()
data = response.json()
# Validate data structure
if 'data' not in data:
raise ValueError("Invalid response format")
return data
except requests.Timeout:
print(f"Timeout on attempt {attempt + 1}")
except requests.HTTPError as e:
if e.response.status_code >= 500:
# Server error, retry
print(f"Server error {e.response.status_code} on attempt {attempt + 1}")
else:
# Client error, don't retry
raise
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
raise Exception(f"Failed to fetch data after {max_retries} attempts")
# Usage
try:
data = robust_data_fetch(endpoint_url)
process_data(data)
except Exception as e:
alert_admin(f"Data fetch failed: {e}")
These webhook integration patterns enable real-time data sharing, automated notifications, and seamless system integration, transforming your ApudFlow workflows into powerful data distribution hubs.