PHP MapReduce

1 hour ago 2

A high-performance, framework-agnostic MapReduce implementation for PHP that processes large datasets using parallel workers and disk-based storage.

PHP Version License

  • Process datasets larger than RAM using memory-efficient disk storage
  • Stream iterators without loading into memory - process large files, database cursors, and API responses efficiently
  • Utilize multiple CPU cores for true parallel processing
  • Framework-agnostic - works with any PHP project
  • Handle millions of records with predictable memory usage
composer require spiritinlife/php-mapreduce

Requirements: PHP 8.1+

Word count example:

use Spiritinlife\MapReduce\MapReduceBuilder; $documents = [ 'doc1' => 'the quick brown fox', 'doc2' => 'the lazy dog', 'doc3' => 'quick brown dogs', ]; $results = (new MapReduceBuilder()) ->input($documents) ->map(function ($text) { foreach (str_word_count(strtolower($text), 1) as $word) { yield [$word, 1]; } }) ->reduce(function ($word, $counts) { return array_sum($counts); }) ->execute(); // Results are streamed as a generator - iterate to process results foreach ($results as $word => $count) { // Output: 'quick' => 2, 'brown' => 2, 'the' => 2, ... }

MapReduce processes data in three parallel phases:

  1. Map Phase - Input is split and processed in parallel. Each worker emits key-value pairs to partitioned temporary files.
  2. Shuffle Phase - Partitions are sorted and grouped in parallel using memory-efficient external sorting.
  3. Reduce Phase - Partitions are processed in parallel, aggregating values by key.
Input → [Parallel Map] → [Parallel Shuffle & Sort] → [Parallel Reduce] → Results

MapReduce accepts any iterable as input - arrays, generators, iterators, or custom iterables.

💡 True Streaming: Large files, database cursors, and API paginations are consumed as they're produced - never loading the entire dataset into memory.

Arrays and Basic Iterables

// Simple array ->input(['a', 'b', 'c']) // Associative array ->input(['doc1' => 'text', 'doc2' => 'more text']) // ArrayIterator ->input(new ArrayIterator($data))
// Read file line by line (memory-efficient) ->input(new SplFileObject('large-file.txt')) // Generator from file ->input((function() { $handle = fopen('data.csv', 'r'); while (($line = fgets($handle)) !== false) { yield $line; } fclose($handle); })())
// Parse CSV rows ->input((function() { $file = fopen('data.csv', 'r'); fgetcsv($file); // Skip header while (($row = fgetcsv($file)) !== false) { yield [ 'name' => $row[0], 'value' => $row[1], ]; } fclose($file); })())
// Process newline-delimited JSON ->input((function() { $file = fopen('data.jsonl', 'r'); while (($line = fgets($file)) !== false) { yield json_decode($line, true); } fclose($file); })())
// PDO result set $stmt = $pdo->query('SELECT * FROM large_table'); ->input($stmt) // Generator for memory efficiency ->input((function() use ($pdo) { $stmt = $pdo->prepare('SELECT * FROM users'); $stmt->execute(); while ($row = $stmt->fetch()) { yield $row; } })())
// Process all files in a directory ->input(new RecursiveIteratorIterator( new RecursiveDirectoryIterator('/path/to/logs') )) // Custom file filter ->input((function() { $dir = new RecursiveDirectoryIterator('/path/to/data'); $iterator = new RecursiveIteratorIterator($dir); foreach ($iterator as $file) { if ($file->isFile() && $file->getExtension() === 'log') { yield $file->getPathname() => file_get_contents($file); } } })())

Set the input data - any array or iterable.

->input($documents) ->input(new ArrayIterator($data))

Define the transformation function. Receives input values and must yield [$key, $value] pairs.

->map(function ($value) { // Process and emit key-value pairs yield [$newKey, $newValue]; })

reduce(callable $reducer)

Define the aggregation function. Receives all values for each key.

->reduce(function ($key, array $values) { return array_sum($values); })

Run the job and return results as a generator

Configuration Methods (Optional)

Number of parallel workers. Default: 4

->concurrent(8) // Use 8 CPU cores

Number of reduce partitions. Default: same as concurrent()

->partitions(16) // More parallelism in reduce phase

Tip: Use 1-2x concurrency level. More partitions = better parallelism but more overhead.

Controls: Memory usage during shuffle phase sorting. Default: 10000

Determines how many records accumulate in memory before being sorted and written as a chunk file during external sorting. This only affects the shuffle phase.

->chunkSize(50000) // High-memory system ->chunkSize(2000) // Memory-constrained

Memory impact: chunkSize × average_record_size bytes per sort operation

Controls: I/O write buffering across all phases (map, shuffle, reduce). Default: 1000

Determines how many records are buffered in memory before flushing to disk. Affects the frequency of fwrite() system calls. Used by all phases, not just shuffle.

->bufferSize(5000) // Large datasets, plenty of RAM ->bufferSize(500) // Memory-constrained

Memory impact: bufferSize × average_record_size bytes per writer (multiple writers in map phase)

Guidelines:

  • Small datasets (<10K): 100-500
  • Medium datasets (10K-1M): 1000-5000
  • Large datasets (>1M): 5000-10000

💡 Why Two Parameters?

  • chunkSize controls sorting granularity (shuffle only) - larger values create fewer chunk files to merge
  • bufferSize controls write batching (all phases) - larger values reduce system calls

These are independent memory concerns. For example, in the shuffle phase, a 10,000-record chunk will be written in ~10 buffer flushes if bufferSize=1000.

workingDirectory(string $path)

Directory for temporary files. Default: system temp

->workingDirectory('/mnt/fast-ssd/tmp')

Tip: Use SSD storage for better performance on large datasets.

partitionBy(callable $partitioner)

Custom function to control which partition a key goes to. Must return 0 to partitions-1.

->partitionBy(function ($key, $numPartitions) { return ord($key[0]) % $numPartitions; // Partition by first letter })
$sales = [ ['product' => 'Widget', 'amount' => 100], ['product' => 'Gadget', 'amount' => 150], ['product' => 'Widget', 'amount' => 200], ]; $totals = (new MapReduceBuilder()) ->input($sales) ->map(fn($sale) => yield [$sale['product'], $sale['amount']]) ->reduce(fn($product, $amounts) => [ 'total' => array_sum($amounts), 'average' => array_sum($amounts) / count($amounts), 'count' => count($amounts), ]) ->execute(); foreach ($totals as $product => $stats) { echo "{$product}: {$stats['total']}\n"; }

Building an Inverted Index

$documents = [ ['id' => 1, 'text' => 'the quick brown fox'], ['id' => 2, 'text' => 'the lazy dog'], ['id' => 3, 'text' => 'quick brown animals'], ]; $invertedIndex = (new MapReduceBuilder()) ->input($documents) ->map(function ($doc) { foreach (array_unique(str_word_count(strtolower($doc['text']), 1)) as $word) { yield [$word, $doc['id']]; } }) ->reduce(fn($word, $docIds) => [ 'documents' => array_unique($docIds), 'frequency' => count($docIds), ]) ->execute(); foreach ($invertedIndex as $word => $index) { echo "{$word}: appears in {$index['frequency']} documents\n"; }
$stats = (new MapReduceBuilder()) ->input(file('access.log')) ->map(function ($line) { if (preg_match('/^(\S+).*?"GET (\S+).*?" (\d+)/', $line, $m)) { yield ["ip:{$m[1]}", 1]; yield ["status:{$m[3]}", 1]; } }) ->reduce(fn($key, $counts) => array_sum($counts)) ->concurrent(8) ->execute(); foreach ($stats as $key => $count) { echo "{$key}: {$count}\n"; }
(new MapReduceBuilder()) ->concurrent(8) // CPU cores (or 2-3x for I/O tasks) ->partitions(16) // 1-2x concurrency ->chunkSize(50000) // Shuffle sort memory (larger = fewer merges) ->bufferSize(5000) // Write buffer across all phases (larger = fewer syscalls) ->workingDirectory('/ssd') // Use fast storage for large jobs

Low Memory System (2GB RAM):

->chunkSize(2000) // Small sort chunks ->bufferSize(500) // Small write buffers

High Performance System (SSD, 32GB RAM):

->chunkSize(100000) // Large sort chunks for fast sorting ->bufferSize(10000) // Large buffers to minimize disk I/O ->workingDirectory('/mnt/fast-ssd/tmp')

Tuning independently:

  • Slow disk? Increase bufferSize to batch more writes
  • Limited RAM during shuffle? Decrease chunkSize to reduce sort memory
  • Many concurrent map workers? Decrease bufferSize (memory = workers × partitions × bufferSize)

Tested on Apple M2 Pro (10-core), 16GB RAM, SSD:

Records Concurrency Time Memory
10K 4 0.3s 2MB
100K 4 2.9s 6MB
1M 8 33s 8MB
10M 8 6m12s 73MB

Run your own benchmarks:

php bin/benchmark-readme.php

Run tests:

composer test # Run test suite composer test-coverage # With coverage report composer analyse # Static analysis (PHPStan) composer cs-check # Code style check (PSR-12)
  • Single machine only (not a distributed cluster)
  • Requires disk space for intermediate files
  • Process spawning overhead makes it inefficient for tiny datasets (<1000 records)

MIT License - see LICENSE file

Read Entire Article