AG-UI replaces one-shot API calls with a live event stream — negotiation rounds, price updates, LOI status, and boss approvals flow to every portal in real time. One backend, four frontends, zero polling.
All events are JSON objects streamed over SSE. S→C = server to client. C→S = client to server (via POST).
{
"type": "ROUND_COMPLETE", // event type
"ts": 1746518400000, // Unix ms timestamp
"session_id": "NEG-001", // negotiation session ID
"user_id": "usr_abc123", // who this is for (RLS)
"data": { // event-specific payload
"round": 3,
"agent_price": 2780,
"broker_price": 2860,
"gap_pct": 2.88,
"next_action": "continue"
}
}
const Anthropic = require('@anthropic-ai/sdk'); const anthropic = new Anthropic(); // POST /v1/negotiate — returns SSE stream app.post('/v1/negotiate', authenticate, async (req, res) => { const { neg_id, product, qty, unit, target, max, buyer, strategy } = req.body; // Set SSE headers res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); // disable nginx buffering const emit = (type, data) => { res.write(`data: ${JSON.stringify({ type, ts: Date.now(), session_id: neg_id, data })}\n\n`); }; const maxRounds = strategy === 'aggressive' ? 7 : 5; let history = []; let agentPrice = null, brokerPrice = null; emit('NEGOTIATION_STARTED', { neg_id, product, qty, unit, buyer, strategy, maxRounds }); for (let round = 1; round <= maxRounds; round++) { // ── AGENT TURN (streaming) ───────────────────── const agentStream = await anthropic.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 300, system: buildAgentSystem(product, qty, unit, target, max, buyer, strategy, round, maxRounds), messages: [...history, { role: 'user', content: round === 1 ? 'Start. Make opening offer.' : 'Make your counter-offer.' }], }); let agentMsg = ''; for await (const chunk of agentStream) { if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') { agentMsg += chunk.delta.text; emit('TEXT_CHUNK', { text: chunk.delta.text, role: 'agent', round }); } } agentPrice = extractPrice(agentMsg); history.push({ role: 'assistant', content: agentMsg }); // ── BROKER TURN (streaming) ──────────────────── const brokerHistory = history.map(m => ({ role: m.role === 'assistant' ? 'user' : 'assistant', content: m.content })); const brokerStream = await anthropic.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 300, system: buildBrokerSystem(product, target, max, round), messages: [...brokerHistory, { role: 'user', content: 'Respond to the offer.' }], }); let brokerMsg = ''; for await (const chunk of brokerStream) { if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') { brokerMsg += chunk.delta.text; emit('TEXT_CHUNK', { text: chunk.delta.text, role: 'trader', round }); } } brokerPrice = extractPrice(brokerMsg); history.push({ role: 'user', content: brokerMsg }); const gapPct = agentPrice && brokerPrice ? ((brokerPrice - agentPrice) / agentPrice * 100) : 99; emit('ROUND_COMPLETE', { round, agent_price: agentPrice, broker_price: brokerPrice, gap_pct: gapPct }); // ── CHECK DEAL ──────────────────────────────── if (agentPrice && brokerPrice && brokerPrice <= max && gapPct < 2.5) { const dealPrice = Math.round((agentPrice + brokerPrice) / 2); // AWAIT_HUMAN — stop and wait for Boss emit('AWAIT_HUMAN', { deal_price: dealPrice, total_value: dealPrice * qty, loi_fee: 500, credits_used: strategy === 'aggressive' ? 25 : 15, round, }); // Wait for boss decision (stored in DB by approval endpoint) const decision = await waitForBossDecision(neg_id); // polls Supabase if (decision.action === 'approve') { const loiRef = await generateLOI({ neg_id, product, qty, unit, buyer, dealPrice }); emit('LOI_GENERATED', { loi_ref: loiRef, deal_price: dealPrice, total_value: dealPrice * qty, pdf_url: `/loi/${loiRef}.pdf` }); } else { emit('NEGOTIATION_FAILED', { reason: 'boss_rejected' }); } break; } if (round === maxRounds) { emit('NEGOTIATION_FAILED', { reason: 'max_rounds', last_agent_price: agentPrice, last_broker_price: brokerPrice }); } } res.end(); }); // Boss approval endpoint app.post('/v1/negotiations/:id/approve', authenticate, requireBoss, async (req, res) => { const { action, override_price } = req.body; await supabase.from('negotiations').update({ boss_decision: action, override_price }).eq('id', req.params.id); res.json({ ok: true }); });
All portals connect to a single personal stream filtered by role/user_id. One connection, all events.
// GET /v1/stream — persistent SSE connection per authenticated user app.get('/v1/stream', authenticate, (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); const userId = req.caller.id; const role = req.caller.role; // boss|buyer|trader|supplier // Register this connection in event bus const unsubscribe = eventBus.subscribe(userId, role, (event) => { res.write(`data: ${JSON.stringify(event)}\n\n`); }); // Heartbeat every 15s const hb = setInterval(() => { res.write(`data: ${JSON.stringify({ type: 'HEARTBEAT', ts: Date.now() })}\n\n`); }, 15000); req.on('close', () => { unsubscribe(); clearInterval(hb); }); }); // Event bus — pub/sub with role-based routing const eventBus = { subscribers: new Map(), subscribe(userId, role, cb) { this.subscribers.set(userId, { role, cb }); return () => this.subscribers.delete(userId); }, publish(event, targetRoles) { this.subscribers.forEach(({ role, cb }, userId) => { // Boss sees everything. Others see their own events only. if (role === 'boss' || targetRoles.includes(role) || event.user_id === userId) { cb(event); } }); } };
CREATE TABLE negotiations ( id TEXT PRIMARY KEY, -- NEG-001 product TEXT NOT NULL, qty INT NOT NULL, unit TEXT DEFAULT 'MT', buyer_id UUID REFERENCES users(id), buyer_name TEXT, broker_id UUID REFERENCES users(id), target_price NUMERIC(12,2), max_price NUMERIC(12,2), strategy TEXT DEFAULT 'standard', status TEXT DEFAULT 'running', -- running|awaiting_boss|completed|failed current_round INT DEFAULT 0, max_rounds INT DEFAULT 5, agent_price NUMERIC(12,2), broker_price NUMERIC(12,2), deal_price NUMERIC(12,2), boss_decision TEXT, -- approve|reject override_price NUMERIC(12,2), loi_ref TEXT, credits_used INT DEFAULT 0, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); -- RLS: buyers see own deals, traders see assigned, boss sees all ALTER TABLE negotiations ENABLE ROW LEVEL SECURITY; CREATE POLICY buyer_own ON negotiations FOR SELECT USING (buyer_id = auth.uid()); CREATE POLICY broker_assigned ON negotiations FOR SELECT USING (broker_id = auth.uid()); CREATE POLICY boss_all ON negotiations FOR ALL USING (is_boss(auth.uid()));
One JS class. Each portal imports it and registers handlers for the events it cares about.
// agui-client.js — include in every portal class SloiAGUI { constructor(authToken) { this.token = authToken; this.handlers = {}; this.es = null; this.reconnectDelay = 1000; } on(eventType, handler) { if (!this.handlers[eventType]) this.handlers[eventType] = []; this.handlers[eventType].push(handler); return this; // chainable } connect() { this.es = new EventSource(`/v1/stream?token=${this.token}`); this.es.onmessage = (e) => { try { const event = JSON.parse(e.data); const handlers = this.handlers[event.type] || []; handlers.forEach(h => h(event.data, event)); } catch(err) { console.error('AG-UI parse error', err); } }; this.es.onerror = () => { this.es.close(); setTimeout(() => this.connect(), this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000)); }; this.es.onopen = () => { this.reconnectDelay = 1000; }; return this; } async humanResponse(negId, action, overridePrice = null) { return fetch(`/v1/negotiations/${negId}/approve`, { method: 'POST', headers: { 'Authorization': `Bearer ${this.token}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ action, override_price: overridePrice }), }); } disconnect() { if (this.es) this.es.close(); } } // Export for use in each portal window.SloiAGUI = SloiAGUI;
const agui = new SloiAGUI(AUTH_TOKEN) .on('TEXT_CHUNK', ({ text, role, round }) => { // Stream tokens into the correct bubble let bubble = document.getElementById('bubble-r' + round + '-' + role); if (!bubble) bubble = addBubble(role, '', role.toUpperCase() + ' Round ' + round); bubble.innerHTML += text; chatArea.scrollTop = chatArea.scrollHeight; }) .on('ROUND_COMPLETE', ({ round, agent_price, broker_price, gap_pct }) => { updateRoundsBar(round, maxRounds); updatePriceDisplay(agent_price, broker_price); addBubble('system', `Round ${round} complete · Gap: ${gap_pct.toFixed(1)}%`); }) .on('AWAIT_HUMAN', ({ deal_price, total_value, loi_fee, credits_used }) => { showApprovalBox(deal_price, total_value, loi_fee, credits_used); // Boss clicks → agui.humanResponse(negId, 'approve') }) .on('LOI_GENERATED', ({ loi_ref, deal_price, pdf_url }) => { addBubble('loi', `✅ LOI ${loi_ref} · $${deal_price.toLocaleString()} · ` + `📄 Download`); }) .on('LEAD_FOUND', ({ name, country, value, relevance, reason }) => { appendLeadCard({ name, country, value, relevance, reason }); document.getElementById('tc-lead').textContent = ++leadCount + ' new'; }) .on('PRICE_UPDATE', ({ sku, price, change_pct }) => { updatePriceRow(sku, price, change_pct); }) .connect();
agui.on('AWAIT_HUMAN', (data, event) => { // Show approval popup in War Room — top priority const popup = document.getElementById('approval-popup'); popup.innerHTML = ` <div class="approval-overlay"> <div class="approval-box"> <div class="ab-title">⚡ Boss approval required</div> <div class="ab-neg">${event.session_id}</div> <div class="ab-price">$${data.deal_price.toLocaleString()}/MT</div> <div class="ab-total">Total: $${data.total_value.toLocaleString()}</div> <div class="ab-actions"> <button onclick="agui.humanResponse('${event.session_id}','approve')">✅ Approve</button> <button onclick="agui.humanResponse('${event.session_id}','reject')">✗ Reject</button> </div> </div> </div>`; popup.style.display = 'block'; // Also notify via Telegram Bot fetch('/v1/telegram-bot/notify', { method: 'POST', body: JSON.stringify({ type: 'AWAIT_HUMAN', ...data }) }); }); // Hide popup after decision agui.on('LOI_GENERATED', () => { document.getElementById('approval-popup').style.display = 'none'; incrementLOICounter(); });
agui .on('ROUND_COMPLETE', ({ round, agent_price, broker_price }) => { // Update negotiate tab status — buyer sees round progress document.getElementById('neg-status').textContent = `Round ${round} · Agent: $${agent_price?.toLocaleString()} · Supplier: $${broker_price?.toLocaleString()}`; document.getElementById('neg-badge').textContent = `● R${round}`; }) .on('AWAIT_HUMAN', ({ deal_price, total_value }) => { // Buyer sees "Pending boss approval" — cannot approve themselves document.getElementById('neg-status').innerHTML = `⏳ Pending boss approval · Deal at $${deal_price.toLocaleString()}/MT`; }) .on('LOI_GENERATED', ({ loi_ref, pdf_url }) => { // Switch to LOIs tab and show download show('lois'); prependLOI({ ref: loi_ref, pdf_url }); showToast('✅ LOI generated! ' + loi_ref); }) .on('PRICE_UPDATE', ({ sku, price, change_pct }) => { exUpdatePrice(sku, price, change_pct); // update Exchange tab }) .connect();
npm install @anthropic-ai/sdkroutes/negotiate.js with SSE endpoint + streaming Claude callsroutes/stream.js — global SSE stream with eventBus + role filteringPOST /v1/negotiations/:id/approve — writes boss decision to Supabasenegotiations table with RLS policieswaitForBossDecision(negId) — polls Supabase realtime subscriptionX-Accel-Buffering: no header on Railway (disables nginx buffering for SSE)agui-client.js — shared SloiAGUI classPOST /v1/telegram-bot/notify — forwards AWAIT_HUMAN to Telegram Bot webhookPOST /v1/negotiations/:id/approveTELEGRAM_BOT_TOKEN to Railway ENVANTHROPIC_API_KEY=sk-ant-xxxx # Claude API — streaming enabled SUPABASE_URL=https://xxx.supabase.co SUPABASE_SERVICE_KEY=eyJh... TELEGRAM_BOT_TOKEN=https://... # from Telegram Bot VPS setup AG_UI_HEARTBEAT_INTERVAL=15000 # ms between heartbeats
Boss approves on Telegram → Admin popup closes → Buyer sees LOI → Trader gets notification → all from a single SSE stream.