A lightweight MapReduce micro-framework for word counting, implemented in Python.
- Clone this repository and navigate to the root of the project:
git clone https://github.com/ggcr/py-mapreduce.git
cd py-mapreduce- Create an environment and install dependencies:
conda create -n pymapreduce python=3.10 -y
conda activate pymapreduce
pip install requestsThere are two ways to run the framework:
- In one terminal window, start the workers:
python3 -m src.http_worker -N 30 -M 30- In another terminal window, start the driver:
python3 -m src.main -N 30 -M 30 inputs/*.txtSimply run the driver, which will start the workers automatically:
python3 -m src.main -N 30 -M 30 inputs/*.txtNote
The framework will create a maximum of max(N, M) workers. These workers are reused for both map and reduce tasks, optimizing resource usage.
For example, if N=30 and M=20, a total of 30 workers will be created and reused for both phases of the MapReduce process.
Due to the constraint of only being able to send metadata between the driver and workers, the driver splits input files into N chunks and stores them in an intermediate directory at files/chunks/. This approach allows for the distribution of work across workers while mantaining the requirement of metadata-only transfer.
├── files
│ ├── chunks
│ │ ├── chunk_0.pkl
│ │ ├── ...
│ │ └── chunk_N.pkl
│ ├── intermediate
│ │ ├── mr-0-1
│ │ ├── ...
│ │ └── mr-N-M
│ └── out
│ ├── out-0
│ ├── ...
│ └── out-M
├── inputs
│ ├── pg-being_ernest.txt
│ ├── pg-dorian_gray.txt
│ └── ....
└── src
├── driver.py
├── http_worker.py
├── main.py
├── utils.py
└── worker.py
The framework uses a simple HTTP-based communication protocol between the driver and workers. Here's an example sequence for N=4 and M=3:
One nice feature of this framework is the ability to dynamically create workers when needed. This is particularly useful when a worker is not responding or has not been started. Here's how it works:
def map_worker(self, n: int, chunk_path: str, retries: int = 0) -> None:
try:
payload = {
'n': n, 'M': self.M, 'chunk': chunk_path,
'BUCKETS_PARENT_PATH': self.BUCKETS_PARENT_PATH,
}
response = requests.post(url=f"{self.WORKERS_URL}:{8000 + n + 1}/map", json=payload)
response.raise_for_status() # checks that res.status_code == 200 (OK)
except requests.exceptions.ConnectionError:
print(f"[DRIVER] HTTP Worker {self.WORKERS_URL}:{8000 + n + 1} is not up. Attempting retry {retries + 1}")
process = subprocess.Popen([sys.executable, '-m', 'src.http_worker', '-id', str(n + 1)])
if retries < 5:
time.sleep(1)
self.map_worker(n, chunk_path, retries + 1) # recursiveThis method attempts to send a map task to a worker. If the worker is not responding (ConnectionError), the driver will:
- Log the failure and retry attempt.
- Spawn a new worker process using
subprocess.Popen. - Wait for 1 second to allow the worker to start up.
- Retry the map task, up to a maximum of 5 attempts.
This approach ensures that the framework can recover from worker failures or missing workers, making it more robust and easier to use. Users don't need to worry about starting all workers manually - the system will create them as needed.
Several areas for improvement:
-
State Reset Method: The
reset_state()method removes all thefiles/sub-directories in between subsequent runs to avoid having incorrect results for subsequent runs with different N and M values or very small inputs, as it may reuse previous execution's intermediate results. -
Dynamic Worker Creation: The recursive retry mechanism for creating workers doesn't accurately determine how long to wait for the subprocess to start. Implementing a socket-based communication system would provide more reliable worker initialization from the Driver.
-
File Chunking: Creating N chunks and storing them in the filesystem may be inefficient, especially for large files, despite the usage of serialization.
For more details on the implementation, please refer to the source code in the src directory.