6 שלב 2 — Skill-Building

Building Production Pipelines — בניית צינורות עבודה לפרודקשן

איך לבנות צינורות עבודה אוטומטיים מרובי שלבים עם Claude: עיצוב pipeline, דפוס ה-Orchestrator, תקשורת בין שלבים, טיפול בשגיאות, checkpoints, גרסאות, לוגים, ביצוע מקבילי, ו-scaling. אחרי הפרק הזה, תוכלו לבנות pipeline שלוקח קלט גולמי ומייצר תוצר מוגמר — אוטומטית, בצורה אמינה, ועם יכולת שחזור.

מה יהיה לך בסוף הפרק הזה
מה תוכלו לעשות אחרי הפרק הזה
דרישות קדם
הפרויקט שלך

בפרק 5 בניתם צוותי סוכנים — Agent Teams שעובדים במקביל על משימות מורכבות, עם Lead Agent שמתאם ומסנתז תוצאות. עכשיו אנחנו עוברים מ-"עבודה במקביל" ל-"עבודה ברצף" — Pipelines שבהם הפלט של שלב אחד הופך לקלט של השלב הבא, כמו פס ייצור במפעל. בפרק 7 ניקח את ה-Pipelines האלה ונריץ אותם בתוך Docker containers ושרתים מרוחקים — הצעד האחרון לפני שזה באמת production.

מילון מונחים — מושגים חדשים בפרק
מונח (English) תרגום הסבר
Pipeline צינור עבודה רצף אוטומטי של שלבים שבו הפלט של שלב אחד הופך לקלט של השלב הבא — כמו פס ייצור
Orchestrator מנצח / מתזמר הקוד שמנהל את ה-pipeline: מריץ שלבים בסדר, מעביר נתונים, מטפל בשגיאות. זה לא Claude — זה הקוד שלכם
Stage שלב יחידת עבודה בודדת ב-pipeline — מקבלת input מוגדר, מריצה סוכן, ומייצרת output מוגדר
Checkpoint נקודת שמירה שמירת מצב ה-pipeline לדיסק אחרי כל שלב מוצלח — מאפשרת המשך מנקודת הכשל במקום התחלה מחדש
Exponential Backoff השהיה מעריכית אסטרטגיית retry שבה זמן ההמתנה גדל בין ניסיונות: 1 שנייה, 2, 4, 8... מונע הצפה של ה-API
Data Flow זרימת נתונים הדרך שבה מידע עובר בין שלבי ה-pipeline — בזיכרון, דרך קבצים, או שילוב של שניהם
Rollback חזרה לגרסה קודמת שחזור ה-pipeline לגרסה קודמת שעבדה כשגרסה חדשה מייצרת תוצאות גרועות יותר
Backpressure לחץ לאחור מנגנון שמאט את קצב הקלט כשהמערכת לא מספיקה לעבד — מונע קריסה מעומס
Queue תור מבנה נתונים שמחכה — items נכנסים מצד אחד ויוצאים מצד שני. Redis, RabbitMQ, או SQS לדוגמה
Structured Logging לוגים מובנים כתיבת לוגים בפורמט JSON עם שדות קבועים (זמן, שלב, עלות, תוצאה) — מאפשר חיפוש וניתוח אוטומטי
מתחיל 10 דקות מושג חינם

מה זה Pipeline בכלל?

בואו נתחיל מהמושג הבסיסי: Pipeline (צינור עבודה) הוא רצף אוטומטי של שלבים שבו הפלט של שלב אחד הופך לקלט של השלב הבא. זה כמו פס ייצור במפעל — כל תחנה עושה את שלה, מעבירה למוסעה הבאה, ובסוף יוצא מוצר מוגמר.

למה זה חשוב? כי עד עכשיו בנינו סוכנים בודדים (פרקים 3-4) וצוותי סוכנים (פרק 5). סוכן בודד מצוין למשימה ממוקדת. צוות מצוין למשימות שאפשר לפצל ולהריץ במקביל. אבל הרבה תהליכים אמיתיים הם סדרתיים מטבעם — אי אפשר לערוך טקסט שעדיין לא נכתב, אי אפשר לבדוק קוד שעדיין לא נוצר. לזה צריך pipeline.

ההבדל הקריטי בין pipeline לבין "שלוש פקודות שרצות בזו אחר זו": pipeline מוגדר היטב — כל שלב יודע מה הוא מקבל, מה הוא מייצר, ומה קורה כשהוא נכשל. יש לו error handling, logging, checkpoints. הוא מתוכנן להיות אמין ולרוץ שוב ושוב — לא סקריפט חד-פעמי שעובד "אם הכל בסדר".

מסגרת החלטה: Pipeline מול Team מול Script
מבנה מה זה מתי להשתמש דוגמה
Script (פרק 1) קריאה אחת ל-Claude, תוצאה אחת משימה פשוטה שלא צריכה שלבים "תרגם את הקובץ הזה"
Pipeline (הפרק הזה) שרשרת קריאות — כל אחת בונה על הקודמת תהליך סדרתי עם תלויות בין שלבים "תחקור → תכתוב outline → תכתוב מאמר → תערוך → תפרסם"
Team (פרק 5) כמה סוכנים במקביל + Lead שמסנתז תת-משימות עצמאיות שאפשר לפצל "שלושה חוקרים בודקים באג מזוויות שונות"

הכלל: אם שלב B תלוי בפלט של שלב A — זה pipeline. אם שלבים יכולים לרוץ בלי תלות — זה team. אם יש שלב אחד — זה script.

דוגמאות ל-pipelines מהעולם האמיתי:

נתון מייצג

Pipeline מרובה שלבים מייצר תוצאות טובות משמעותית מקריאה אחת ל-Claude, גם כשמשקיעים הרבה בפרומפט — מכיוון שכל שלב ממוקד במשימה אחת, בהקשר נקי, עם הנחיות מדויקות. pipeline בן 7 שלבים לכתיבת תוכן מייצר פרקים של 5,000-10,000 מילים ב-15-25 דקות, בעלות של $3-8 לפרק (לפי אומדן עבור Opus 4.6 נכון למרץ 2026). התוצאה עולה בהרבה על single-prompt generation כי ה-pipeline תופס שגיאות שקריאה יחידה מפספסת.

Claude Code ו-Pipelines — הקשר ישיר

Claude Code עצמו מספק כלים מובנים שרלוונטיים ישירות ל-pipelines. /batch (פברואר 2026) מפרק עבודה ל-30 יחידות, כל אחת ב-git worktree מבודד עם סוכן עצמאי שפותח PR. Hooks (מגרסה 2.0, ספטמבר 2025) מאפשרים להפעיל סקריפטים אוטומטית לפני/אחרי כל tool call — quality gate מושלם בין שלבי pipeline. Skills (SKILL.md) מאפשרים לארוז שלב pipeline כ-/slash-command עם frontmatter שמגדיר כלים, מודל, ו-effort level. וכמובן, Agent SDK (Python v0.1.48, TypeScript v0.2.71 נכון למרץ 2026) הוא הבסיס לבניית pipeline stages פרוגרמטיים — אותם כלים וסוכנים כמו ב-Claude Code, ארוזים כספרייה.

✍ עשו עכשיו 3 דקות

חשבו על תהליך עבודה שאתם עושים שוב ושוב — כתיבת דוח, יצירת מצגת, בדיקת קוד, או כל דבר אחר. רשמו אותו כרצף שלבים: "קודם אני עושה X, אחר כך Y, אחר כך Z." ספרו כמה שלבים יצאו. אם 3-10 שלבים — מצוין, יש לכם מועמד מושלם ל-pipeline.

✍ עשו עכשיו 2 דקות

פתחו Claude Code והקלידו /batch. קראו את התיאור שמופיע. חשבו: האם התהליך שזיהיתם למעלה מתאים ל-/batch (משימות עצמאיות שאפשר לפצל) או ל-pipeline מותאם אישית (שלבים סדרתיים עם תלויות)? רשמו את התשובה — היא תנחה את הבחירה שלכם בהמשך הפרק.

בינוני 20 דקות אסטרטגיה חינם

עיצוב Pipelines מרובי שלבים

הכלל הראשון והחשוב ביותר בעיצוב pipeline: תתחילו מהסוף. מה התוצר הסופי? מאמר מפורסם? קוד שעבר code review? דוח מנותח? הגדירו את הפלט הסופי בצורה מדויקת — ואז עבדו אחורה.

נניח שהתוצר הסופי הוא "מאמר בלוג מפורסם." עובדים אחורה: מה צריך בשביל לפרסם? מאמר ערוך, אופטימיזציית SEO, תמונות. מה צריך בשביל מאמר ערוך? טיוטה שעברה עריכה. מה צריך בשביל טיוטה? outline מפורט + מחקר. מה צריך בשביל outline? brief עם נושא, קהל יעד, וטון. ככה מגלים את השלבים — מהסוף להתחלה.

Stage Specification Template — תבנית מפרט שלב

לכל שלב ב-pipeline, כתבו מפרט שכולל שישה דברים:

שדה מה לכתוב דוגמה
Name שם השלב Research Stage
Input מה השלב מקבל מהשלב הקודם או מהטריגר Content brief (JSON) with topic, audience, tone
Agent Config מודל, כלים, max_turns, הנחיות ספציפיות Sonnet 4.6, WebSearch, maxTurns: 15, "Find 5 sources"
Output מה השלב מייצר בדיוק research.json with sources[], statistics[], key_findings[]
Success Criteria איך יודעים שהשלב הצליח At least 3 sources found, output JSON is valid
Failure Action מה קורה כשנכשל — retry, skip, abort, escalate Retry with different search terms (max 2 retries), then abort
כמה שלבים זה "בדיוק נכון"?

5-8 שלבים זה ה-sweet spot לרוב ה-pipelines. פחות מ-3 שלבים — כנראה שמספיק script פשוט. יותר מ-10 שלבים — כדאי לפצל ל-sub-pipelines. כל שלב שלא מוסיף ערך ברור (ולידציה, שיפור, פורמט) הוא תקורה מיותרת שעולה זמן ועלות.

✍ עשו עכשיו 5 דקות

קחו את תהליך העבודה שזיהיתם קודם. לכל שלב, מלאו את ה-Stage Specification Template — שם, input, output, success criteria, failure action. אל תדלגו על failure action — זה מה שהופך pipeline לאמין.

טעות נפוצה: לעצב pipeline בלי להגדיר success criteria

מפתחים רבים מגדירים שלבים עם input ו-output, אבל שוכחים לשאול "איך יודעים שזה עבד?" בלי success criteria, ה-pipeline ממשיך עם פלט גרוע ושגיאות מצטברות. שלב Research שמצא 0 מקורות? ה-pipeline ממשיך לכתוב מאמר ללא מקורות. הפתרון: לכל שלב, הגדירו בדיקה אוטומטית — JSON תקין? מספיק תוצאות? אורך טקסט סביר? בדקו לפני שמעבירים לשלב הבא.

טעות נפוצה: לבנות pipeline מאפס כשיש כלים מובנים

מפתחים רבים מתחילים לכתוב orchestrator מלא מאפס, עם queue, retry logic, ו-logging — כשלפעמים מספיק להשתמש ב-/batch של Claude Code שכבר עושה את רוב העבודה (פירוק למשימות, worktrees מבודדים, PR לכל יחידה). לפני שמתחילים לבנות, שאלו: "האם /batch או /loop עם skills מספיקים למקרה שלי?" אם כן — חסכתם שבועות של פיתוח. בנו custom pipeline רק כשצריך שליטה מלאה על flow, error handling, או שילוב עם מערכות חיצוניות.

עקרון הממשק הנקי — Clean Interface Principle

כל שלב ב-pipeline צריך להתנהג כמו קופסה שחורה: input מוגדר → עיבוד → output מוגדר. השלב הבא לא צריך לדעת איך השלב הקודם עבד — רק מה הוא ייצר. זה מאפשר:

בפועל, "ממשק נקי" אומר שאתם מגדירים חוזה (contract) לכל שלב: "השלב הזה מקבל JSON עם שדות X, Y, Z ומחזיר JSON עם שדות A, B, C." אם שלב אחד מפר את החוזה — ולידציה תופסת את זה לפני שהשלב הבא רואה פלט פגום. זה הבסיס של pipeline אמין.

✍ עשו עכשיו 3 דקות

לכל שלב ב-pipeline שלכם, כתבו משפט אחד שמתאר את ה-"חוזה": "מקבל: ___. מחזיר: ___." וודאו שה-output של שלב N תואם בדיוק ל-input של שלב N+1. אם יש פער — מצאתם באג בעיצוב.

בינוני 25 דקות הקמה חינם

דפוס ה-Orchestrator — מנצח התזמורת

ה-Orchestrator הוא הקוד שלכם שמנהל את ה-pipeline. זה לא Claude. זה לא סוכן. זה Python script או TypeScript module שיודע: איזה שלבים לרוץ, באיזה סדר, איך להעביר נתונים ביניהם, ומה לעשות כשמשהו נכשל.

למה ה-orchestrator חייב להיות קוד דטרמיניסטי ולא סוכן AI? כי:

Separation of Concerns (הפרדת אחריות) — שלוש שכבות:

שכבה אחריות דוגמה
Orchestrator Flow control, error handling, logging "הרץ שלב 1, בדוק הצלחה, העבר output לשלב 2"
Stages Agent config, prompt engineering, output parsing "הגדר סוכן Sonnet עם WebSearch, תן לו brief, פרסר JSON"
Agents Tool use, reasoning, execution "Claude מחפש באינטרנט, קורא קבצים, מייצר תוכן"

בואו נראה את זה בפרקטיקה. הנה stage wrapper שמראה את ההפרדה — ה-stage מגדיר את הסוכן, ה-orchestrator מנהל את ה-flow:

# stage definition — each stage is a self-contained unit
def create_research_stage():
    """Define the Research stage with its agent configuration"""
    async def execute(input_data: dict) -> dict:
        from claude_agent_sdk import Agent

        agent = Agent(
            model="claude-sonnet-4-6",
            tools=["WebSearch", "WebFetch", "Write"],
            max_turns=15,
            instructions="You are a research agent. Find 5+ sources on the given topic."
        )

        prompt = f"Research this topic: {input_data['topic']}\n"
        prompt += f"Focus areas: {', '.join(input_data.get('focus_areas', []))}"

        result = await agent.run(prompt)

        return {
            "output": {
                "sources": result.structured_output.get("sources", []),
                "statistics": result.structured_output.get("statistics", []),
                "key_findings": result.structured_output.get("key_findings", [])
            },
            "cost": result.cost_usd,
            "tokens": result.total_tokens
        }

    return {
        "name": "research",
        "execute": execute,
        "on_failure": "retry",
        "max_retries": 2
    }

שימו לב: ה-stage לא יודע שום דבר על שלבים אחרים. הוא לא יודע אם הוא שלב 2 או שלב 5. הוא מקבל input, מריץ סוכן, ומחזיר output. ה-orchestrator הוא זה שמחבר את הכל.

הנה מבנה orchestrator בסיסי ב-Python:

# pipeline_orchestrator.py
import json
import time
import logging
from pathlib import Path
from datetime import datetime

class PipelineOrchestrator:
    def __init__(self, name: str, stages: list, output_dir: str = "./output"):
        self.name = name
        self.stages = stages
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.logger = self._setup_logging()

    def _setup_logging(self):
        logger = logging.getLogger(self.name)
        logger.setLevel(logging.INFO)
        handler = logging.FileHandler(
            self.output_dir / f"run_{self.run_id}.log"
        )
        handler.setFormatter(logging.Formatter(
            '%(asctime)s | %(levelname)s | %(message)s'
        ))
        logger.addHandler(handler)
        return logger

    def run(self, initial_input: dict) -> dict:
        data = initial_input
        total_cost = 0
        start_time = time.time()

        self.logger.info(f"Pipeline '{self.name}' started | Run: {self.run_id}")

        for i, stage in enumerate(self.stages):
            stage_name = stage["name"]
            self.logger.info(f"Stage {i+1}/{len(self.stages)}: {stage_name} | START")

            try:
                result = stage["execute"](data)
                data = result["output"]
                total_cost += result.get("cost", 0)

                # Checkpoint
                self._save_checkpoint(i, stage_name, data)
                self.logger.info(
                    f"Stage {stage_name} | DONE | "
                    f"cost=${result.get('cost', 0):.3f} | "
                    f"tokens={result.get('tokens', 0)}"
                )

            except Exception as e:
                self.logger.error(f"Stage {stage_name} | FAILED | {e}")
                action = stage.get("on_failure", "abort")
                if action == "retry":
                    # ... retry logic
                    pass
                elif action == "skip":
                    self.logger.warning(f"Skipping {stage_name}")
                    continue
                else:
                    raise

        elapsed = time.time() - start_time
        self.logger.info(
            f"Pipeline COMPLETE | {elapsed:.1f}s | ${total_cost:.3f}"
        )
        return data

    def _save_checkpoint(self, stage_idx, stage_name, data):
        checkpoint = {
            "run_id": self.run_id,
            "stage_idx": stage_idx,
            "stage_name": stage_name,
            "data": data,
            "timestamp": datetime.now().isoformat()
        }
        path = self.output_dir / f"checkpoint_{stage_idx}_{stage_name}.json"
        path.write_text(json.dumps(checkpoint, ensure_ascii=False, indent=2))
✍ עשו עכשיו 3 דקות

קראו את הקוד למעלה בעיון. זהו את שלושת האלמנטים המרכזיים: (1) ה-loop שרץ על שלבים, (2) ה-checkpoint שנשמר אחרי כל שלב, (3) ה-error handling עם הבחירה בין retry/skip/abort. אלו שלוש היכולות שמבדילות orchestrator מ-script פשוט.

Orchestrator ב-TypeScript

אם אתם עובדים ב-TypeScript (בהמשך לפרק 4), הנה אותו רעיון:

// pipeline-orchestrator.ts
import { writeFile, mkdir } from 'fs/promises';
import { join } from 'path';

interface StageResult {
  output: Record<string, any>;
  cost?: number;
  tokens?: number;
}

interface StageConfig {
  name: string;
  execute: (input: Record<string, any>) => Promise<StageResult>;
  onFailure?: 'retry' | 'skip' | 'abort';
  maxRetries?: number;
}

class PipelineOrchestrator {
  private runId: string;

  constructor(
    private name: string,
    private stages: StageConfig[],
    private outputDir: string = './output'
  ) {
    this.runId = new Date().toISOString().replace(/[:.]/g, '-');
  }

  async run(initialInput: Record<string, any>): Promise<Record<string, any>> {
    await mkdir(this.outputDir, { recursive: true });
    let data = initialInput;
    let totalCost = 0;
    const startTime = Date.now();

    console.log(`Pipeline '${this.name}' started | Run: ${this.runId}`);

    for (let i = 0; i < this.stages.length; i++) {
      const stage = this.stages[i];
      console.log(`Stage ${i + 1}/${this.stages.length}: ${stage.name} | START`);

      try {
        const result = await stage.execute(data);
        data = result.output;
        totalCost += result.cost ?? 0;

        await this.saveCheckpoint(i, stage.name, data);
        console.log(`Stage ${stage.name} | DONE | cost=$${(result.cost ?? 0).toFixed(3)}`);

      } catch (error) {
        console.error(`Stage ${stage.name} | FAILED |`, error);
        if (stage.onFailure === 'skip') continue;
        if (stage.onFailure === 'retry') { /* retry logic */ }
        throw error;  // default: abort
      }
    }

    const elapsed = ((Date.now() - startTime) / 1000).toFixed(1);
    console.log(`Pipeline COMPLETE | ${elapsed}s | $${totalCost.toFixed(3)}`);
    return data;
  }

  private async saveCheckpoint(idx: number, name: string, data: any) {
    const checkpoint = {
      runId: this.runId,
      stageIdx: idx,
      stageName: name,
      data,
      timestamp: new Date().toISOString()
    };
    const path = join(this.outputDir, `checkpoint_${idx}_${name}.json`);
    await writeFile(path, JSON.stringify(checkpoint, null, 2));
  }
}

Hooks כ-Quality Gates בתוך Pipeline

מערכת Hooks של Claude Code (מגרסה 2.0) מאפשרת לשלב quality gates אוטומטיים בתוך pipeline. Hooks הם סקריפטים שרצים לפני (PreToolUse) או אחרי (PostToolUse) כל tool call — ואפשר להשתמש בהם כדי לבדוק שהפלט של שלב עומד בתנאים לפני שעוברים לשלב הבא. למשל:

מגרסה 2.1.63 (פברואר 2026) נוספו HTTP Hooks — במקום סקריפט מקומי, ה-hook שולח POST JSON ל-URL ומקבל JSON חזרה. זה מאפשר integration עם מערכות חיצוניות: סיום שלב pipeline → hook שולח notification ל-Slack → pipeline ממשיך. זה לא מחליף את ה-orchestrator, אבל מוסיף שכבת בקרה שרצה ברמת ה-tool call — יותר granular מרמת ה-stage.

✍ עשו עכשיו 3 דקות

חשבו על ה-pipeline שלכם: באיזה שלב quality gate הכי קריטי? כתבו hook פשוט (בדמיון או בקוד) שבודק את הפלט של אותו שלב. מה הבדיקה? מה קורה אם היא נכשלת?

טעות נפוצה: לתת ל-Claude להיות ה-orchestrator

מפתחים שרגילים לפרומפטים ארוכים נוטים לכתוב: "תריץ את ה-pipeline הזה: קודם תחקור, אחר כך תכתוב outline, אחר כך..." — ולתת ל-Claude להחליט מתי כל שלב הסתיים ומה להעביר הלאה. הבעיה: Claude עלול לדלג על שלבים, לשנות את הסדר, או "להחליט" שהמחקר מספק אחרי מקור אחד. הפתרון: ה-orchestrator צריך להיות קוד דטרמיניסטי. Claude מריץ כל שלב, אבל הקוד שלכם מחליט מתי להתקדם.

Skills כשלבי Pipeline

דרך אלגנטית לארגן pipeline ב-Claude Code: כל שלב הוא Skill (קובץ .md עם frontmatter). ה-Skill מגדיר כלים, מודל, effort level, ו-agent — וה-orchestrator מפעיל אותם ברצף. היתרון: שלבים ניתנים לשימוש חוזר, כל שלב הוא /slash-command שאפשר גם להפעיל בנפרד, ו-hot reload תומך בעדכון שלבים בלי לעצור את ה-pipeline.

# .claude/agents/pipeline-research.md (skill as pipeline stage)
---
name: pipeline-research
description: Research stage for content pipeline
model: claude-sonnet-4-6
tools:
  - WebSearch
  - WebFetch
  - Write
---

# Research Stage

You are the Research Agent in a content pipeline.

## Input
Read the brief from `output/brief.json`.

## Task
1. Search for 5+ sources on the topic
2. Extract statistics, quotes, and examples
3. Focus on Israeli market when relevant

## Output
Write results to `output/research.json` with:
- sources[]: URL, title, key quotes
- statistics[]: claim, value, source
- key_findings[]: insight, relevance

ה-orchestrator קורא ל-skills ברצף באמצעות CLI mode: claude -p "/pipeline-research" --bare. הדגל --bare (מגרסה 2.1.81) מדלג על hooks, LSP, ו-plugin sync — מה שמאיץ את ההפעלה ב-pipeline אוטומטי. כל skill כותב output לקובץ מוסכם, ו-skill הבא קורא אותו. file-based data flow, פשוט ויעיל.

🛠 תרגיל: בניית Orchestrator בסיסי

זמן: 25 דקות | תוצר: Pipeline orchestrator עובד עם 3 שלבים

  1. העתיקו את קוד ה-Orchestrator (Python או TypeScript) לפרויקט חדש
  2. צרו 3 stages פשוטים:
    • Stage 1 — Brief: מקבל נושא, מייצר brief עם 3 נקודות מפתח (השתמשו ב-Agent SDK)
    • Stage 2 — Draft: מקבל brief, מייצר טיוטת פסקה (300-500 מילים)
    • Stage 3 — Review: מקבל טיוטה, מייצר ביקורת עם 3 שיפורים מוצעים
  3. הריצו את ה-pipeline עם נושא לבחירתכם
  4. וודאו שיש לוג מלא ו-checkpoint לכל שלב בתיקיית output
  5. נסו להריץ שוב ובדקו שהתוצאות שונות (non-determinism) אבל המבנה זהה

תוצאה צפויה: תיקיית output עם 3 checkpoint files, קובץ log מלא, וטיוטת תוכן שעברה סבב review.

בינוני 15 דקות מושג חינם

תקשורת בין שלבים — Data Flow

שאלת מפתח: איך שלב A מעביר את התוצר שלו לשלב B? יש שלוש גישות, וכל אחת מתאימה למצב אחר.

גישה 1: File-Based — דרך קבצים

שלב A כותב output לקובץ (stage1_output.json), שלב B קורא אותו. זו הגישה הכי פשוטה ו-debuggable — אפשר לפתוח את הקובץ ולראות בדיוק מה שלב A ייצר. החיסרון: איטי יותר לנתונים גדולים בגלל I/O.

גישה 2: In-Memory — בזיכרון

שלב A מחזיר dict/object, ה-orchestrator מעביר אותו ישירות לשלב B. זו הגישה הכי מהירה — אין I/O, הכל בזיכרון. החיסרון: אם ה-pipeline קורס באמצע, הכל אבוד.

גישה 3: Hybrid — שילוב (מומלץ)

נתונים עוברים בין שלבים בזיכרון (מהיר), וגם נשמרים לדיסק כ-checkpoint אחרי כל שלב מוצלח (בטוח). זו הגישה שהשתמשנו בה ב-orchestrator למעלה — ה-data עובר ב-data = result["output"] (זיכרון) וגם נשמר ב-_save_checkpoint() (דיסק).

מסגרת החלטה: איזו גישת Data Flow לבחור?
אם... אז... הסיבה
דיבוג חשוב, הנתונים קטנים File-Based אפשר לראות כל שלב בנפרד
ביצועים קריטיים, ה-pipeline קצר In-Memory אין תקורת I/O
pipeline ארוך, קריסה עולה ביוקר Hybrid (מומלץ) מהירות + בטיחות + resume

The Checkpoint Pattern — שמירת מצב

Checkpoint (נקודת שמירה) הוא הרעיון הכי חשוב ב-pipeline engineering. אחרי כל שלב מוצלח, שמרו את מצב ה-pipeline לדיסק. אם ה-pipeline קורס בשלב 6 מתוך 7, אתם יכולים להמשיך משלב 6 במקום להתחיל מחדש.

בלי checkpoints, pipeline בן 7 שלבים שקורס בשלב 6 — 100% מהעלות בזבוז. עם checkpoints — רק עלות שלב 6 בזבוז, ואתם ממשיכים ממקום שעצרתם.

# resume from checkpoint
def resume_pipeline(self, from_stage: int):
    """Load last checkpoint and resume from given stage"""
    checkpoint_path = self.output_dir / f"checkpoint_{from_stage - 1}_*.json"
    checkpoints = list(self.output_dir.glob(f"checkpoint_{from_stage - 1}_*.json"))

    if not checkpoints:
        raise ValueError(f"No checkpoint found for stage {from_stage - 1}")

    data = json.loads(checkpoints[0].read_text())["data"]

    # Run remaining stages
    for i, stage in enumerate(self.stages[from_stage:], start=from_stage):
        result = stage["execute"](data)
        data = result["output"]
        self._save_checkpoint(i, stage["name"], data)

    return data
✍ עשו עכשיו 2 דקות

חשבו על ה-pipeline שעיצבתם קודם. כמה יעלה כל שלב בערך (בטוקנים ובכסף)? אם השלב האחרון נכשל, כמה עלות הולכת לפח בלי checkpoint? כתבו את המספרים — זה יעזור לכם להבין למה checkpoints הם חובה.

Data Validation — ולידציה בין שלבים

בכל נקודת חיבור בין שלבים, בדקו שהפלט תקין לפני שמעבירים אותו הלאה. זה כמו quality control בפס ייצור — מוצר פגום לא עובר לתחנה הבאה.

# data validation between stages
def validate_stage_output(stage_name: str, output: dict, schema: dict) -> bool:
    """Validate output matches expected schema before passing to next stage"""
    required_fields = schema.get("required", [])
    for field in required_fields:
        if field not in output:
            raise ValueError(
                f"Stage '{stage_name}' output missing required field: '{field}'"
            )
    # Check minimum content length
    if "min_length" in schema:
        content = output.get("content", "")
        if len(content) < schema["min_length"]:
            raise ValueError(
                f"Stage '{stage_name}' output too short: "
                f"{len(content)} < {schema['min_length']}"
            )
    return True
טעות נפוצה: לא לבדוק output בין שלבים

שלב Research מחזיר JSON ריק בגלל שגיאת חיפוש. שלב Draft מקבל את ה-JSON הריק ומייצר מאמר "על סמך המחקר" — שהוא בפועל המצאה מלאה. שלב Edit מנסה לשפר מאמר שמבוסס על אוויר. בסוף מקבלים תוצר שנראה מושלם — אבל הכל בדוי. הפתרון: ולידציה בין כל שני שלבים. JSON תקין? מספיק שדות? תוכן לא ריק? בדקו לפני שממשיכים.

מתקדם 20 דקות אסטרטגיה חינם

טיפול בשגיאות ושחזור

ב-pipeline אמיתי, שגיאות יקרו. לא אם, אלא מתי. API timeout, rate limit, פלט לא צפוי מ-Claude, שגיאת parsing. השאלה היא לא "איך למנוע שגיאות" אלא "מה לעשות כשהן קורות."

The Recovery Ladder — סולם ההתאוששות

כשמשהו נכשל, עולים בסולם עד שמוצאים פתרון:

  1. Retry Same — הריצו שוב את אותה קריאה. פותר שגיאות חולפות (timeout, rate limit). השתמשו ב-exponential backoff: המתנה של 1 שנייה, 2, 4, 8.
  2. Retry Different — שנו משהו בקריאה: פרומפט אחר, מודל אחר, פחות טוקנים. אם הגישה נכשלה, אולי צריך גישה אחרת.
  3. Skip Stage — דלגו על השלב אם הוא לא קריטי. שלב "Polish" שנכשל? ה-pipeline עדיין מייצר תוצר שמיש.
  4. Pause for Human — עצרו את ה-pipeline ושלחו התראה (Slack, email). בן אדם מחליט מה לעשות.
  5. Abort — עצרו הכל. שמרו checkpoint, כתבו לוג, צאו. לשלבים שנכשלים בהם זה מסוכן להמשיך (ולידציה, אבטחה).

Error Matrix — טבלת טיפול בשגיאות

לכל שלב, הגדירו מראש מה קורה בכל סוג שגיאה:

שלב Timeout/Rate Limit פלט לא תקין תוכן ריק שגיאה לא צפויה
Research Retry x3 (backoff) Retry Different (שנה search terms) Abort (אין מה להמשיך בלי מחקר) Abort + Log
Draft Retry x3 (backoff) Retry Different (שנה prompt) Retry Different Abort + Log
Edit Retry x3 (backoff) Skip (הטיוטה עדיין שמישה) Skip Skip + Log
SEO Retry x2 Skip Skip Skip + Log
QA Retry x3 Pause for Human Abort Abort + Log
✍ עשו עכשיו 4 דקות

צרו Error Matrix ל-pipeline שלכם. לכל שלב, כתבו מה לעשות ב-4 סוגי שגיאות: timeout, פלט לא תקין, תוכן ריק, שגיאה לא צפויה. הכלל: שלבים קריטיים → abort. שלבים אופציונליים → skip. שגיאות חולפות → retry.

Retry עם Exponential Backoff

# retry with exponential backoff
import asyncio
import random

async def retry_with_backoff(
    func,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0
):
    """Retry a function with exponential backoff and jitter"""
    for attempt in range(max_retries + 1):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries:
                raise  # Last attempt, give up

            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay * 0.1)
            total_delay = delay + jitter

            print(f"Attempt {attempt + 1} failed: {e}")
            print(f"Retrying in {total_delay:.1f}s...")
            await asyncio.sleep(total_delay)

שימו לב ל-jitter (רעש אקראי) — זה מונע מצב שבו 10 pipelines שנכשלו באותו רגע גם עושים retry באותו רגע ויוצרים גל נוסף של rate limits.

Retry with Variation — לנסות אחרת

לפעמים retry same לא מספיק. אם הסוכן נכשל לא בגלל timeout אלא בגלל שהגישה שלו לא עבדה, צריך retry different: שנו את הפרומפט, החליפו מודל, או צמצמו את המשימה.

# retry with variation — different approach each attempt
async def retry_with_variation(stage_name: str, data: dict, variations: list):
    """Try different approaches for the same stage"""
    for i, variation in enumerate(variations):
        try:
            print(f"Stage {stage_name}: Attempt {i+1} with variation '{variation['name']}'")
            result = await variation["execute"](data)
            return result
        except Exception as e:
            print(f"Variation '{variation['name']}' failed: {e}")
            if i == len(variations) - 1:
                raise  # All variations exhausted

# Usage:
research_variations = [
    {"name": "broad_search",    "execute": lambda d: research_agent(d, terms="broad")},
    {"name": "specific_search", "execute": lambda d: research_agent(d, terms="specific")},
    {"name": "alternative_sources", "execute": lambda d: research_agent(d, sources="academic")}
]

result = await retry_with_variation("research", data, research_variations)

הדפוס הזה חזק במיוחד לשלבי Research ו-Draft — אם חיפוש רחב נכשל, נסו חיפוש ספציפי. אם סגנון כתיבה אחד לא עובד, נסו סגנון אחר. כל variation היא "גישה שונה" — לא חזרה על אותו דבר.

ההקשר הישראלי: עלויות שגיאות

Pipeline בלי error handling שנכשל ומתחיל מחדש 3 פעמים — כפול 3 בעלויות. עם Opus 4.6 וטוקנים של $15/$75 ל-MTok input/output (נכון למרץ 2026), pipeline בן 7 שלבים שעולה 30-100 ₪ יכול להפוך ל-90-300 ₪ בזבוז. checkpoints + retry logic חוסכים את רוב הנזק. תחשבו על זה כפוליסת ביטוח — ההשקעה קטנה, החיסכון ענק.

בינוני 15 דקות אסטרטגיה חינם

גרסאות ו-Rollback

Pipeline משתנה לאורך זמן: משפרים פרומפטים, מחליפים מודלים, מוסיפים שלבים, מסירים שלבים. בלי ניהול גרסאות, אי אפשר לדעת איזו גרסה של ה-pipeline ייצרה איזו תוצאה. ויותר חשוב — אי אפשר לחזור לגרסה שעבדה כשמשהו מתקלקל.

שלוש רמות של Versioning

רמה מה מנהלים בגרסאות איך
Pipeline Versioning שינויים ב-orchestrator, שלבים, סדר Git tags (v1.0, v1.1, v2.0)
Prompt Versioning שינויים בפרומפטים של כל שלב קבצים נפרדים: prompts/research-v1.md, research-v2.md
Output Versioning תוצרים של כל ריצה תיקיות timestamped: output/20260324_143000/

Prompt Versioning — הכי חשוב

פרומפטים משתנים הרבה יותר לעתים קרובות מקוד. הכלל: לעולם אל תערכו פרומפט in-place. תמיד צרו גרסה חדשה. זה מאפשר:

# prompt versioning pattern
pipeline/
├── prompts/
│   ├── research/
│   │   ├── v1.md          # הגרסה המקורית
│   │   ├── v2.md          # שיפור — מקורות יותר ספציפיים
│   │   └── v3.md          # ניסיון עם format שונה
│   ├── draft/
│   │   ├── v1.md
│   │   └── v2.md
│   └── edit/
│       └── v1.md
├── config.json            # מגדיר איזו גרסת prompt כל שלב משתמש
└── orchestrator.py
# config.json — which prompt version each stage uses
{
  "pipeline_version": "1.2.0",
  "stages": {
    "research": { "prompt": "prompts/research/v2.md", "model": "claude-sonnet-4-6" },
    "draft":    { "prompt": "prompts/draft/v1.md",    "model": "claude-opus-4-6" },
    "edit":     { "prompt": "prompts/edit/v1.md",     "model": "claude-sonnet-4-6" }
  }
}
✍ עשו עכשיו 3 דקות

צרו תיקיית prompts/ בפרויקט ה-pipeline שלכם. לכל שלב, צרו תת-תיקייה עם קובץ v1.md שמכיל את הפרומפט הנוכחי. צרו config.json שמצביע על כל הגרסאות. מהרגע הזה, כשתשנו פרומפט — תמיד v2.md חדש.

Rollback Procedure

כשגרסה חדשה של ה-pipeline מייצרת תוצאות גרועות יותר:

  1. זהו את הגרסה האחרונה שעבדה (Git log + output comparison)
  2. חזרו ל-Git tag של אותה גרסה: git checkout v1.1.0
  3. הריצו ריצת בדיקה כדי לוודא שהתוצאות חזרו לאיכות הצפויה
  4. חקרו למה הגרסה החדשה נכשלה — בדרך כלל שינוי פרומפט שנראה תמים
טעות נפוצה: לערוך פרומפטים in-place

"רק שינוי קטן בפרומפט של שלב 3..." — ופתאום ה-pipeline מייצר תוצאות שונות לגמרי. בלי גרסה קודמת שמורה, אי אפשר לחזור. הפתרון: גרסאות פרומפט הן בחינם (קבצי טקסט). הרגל: כל שינוי = קובץ חדש. v1, v2, v3. לעולם לא מוחקים גרסה ישנה.

בינוני 15 דקות כלי חינם

לוגים, מוניטורינג ועלויות

Pipeline בלי לוגים הוא כמו נהיגה בחושך. כשמשהו נכשל — ואיפה זה יקרה — אתם צריכים לדעת בדיוק מה קרה, באיזה שלב, כמה זה עלה, וכמה זמן לקח.

Structured Logging

Structured Logging (לוגים מובנים) אומר שכל שורת לוג היא JSON עם שדות קבועים, לא טקסט חופשי. זה מאפשר חיפוש, סינון, וניתוח אוטומטי.

# structured log entry — what every stage should log
{
  "timestamp": "2026-03-24T14:30:00.000Z",
  "run_id": "20260324_143000",
  "pipeline": "content-pipeline",
  "stage": "research",
  "stage_index": 1,
  "event": "stage_complete",
  "duration_seconds": 45.2,
  "tokens_input": 1250,
  "tokens_output": 3800,
  "cost_usd": 0.082,
  "model": "claude-sonnet-4-6",
  "success": true,
  "output_summary": "Found 5 sources, 12 statistics, 8 key findings"
}

Log Levels

Level מה לכתוב מתי
DEBUG שיחת סוכן מלאה, כל tool call פיתוח ודיבוג בלבד
INFO התחלה/סיום שלב, החלטות מפתח, תוצאות ריצה רגילה
WARN שגיאות שנפתרו (retry הצליח), ביצועים חריגים דברים שכדאי לשים לב אליהם
ERROR כשלונות, שלבים שנדלגו, aborts דברים שדורשים טיפול
✍ עשו עכשיו 5 דקות

הוסיפו structured logging לשלב אחד ב-pipeline שלכם. לוגו: זמן התחלה, זמן סיום, עלות בטוקנים, עלות בדולר, האם הצליח, וסיכום פלט של שורה אחת. שמרו את הלוג כ-JSON ב-output directory.

Cost Tracking — מעקב עלויות

כסף. בואו נדבר על כסף. Pipelines הם הרכיב הכי יקר בעבודה עם Claude כי הם מבצעים קריאות API מרובות. צריך לעקוב אחרי עלויות ברמת שלב, ריצה, יום, ושבוע.

# cost tracking aggregation
class CostTracker:
    def __init__(self):
        self.costs = []

    def add(self, stage: str, cost: float, tokens: int):
        self.costs.append({
            "stage": stage,
            "cost": cost,
            "tokens": tokens,
            "timestamp": datetime.now().isoformat()
        })

    def summary(self) -> dict:
        total = sum(c["cost"] for c in self.costs)
        by_stage = {}
        for c in self.costs:
            stage = c["stage"]
            by_stage[stage] = by_stage.get(stage, 0) + c["cost"]

        # Find most expensive stage
        most_expensive = max(by_stage, key=by_stage.get) if by_stage else None

        return {
            "total_cost": round(total, 4),
            "by_stage": by_stage,
            "most_expensive_stage": most_expensive,
            "recommendation": (
                f"Optimize '{most_expensive}' — "
                f"it's {by_stage[most_expensive]/total*100:.0f}% of total cost"
            ) if most_expensive else "No data"
        }

Pipeline Monitoring עם כלי Claude Code

Claude Code עצמו מספק כלים שמתאימים למעקב אחרי pipelines:

Alerting — התראות

הגדירו התראות על:

מתקדם 15 דקות תרגול חינם

ביצוע מקבילי של שלבים

לא כל שלב ב-pipeline חייב לחכות לקודמו. לפעמים יש שלבים עצמאיים שאפשר להריץ במקביל. הטריק: לזהות אילו שלבים באמת עצמאיים ואילו רק נראים עצמאיים.

דוגמה: אחרי שלב Research, שלבי "Outline" ו-"Image Generation" יכולים לרוץ במקביל — כי Outline צריך את תוצאות המחקר, ו-Image Generation גם צריך את תוצאות המחקר, אבל הם לא צריכים אחד את השני.

# parallel stages in Python
import asyncio

async def run_parallel_stages(data: dict, stages: list) -> dict:
    """Run multiple stages in parallel and merge their outputs"""
    tasks = [stage["execute"](data) for stage in stages]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    merged_output = {}
    for stage, result in zip(stages, results):
        if isinstance(result, Exception):
            print(f"Stage {stage['name']} failed: {result}")
            if stage.get("on_failure") != "skip":
                raise result
            continue
        merged_output[stage["name"]] = result["output"]

    return merged_output
// parallel stages in TypeScript
async function runParallelStages(
  data: Record<string, any>,
  stages: StageConfig[]
): Promise<Record<string, any>> {
  const results = await Promise.allSettled(
    stages.map(stage => stage.execute(data))
  );

  const merged: Record<string, any> = {};
  for (let i = 0; i < stages.length; i++) {
    const result = results[i];
    if (result.status === 'fulfilled') {
      merged[stages[i].name] = result.value.output;
    } else {
      console.error(`Stage ${stages[i].name} failed:`, result.reason);
      if (stages[i].onFailure !== 'skip') throw result.reason;
    }
  }
  return merged;
}
מסגרת החלטה: מקבילי או סדרתי?
שאלה אם כן אם לא
שלב B צריך את הפלט של שלב A? סדרתי אפשר מקביל
שלב B כותב לאותו קובץ כמו שלב A? סדרתי (מניעת קונפליקט) אפשר מקביל
שני השלבים יחד חורגים מ-rate limit? סדרתי (או עם throttling) מקביל
✍ עשו עכשיו 3 דקות

הסתכלו על ה-pipeline שלכם. האם יש שני שלבים שמקבלים את אותו input ולא תלויים זה בזה? אם כן — סמנו אותם כמועמדים לביצוע מקבילי. חשבו: כמה זמן זה יחסוך?

טעות נפוצה: ריצה מקבילית בלי לבדוק rate limits

ביצוע מקבילי מכפיל את קריאות ה-API. אם יש לכם rate limit של 60 requests/minute ו-pipeline בן 7 שלבים, ריצה סדרתית צריכה 7 requests. ריצה עם 3 שלבים מקבילים צריכה 5 requests — אבל 3 מהם נשלחים באותו רגע. אם ה-rate limit שלכם הוא per-second ולא per-minute, שלושה requests בו-זמנית יכולים לגרום ל-429 errors ו-backoff אגרסיבי שמאט את כל ה-pipeline. הפתרון: תכננו את המקביליות בהתאם ל-rate limits שלכם, לא רק ל-CPU. בדקו את ה-statusline של Claude Code — מגרסה 2.1.80 היא מציגה rate-limit windows (5 שעות ו-7 ימים) בזמן אמת.

🛠 תרגיל: הוספת שלבים מקבילים ל-Pipeline

זמן: 20 דקות | תוצר: Pipeline עם לפחות שלב מקבילי אחד

  1. קחו את ה-pipeline מהתרגיל הקודם (Brief → Draft → Review)
  2. הוסיפו שלב "Keywords" שרץ במקביל לשלב Draft — שניהם מקבלים את ה-Brief כ-input
  3. שלב Keywords מייצר רשימה של 10 מילות מפתח שקשורות לנושא
  4. עדכנו את ה-orchestrator להריץ Draft ו-Keywords במקביל (asyncio.gather / Promise.all)
  5. שלב Review מקבל את שניהם: את הטיוטה ואת מילות המפתח
  6. בדקו שהלוג מראה ששני השלבים רצו במקביל (timestamps חופפים)

תוצאה צפויה: Pipeline מהיר יותר שמייצר טיוטה ומילות מפתח בו-זמנית.

בינוני 20 דקות ניתוח חינם

דוגמה מייצגת: ה-Pipeline בן 7 השלבים

זו לא דוגמה תיאורטית. זה pipeline אמיתי, שפועל בפרודקשן, ויצר את הפרקים של nVision Academy שאתם קוראים עכשיו. 87 פרקים, למעלה מ-600,000 מילים, נכתבו באמצעות pipeline אוטומטי מרובה שלבים. בואו ננתח אותו.

7 השלבים

שלב שם תפקיד מודל כלים
1 Brief Agent מקבל נושא → מייצר brief מובנה עם מטרות, קהל יעד, טון, ונקודות מפתח Sonnet 4.6 Read, Write
2 Research Agent מחפש באינטרנט נתונים עדכניים, סטטיסטיקות, דוגמאות, ודעות מומחים Sonnet 4.6 WebSearch, WebFetch, Write
3 Outline Agent יוצר outline מפורט מה-brief + מחקר — כותרות, תת-נושאים, נקודות Sonnet 4.6 Read, Write
4 Writing Agent כותב את הפרק המלא לפי ה-outline, שומר על קול אחיד ועומק (5K-10K מילים) Opus 4.6 Read, Write, WebFetch
5 Editor Agent בודק בהירות, דיוק, שלמות, דקדוק, עקביות טון, ואיכות עברית Opus 4.6 Read, Edit
6 SEO/Format Agent אופטימיזציית כותרות, תגיות, פורמט HTML, עקביות CSS classes Sonnet 4.6 Read, Edit, Grep
7 QA Agent בדיקה סופית: קישורים, עובדות, שלמות כל הסעיפים, אין placeholder text Sonnet 4.6 Read, Grep, Bash

ביצועים

מספרים מהשטח (אומדנים)

הסיבה שה-pipeline הזה עובד: כל שלב ממוקד במשימה אחת. ה-Research Agent לא צריך לכתוב — הוא רק מחפש. ה-Writing Agent לא צריך לחפש — הוא מקבל מחקר מוכן. ה-Editor Agent לא צריך לכתוב מחדש — הוא רק מתקן. הפוקוס הזה מייצר איכות גבוהה יותר מ-"תכתוב מאמר מלא מאפס" בפרומפט אחד.

איך השלבים מתחברים — זרימת הנתונים

בואו נעקוב אחרי ריצה אחת של ה-pipeline מתחילתה ועד סופה:

  1. Input: "פרק 6: Building Production Pipelines, קורס CC Production & SDK, פרופיל Skill-Building"
  2. Brief Agent (30 שניות): מייצר brief.json עם מטרות למידה, קהל יעד (מפתחים), טון (מנטורי), 12 נקודות מפתח
  3. Research Agent (3-5 דקות): מחפש ב-WebSearch מאמרים על AI pipelines, מוצא סטטיסטיקות, דוגמאות. מייצר research.json עם 8 מקורות ו-15 עובדות
  4. Outline Agent (1-2 דקות): מקבל brief + research, מייצר outline מפורט עם 11 סקציות, כותרות H2 ו-H3, ונקודות תבליט
  5. Writing Agent (8-12 דקות): זה השלב הכבד — Opus 4.6 כותב את הפרק המלא, 5K-10K מילים, עם כל 15 אלמנטי Gold Standard. מייצר HTML
  6. Editor Agent (3-5 דקות): קורא את ה-HTML, מתקן עברית, בודק שכל CSS class קיים, מוסיף סימני פיסוק חסרים
  7. SEO/Format Agent (1-2 דקות): מוודא עקביות כותרות, תגיות section-tags על כל H2, ניקוי HTML
  8. QA Agent (1-2 דקות): בדיקה סופית — אין placeholder text, כל הסקציות קיימות, אין HTML שבור

סה"כ: ~20 דקות, ~$5. התוצר: פרק מלא, ערוך, מפורמט, ומוכן לפרסום. ותשוו את זה לכתיבה ידנית שלוקחת 8-15 שעות.

איך Claude Code מאיץ את ה-Pipeline

ה-pipeline הזה לא רץ "סתם" — הוא ממנף תכונות ספציפיות של Claude Code:

✍ עשו עכשיו 3 דקות

פתחו טרמינל והריצו: claude -p "Summarize this file in 3 bullets: [path to a file]". שימו לב לזמן — זו קריאה אחת של pipeline stage ב-headless mode. עכשיו הריצו את אותו דבר עם --bare. ראו כמה זמן נחסך ב-startup. זה ההבדל כש-pipeline רץ את זה 7 פעמים.

למה Pipeline עולה על Single Prompt?

היבט Single Prompt Pipeline בן 7 שלבים
Focus הסוכן צריך לעשות הכל בבת אחת כל שלב ממוקד במשימה אחת
Context Context מלא = פחות מקום לתוכן כל שלב מתחיל עם context נקי
Error Recovery שגיאה = התחלה מחדש של הכל שגיאה = חזרה על שלב אחד
Quality סביר עבור טקסט קצר גבוה ועקבי עבור תוכן ארוך
Debugging "משהו לא טוב" — אבל מה? "שלב 3 ייצר outline חלש" — ברור
✍ עשו עכשיו 4 דקות

בחרו דומיין שלכם (פיתוח, שיווק, תוכן, דאטה). כתבו pipeline בן 5-7 שלבים שמתאים לדומיין שלכם, בפורמט: שם שלב → input → output → מודל. השתמשו ב-case study הזה כהשראה, לא כעותק.

בינוני 15 דקות אסטרטגיה חינם

Pipeline Patterns לתחומים שונים

לכל תחום יש מבנה pipeline טבעי משלו. הנה חמישה דפוסים עם הסבר מתי כל אחד מתאים.

1. Code Generation Pipeline

Requirements → Design → Implementation → Testing → Review → Documentation → Deploy

מתי: כשצריך לייצר feature שלם מ-requirements. שלב Testing הוא quality gate — אם טסטים נכשלים, ה-pipeline חוזר לשלב Implementation (retry different). שלב Review משתמש ב-Opus לביקורת קוד מעמיקה.

2. Data Processing Pipeline

Ingest → Validate → Clean → Transform → Analyze → Report → Archive

מתי: כשמעבדים נתונים שמגיעים מבחוץ. שלב Validate הוא quality gate קריטי — נתונים לא תקינים = abort. שלבי Clean ו-Transform יכולים לרוץ עם Sonnet (זול יותר) כי המשימה מכנית.

3. Customer Support Pipeline

Receive → Classify → Research → Draft Response → Review → Send → Follow-up

מתי: כשמטפלים בפניות לקוחות. שלב Classify מחליט אם הפנייה טכנית, billing, או כללית — וזה משנה את הנתיב (routing). שלב Review חובה לפני שליחה — לעולם אל תשלחו תשובה ללקוח בלי review.

4. DevOps Pipeline

Monitor → Detect → Diagnose → Fix → Test → Deploy → Verify

מתי: כש-monitoring מזהה anomaly. שלב Detect כולל סף (threshold) — לא כל anomaly שווה תגובה. שלב Fix חייב להיות זהיר — ב-production, fix שגוי גרוע יותר מאי-עשייה.

5. Content Marketing Pipeline

Ideation → Research → Draft → Edit → SEO → Publish → Promote

מתי: כשמייצרים תוכן שיווקי. שלב Ideation יכול לרוץ ב-batch — לייצר 20 רעיונות, בן אדם בוחר, Pipeline ממשיך עם הנבחר. שלבי Edit ו-SEO יכולים לרוץ במקביל.

ההקשר הישראלי: pipelines בסטארטאפ

סטארטאפ ישראלי שמפתח מוצר SaaS יכול להשתמש ב-Code Generation Pipeline לפיצ'רים חדשים, ב-Customer Support Pipeline לטיפול בפניות (בעברית ובאנגלית — שלב Classify מזהה שפה ו-routing מתאים), וב-Content Marketing Pipeline לבלוג ולרשתות חברתיות. שלושה pipelines, שלושה תחומים, אותו orchestrator בסיסי. חברת fintech ישראלית יכולה להוסיף שלב compliance שבודק התאמה לרגולציה של רשות שוק ההון — pipeline stage ספציפי לשוק המקומי שאין לו מקבילה ב-templates הגנריים.

Pipelines בתעשייה הישראלית

חברות טכנולוגיה ישראליות שמשתמשות ב-AI pipelines בפרודקשן מדווחות על חיסכון של 60-80% בזמן עבודה ידנית על תהליכים חוזרים כמו code review, תיעוד, ועיבוד נתונים. היתרון הייחודי של Pipeline מול קריאה בודדת: ב-pipeline אפשר לשלב שלב של לוקליזציה לעברית שבודק RTL, ניקוד, ומינוח ישראלי — שלב שנכשל ב-single prompt כי אין מספיק context לעשות הכל בבת אחת.

The Universal Pipeline Template

כל pipeline שונה, אבל יש תבנית אוניברסלית שמתאימה כנקודת התחלה:

Input → Validate → Process (1-3 stages) → Review → Output → Archive

התחילו מ-3 שלבים: Process → Review → Output. הוסיפו שלבים רק כשהאיכות דורשת את זה. Pipeline עם 12 שלבים שרוב השלבים לא מוסיפים ערך — הוא בזבוז זמן, כסף, ומורכבות. הכלל: כל שלב שמוסיפים חייב לענות על השאלה "מה התוצר יפסיד בלי השלב הזה?" אם אין תשובה ברורה — לא מוסיפים.

שימו לב: over-engineering הוא הטעות מספר 1

מפתחים רבים מתלהבים מ-pipelines ובונים 12 שלבים מהיום הראשון. התוצאה: pipeline שלוקח 45 דקות ועולה $15 לריצה, כש-pipeline בן 5 שלבים היה מייצר תוצאה כמעט זהה ב-15 דקות ו-$5. הכלל: התחילו עם 3-5 שלבים. הוסיפו שלב רק כשיש הוכחה שהתוצר משתפר. מדדו לפני ואחרי.

🛠 תרגיל: עיצוב Pipeline לדומיין שלכם

זמן: 25 דקות | תוצר: מסמך Pipeline Design מלא

  1. בחרו דומיין שרלוונטי לעבודה שלכם (אם לא בטוחים — בחרו Content Marketing)
  2. הגדירו את הפלט הסופי — מה ה-pipeline מייצר?
  3. עבדו אחורה ורשמו 5-7 שלבים
  4. לכל שלב, מלאו Stage Specification: Name, Input, Agent Config, Output, Success Criteria, Failure Action
  5. זהו שלבים שאפשר להריץ במקביל
  6. צרו Error Matrix — מה קורה בכל סוג שגיאה
  7. הוסיפו אומדן עלות ל-run אחד

תוצאה צפויה: מסמך Pipeline Design מלא שאפשר לבנות ממנו orchestrator.

✍ עשו עכשיו 2 דקות

מבין 5 ה-patterns שראינו, בחרו את הקרוב ביותר לעבודה שלכם. רשמו: (1) איזה pattern? (2) כמה שלבים הייתם צריכים? (3) איזה שלב הוא הכי קריטי (שם ה-abort הכי חשוב)?

מתקדם 15 דקות אסטרטגיה חינם

Scaling — הרחבת Pipelines

Pipeline אחד שעובד — מצוין. עכשיו רוצים להריץ 10 pipelines במקביל. או 100. זה דורש חשיבה שונה.

Horizontal Scaling — הרחבה אופקית

במקום pipeline אחד שרץ מהר יותר, מריצים כמה pipelines במקביל. לדוגמה: 10 מאמרים שרצים ב-10 instances של אותו pipeline בו-זמנית. הצוואר בקבוק הוא לא ה-CPU — הוא ה-API rate limits.

Queue-Based Architecture

ארכיטקטורת תור: במקום להריץ pipelines ישירות, שולחים inputs לתור (Redis, RabbitMQ, Amazon SQS). Workers שולפים מהתור ומריצים pipelines. יתרונות:

Cost Budgeting — תקצוב עלויות

כשמריצים pipelines ב-scale, עלויות יכולות לברוח. הגדירו:

מגבלה דוגמה פעולה כשחורגים
Per-run limit $10 לריצה Abort pipeline + alert
Daily limit $100 ליום Pause queue + email
Monthly budget $2,000 לחודש Shutdown + management alert
טעות נפוצה: לא לחשב את עלות ה-retries בתקציב

מפתחים מחשבים "pipeline עולה $5 לריצה, 20 ריצות ביום = $100 ליום." אבל שוכחים retries. אם שיעור כשל של 10% עם ממוצע 2 retries לכשל, העלות האמיתית היא $110-115 ליום — עלייה של 15%. עם Opus 4.6 שהוא המודל היקר (input: ~$15/MTok, output: ~$75/MTok לפי אומדן), retry אחד של שלב writing יכול לעלות $2-4. הפתרון: תמיד הוסיפו 20% buffer לתקציב ה-pipeline עבור retries, timeouts, ו-debugging. תעקבו אחרי retry rate כמטריקה — אם הוא עולה, יש בעיה בפרומפט או ב-API.

The Backpressure Pattern

Backpressure (לחץ לאחור) הוא מנגנון שמאט את קצב הקלט כשהמערכת לא מספיקה לעבד. אם 100 items מחכים בתור אבל ה-workers מספיקים לעבד רק 10 בשעה — אל תוסיפו עוד items לתור. חכו שהתור יתרוקן, או הוסיפו workers.

# simple backpressure implementation
class PipelineQueue:
    def __init__(self, max_queue_size: int = 50, max_concurrent: int = 5):
        self.max_queue_size = max_queue_size
        self.max_concurrent = max_concurrent
        self.queue = []
        self.running = 0

    def can_accept(self) -> bool:
        """Check if queue can accept new items"""
        return len(self.queue) < self.max_queue_size

    async def submit(self, input_data: dict) -> bool:
        if not self.can_accept():
            print(f"Queue full ({len(self.queue)}/{self.max_queue_size}). "
                  f"Rejecting input. Try again later.")
            return False
        self.queue.append(input_data)
        await self._process_if_capacity()
        return True

    async def _process_if_capacity(self):
        while self.queue and self.running < self.max_concurrent:
            item = self.queue.pop(0)
            self.running += 1
            # Run pipeline in background
            asyncio.create_task(self._run_and_release(item))

    async def _run_and_release(self, input_data):
        try:
            await self.pipeline.run(input_data)
        finally:
            self.running -= 1
            await self._process_if_capacity()

Monitoring at Scale

כשמריצים עשרות ומאות pipelines, אתם צריכים dashboard מרכזי שמציג: כמה pipelines רצים עכשיו, כמה מחכים בתור, מה שיעור ההצלחה, מה העלות המצטברת. אפשר לבנות dashboard פשוט ב-HTML שקורא מקבצי הלוג — או להשתמש בכלים כמו Grafana עם JSON logs.

✍ עשו עכשיו 2 דקות

חשבו: אם הייתם צריכים להריץ את ה-pipeline שלכם על 100 items, כמה זמן זה היה לוקח סדרתית? כמה זמן עם 5 workers במקביל? מה rate limit ה-API שלכם מאפשר? כתבו את 3 המספרים האלה — הם מגדירים את תקרת ה-scaling שלכם.

ההקשר הישראלי: rate limits ותקציבים

סטארטאפ ישראלי שמריץ pipeline לעיבוד 500 כרטיסי support ביום: בעלות ממוצעת של 2 ₪ לכרטיס, זה 1,000 ₪ ליום, ~22,000 ₪ לחודש, ~$6,000. לא מעט. הגדרת cost budget וניטור יומי הם חובה — לא מותרות. טיפ: התחילו עם 50 כרטיסים ביום, מדדו עלות אמיתית, ואז הגדילו בהדרגה.

🛠 תרגיל: בניית Pipeline מלא מקצה לקצה

זמן: 30 דקות | תוצר: Production Pipeline מלא

  1. בחרו אחד: Content Pipeline, Code Review Pipeline, או Data Processing Pipeline
  2. בנו orchestrator עם לפחות 4 שלבים (השתמשו בקוד מהפרק כ-base)
  3. כל שלב משתמש ב-Agent SDK (Python או TypeScript) עם agent configuration ספציפי
  4. הוסיפו checkpoint אחרי כל שלב + resume function
  5. הוסיפו Error Matrix עם לפחות retry ו-abort
  6. הוסיפו structured logging — JSON log לכל שלב
  7. הוסיפו cost tracking שמסכם עלות כוללת בסוף
  8. הריצו את ה-pipeline על input אמיתי ובדקו את ה-output, logs, ו-checkpoints

תוצאה צפויה: תיקיית output עם: תוצר סופי, checkpoint files, structured log, ודוח עלות.

שגרת עבודה — Pipelines

בנוסף לשגרה מפרקים קודמים (ניהול סוכנים, צוותים):

תדירות משימה זמן
יומי בדקו את הלוגים של pipelines שרצו — שגיאות, עלויות חריגות, זמנים ארוכים 5 דקות
יומי וודאו שעלות יומית לא חרגה מהתקציב שהגדרתם 2 דקות
שבועי השוו תוצרים מריצות שונות — האם האיכות עקבית? 15 דקות
שבועי בדקו cost breakdown לפי שלב — מהו השלב הכי יקר? אפשר לאופטמז? 10 דקות
שבועי בדקו אם יש פרומפטים שצריכים update (מודל חדש, API שהשתנה) 10 דקות
חודשי עשו A/B test על פרומפט חדש מול פרומפט ישן בשלב אחד 30 דקות
חודשי בדקו אם שלבים חדשים נדרשים או שאפשר לאחד שלבים קיימים 20 דקות
חודשי עדכנו version tag ו-changelog של ה-pipeline 10 דקות
🎯 אם אתם עושים רק דבר אחד מהפרק הזה

בנו pipeline בן 3 שלבים: Brief → Draft → Review. השתמשו ב-Orchestrator מהפרק (Python או TypeScript), צרו 3 stages שכל אחד מריץ Agent SDK, הוסיפו checkpoint אחרי כל שלב. 30 דקות עבודה. מהרגע הזה יש לכם תשתית pipeline שאפשר להרחיב ל-5 שלבים, 7, 10 — פשוט מוסיפים stages. כל שיפור נוסף — error matrix, parallel stages, cost tracking, versioning — הוא בונוס שנבנה על הבסיס הזה.

✅ בדוק את עצמך — 5 שאלות

ענו על לפחות 4 מתוך 5 כדי לעבור:

  1. למה ה-Orchestrator צריך להיות קוד דטרמיניסטי ולא סוכן AI? מה הסיכון בלתת ל-Claude לנהל את ה-flow?
    (רמז: דילוג על שלבים, שינוי סדר, החלטות לא צפויות, עלות טוקנים על flow control)
  2. מה ההבדל בין Pipeline ל-Team, ואיך מחליטים מה להשתמש?
    (רמז: תלות בין שלבים — סדרתי vs. מקבילי, פלט של אחד הוא קלט של השני)
  3. למה checkpoint חשוב, וכמה כסף pipeline בן 7 שלבים בלי checkpoints יכול לבזבז?
    (רמז: קריסה בשלב 6 = 100% בזבוז בלי checkpoints, ~17% בזבוז עם checkpoints)
  4. למה לא לערוך פרומפטים in-place? מה היתרון של prompt versioning?
    (רמז: rollback, A/B testing, השוואת תוצאות, אי אפשר לחזור בלי גרסה קודמת)
  5. מה ה-"Recovery Ladder" ובאיזה סדר עולים עליו? מתי skip ומתי abort?
    (רמז: retry same → retry different → skip → human → abort. שלב אופציונלי = skip, שלב קריטי = abort)
סיכום הפרק

בפרק הזה עברנו מסוכנים בודדים וצוותים ל-צינורות עבודה אוטומטיים — המבנה שהכי מתאים לתהליכים סדרתיים שבהם כל שלב בונה על הקודם. התובנה המרכזית: pipeline טוב הוא לא רק "שרשרת פרומפטים" — הוא מערכת הנדסית עם orchestrator דטרמיניסטי, checkpoints ששומרים מצב, error handling עם recovery ladder, prompt versioning שמאפשר rollback, ו-structured logging שמאפשר ניתוח עלויות ודיבוג. ראינו pipeline אמיתי בן 7 שלבים שמייצר תוכן ברמה גבוהה ב-15-25 דקות, ולמדנו שהפוקוס של כל שלב במשימה אחת הוא מה שמייצר את האיכות. בפרק הבא נעבור ל-Docker, שרתים מרוחקים, ופריסה ארגונית — ניקח את ה-Pipelines האלה ונריץ אותם בסביבות production אמיתיות, עם isolation, אבטחה, ו-scaling.

☑ צ'קליסט השלמת הפרק