@@ -0,0 +1,888 @@
from __future__ import annotations
from typing import Any , Dict , List , Optional
from urllib . parse import urljoin
import json
import re
import asyncio
from agentui . providers . http_client import build_client
# --- Templating helpers ------------------------------------------------------
_OUT_MACRO_RE = re . compile ( r " \ [ \ [ \ s*OUT \ s*[: \ s] \ s*([^ \ ]]+?) \ s* \ ] \ ] " , re . IGNORECASE )
_VAR_MACRO_RE = re . compile ( r " \ [ \ [ \ s*VAR \ s*[: \ s] \ s*([^ \ ]]+?) \ s* \ ] \ ] " , re . IGNORECASE )
# Unified prompt fragment macro (provider-specific JSON fragment)
_PROMPT_MACRO_RE = re . compile ( r " \ [ \ [ \ s*PROMPT \ s* \ ] \ ] " , re . IGNORECASE )
# Short form: [[OUT1]] -> best-effort text from node n1
_OUT_SHORT_RE = re . compile ( r " \ [ \ [ \ s*OUT \ s*( \ d+) \ s* \ ] \ ] " , re . IGNORECASE )
_BRACES_RE = re . compile ( r " \ { \ { \ s*([^}]+?) \ s* \ } \ } " )
def _split_path ( path : str ) - > List [ str ] :
return [ p . strip ( ) for p in str ( path ) . split ( " . " ) if str ( p ) . strip ( ) ]
def _get_by_path ( obj : Any , path : Optional [ str ] ) - > Any :
if path is None or path == " " :
return obj
cur = obj
for seg in _split_path ( path ) :
if isinstance ( cur , dict ) :
if seg in cur :
cur = cur [ seg ]
else :
return None
elif isinstance ( cur , list ) :
try :
idx = int ( seg )
except Exception : # noqa: BLE001
return None
if 0 < = idx < len ( cur ) :
cur = cur [ idx ]
else :
return None
else :
return None
return cur
def _stringify_for_template ( val : Any ) - > str :
if val is None :
return " "
if isinstance ( val , bool ) :
# JSON-friendly booleans (useful when embedding into JSON-like templates)
return " true " if val else " false "
if isinstance ( val , ( dict , list ) ) :
try :
return json . dumps ( val , ensure_ascii = False )
except Exception : # noqa: BLE001
return str ( val )
return str ( val )
def _deep_find_text ( obj : Any , max_nodes : int = 5000 ) - > Optional [ str ] :
"""
Best-effort поиск первого текстового значения в глубине структуры JSON.
Сначала пытаемся по ключам content/text, затем общий обход.
"""
try :
# Быстрые ветки
if isinstance ( obj , str ) :
return obj
if isinstance ( obj , dict ) :
c = obj . get ( " content " )
if isinstance ( c , str ) :
return c
t = obj . get ( " text " )
if isinstance ( t , str ) :
return t
parts = obj . get ( " parts " )
if isinstance ( parts , list ) and parts :
for p in parts :
if isinstance ( p , dict ) and isinstance ( p . get ( " text " ) , str ) :
return p . get ( " text " )
# Общий нерекурсивный обход в ширину
queue : List [ Any ] = [ obj ]
seen = 0
while queue and seen < max_nodes :
cur = queue . pop ( 0 )
seen + = 1
if isinstance ( cur , str ) :
return cur
if isinstance ( cur , dict ) :
# часто встречающиеся поля
for k in ( " text " , " content " ) :
v = cur . get ( k )
if isinstance ( v , str ) :
return v
# складываем все значения
for v in cur . values ( ) :
queue . append ( v )
elif isinstance ( cur , list ) :
for it in cur :
queue . append ( it )
except Exception :
pass
return None
def _best_text_from_outputs ( node_out : Any ) - > str :
"""
Унифицированное извлечение " текста " из выхода ноды.
Поддерживает:
- PromptTemplate: { " text " : ...}
- LLMInvoke: { " response_text " : ...}
- ProviderCall/RawForward: { " result " : <provider_json>}, извлекаем текст для openai/gemini/claude
- Общий глубокий поиск текста, если специфичные ветки не сработали
"""
# Строка сразу
if isinstance ( node_out , str ) :
return node_out
if not isinstance ( node_out , dict ) :
return " "
# Явные короткие поля
if isinstance ( node_out . get ( " response_text " ) , str ) and node_out . get ( " response_text " ) :
return str ( node_out [ " response_text " ] )
if isinstance ( node_out . get ( " text " ) , str ) and node_out . get ( " text " ) :
return str ( node_out [ " text " ] )
res = node_out . get ( " result " )
base = res if isinstance ( res , ( dict , list ) ) else node_out
# OpenAI
try :
if isinstance ( base , dict ) :
ch0 = ( base . get ( " choices " ) or [ { } ] ) [ 0 ]
msg = ch0 . get ( " message " ) or { }
c = msg . get ( " content " )
if isinstance ( c , str ) :
return c
except Exception :
pass
# Gemini
try :
if isinstance ( base , dict ) :
cand0 = ( base . get ( " candidates " ) or [ { } ] ) [ 0 ]
content = cand0 . get ( " content " ) or { }
parts0 = ( content . get ( " parts " ) or [ { } ] ) [ 0 ]
t = parts0 . get ( " text " )
if isinstance ( t , str ) :
return t
except Exception :
pass
# Claude
try :
if isinstance ( base , dict ) :
blocks = base . get ( " content " ) or [ ]
texts = [ b . get ( " text " ) for b in blocks if isinstance ( b , dict ) and isinstance ( b . get ( " text " ) , str ) ]
if texts :
return " \n " . join ( texts )
except Exception :
pass
# Общий глубокий поиск
txt = _deep_find_text ( base )
return txt or " "
def _extract_out_node_id_from_ref ( s : Any ) - > Optional [ str ] :
"""
Извлекает node_id из строки с макросом [[OUT:nodeId(.path)*]].
Возвращает None, если макрос не найден.
"""
if not isinstance ( s , str ) :
return None
m = _OUT_MACRO_RE . search ( s )
if not m :
return None
body = m . group ( 1 ) . strip ( )
node_id = body . split ( " . " , 1 ) [ 0 ] . strip ( )
return node_id or None
def _resolve_in_value ( source : Any , context : Dict [ str , Any ] , values : Dict [ str , Dict [ str , Any ] ] ) - > Any :
"""
Разрешает входные связи/макросы в значение для inputs:
- Нестроковые значения возвращаются как есть
- " macro:path " → берёт значение из context по точечному пути
- " [[VAR:path]] " → берёт значение из context
- " [[OUT:nodeId(.path)*]] " → берёт из уже вычисленных выходов ноды
- " nodeId(.path)* " → ссылка на выходы ноды
- Иначе пытается взять из context по пути; если не найдено, оставляет исходную строку
"""
if not isinstance ( source , str ) :
return source
s = source . strip ( )
# macro:path
if s . lower ( ) . startswith ( " macro: " ) :
path = s . split ( " : " , 1 ) [ 1 ] . strip ( )
return _get_by_path ( context , path )
# [[VAR: path]]
m = _VAR_MACRO_RE . fullmatch ( s )
if m :
path = m . group ( 1 ) . strip ( )
return _get_by_path ( context , path )
# [[OUT: nodeId(.path)*]]
m = _OUT_MACRO_RE . fullmatch ( s )
if m :
body = m . group ( 1 ) . strip ( )
if " . " in body :
node_id , rest = body . split ( " . " , 1 )
node_val = values . get ( node_id )
return _get_by_path ( node_val , rest )
node_val = values . get ( body )
return node_val
# "nodeId(.path)*"
if " . " in s :
node_id , rest = s . split ( " . " , 1 )
if node_id in values :
return _get_by_path ( values . get ( node_id ) , rest )
if s in values :
return values . get ( s )
# fallback: from context by dotted path or raw string
ctx_val = _get_by_path ( context , s )
return ctx_val if ctx_val is not None else source
def render_template_simple ( template : str , context : Dict [ str , Any ] , out_map : Dict [ str , Any ] ) - > str :
"""
Простая подстановка:
- {{ path }} — берёт из context (или {{ OUT.node.path }} для выходов)
- Поддержка фильтра по умолчанию: {{ path|default(value) }}
value может быть числом, строкой ( ' .. ' / " .. " ), массивом/объектом в виде литерала.
- [[VAR:path]] — берёт из context
- [[OUT:nodeId(.path)*]] — берёт из out_map
Возвращает строку.
"""
if template is None :
return " "
s = str ( template )
# 1) Макросы [[VAR:...]] и [[OUT:...]]
def repl_var ( m : re . Match ) - > str :
path = m . group ( 1 ) . strip ( )
val = _get_by_path ( context , path )
return _stringify_for_template ( val )
def repl_out ( m : re . Match ) - > str :
body = m . group ( 1 ) . strip ( )
if " . " in body :
node_id , rest = body . split ( " . " , 1 )
node_val = out_map . get ( node_id )
val = _get_by_path ( node_val , rest )
else :
val = out_map . get ( body )
return _stringify_for_template ( val )
s = _VAR_MACRO_RE . sub ( repl_var , s )
s = _OUT_MACRO_RE . sub ( repl_out , s )
# [[OUT1]] → текст из ноды n1 (best-effort)
def repl_out_short ( m : re . Match ) - > str :
try :
num = int ( m . group ( 1 ) )
node_id = f " n { num } "
node_out = out_map . get ( node_id )
txt = _best_text_from_outputs ( node_out )
return _stringify_for_template ( txt )
except Exception :
return " "
s = _OUT_SHORT_RE . sub ( repl_out_short , s )
# [[PROMPT]] expands to raw provider-specific JSON fragment prepared in context["PROMPT"]
s = _PROMPT_MACRO_RE . sub ( lambda _m : str ( context . get ( " PROMPT " ) or " " ) , s )
# 2) Подстановки {{ ... }} (+ simple default filter)
def repl_braces ( m : re . Match ) - > str :
expr = m . group ( 1 ) . strip ( )
def eval_path ( p : str ) - > Any :
p = p . strip ( )
if p . startswith ( " OUT. " ) :
body = p [ 4 : ]
if " . " in body :
node_id , rest = body . split ( " . " , 1 )
node_val = out_map . get ( node_id )
return _get_by_path ( node_val , rest )
return out_map . get ( body )
return _get_by_path ( context , p )
default_match = re . match ( r " ([^|]+) \ | \ s*default \ ((.*) \ ) \ s*$ " , expr )
if default_match :
base_path = default_match . group ( 1 ) . strip ( )
fallback_raw = default_match . group ( 2 ) . strip ( )
# Снимем внешние кавычки, если это строковый литерал
if len ( fallback_raw ) > = 2 and ( ( fallback_raw [ 0 ] == " ' " and fallback_raw [ - 1 ] == " ' " ) or ( fallback_raw [ 0 ] == ' " ' and fallback_raw [ - 1 ] == ' " ' ) ) :
fallback_val : Any = fallback_raw [ 1 : - 1 ]
else :
# Иначе оставляем как есть (числа/массивы/объекты — литералами)
fallback_val = fallback_raw
raw_val = eval_path ( base_path )
val = raw_val if raw_val not in ( None , " " ) else fallback_val
else :
val = eval_path ( expr )
return _stringify_for_template ( val )
s = _BRACES_RE . sub ( repl_braces , s )
return s
def detect_vendor ( payload : Dict [ str , Any ] ) - > str :
if not isinstance ( payload , dict ) :
return " unknown "
if " anthropic_version " in payload or payload . get ( " provider " ) == " anthropic " :
return " claude "
# Gemini typical payload keys
if " contents " in payload or " generationConfig " in payload :
return " gemini "
# OpenAI typical keys
if " messages " in payload or " model " in payload :
return " openai "
return " unknown "
class ExecutionError ( Exception ) :
pass
class Node :
type_name : str = " Base "
def __init__ ( self , node_id : str , config : Optional [ Dict [ str , Any ] ] = None ) - > None :
self . node_id = node_id
self . config = config or { }
async def run ( self , inputs : Dict [ str , Any ] , context : Dict [ str , Any ] ) - > Dict [ str , Any ] : # noqa: D401
""" Execute node with inputs and context. Return dict of outputs. """
raise NotImplementedError
# Регистрация поддерживаемых типов нод (минимальный набор)
NODE_REGISTRY : Dict [ str , Any ] = { }
class PipelineExecutor :
def __init__ ( self , pipeline : Dict [ str , Any ] ) - > None :
self . pipeline = pipeline
self . nodes_by_id : Dict [ str , Node ] = { }
for n in pipeline . get ( " nodes " , [ ] ) :
node_cls = NODE_REGISTRY . get ( n . get ( " type " ) )
if not node_cls :
raise ExecutionError ( f " Unknown node type: { n . get ( ' type ' ) } " )
self . nodes_by_id [ n [ " id " ] ] = node_cls ( n [ " id " ] , n . get ( " config " , { } ) )
async def run ( self , context : Dict [ str , Any ] ) - > Dict [ str , Any ] :
"""
Исполнитель пайплайна с динамическим порядком на основе зависимостей графа.
Новый режим: волновое (level-by-level) исполнение с параллелизмом и барьером.
Все узлы «готовой волны» стартуют параллельно, ждём всех, затем открывается следующая волна.
Ограничение параллелизма берётся из pipeline.parallel_limit (по умолчанию 8).
Политика ошибок: fail-fast — при исключении любой задачи волны прерываем пайплайн.
"""
nodes : List [ Dict [ str , Any ] ] = list ( self . pipeline . get ( " nodes " , [ ] ) )
id_set = set ( self . nodes_by_id . keys ( ) )
# Собираем зависимости: node_id -> set(parent_ids), и обратные связи dependents
deps_map : Dict [ str , set ] = { n [ " id " ] : set ( ) for n in nodes }
dependents : Dict [ str , set ] = { n [ " id " ] : set ( ) for n in nodes }
for n in nodes :
nid = n [ " id " ]
for _ , source in ( n . get ( " in " ) or { } ) . items ( ) :
if not isinstance ( source , str ) :
# Нестрочные значения считаем константами — зависимостей нет
continue
if source . startswith ( " macro: " ) :
# Макросы берутся из контекста, без зависимостей
continue
# [[VAR:...]] — макрос из контекста, зависимостей нет
if re . fullmatch ( r " \ [ \ [ \ s*VAR \ s*[: \ s] \ s*[^ \ ]]+ \ s* \ ] \ ] " , source . strip ( ) ) :
continue
# [[OUT:nodeId(.key)*]] — зависимость от указанной ноды
out_ref_node = _extract_out_node_id_from_ref ( source )
if out_ref_node and out_ref_node in id_set :
deps_map [ nid ] . add ( out_ref_node )
dependents [ out_ref_node ] . add ( nid )
continue
# Ссылки вида "node.outKey" или "node"
src_id = source . split ( " . " , 1 ) [ 0 ] if " . " in source else source
if src_id in id_set :
deps_map [ nid ] . add ( src_id )
dependents [ src_id ] . add ( nid )
# Входящие степени и первая волна
in_degree : Dict [ str , int ] = { nid : len ( deps ) for nid , deps in deps_map . items ( ) }
ready : List [ str ] = [ nid for nid , deg in in_degree . items ( ) if deg == 0 ]
processed : List [ str ] = [ ]
values : Dict [ str , Dict [ str , Any ] ] = { }
last_result : Dict [ str , Any ] = { }
node_def_by_id : Dict [ str , Dict [ str , Any ] ] = { n [ " id " ] : n for n in nodes }
# Параметры параллелизма
try :
parallel_limit = int ( self . pipeline . get ( " parallel_limit " , 8 ) )
except Exception :
parallel_limit = 8
if parallel_limit < = 0 :
parallel_limit = 1
# Вспомогательная корутина исполнения одной ноды со снапшотом OUT
async def exec_one ( node_id : str , values_snapshot : Dict [ str , Any ] ) - > tuple [ str , Dict [ str , Any ] ] :
ndef = node_def_by_id . get ( node_id )
if not ndef :
raise ExecutionError ( f " Node definition not found: { node_id } " )
node = self . nodes_by_id [ node_id ]
# Снимок контекста и OUT на момент старта волны
ctx = dict ( context )
ctx [ " OUT " ] = values_snapshot
# Разрешаем inputs для ноды
inputs : Dict [ str , Any ] = { }
for name , source in ( ndef . get ( " in " ) or { } ) . items ( ) :
inputs [ name ] = _resolve_in_value ( source , ctx , values_snapshot )
out = await node . run ( inputs , ctx )
return node_id , out
# Волновое исполнение
while ready :
wave_nodes = list ( ready )
ready = [ ] # будет заполнено после завершения волны
wave_results : Dict [ str , Dict [ str , Any ] ] = { }
# Один общий снапшот OUT для всей волны (барьер — узлы волны не видят результаты друг друга)
values_snapshot = dict ( values )
# Чанковый запуск с лимитом parallel_limit
for i in range ( 0 , len ( wave_nodes ) , parallel_limit ) :
chunk = wave_nodes [ i : i + parallel_limit ]
# fail-fast: при исключении любой задачи gather бросит и отменит остальные
results = await asyncio . gather (
* ( exec_one ( nid , values_snapshot ) for nid in chunk ) ,
return_exceptions = False ,
)
# Коммитим результаты чанка в локальное хранилище волны
for nid , out in results :
wave_results [ nid ] = out
last_result = out # обновляем на каждом успешном результате
# После завершения волны — коммитим все её результаты в общие values
values . update ( wave_results )
processed . extend ( wave_nodes )
# Обновляем входящие степени для зависимых и формируем следующую волну
for done in wave_nodes :
for child in dependents . get ( done , ( ) ) :
in_degree [ child ] - = 1
next_ready = [ nid for nid , deg in in_degree . items ( ) if deg == 0 and nid not in processed and nid not in wave_nodes ]
# Исключаем уже учтённые и добавляем только те, которые действительно готовы
ready = next_ready
# Проверка на циклы/недостижимые ноды
if len ( processed ) != len ( nodes ) :
remaining = [ n [ " id " ] for n in nodes if n [ " id " ] not in processed ]
raise ExecutionError ( f " Cycle detected or unresolved dependencies among nodes: { remaining } " )
return last_result
class ProviderCallNode ( Node ) :
type_name = " ProviderCall "
# --- Prompt Manager helpers -------------------------------------------------
def _get_blocks ( self ) - > List [ Dict [ str , Any ] ] :
""" Return normalized list of prompt blocks from config. """
raw = self . config . get ( " blocks " ) or self . config . get ( " prompt_blocks " ) or [ ]
if not isinstance ( raw , list ) :
return [ ]
norm : List [ Dict [ str , Any ] ] = [ ]
for i , b in enumerate ( raw ) :
if not isinstance ( b , dict ) :
continue
role = str ( b . get ( " role " , " user " ) ) . lower ( ) . strip ( )
if role not in { " system " , " user " , " assistant " , " tool " } :
role = " user "
# order fallback: keep original index if not provided/correct
try :
order = int ( b . get ( " order " ) ) if b . get ( " order " ) is not None else i
except Exception : # noqa: BLE001
order = i
norm . append (
{
" id " : b . get ( " id " ) or f " b { i } " ,
" name " : b . get ( " name " ) or f " Block { i + 1 } " ,
" role " : role ,
" prompt " : b . get ( " prompt " ) or " " ,
" enabled " : bool ( b . get ( " enabled " , True ) ) ,
" order " : order ,
}
)
return norm
def _render_blocks_to_unified ( self , context : Dict [ str , Any ] ) - > List [ Dict [ str , Any ] ] :
"""
Filter+sort+render blocks to unified messages:
[ { role, content, name?}]
"""
out_map = context . get ( " OUT " ) or { }
blocks = [ b for b in self . _get_blocks ( ) if b . get ( " enabled " , True ) ]
blocks . sort ( key = lambda x : x . get ( " order " , 0 ) )
messages : List [ Dict [ str , Any ] ] = [ ]
for b in blocks :
content = render_template_simple ( str ( b . get ( " prompt " ) or " " ) , context , out_map )
msg = { " role " : b [ " role " ] , " content " : content }
if b . get ( " name " ) :
msg [ " name " ] = b [ " name " ]
messages . append ( msg )
return messages
def _messages_to_payload ( self , provider : str , messages : List [ Dict [ str , Any ] ] , context : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Convert unified messages to provider-specific request payload. """
params = context . get ( " params " ) or { }
model = context . get ( " model " ) or " "
if provider == " openai " :
payload : Dict [ str , Any ] = {
" model " : model ,
" messages " : [
{ k : v for k , v in { " role " : m [ " role " ] , " content " : m [ " content " ] , " name " : m . get ( " name " ) } . items ( ) if v is not None }
for m in messages
] ,
" temperature " : params . get ( " temperature " , 0.7 ) ,
}
if params . get ( " max_tokens " ) is not None :
payload [ " max_tokens " ] = params . get ( " max_tokens " )
if params . get ( " top_p " ) is not None :
payload [ " top_p " ] = params . get ( " top_p " )
if params . get ( " stop " ) is not None :
payload [ " stop " ] = params . get ( " stop " )
return payload
if provider == " gemini " :
sys_text = " \n \n " . join ( [ m [ " content " ] for m in messages if m [ " role " ] == " system " ] ) . strip ( )
contents = [ ]
for m in messages :
if m [ " role " ] == " system " :
continue
role = " model " if m [ " role " ] == " assistant " else " user "
contents . append ( { " role " : role , " parts " : [ { " text " : m [ " content " ] } ] } )
gen_cfg : Dict [ str , Any ] = { }
if params . get ( " temperature " ) is not None :
gen_cfg [ " temperature " ] = params . get ( " temperature " )
if params . get ( " max_tokens " ) is not None :
gen_cfg [ " maxOutputTokens " ] = params . get ( " max_tokens " )
if params . get ( " top_p " ) is not None :
gen_cfg [ " topP " ] = params . get ( " top_p " )
if params . get ( " stop " ) is not None :
gen_cfg [ " stopSequences " ] = params . get ( " stop " )
payload = { " model " : model , " contents " : contents }
if sys_text :
payload [ " systemInstruction " ] = { " parts " : [ { " text " : sys_text } ] }
if gen_cfg :
payload [ " generationConfig " ] = gen_cfg
return payload
if provider == " claude " :
sys_text = " \n \n " . join ( [ m [ " content " ] for m in messages if m [ " role " ] == " system " ] ) . strip ( )
msgs = [ ]
for m in messages :
if m [ " role " ] == " system " :
continue
role = m [ " role " ] if m [ " role " ] in { " user " , " assistant " } else " user "
msgs . append ( { " role " : role , " content " : [ { " type " : " text " , " text " : m [ " content " ] } ] } )
payload : Dict [ str , Any ] = {
" model " : model ,
" messages " : msgs ,
" anthropic_version " : context . get ( " anthropic_version " , " 2023-06-01 " ) ,
}
if sys_text :
payload [ " system " ] = sys_text
if params . get ( " temperature " ) is not None :
payload [ " temperature " ] = params . get ( " temperature " )
if params . get ( " max_tokens " ) is not None :
payload [ " max_tokens " ] = params . get ( " max_tokens " )
if params . get ( " top_p " ) is not None :
payload [ " top_p " ] = params . get ( " top_p " )
if params . get ( " stop " ) is not None :
payload [ " stop " ] = params . get ( " stop " )
return payload
return { }
def _blocks_struct_for_template ( self , provider : str , messages : List [ Dict [ str , Any ] ] , context : Dict [ str , Any ] ) - > Dict [ str , Any ] :
"""
Сформировать структуру для вставки в шаблон (template) из Prompt Blocks.
Возвращает provider-специфичные ключи, которые можно вставлять в JSON:
- openai: { " messages " : [...] , " system_text " : " ... " }
- gemini: { " contents " : [...], " systemInstruction " : { ...}, " system_text " : " ... " }
- claude: { " system_text " : " ... " , " system " : " ... " , " messages " : [...] }
"""
provider = ( provider or " openai " ) . lower ( )
# Гарантируем список
msgs = messages or [ ]
if provider == " openai " :
# Уже в формате {"role","content","name?"}
sys_text = " \n \n " . join ( [ m [ " content " ] for m in msgs if m . get ( " role " ) == " system " ] ) . strip ( )
# Вставляем как есть (editor будет встраивать JSON массива без кавычек)
return {
" messages " : [
{ k : v for k , v in { " role " : m [ " role " ] , " content " : m . get ( " content " ) , " name " : m . get ( " name " ) } . items ( ) if v is not None }
for m in msgs
] ,
" system_text " : sys_text ,
}
if provider == " gemini " :
sys_text = " \n \n " . join ( [ m [ " content " ] for m in msgs if m . get ( " role " ) == " system " ] ) . strip ( )
contents = [ ]
for m in msgs :
if m . get ( " role " ) == " system " :
continue
role = " model " if m . get ( " role " ) == " assistant " else " user "
contents . append ( { " role " : role , " parts " : [ { " text " : str ( m . get ( " content " ) or " " ) } ] } )
sys_instr = { " parts " : [ { " text " : sys_text } ] } if sys_text else { } # всегда корректный JSON-объект
return {
" contents " : contents ,
" systemInstruction " : sys_instr ,
" system_text " : sys_text ,
}
if provider == " claude " :
sys_text = " \n \n " . join ( [ m [ " content " ] for m in msgs if m . get ( " role " ) == " system " ] ) . strip ( )
out_msgs = [ ]
for m in msgs :
if m . get ( " role " ) == " system " :
continue
role = m . get ( " role " )
role = role if role in { " user " , " assistant " } else " user "
out_msgs . append ( { " role " : role , " content " : [ { " type " : " text " , " text " : str ( m . get ( " content " ) or " " ) } ] } )
return {
" system_text " : sys_text ,
" system " : sys_text , # удобно для шаблона: "system": "{{ pm.system_text }}"
" messages " : out_msgs ,
}
# По умолчанию ничего, но это валидный JSON
return { " messages " : [ ] }
async def run ( self , inputs : Dict [ str , Any ] , context : Dict [ str , Any ] ) - > Dict [ str , Any ] :
provider = ( self . config . get ( " provider " ) or " openai " ) . lower ( )
# Support provider-specific configs stored in UI as provider_configs.{provider}
prov_cfg : Dict [ str , Any ] = { }
try :
cfgs = self . config . get ( " provider_configs " ) or { }
if isinstance ( cfgs , dict ) :
prov_cfg = cfgs . get ( provider ) or { }
except Exception : # noqa: BLE001
prov_cfg = { }
base_url = prov_cfg . get ( " base_url " ) or self . config . get ( " base_url " )
if not base_url :
raise ExecutionError ( f " Node { self . node_id } ( { self . type_name } ) requires ' base_url ' in config " )
if not str ( base_url ) . startswith ( ( " http:// " , " https:// " ) ) :
base_url = " http:// " + str ( base_url )
endpoint_tmpl : str = prov_cfg . get ( " endpoint " ) or self . config . get ( " endpoint " ) or " "
template : str = prov_cfg . get ( " template " ) or self . config . get ( " template " ) or " {} "
headers_json : str = prov_cfg . get ( " headers " ) or self . config . get ( " headers " ) or " {} "
# Default endpoints if not set
if not endpoint_tmpl :
if provider == " openai " :
endpoint_tmpl = " /v1/chat/completions "
elif provider == " gemini " :
endpoint_tmpl = " /v1beta/models/ {{ model }}:generateContent "
elif provider == " claude " :
endpoint_tmpl = " /v1/messages "
# Подготовим Prompt Blocks + pm-структуру для шаблона
unified_msgs = self . _render_blocks_to_unified ( context )
pm_struct = self . _blocks_struct_for_template ( provider , unified_msgs , context )
# Расширяем контекст для рендера шаблонов
render_ctx = dict ( context )
render_ctx [ " pm " ] = pm_struct
# Единый JSON-фрагмент PROMPT для шаблонов: [[PROMPT]]
prompt_fragment = " "
try :
if provider == " openai " :
prompt_fragment = ' " messages " : ' + json . dumps ( pm_struct . get ( " messages " , [ ] ) , ensure_ascii = False )
elif provider == " gemini " :
parts = [ ]
contents = pm_struct . get ( " contents " )
if contents is not None :
parts . append ( ' " contents " : ' + json . dumps ( contents , ensure_ascii = False ) )
sysi = pm_struct . get ( " systemInstruction " )
# даже если пустой объект {}, это валидно
if sysi is not None :
parts . append ( ' " systemInstruction " : ' + json . dumps ( sysi , ensure_ascii = False ) )
prompt_fragment = " , " . join ( parts )
elif provider == " claude " :
parts = [ ]
sys_text = pm_struct . get ( " system_text " ) or pm_struct . get ( " system " )
if sys_text is not None :
parts . append ( ' " system " : ' + json . dumps ( sys_text , ensure_ascii = False ) )
msgs = pm_struct . get ( " messages " )
if msgs is not None :
parts . append ( ' " messages " : ' + json . dumps ( msgs , ensure_ascii = False ) )
prompt_fragment = " , " . join ( parts )
except Exception : # noqa: BLE001
prompt_fragment = " "
render_ctx [ " PROMPT " ] = prompt_fragment
# Render helper с поддержкой [[VAR]], [[OUT]] и {{ ... }}
def render ( s : str ) - > str :
return render_template_simple ( s or " " , render_ctx , render_ctx . get ( " OUT " ) or { } )
# Рендер endpoint с макросами/шаблонами
endpoint = render ( endpoint_tmpl )
# Формируем тело ТОЛЬКО из template/[[PROMPT]] (без сырого payload/входов)
try :
rendered = render ( template )
payload = json . loads ( rendered )
except Exception :
# Fallback: используем генерацию из Prompt Blocks в формате провайдера
payload = self . _messages_to_payload ( provider , unified_msgs , context )
# Заголовки — полностью из редактируемого JSON с макросами
try :
headers_src = render ( headers_json ) if headers_json else " {} "
headers = json . loads ( headers_src ) if headers_src else { }
if not isinstance ( headers , dict ) :
raise ValueError ( " headers must be a JSON object " )
except Exception as exc : # noqa: BLE001
raise ExecutionError ( f " ProviderCall headers invalid JSON: { exc } " )
# Итоговый URL
if not base_url . startswith ( ( " http:// " , " https:// " ) ) :
base_url = " http:// " + base_url
url = endpoint if endpoint . startswith ( " http " ) else urljoin ( base_url . rstrip ( ' / ' ) + ' / ' , endpoint . lstrip ( ' / ' ) )
# Debug logs to validate config selection and payload
try :
payload_preview = " "
try :
payload_preview = json . dumps ( payload , ensure_ascii = False ) [ : 400 ]
except Exception :
payload_preview = str ( payload ) [ : 400 ]
print ( f " DEBUG: ProviderCallNode provider= { provider } URL= { url } " )
print ( f " DEBUG: ProviderCallNode headers_keys= { list ( headers . keys ( ) ) } " )
print ( f " DEBUG: ProviderCallNode payload_preview= { payload_preview } " )
except Exception :
pass
async with build_client ( ) as client :
resp = await client . post ( url , json = payload , headers = { " Content-Type " : " application/json " , * * headers } )
resp . raise_for_status ( )
data = resp . json ( )
# Извлекаем текст best-effort
text = None
if provider == " openai " :
try :
text = data . get ( " choices " , [ { } ] ) [ 0 ] . get ( " message " , { } ) . get ( " content " )
except Exception : # noqa: BLE001
text = None
elif provider == " gemini " :
try :
text = data . get ( " candidates " , [ { } ] ) [ 0 ] . get ( " content " , { } ) . get ( " parts " , [ { } ] ) [ 0 ] . get ( " text " )
except Exception : # noqa: BLE001
text = None
elif provider == " claude " :
try :
blocks = data . get ( " content " ) or [ ]
texts = [ b . get ( " text " ) for b in blocks if isinstance ( b , dict ) and b . get ( " type " ) == " text " ]
text = " \n " . join ( [ t for t in texts if isinstance ( t , str ) ] )
except Exception : # noqa: BLE001
text = None
return { " result " : data , " response_text " : text or " " }
class RawForwardNode ( Node ) :
type_name = " RawForward "
async def run ( self , inputs : Dict [ str , Any ] , context : Dict [ str , Any ] ) - > Dict [ str , Any ] :
incoming = context . get ( " incoming " , { } )
raw_payload = incoming . get ( " json " )
base_url : Optional [ str ] = self . config . get ( " base_url " )
override_path : Optional [ str ] = self . config . get ( " override_path " )
# Разрешаем макросы в конфиге RawForward (base_url, override_path)
out_map_for_macros = context . get ( " OUT " ) or { }
if base_url :
base_url = render_template_simple ( str ( base_url ) , context , out_map_for_macros )
if override_path :
override_path = render_template_simple ( str ( override_path ) , context , out_map_for_macros )
# Если base_url не указан, включаем автодетекцию
if not base_url :
vendor = detect_vendor ( raw_payload )
if vendor == " openai " :
base_url = " https://api.openai.com "
elif vendor == " claude " :
base_url = " https://api.anthropic.com "
elif vendor == " gemini " :
base_url = " https://generativelanguage.googleapis.com "
else :
raise ExecutionError ( f " Node { self . node_id } ( { self . type_name } ): ' base_url ' is not configured and vendor could not be detected. " )
# Гарантируем наличие схемы у base_url
if not base_url . startswith ( ( " http:// " , " https:// " ) ) :
base_url = " http:// " + base_url
path = override_path or incoming . get ( " path " ) or " / "
query = incoming . get ( " query " )
if query :
path_with_qs = f " { path } ? { query } "
else :
path_with_qs = path
url = urljoin ( base_url . rstrip ( " / " ) + " / " , path_with_qs . lstrip ( " / " ) )
passthrough_headers : bool = bool ( self . config . get ( " passthrough_headers " , True ) )
extra_headers_json : str = self . config . get ( " extra_headers " ) or " {} "
# Макросы в extra_headers
try :
extra_headers_src = render_template_simple ( extra_headers_json , context , out_map_for_macros ) if extra_headers_json else " {} "
extra_headers = json . loads ( extra_headers_src ) if extra_headers_src else { }
if not isinstance ( extra_headers , dict ) :
raise ValueError ( " extra_headers must be an object " )
except Exception as exc : # noqa: BLE001
raise ExecutionError ( f " RawForward extra_headers invalid JSON: { exc } " )
headers : Dict [ str , str ] = { }
if passthrough_headers :
inc_headers = incoming . get ( " headers " ) or { }
# Копируем все заголовки, кроме Host и Content-Length
for k , v in inc_headers . items ( ) :
if k . lower ( ) not in [ ' host ' , ' content-length ' ] :
headers [ k ] = v
# Убедимся, что Content-Type на месте, если его не было
if ' content-type ' not in { k . lower ( ) for k in headers } :
headers [ ' Content-Type ' ] = ' application/json '
headers . update ( extra_headers )
print ( f " DEBUG: RawForwardNode sending request to URL: { url } " )
print ( f " DEBUG: RawForwardNode sending with HEADERS: { headers } " )
async with build_client ( ) as client :
resp = await client . post ( url , json = raw_payload , headers = headers )
# Логируем ответ от целевого API для диагностики
try :
data = resp . json ( )
print ( f " DEBUG: RawForwardNode received response. Status: { resp . status_code } , Body: { data } " )
except Exception :
data = { " error " : " Failed to decode JSON from upstream " , " text " : resp . text }
print ( f " DEBUG: RawForwardNode received non-JSON response. Status: { resp . status_code } , Text: { resp . text } " )
# Не выбрасываем исключение, а просто проксируем ответ
# resp.raise_for_status()
return { " result " : data }
NODE_REGISTRY . update ( {
ProviderCallNode . type_name : ProviderCallNode ,
RawForwardNode . type_name : RawForwardNode ,
} )