Skip to content

chunklet.cli

Functions:

  • chunk_command

    Chunk text or files based on specified parameters.

  • split_command

    Split text or a single file into sentences using the SentenceSplitter.

chunk_command

chunk_command(
    text: Optional[str] = typer.Argument(
        None,
        help="The input text to chunk. If not provided, --source must be used.",
    ),
    source: Optional[List[Path]] = typer.Option(
        None,
        "--source",
        "-s",
        help="Path(s) to one or more files or directories to read input from. Overrides the 'text' argument.",
    ),
    code: bool = typer.Option(
        False,
        "--code",
        help="Use CodeChunker for code files.",
    ),
    doc: bool = typer.Option(
        False,
        "--doc",
        help="Use DocumentChunker for document files.",
    ),
    destination: Optional[Path] = typer.Option(
        None,
        "--destination",
        "-d",
        help="Path to a file (for single output) or a directory (for batch output) to write the chunks.",
    ),
    lang: str = typer.Option(
        "auto",
        "--lang",
        help="Language of the text (e.g., 'en', 'fr', 'auto'). (default: auto)",
    ),
    max_tokens: int = typer.Option(
        None,
        "--max-tokens",
        help="Maximum number of tokens per chunk. Applies to all chunking strategies. (must be >= 12)",
    ),
    max_sentences: int = typer.Option(
        None,
        "--max-sentences",
        help="Maximum number of sentences per chunk. Applies to PlainTextChunker and DocumentChunker. (must be >= 1)",
    ),
    max_section_breaks: Optional[int] = typer.Option(
        None,
        "--max-section-breaks",
        help="Maximum number of section breaks per chunk. Applies to PlainTextChunker and DocumentChunker. (must be >= 1)",
    ),
    overlap_percent: float = typer.Option(
        20.0,
        "--overlap-percent",
        help="Percentage of overlap between chunks (0-85). Applies to PlainTextChunker and DocumentChunker. (default: 20)",
    ),
    offset: int = typer.Option(
        0,
        "--offset",
        help="Starting sentence offset for chunking. Applies to PlainTextChunker and DocumentChunker. (default: 0)",
    ),
    verbose: bool = typer.Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging.",
    ),
    tokenizer_command: Optional[str] = typer.Option(
        None,
        "--tokenizer-command",
        help="A shell command to use for token counting. The command should take text as stdin and output the token count as a number.",
    ),
    metadata: bool = typer.Option(
        False,
        "--metadata",
        help="Include metadata in the output. If --destination is a directory, metadata is saved as separate .json files; otherwise, it's included inline in the output.",
    ),
    n_jobs: Optional[int] = typer.Option(
        None,
        "--n-jobs",
        help="Number of parallel jobs for batch chunking. (default: None, uses all available cores)",
    ),
    on_errors: OnError = typer.Option(
        OnError.raise_,
        "--on-errors",
        help="How to handle errors during processing: 'raise', 'skip' or 'break'",
    ),
    max_lines: int = typer.Option(
        None,
        "--max-lines",
        help="Maximum number of lines per chunk. Applies to CodeChunker only. (must be >= 5)",
    ),
    max_functions: int = typer.Option(
        None,
        "--max-functions",
        help="Maximum number of functions per chunk. Applies to CodeChunker only. (must be >= 1)",
    ),
    docstring_mode: DocstringMode = typer.Option(
        DocstringMode.all_,
        "--docstring-mode",
        help="Docstring processing strategy for CodeChunker: 'summary', 'all', or 'excluded'. Applies to CodeChunker only.",
    ),
    strict: bool = typer.Option(
        True,
        "--strict",
        help="If True, raise error when structural blocks exceed max_tokens in CodeChunker. If False, split oversized blocks. Applies to CodeChunker only.",
    ),
    include_comments: bool = typer.Option(
        True,
        "--include-comments",
        help="Include comments in output chunks for CodeChunker. Applies to CodeChunker only.",
    ),
)

Chunk text or files based on specified parameters.

Source code in src/chunklet/cli.py
@app.command(name="chunk", help="Chunk text or files based on specified parameters.")
def chunk_command(
    text: Optional[str] = typer.Argument(
        None, help="The input text to chunk. If not provided, --source must be used."
    ),
    source: Optional[List[Path]] = typer.Option(
        None,
        "--source",
        "-s",
        help="Path(s) to one or more files or directories to read input from. Overrides the 'text' argument.",
    ),
    # flags for chunker type
    code: bool = typer.Option(False, "--code", help="Use CodeChunker for code files."),
    doc: bool = typer.Option(
        False, "--doc", help="Use DocumentChunker for document files."
    ),
    destination: Optional[Path] = typer.Option(
        None,
        "--destination",
        "-d",
        help="Path to a file (for single output) or a directory (for batch output) to write the chunks.",
    ),
    lang: str = typer.Option(
        "auto",
        "--lang",
        help="Language of the text (e.g., 'en', 'fr', 'auto'). (default: auto)",
    ),
    max_tokens: int = typer.Option(
        None,
        "--max-tokens",
        help="Maximum number of tokens per chunk. Applies to all chunking strategies. (must be >= 12)",
    ),
    max_sentences: int = typer.Option(
        None,
        "--max-sentences",
        help="Maximum number of sentences per chunk. Applies to PlainTextChunker and DocumentChunker. (must be >= 1)",
    ),
    max_section_breaks: Optional[int] = typer.Option(
        None,
        "--max-section-breaks",
        help="Maximum number of section breaks per chunk. Applies to PlainTextChunker and DocumentChunker. (must be >= 1)",
    ),
    overlap_percent: float = typer.Option(
        20.0,
        "--overlap-percent",
        help="Percentage of overlap between chunks (0-85). Applies to PlainTextChunker and DocumentChunker. (default: 20)",
    ),
    offset: int = typer.Option(
        0,
        "--offset",
        help="Starting sentence offset for chunking. Applies to PlainTextChunker and DocumentChunker. (default: 0)",
    ),
    verbose: bool = typer.Option(
        False, "--verbose", "-v", help="Enable verbose logging."
    ),
    tokenizer_command: Optional[str] = typer.Option(
        None,
        "--tokenizer-command",
        help=(
            "A shell command to use for token counting. "
            "The command should take text as stdin and output the token count as a number."
        ),
    ),
    metadata: bool = typer.Option(
        False,
        "--metadata",
        help=(
            "Include metadata in the output. If --destination is a directory, "
            "metadata is saved as separate .json files; otherwise, it's "
            "included inline in the output."
        ),
    ),
    # for Batching
    n_jobs: Optional[int] = typer.Option(
        None,
        "--n-jobs",
        help="Number of parallel jobs for batch chunking. (default: None, uses all available cores)",
    ),
    on_errors: OnError = typer.Option(
        OnError.raise_,
        "--on-errors",
        help="How to handle errors during processing: 'raise', 'skip' or 'break'",
    ),
    # CodeChunker specific arguments
    max_lines: int = typer.Option(
        None,
        "--max-lines",
        help="Maximum number of lines per chunk. Applies to CodeChunker only. (must be >= 5)",
    ),
    max_functions: int = typer.Option(
        None,
        "--max-functions",
        help="Maximum number of functions per chunk. Applies to CodeChunker only. (must be >= 1)",
    ),
    docstring_mode: DocstringMode = typer.Option(
        DocstringMode.all_,
        "--docstring-mode",
        help="Docstring processing strategy for CodeChunker: 'summary', 'all', or 'excluded'. Applies to CodeChunker only.",
    ),
    strict: bool = typer.Option(
        True,
        "--strict",
        help="If True, raise error when structural blocks exceed max_tokens in CodeChunker. If False, split oversized blocks. Applies to CodeChunker only.",
    ),
    include_comments: bool = typer.Option(
        True,
        "--include-comments",
        help="Include comments in output chunks for CodeChunker. Applies to CodeChunker only.",
    ),
):
    """
    Chunk text or files based on specified parameters.
    """
    # --- Input validation logic ---
    provided_inputs = [arg for arg in [text, source] if arg]

    if len(provided_inputs) == 0:
        typer.echo(
            "Error: No input provided. Please provide a text, or use the --source option.",
            err=True,
        )
        typer.echo(
            "💡 Hint: Use 'chunklet --help' for more information and usage examples.",
            err=True,
        )
        raise typer.Exit(code=1)

    if len(provided_inputs) > 1:
        typer.echo(
            "Error: Please provide either a text string, or use the --source option, but not both.",
            err=True,
        )
        typer.echo(
            "💡 Hint: Use 'chunklet --help' for more information and usage examples.",
            err=True,
        )
        raise typer.Exit(code=1)

    if doc and code:
        typer.echo(
            "Error: Please specify either '--doc' or '--code', but not both.",
            err=True,
        )
        raise typer.Exit(code=1)

    # --- Tokenizer setup ---
    token_counter = None
    if tokenizer_command:
        token_counter = create_external_tokenizer(tokenizer_command)

    all_results = []

    # Construct chunk_kwargs dynamically
    chunk_kwargs = {
        "max_tokens": max_tokens,
        "token_counter": token_counter,
    }

    if code:
        chunker_instance = CodeChunker(
            verbose=verbose,
            token_counter=token_counter,
        )
        chunk_kwargs.update(
            {
                "max_lines": max_lines,
                "max_functions": max_functions,
                "docstring_mode": docstring_mode,
                "strict": strict,
                "include_comments": include_comments,
            }
        )
    else:
        if text:
            chunker_instance = PlainTextChunker(
                verbose=verbose,
                token_counter=token_counter,
            )
        else:
            chunker_instance = DocumentChunker(
                verbose=verbose,
                token_counter=token_counter,
            )
        chunk_kwargs.update(
            {
                "lang": lang,
                "max_sentences": max_sentences,
                "max_section_breaks": max_section_breaks,
                "overlap_percent": overlap_percent,
                "offset": offset,
            }
        )

    # --- Chunking logic ---
    if text:
        chunks = chunker_instance.chunk(
            text=text,
            **chunk_kwargs,
        )
        all_results.append(chunks)

    elif source:
        file_paths = []

        for path in source:
            path = path.resolve()

            if is_path_like(str(path)):
                if path.is_file():
                    file_paths.append(path)
                elif path.is_dir():
                    file_paths.extend([p for p in path.glob("**/*") if p.is_file()])
                else:
                    # This single 'else' catches paths that pass the heuristic but
                    # either don't exist OR exist but are special file types
                    # (e.g., pipes, sockets, broken symlinks, etc.)
                    typer.echo(
                        f"Warning: '{path}' is path-like but was not found "
                        "or is not a processable file/directory. Skipping.",
                        err=True,
                    )
            else:
                # Fails the path-like regex heuristic check
                typer.echo(
                    f"Warning: '{path}' does not resemble a valid file system path "
                    "(failed heuristic check). Skipping.",
                    err=True,
                )

        if not file_paths:
            typer.echo(
                "Warning: No processable files found in the specified source(s). Exiting.",
                err=True,
            )
            raise typer.Exit(code=0)

        if len(file_paths) == 1 and file_paths[0].suffix not in {
            ".pdf",
            ".epub",
            ".docx",
        }:
            single_file = file_paths[0]
            chunks = chunker_instance.chunk(
                path=single_file,
                **chunk_kwargs,
            )
            all_results.append(chunks)
        else:
            # Batch input logic
            all_results_gen = chunker_instance.batch_chunk(
                paths=file_paths,
                n_jobs=n_jobs,
                show_progress=True,
                on_errors=on_errors,
                **chunk_kwargs,
            )
            all_results.append(all_results_gen)

    if not all_results:
        typer.echo(
            "Warning: No chunks were generated. "
            "This might be because the input was empty or did not contain any processable content.",
            err=True,
        )
        raise typer.Exit(code=0)

    # --- Output handling ---

    # Check for conflict: multi-input requires directory destination
    if destination and destination.is_file():
        typer.echo(
            "Error: When processing multiple inputs, '--destination' must be a directory, not a file.",
            err=True,
        )
        raise typer.Exit(code=1)

    if destination and len(destination):
        # This is the equivalent of the old `if output_dir:` block
        destination.mkdir(parents=True, exist_ok=True)
        total_chunks_written = 0
        processed_sources = set()

        for res in all_results:
            for chunk_box in res:
                source_name = chunk_box.metadata["source"]
                base_name = Path(source_name).stem

                base_output_filename = (
                    f"{base_name}_chunk_{chunk_box.metadata['chunk_num']}"
                )

                # Write content file
                output_txt_path = destination / f"{base_output_filename}.txt"
                with open(output_txt_path, "w", encoding="utf-8") as f:
                    f.write(chunk_box.content + "\n")

                total_chunks_written += 1

                # Write metadata file if requested
                if metadata:
                    output_json_path = destination / f"{base_output_filename}.json"
                    with open(output_json_path, "w", encoding="utf-8") as f:
                        # Ensures metadata is a standard dict before dumping
                        data_to_dump = (
                            chunk_box.metadata.to_dict()
                            if hasattr(chunk_box.metadata, "to_dict")
                            else dict(chunk_box.metadata)
                        )
                        json.dump(data_to_dump, f, indent=4)

                processed_sources.add(source_name)

        message = (
            f"Successfully processed {len(processed_sources)} input(s)"
            f"and wrote {total_chunks_written} chunk file(s) to {destination}"
        )
        if metadata:
            message += " (with .json metadata files)."
        else:
            message += "."
        typer.echo(message)

    else:
        # This is the equivalent of the old `else:` block (stdout or single output_file)
        output_content = []

        chunk_counter = 0
        for res in all_results:
            for chunk_box in res:
                chunk_counter += 1
                output_content.append(f"## --- Chunk {chunk_counter} ---")
                output_content.append(chunk_box.content)
                output_content.append("")
                if metadata:
                    chunk_metadata = chunk_box.metadata.to_dict()
                    output_content.append("\n--- Metadata ---")  # Use a sub-header

                    for key, value in chunk_metadata.items():
                        # Use clean pipe formatting for terminal style tables
                        output_content.append(f"| {key}: {value}")

                    output_content.append("\n")

        output_str = "\n".join(output_content)

        if destination:
            destination.write_text(output_str, encoding="utf-8")
        else:
            typer.echo(output_str)

split_command

split_command(
    text: Optional[str] = typer.Argument(
        None,
        help="The input text to split. If not provided, --source must be used.",
    ),
    source: Optional[Path] = typer.Option(
        None,
        "--source",
        "-s",
        help="Path to a single file to read input from. Cannot be a directory or multiple files.",
    ),
    destination: Optional[Path] = typer.Option(
        None,
        "--destination",
        "-d",
        help="Path to a single file to write the segmented sentences (separated by \\n). Cannot be a directory.",
    ),
    lang: str = typer.Option(
        "auto",
        "--lang",
        help="Language of the text (e.g., 'en', 'fr', 'auto').",
    ),
    verbose: bool = typer.Option(
        False,
        "--verbose",
        "-v",
        help="Enable verbose logging.",
    ),
)

Split text or a single file into sentences using the SentenceSplitter.

Source code in src/chunklet/cli.py
@app.command(name="split", help="Splits text or a single file into sentences.")
def split_command(
    text: Optional[str] = typer.Argument(
        None, help="The input text to split. If not provided, --source must be used."
    ),
    source: Optional[Path] = typer.Option(
        None,
        "--source",
        "-s",
        help="Path to a single file to read input from. Cannot be a directory or multiple files.",
    ),
    destination: Optional[Path] = typer.Option(
        None,
        "--destination",
        "-d",
        help="Path to a single file to write the segmented sentences (separated by \\n). Cannot be a directory.",
    ),
    lang: str = typer.Option(
        "auto",
        "--lang",
        help="Language of the text (e.g., 'en', 'fr', 'auto').",
    ),
    verbose: bool = typer.Option(
        False, "--verbose", "-v", help="Enable verbose logging."
    ),
):
    """
    Split text or a single file into sentences using the SentenceSplitter.
    """
    # Validation and Input Acquisition
    provided_inputs = [arg for arg in [text, source] if arg is not None]

    if len(provided_inputs) == 0:
        typer.echo(
            "Error: No input provided. Please use a text argument or the --source option.",
            err=True,
        )
        raise typer.Exit(code=1)

    if len(provided_inputs) > 1:
        typer.echo(
            "Error: Provide either a text string, or use the --source option, but not both.",
            err=True,
        )
        raise typer.Exit(code=1)

    if source:
        # --- Source Constraints ---
        if source.is_dir():
            typer.echo(
                f"Error: Source path '{source}' cannot be a directory for the 'split' command.",
                err=True,
            )
            raise typer.Exit(code=1)
        if not source.is_file():
            typer.echo(
                f"Error: Source path '{source}' not found or is not a file.",
                err=True,
            )
            raise typer.Exit(code=1)

        try:
            input_text = source.read_text(encoding="utf-8")
        except Exception as e:
            typer.echo(f"Error reading source file: {e}", err=True)
            raise typer.Exit(code=1)
    else:
        input_text = text

    # --- Destination Constraint ---
    if destination and destination.is_dir():
        typer.echo(
            f"Error: Destination path '{destination}' cannot be a directory for the 'split' command.",
            err=True,
        )
        raise typer.Exit(code=1)

    # Split Logic
    splitter = SentenceSplitter(verbose=verbose)
    sentences, confidence = splitter.split(input_text, lang=lang)

    # Output Handling
    if destination:
        output_str = "\n".join(sentences)
        source_display = f"from {source.name}" if source else "(from stdin)"

        try:
            destination.write_text(output_str, encoding="utf-8")
            typer.echo(
                f"Successfully split and wrote {len(sentences)} sentences "
                f"{source_display} to {destination} (Confidence: {confidence})",
                err=True,
            )
        except Exception as e:
            typer.echo(f"Error writing to destination file: {e}", err=True)
            raise typer.Exit(code=1)
    else:
        source_display = f"Source: {source.name}" if source else "Source: stdin"

        typer.echo(
            f"--- Sentences ({len(sentences)}): "
            f" [{source_display} | Lang: {lang.upper()} | Confidence: {confidence}] ---"
        )

        for sentence in sentences:
            typer.echo(sentence)