OpenAI Prompting Guide

5 days ago 1
#!/usr/bin/env python3 """ A self-contained **pure-Python 3.9+** utility for applying human-readable “pseudo-diff” patch files to a collection of text files. """ from __future__ import annotations import pathlib from dataclasses import dataclass, field from enum import Enum from typing import ( Callable, Dict, List, Optional, Tuple, Union, ) # --------------------------------------------------------------------------- # # Domain objects # --------------------------------------------------------------------------- # class ActionType(str, Enum): ADD = "add" DELETE = "delete" UPDATE = "update" @dataclass class FileChange: type: ActionType old_content: Optional[str] = None new_content: Optional[str] = None move_path: Optional[str] = None @dataclass class Commit: changes: Dict[str, FileChange] = field(default_factory=dict) # --------------------------------------------------------------------------- # # Exceptions # --------------------------------------------------------------------------- # class DiffError(ValueError): """Any problem detected while parsing or applying a patch.""" # --------------------------------------------------------------------------- # # Helper dataclasses used while parsing patches # --------------------------------------------------------------------------- # @dataclass class Chunk: orig_index: int = -1 del_lines: List[str] = field(default_factory=list) ins_lines: List[str] = field(default_factory=list) @dataclass class PatchAction: type: ActionType new_file: Optional[str] = None chunks: List[Chunk] = field(default_factory=list) move_path: Optional[str] = None @dataclass class Patch: actions: Dict[str, PatchAction] = field(default_factory=dict) # --------------------------------------------------------------------------- # # Patch text parser # --------------------------------------------------------------------------- # @dataclass class Parser: current_files: Dict[str, str] lines: List[str] index: int = 0 patch: Patch = field(default_factory=Patch) fuzz: int = 0 # ------------- low-level helpers -------------------------------------- # def _cur_line(self) -> str: if self.index >= len(self.lines): raise DiffError("Unexpected end of input while parsing patch") return self.lines[self.index] @staticmethod def _norm(line: str) -> str: """Strip CR so comparisons work for both LF and CRLF input.""" return line.rstrip("\r") # ------------- scanning convenience ----------------------------------- # def is_done(self, prefixes: Optional[Tuple[str, ...]] = None) -> bool: if self.index >= len(self.lines): return True if ( prefixes and len(prefixes) > 0 and self._norm(self._cur_line()).startswith(prefixes) ): return True return False def startswith(self, prefix: Union[str, Tuple[str, ...]]) -> bool: return self._norm(self._cur_line()).startswith(prefix) def read_str(self, prefix: str) -> str: """ Consume the current line if it starts with *prefix* and return the text **after** the prefix. Raises if prefix is empty. """ if prefix == "": raise ValueError("read_str() requires a non-empty prefix") if self._norm(self._cur_line()).startswith(prefix): text = self._cur_line()[len(prefix) :] self.index += 1 return text return "" def read_line(self) -> str: """Return the current raw line and advance.""" line = self._cur_line() self.index += 1 return line # ------------- public entry point -------------------------------------- # def parse(self) -> None: while not self.is_done(("*** End Patch",)): # ---------- UPDATE ---------- # path = self.read_str("*** Update File: ") if path: if path in self.patch.actions: raise DiffError(f"Duplicate update for file: {path}") move_to = self.read_str("*** Move to: ") if path not in self.current_files: raise DiffError(f"Update File Error - missing file: {path}") text = self.current_files[path] action = self._parse_update_file(text) action.move_path = move_to or None self.patch.actions[path] = action continue # ---------- DELETE ---------- # path = self.read_str("*** Delete File: ") if path: if path in self.patch.actions: raise DiffError(f"Duplicate delete for file: {path}") if path not in self.current_files: raise DiffError(f"Delete File Error - missing file: {path}") self.patch.actions[path] = PatchAction(type=ActionType.DELETE) continue # ---------- ADD ---------- # path = self.read_str("*** Add File: ") if path: if path in self.patch.actions: raise DiffError(f"Duplicate add for file: {path}") if path in self.current_files: raise DiffError(f"Add File Error - file already exists: {path}") self.patch.actions[path] = self._parse_add_file() continue raise DiffError(f"Unknown line while parsing: {self._cur_line()}") if not self.startswith("*** End Patch"): raise DiffError("Missing *** End Patch sentinel") self.index += 1 # consume sentinel # ------------- section parsers ---------------------------------------- # def _parse_update_file(self, text: str) -> PatchAction: action = PatchAction(type=ActionType.UPDATE) lines = text.split("\n") index = 0 while not self.is_done( ( "*** End Patch", "*** Update File:", "*** Delete File:", "*** Add File:", "*** End of File", ) ): def_str = self.read_str("@@ ") section_str = "" if not def_str and self._norm(self._cur_line()) == "@@": section_str = self.read_line() if not (def_str or section_str or index == 0): raise DiffError(f"Invalid line in update section:\n{self._cur_line()}") if def_str.strip(): found = False if def_str not in lines[:index]: for i, s in enumerate(lines[index:], index): if s == def_str: index = i + 1 found = True break if not found and def_str.strip() not in [ s.strip() for s in lines[:index] ]: for i, s in enumerate(lines[index:], index): if s.strip() == def_str.strip(): index = i + 1 self.fuzz += 1 found = True break next_ctx, chunks, end_idx, eof = peek_next_section(self.lines, self.index) new_index, fuzz = find_context(lines, next_ctx, index, eof) if new_index == -1: ctx_txt = "\n".join(next_ctx) raise DiffError( f"Invalid {'EOF ' if eof else ''}context at {index}:\n{ctx_txt}" ) self.fuzz += fuzz for ch in chunks: ch.orig_index += new_index action.chunks.append(ch) index = new_index + len(next_ctx) self.index = end_idx return action def _parse_add_file(self) -> PatchAction: lines: List[str] = [] while not self.is_done( ("*** End Patch", "*** Update File:", "*** Delete File:", "*** Add File:") ): s = self.read_line() if not s.startswith("+"): raise DiffError(f"Invalid Add File line (missing '+'): {s}") lines.append(s[1:]) # strip leading '+' return PatchAction(type=ActionType.ADD, new_file="\n".join(lines)) # --------------------------------------------------------------------------- # # Helper functions # --------------------------------------------------------------------------- # def find_context_core( lines: List[str], context: List[str], start: int ) -> Tuple[int, int]: if not context: return start, 0 for i in range(start, len(lines)): if lines[i : i + len(context)] == context: return i, 0 for i in range(start, len(lines)): if [s.rstrip() for s in lines[i : i + len(context)]] == [ s.rstrip() for s in context ]: return i, 1 for i in range(start, len(lines)): if [s.strip() for s in lines[i : i + len(context)]] == [ s.strip() for s in context ]: return i, 100 return -1, 0 def find_context( lines: List[str], context: List[str], start: int, eof: bool ) -> Tuple[int, int]: if eof: new_index, fuzz = find_context_core(lines, context, len(lines) - len(context)) if new_index != -1: return new_index, fuzz new_index, fuzz = find_context_core(lines, context, start) return new_index, fuzz + 10_000 return find_context_core(lines, context, start) def peek_next_section( lines: List[str], index: int ) -> Tuple[List[str], List[Chunk], int, bool]: old: List[str] = [] del_lines: List[str] = [] ins_lines: List[str] = [] chunks: List[Chunk] = [] mode = "keep" orig_index = index while index < len(lines): s = lines[index] if s.startswith( ( "@@", "*** End Patch", "*** Update File:", "*** Delete File:", "*** Add File:", "*** End of File", ) ): break if s == "***": break if s.startswith("***"): raise DiffError(f"Invalid Line: {s}") index += 1 last_mode = mode if s == "": s = " " if s[0] == "+": mode = "add" elif s[0] == "-": mode = "delete" elif s[0] == " ": mode = "keep" else: raise DiffError(f"Invalid Line: {s}") s = s[1:] if mode == "keep" and last_mode != mode: if ins_lines or del_lines: chunks.append( Chunk( orig_index=len(old) - len(del_lines), del_lines=del_lines, ins_lines=ins_lines, ) ) del_lines, ins_lines = [], [] if mode == "delete": del_lines.append(s) old.append(s) elif mode == "add": ins_lines.append(s) elif mode == "keep": old.append(s) if ins_lines or del_lines: chunks.append( Chunk( orig_index=len(old) - len(del_lines), del_lines=del_lines, ins_lines=ins_lines, ) ) if index < len(lines) and lines[index] == "*** End of File": index += 1 return old, chunks, index, True if index == orig_index: raise DiffError("Nothing in this section") return old, chunks, index, False # --------------------------------------------------------------------------- # # Patch → Commit and Commit application # --------------------------------------------------------------------------- # def _get_updated_file(text: str, action: PatchAction, path: str) -> str: if action.type is not ActionType.UPDATE: raise DiffError("_get_updated_file called with non-update action") orig_lines = text.split("\n") dest_lines: List[str] = [] orig_index = 0 for chunk in action.chunks: if chunk.orig_index > len(orig_lines): raise DiffError( f"{path}: chunk.orig_index {chunk.orig_index} exceeds file length" ) if orig_index > chunk.orig_index: raise DiffError( f"{path}: overlapping chunks at {orig_index} > {chunk.orig_index}" ) dest_lines.extend(orig_lines[orig_index : chunk.orig_index]) orig_index = chunk.orig_index dest_lines.extend(chunk.ins_lines) orig_index += len(chunk.del_lines) dest_lines.extend(orig_lines[orig_index:]) return "\n".join(dest_lines) def patch_to_commit(patch: Patch, orig: Dict[str, str]) -> Commit: commit = Commit() for path, action in patch.actions.items(): if action.type is ActionType.DELETE: commit.changes[path] = FileChange( type=ActionType.DELETE, old_content=orig[path] ) elif action.type is ActionType.ADD: if action.new_file is None: raise DiffError("ADD action without file content") commit.changes[path] = FileChange( type=ActionType.ADD, new_content=action.new_file ) elif action.type is ActionType.UPDATE: new_content = _get_updated_file(orig[path], action, path) commit.changes[path] = FileChange( type=ActionType.UPDATE, old_content=orig[path], new_content=new_content, move_path=action.move_path, ) return commit # --------------------------------------------------------------------------- # # User-facing helpers # --------------------------------------------------------------------------- # def text_to_patch(text: str, orig: Dict[str, str]) -> Tuple[Patch, int]: lines = text.splitlines() # preserves blank lines, no strip() if ( len(lines) < 2 or not Parser._norm(lines[0]).startswith("*** Begin Patch") or Parser._norm(lines[-1]) != "*** End Patch" ): raise DiffError("Invalid patch text - missing sentinels") parser = Parser(current_files=orig, lines=lines, index=1) parser.parse() return parser.patch, parser.fuzz def identify_files_needed(text: str) -> List[str]: lines = text.splitlines() return [ line[len("*** Update File: ") :] for line in lines if line.startswith("*** Update File: ") ] + [ line[len("*** Delete File: ") :] for line in lines if line.startswith("*** Delete File: ") ] def identify_files_added(text: str) -> List[str]: lines = text.splitlines() return [ line[len("*** Add File: ") :] for line in lines if line.startswith("*** Add File: ") ] # --------------------------------------------------------------------------- # # File-system helpers # --------------------------------------------------------------------------- # def load_files(paths: List[str], open_fn: Callable[[str], str]) -> Dict[str, str]: return {path: open_fn(path) for path in paths} def apply_commit( commit: Commit, write_fn: Callable[[str, str], None], remove_fn: Callable[[str], None], ) -> None: for path, change in commit.changes.items(): if change.type is ActionType.DELETE: remove_fn(path) elif change.type is ActionType.ADD: if change.new_content is None: raise DiffError(f"ADD change for {path} has no content") write_fn(path, change.new_content) elif change.type is ActionType.UPDATE: if change.new_content is None: raise DiffError(f"UPDATE change for {path} has no new content") target = change.move_path or path write_fn(target, change.new_content) if change.move_path: remove_fn(path) def process_patch( text: str, open_fn: Callable[[str], str], write_fn: Callable[[str, str], None], remove_fn: Callable[[str], None], ) -> str: if not text.startswith("*** Begin Patch"): raise DiffError("Patch text must start with *** Begin Patch") paths = identify_files_needed(text) orig = load_files(paths, open_fn) patch, _fuzz = text_to_patch(text, orig) commit = patch_to_commit(patch, orig) apply_commit(commit, write_fn, remove_fn) return "Done!" # --------------------------------------------------------------------------- # # Default FS helpers # --------------------------------------------------------------------------- # def open_file(path: str) -> str: with open(path, "rt", encoding="utf-8") as fh: return fh.read() def write_file(path: str, content: str) -> None: target = pathlib.Path(path) target.parent.mkdir(parents=True, exist_ok=True) with target.open("wt", encoding="utf-8") as fh: fh.write(content) def remove_file(path: str) -> None: pathlib.Path(path).unlink(missing_ok=True) # --------------------------------------------------------------------------- # # CLI entry-point # --------------------------------------------------------------------------- # def main() -> None: import sys patch_text = sys.stdin.read() if not patch_text: print("Please pass patch text through stdin", file=sys.stderr) return try: result = process_patch(patch_text, open_file, write_file, remove_file) except DiffError as exc: print(exc, file=sys.stderr) return print(result) if __name__ == "__main__": main()
Read Entire Article