Your LLM Framework Only Needs 100 Lines

4 months ago 7

Does it really need to be this complicated?

What if we stripped everything away?

Your LLM Framework Only Needs

https://github.com/The-Pocket/PocketFlow

import asyncio, warnings, copy, time class BaseNode: def __init__(self): self.params,self.successors={},{} def set_params(self,params): self.params=params def next(self,node,action="default"): if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'") self.successors[action]=node; return node def prep(self,shared): pass def exec(self,prep_res): pass def post(self,shared,prep_res,exec_res): pass def _exec(self,prep_res): return self.exec(prep_res) def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e) def run(self,shared): if self.successors: warnings.warn("Node won't run successors. Use Flow.") return self._run(shared) def __rshift__(self,other): return self.next(other) def __sub__(self,action): if isinstance(action,str): return _ConditionalTransition(self,action) raise TypeError("Action must be a string") class _ConditionalTransition: def __init__(self,src,action): self.src,self.action=src,action def __rshift__(self,tgt): return self.src.next(tgt,self.action) class Node(BaseNode): def __init__(self,max_retries=1,wait=0): super().__init__(); self.max_retries,self.wait=max_retries,wait def exec_fallback(self,prep_res,exc): raise exc def _exec(self,prep_res): for self.cur_retry in range(self.max_retries): try: return self.exec(prep_res) except Exception as e: if self.cur_retry==self.max_retries-1: return self.exec_fallback(prep_res,e) if self.wait>0: time.sleep(self.wait) class BatchNode(Node): def _exec(self,items): return [super(BatchNode,self)._exec(i) for i in (items or [])] class Flow(BaseNode): def __init__(self,start=None): super().__init__(); self.start_node=start def start(self,start): self.start_node=start; return start def get_next_node(self,curr,action): nxt=curr.successors.get(action or "default") if not nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}") return nxt def _orch(self,shared,params=None): curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None while curr: curr.set_params(p); last_action=curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action)) return last_action def _run(self,shared): p=self.prep(shared); o=self._orch(shared); return self.post(shared,p,o) def post(self,shared,prep_res,exec_res): return exec_res class BatchFlow(Flow): def _run(self,shared): pr=self.prep(shared) or [] for bp in pr: self._orch(shared,{**self.params,**bp}) return self.post(shared,pr,None) class AsyncNode(Node): async def prep_async(self,shared): pass async def exec_async(self,prep_res): pass async def exec_fallback_async(self,prep_res,exc): raise exc async def post_async(self,shared,prep_res,exec_res): pass async def _exec(self,prep_res): for i in range(self.max_retries): try: return await self.exec_async(prep_res) except Exception as e: if i==self.max_retries-1: return await self.exec_fallback_async(prep_res,e) if self.wait>0: await asyncio.sleep(self.wait) async def run_async(self,shared): if self.successors: warnings.warn("Node won't run successors. Use AsyncFlow.") return await self._run_async(shared) async def _run_async(self,shared): p=await self.prep_async(shared); e=await self._exec(p); return await self.post_async(shared,p,e) def _run(self,shared): raise RuntimeError("Use run_async.") class AsyncBatchNode(AsyncNode,BatchNode): async def _exec(self,items): return [await super(AsyncBatchNode,self)._exec(i) for i in items] class AsyncParallelBatchNode(AsyncNode,BatchNode): async def _exec(self,items): return await asyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i) for i in items)) class AsyncFlow(Flow,AsyncNode): async def _orch_async(self,shared,params=None): curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None while curr: curr.set_params(p); last_action=await curr._run_async(shared) if isinstance(curr,AsyncNode) else curr._run(shared); curr=copy.copy(self.get_next_node(curr,last_action)) return last_action async def _run_async(self,shared): p=await self.prep_async(shared); o=await self._orch_async(shared); return await self.post_async(shared,p,o) async def post_async(self,shared,prep_res,exec_res): return exec_res class AsyncBatchFlow(AsyncFlow,BatchFlow): async def _run_async(self,shared): pr=await self.prep_async(shared) or [] for bp in pr: await self._orch_async(shared,{**self.params,**bp}) return await self.post_async(shared,pr,None) class AsyncParallelBatchFlow(AsyncFlow,BatchFlow): async def _run_async(self,shared): pr=await self.prep_async(shared) or [] await asyncio.gather(*(self._orch_async(shared,{**self.params,**bp}) for bp in pr)) return await self.post_async(shared,pr,None) AbstractionApp WrappersVendor WrappersLinesSize
LangChain Agent, Chain Many
(e.g., QA, Summarization)
Many
(e.g., OpenAI, Pinecone, etc.)
405K +166MB
CrewAI Agent, Chain Many
(e.g., FileReadTool, SerperDevTool)
Many
(e.g., OpenAI, Anthropic, Pinecone, etc.)
18K +173MB
SmolAgent Agent Some
(e.g., CodeAgent, VisitWebTool)
Some
(e.g., DuckDuckGo, Hugging Face, etc.)
8K +198MB
LangGraph Agent, Graph Some
(e.g., Semantic Search)
Some
(e.g., PostgresStore, SqliteSaver, etc.)
37K +51MB
AutoGen Agent Some
(e.g., Tool Agent, Chat Agent)
Many [Optional]
(e.g., OpenAI, Pinecone, etc.)
7K
(core-only)
+26MB
(core-only)
PocketFlow Graph None None 100 +56KB

You will learn how agents ACTUALLY work.

You will learn how RAG is REALLY built.

You will learn from THE GROUND UP.

If you know a little bit of Python.

You're ready.

The Node The Shared Store The Flow

class BaseNode: def prep(self, shared): pass def exec(self, prep_res): pass def post(self, shared, prep_res, exec_res): pass def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)

prep: Reads from shared store to get ready.

class BaseNode: def prep(self, shared): pass def exec(self, prep_res): pass def post(self, shared, prep_res, exec_res): pass def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)

exec: Performs the core task in isolation.

class BaseNode: def prep(self, shared): pass def exec(self, prep_res): pass def post(self, shared, prep_res, exec_res): pass def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)

post: Writes results back and returns an 'action' string for the next Node.

class Node(BaseNode): def __init__(self, max_retries=1, wait=0): super().__init__() self.max_retries, self.wait = max_retries, wait def _exec(self, prep_res): for self.cur_retry in range(self.max_retries): try: return self.exec(prep_res) except Exception as e: if self.cur_retry == self.max_retries - 1: return self.exec_fallback(prep_res, e) if self.wait > 0: time.sleep(self.wait) def exec_fallback(self, prep_res, exc): raise exc
# A node that calls an LLM to summarize a file class SummarizeFile(Node): def prep(self, shared): return shared.get("file_content") def exec(self, file_content): # This would make a real API call to an LLM return call_llm(f"Summarize this: {file_content}") def exec_fallback(self, file_content, error): # If the LLM call fails after all retries... print(f"LLM call failed with error: {error}") return "Sorry, summarizing the file failed." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res return "next" # Instantiate the node with retry logic summarizer = SummarizeFile( max_retries=3, wait=5 )

The Node The Shared Store The Flow

{ }

# The Shared Store is just a dictionary. shared = {} # Node A writes to it. shared["result_from_A"] = 42 # Node B reads from it. data_for_b = shared["result_from_A"]
class LoadData(Node): # This node's only job is to add data to the shared store. def post(self, shared, prep_res, exec_res): shared["data"] = "Some important text content"
class Summarize(Node): def prep(self, shared): # Reads the data that the previous node wrote. return shared["data"] def exec(self, prep_res): # Does its work on the prepared data. return call_llm(f"Summarize: {prep_res}") def post(self, shared, prep_res, exec_res): # Writes its own result back to the shared store. shared["summary"] = exec_res

The Node The Shared Store The Flow

%%{init: {'theme': 'dark'}}%% graph LR A[Node A] --> B[Node B]

# A simple, default transition node_a >> node_b

%%{init: {'theme': 'dark'}}%% graph TD A[Review Node] -->|approved| B[Payment Node] A -->|rejected| C[Finish Node]

# Named transitions for branching logic review_node - "approved" >> payment_node review_node - "rejected" >> finish_node
class BaseNode: def __init__(self): self.params, self.successors = {}, {} def next(self, node, action="default"): # Adds an entry to the node's "address book" self.successors[action] = node return node
class Flow(BaseNode): def _orch(self, shared, params=None): # Runs a node, gets the action, finds the next node, repeats while curr: last_action = curr._run(shared) curr = self.get_next_node(curr, last_action) def get_next_node(self, curr, action): # Looks up the action in the current node's successors return curr.successors.get(action or "default")

%%{init: {'theme': 'dark'}}%% graph LR subgraph Flow A[Node A] --> B[Node B] end Flow --> C[Node C]

This allows for powerful, nested workflows.

No OpenAI. No Anthropic. No Google.

%%{init: {'theme': 'dark'}}%% graph LR subgraph B[LLM Framework] A[LLM API] end C[You] --> B

Maintenance Nightmare

from openai import OpenAI import os def call_llm(messages): client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) response = client.chat.completions.create( model="gpt-4o", messages=messages ) return response.choices[0].message.content

Write the function yourself!

%%{init: {'theme': 'dark'}}%% graph LR chat[ChatNode] -- "continue" --> chat

A chatbot is just a self-loop.

class ChatNode(Node): def prep(self, shared): # Initialize messages if this is the first run if "messages" not in shared: shared["messages"] = [] print("Welcome to the chat!") # Get user input user_input = input("\nYou: ") # Add user message to history shared["messages"].append({"role": "user", "content": user_input}) # Return all messages for the LLM return shared["messages"] def exec(self, messages): # Call LLM with the entire conversation history response = call_llm(messages) return response def post(self, shared, prep_res, exec_res): # Print the assistant's response print(f"\nAssistant: {exec_res}") # Add assistant message to history shared["messages"].append({"role": "assistant", "content": exec_res}) # Loop back to continue the conversation return "continue" # Create the flow with self-loop chat_node = ChatNode() chat_node - "continue" >> chat_node flow = Flow(start=chat_node) # Start the chat if __name__ == "__main__": shared = {} flow.run(shared)

%%{init: {'theme': 'dark'}}%% graph TD prep["prep()"] --> exec["exec()"] --> post["post()"]

# 1. Initial State shared = {}

%%{init: {'theme': 'dark'}}%% graph TD style prep fill:#4CAF50,stroke:#333 prep["prep()"] --> exec["exec()"] --> post["post()"]

# 2. After prep() shared = { "messages": [ {"role": "user", "content": "Hello, who are you?"} ] }

%%{init: {'theme': 'dark'}}%% graph TD style exec fill:#4CAF50,stroke:#333 prep["prep()"] --> exec["exec()"] --> post["post()"]

# 3. During exec() # No change to shared store shared = { "messages": [ {"role": "user", "content": "Hello, who are you?"} ] }

%%{init: {'theme': 'dark'}}%% graph TD style post fill:#4CAF50,stroke:#333 prep["prep()"] --> exec["exec()"] --> post["post()"]

# 4. After post() shared = { "messages": [ {"role": "user", "content": "Hello, who are you?"}, {"role": "assistant", "content": "I am a helpful AI assistant."} ] }

What you WANT

name: Jane Doe email: [email protected]

What you GET

"Certainly! The name mentioned in the document is Jane Doe, and you can find her email at [email protected]."

JSON - Fragile

{ "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\"" }

escaping

YAML - Robust

dialogue: | Alice said: "Hello Bob. How are you?"

no escaping

class ResumeParserNode(Node): def prep(self, shared): # The resume text is expected to be in the shared store return shared["resume_text"] def exec(self, resume_text): prompt = f"""Analyze the resume below. Output ONLY the requested information in YAML format. **Resume:** ``` {resume_text} ``` **YAML Output Requirements:** - Extract `name` (string). - Extract `email` (string). - Extract `experience` (list of objects). **Example Format:** ```yaml name: Jane Doe email: [email protected] experience: - title: Manager company: Corp A ``` Generate the YAML output now: """ response = call_llm(prompt) # A simple way to extract the YAML block from the response yaml_str = response.split("```yaml")[1].split("```")[0].strip() structured_result = yaml.safe_load(yaml_str) # Assertions to enforce the output structure assert "name" in structured_result, "Name is required" assert "email" in structured_result, "Email is required" return structured_result def post(self, shared, prep_res, exec_res): shared["structured_data"] = exec_res

Before

!! JOHN SMTIH !! contact: 123-456-7890. email me at [email protected] Work Experience: - at ABC Corportaion i was the SALES MANAGER - XYZ Industries, position: ASST. MANAGER

After

# The clean, structured data { "name": "JOHN SMTIH", "email": "[email protected]", "experience": [ {"title": "SALES MANAGER", "company": "ABC Corportaion"}, {"title": "ASST. MANAGER", "company": "XYZ Industries"} ] }

Solution: The BatchNode

Standard Node

class Node: # Returns one item to process def prep(self, shared): return one_item # Processes one item def exec(self, one_item): return one_result # Receives one result def post(self, shared, prep_res, exec_res): pass

BatchNode

class BatchNode(Node): # Returns a LIST of items def prep(self, shared): return [item1, item2, item3] # Processes ONE item at a time def exec(self, one_item_from_list): return one_result # Receives a LIST of all results def post(self, shared, prep_res, exec_res_list): pass

Solution: The BatchNode

Standard Node

class Node: # Returns one item to process def prep(self, shared): return one_item # Processes one item def exec(self, one_item): return one_result # Receives one result def post(self, shared, prep_res, exec_res): pass

BatchNode

class BatchNode(Node): # Returns a LIST of items def prep(self, shared): return [item1, item2, item3] # Processes ONE item at a time def exec(self, one_item_from_list): return one_result # Receives a LIST of all results def post(self, shared, prep_res, exec_res_list): pass

Solution: The BatchNode

Standard Node

class Node: # Returns one item to process def prep(self, shared): return one_item # Processes one item def exec(self, one_item): return one_result # Receives one result def post(self, shared, prep_res, exec_res): pass

BatchNode

class BatchNode(Node): # Returns a LIST of items def prep(self, shared): return [item1, item2, item3] # Processes ONE item at a time def exec(self, one_item_from_list): return one_result # Receives a LIST of all results def post(self, shared, prep_res, exec_res_list): pass

Solution: The BatchNode

Standard Node

class Node: # Returns one item to process def prep(self, shared): return one_item # Processes one item def exec(self, one_item): return one_result # Receives one result def post(self, shared, prep_res, exec_res): pass

BatchNode

class BatchNode(Node): # Returns a LIST of items def prep(self, shared): return [item1, item2, item3] # Processes ONE item at a time def exec(self, one_item_from_list): return one_result # Receives a LIST of all results def post(self, shared, prep_res, exec_res_list): pass

The Implementation: Dumb Simple

class BatchNode(Node): # This is the entire implementation. def _exec(self, items): # It's just a simple for loop. return [super(BatchNode, self)._exec(i) for i in (items or [])] class BatchResumeParserNode(BatchNode): def prep(self, shared): # Assumes a list of resumes is already in the shared store. return shared.get("resumes", []) def exec(self, resume_text): prompt = f"""Analyze the resume below. Output ONLY the requested information in YAML format. **Resume:** ``` {resume_text} ``` **YAML Output Requirements:** - Extract `name` (string). - Extract `email` (string). - Extract `experience` (list of objects). **Example Format:** ```yaml name: Jane Doe email: [email protected] experience: - title: Manager company: Corp A ``` Generate the YAML output now: """ response = call_llm(prompt) # A simple way to extract the YAML block from the response yaml_str = response.split("```yaml")[1].split("```")[0].strip() structured_result = yaml.safe_load(yaml_str) # Assertions to enforce the output structure assert "name" in structured_result, "Name is required" assert "email" in structured_result, "Email is required" return structured_result def post(self, shared, prep_res, all_results_list): # This runs only ONCE, after all items are processed. # It receives a list of all individual results. shared["all_parsed_resumes"] = all_results_list

Batch Node Walkthrough

%%{init: {'theme': 'dark'}}%% graph TD A["prep():
returns [resume1, resume2, resume3]"] --> exec_loop subgraph exec_loop ["_exec() loop"] C["exec(resume1)"] --> E["exec(resume2)"] --> G["exec(resume3)"] end exec_loop --> H["post():
receives [result1, result2, result3]"]

%%{init: {'theme': 'dark'}}%% graph TD subgraph exec_loop ["_exec() loop"] C["exec(resume1)"] --> E["exec(resume2)"] --> G["exec(resume3)"] end

Total Time = Sum of All Tasks

%%{init: {'theme': 'dark'}}%% sequenceDiagram participant You participant LLM You->>LLM: Process Resume 1? activate LLM Note over You, LLM: 10+ seconds of waiting... LLM-->>You: Result 1 deactivate LLM You->>LLM: Process Resume 2? activate LLM Note over You, LLM: 10+ seconds of waiting... LLM-->>You: Result 2 deactivate LLM

5-20 seconds of pure, wasted time... per resume.

The Slow Chef

%%{init: {'theme': 'dark'}}%% graph TD A[Start Eggs] --> B[Wait for Eggs] B --> C[Serve Eggs] C --> D[Start Toast] D --> E[Wait for Toast] E --> F[Serve Toast]

The Smart Chef

%%{init: {'theme': 'dark'}}%% graph TD subgraph " " A[Start Eggs] --> B[Wait for Eggs] C[Start Toast] --> D[Wait for Toast] end B --> E[Serve Eggs] D --> F[Serve Toast]

async

A label for functions that might wait

await

Pause one task and let others run

import asyncio import time async def make_coffee(): # Takes 3s print("Start coffee...") await asyncio.sleep(3) print("Coffee ready!") async def make_toast(): # Takes 2s print("Start toast...") await asyncio.sleep(2) print("Toast ready!") async def main(): start_time = time.time() # Run both tasks concurrently await asyncio.gather( make_coffee(), make_toast() ) print(f"Total time: {time.time() - start_time:.2f}s") asyncio.run(main()) Start coffee... Start toast... Toast ready! Coffee ready! Total time: 3.01s

Total Time = Time of Longest Task

Before - Synchronous

from openai import OpenAI import os def call_llm(messages): client = OpenAI(api_key=...) response = client.chat.completions.create( model="gpt-4o", messages=messages ) return response.choices[0].message.content

After - Asynchronous

from openai import AsyncOpenAI import os async def call_llm_async(messages): client = AsyncOpenAI(api_key=...) response = await client.chat.completions.create( model="gpt-4o", messages=messages ) return response.choices[0].message.content

Before - BatchNode

# Before class BatchResumeParserNode(BatchNode): def prep(self, shared): return shared.get("resumes", []) def exec(self, one_resume_text): ... response = call_llm(prompt) ... def post(self, shared, prep_res, exec_res_list): shared["all_parsed_resumes"] = exec_res_list

After - AsyncParallelBatchNode

# After class BatchResumeParserNode(AsyncParallelBatchNode): async def prep_async(self, shared): return shared.get("resumes", []) async def exec_async(self, one_resume_text): ... response = await call_llm_async(prompt) ... async def post_async(self, shared, prep_res, exec_res_list): shared["all_parsed_resumes"] = exec_res_list

BatchNode - Sequential

class BatchNode(Node): def _exec(self, items): # A simple, sequential for loop return [super(BatchNode, self)._exec(i) for i in (items or [])]

AsyncParallelBatchNode - Parallel

class AsyncParallelBatchNode(AsyncNode, BatchNode): async def _exec(self, items): # Creates tasks and runs them all concurrently return await asyncio.gather(*(super(AsyncParallelBatchNode, self)._exec(i) for i in items))

A loop becomes a gather. That's it.

Sequential Time

10 resumes × 10 seconds

Parallel Time

Time of Longest Task

1. Watch for API Rate Limits

2. Ensure Tasks are Independent

C+

Generic rambling output

Break big tasks into small tasks

%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B[BatchDrafter] --> C[Combiner]

List → Parallel Processing → Combine

class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): prompt = f"List section titles for {topic}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): shared["section_titles"] = exec_res.strip().split('\n')
class BatchDraftSections(BatchNode): def prep(self, shared): return shared["section_titles"] def exec(self, section_title): prompt = f"Write content for: {section_title}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): shared["section_drafts"] = exec_res
class CombineAndRefine(Node): def prep(self, shared): return shared["section_drafts"] def exec(self, section_drafts): combined = "\n\n".join(section_drafts) prompt = f"Combine and polish: {combined}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect the chain outline_node >> batch_draft_node >> combine_node # Create and run flow = Flow(start=outline_node) shared = {"topic": "AI Safety"} flow.run(shared)

%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B[BatchDrafter] --> C[Combiner]

# Initial State shared = {"topic": "AI Safety"}

%%{init: {'theme': 'dark'}}%% graph LR A["[Outliner]"] --> B[BatchDrafter] --> C[Combiner] style A fill:#4CAF50,stroke:#333

# After Outliner - now we have a LIST! shared = { "topic": "AI Safety", "section_titles": ["Introduction", "Key Risks", "Solutions"] }

%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B["[BatchDrafter]"] --> C[Combiner] style B fill:#4CAF50,stroke:#333

# After BatchDrafter - LIST of drafted sections! shared = { "topic": "AI Safety", "section_titles": ["Introduction", "Key Risks", "Solutions"], "section_drafts": [ "AI safety introduction content...", "Key risks in AI development...", "Proposed solutions and approaches..." ] }

%%{init: {'theme': 'dark'}}%% graph LR A[Outliner] --> B[BatchDrafter] --> C["[Combiner]"] style C fill:#4CAF50,stroke:#333

# Final result - combined and polished shared = { "topic": "AI Safety", "section_titles": ["Introduction", "Key Risks", "Solutions"], "section_drafts": [...], "final_article": "AI Safety: A Comprehensive Guide..." }

By breaking one complex request into 3 simple requests...

%%{init: {'theme': 'dark'}}%% graph LR A[Outline] --> B[Draft] --> C[Combine]

What if it could REWRITE its plan?

What if it could ADAPT on the fly?

What if it could actually THINK?

Magic? Consciousness? A Black Box?

%%{init: {'theme': 'dark'}}%% graph TD A[Decide Node] -->|action: 'search'| B[Search Node] A -->|action: 'answer'| C[Answer Node] B --> A

The Brain: DecideAction Node

class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") question = shared["question"] return question, context def exec(self, inputs): question, context = inputs prompt = f"""Given the context, should you 'search' for more info or 'answer' now? Context: {context} Query: {question} Actions: - search - answer Respond in YAML: ```yaml action: <your_choice> search_query: <if searching> ```""" response = call_llm(prompt) yaml_str = response.split("```yaml")[1].split("```")[0].strip() decision = yaml.safe_load(yaml_str) return decision def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_query"] = exec_res["search_query"] return exec_res["action"]

The Tool: SearchWeb Node

class SearchWeb(Node): def prep(self, shared): return shared["search_query"] def exec(self, search_query): # Search the web for information search_client = GoogleSearchAPI(api_key="YOUR_API_KEY") results = search_client.search({ "query": search_query, "num_results": 3, "language": "en" }) formatted_results = f"Results for: {search_query}\n" for result in results: formatted_results += f"- {result.title}: {result.snippet}\n" return formatted_results def post(self, shared, prep_res, exec_res): previous = shared.get("context", "") shared["context"] = previous + "\n\nSEARCH: " + \ prep_res + "\nRESULTS: " + exec_res return "decide" # Loop back to the brain

The Exit: DirectAnswer Node

class DirectAnswer(Node): def prep(self, shared): question = shared["question"] context = shared.get("context", "") return question, context def exec(self, inputs): question, context = inputs prompt = f"""Based on the following information, answer the question. Question: {question} Research: {context} Provide a comprehensive answer using the research results.""" return call_llm(prompt) def post(self, shared, prep_res, exec_res): shared["answer"] = exec_res return "done" # End the flow

Wiring the Graph

# Connect the nodes together decide_node - "search" >> search_node decide_node - "answer" >> answer_node search_node - "decide" >> decide_node # The loop flow = Flow(start=decide_node)

%%{init: {'theme': 'dark'}}%% graph TD A[DecideAction] -->|search| B[SearchWeb] A -->|answer| C[Answer] B --> A

# Initial State shared = { "question": "Who won the 2024 Physics Nobel Prize?" } # Start the flow flow.run(shared)

%%{init: {'theme': 'dark'}}%% graph TD A["[DecideAction]"] -->|search| B[SearchWeb] A -->|answer| C[Answer] B --> A style A fill:#4CAF50,stroke:#333 linkStyle 0 stroke:#FFD700,stroke-width:4px

# After Round 1: Decide shared = { "question": "Who won the 2024 Physics Nobel Prize?", "search_query": "2024 Physics Nobel Prize winner" } # LLM Output: 'search'

%%{init: {'theme': 'dark'}}%% graph TD A[DecideAction] -->|search| B["[SearchWeb]"] A -->|answer| C[Answer] B --> A style B fill:#4CAF50,stroke:#333

# After Round 2: Search shared = { "question": "Who won the 2024 Physics Nobel Prize?", "search_query": "2024 Physics Nobel Prize winner", "context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for foundational discoveries and inventions in machine learning..." } # Node returns: 'decide'

%%{init: {'theme': 'dark'}}%% graph TD A["[DecideAction]"] -->|search| B[SearchWeb] A -->|answer| C[Answer] B --> A style A fill:#4CAF50,stroke:#333 linkStyle 1 stroke:#FFD700,stroke-width:4px

# After Round 3: Decide (with context) shared = { "question": "Who won the 2024 Physics Nobel Prize?", "search_query": "2024 Physics Nobel Prize winner", "context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for foundational discoveries and inventions in machine learning..." } # LLM Output: 'answer' (no new data added)

%%{init: {'theme': 'dark'}}%% graph TD A[DecideAction] -->|search| B[SearchWeb] A -->|answer| C["[Answer]"] B --> A style C fill:#4CAF50,stroke:#333

# After Final Step: Answer shared = { "question": "Who won the 2024 Physics Nobel Prize?", "search_query": "2024 Physics Nobel Prize winner", "context": "SEARCH: 2024 Physics Nobel Prize winner\nRESULTS: The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton...", "answer": "The 2024 Nobel Prize in Physics was awarded to John Hopfield and Geoffrey Hinton for their foundational discoveries and inventions that enable machine learning with artificial neural networks." } # Flow ends.

✓ Chatbots

✓ Batch Processing

✓ Parallel Workflows

✓ Agents

...all from 100 lines.

The Node The Shared Store The Flow

Of course you do.

https://github.com/The-Pocket/PocketFlow

All built on the same 100-line foundation.

Let me show you something.

Just a more complex workflow. A bigger graph.

https://github.com/The-Pocket/PocketFlow-Tutorial-Codebase-Knowledge

%%{init: {'theme': 'dark'}}%% flowchart TD A[FetchRepo] --> B[IdentifyAbstractions] B --> C[AnalyzeRelationships] C --> D[OrderChapters] D --> E[Batch WriteChapters] E --> F[CombineTutorial]

More popular than the framework itself.

There's one more thing....

Read Entire Article