Asynchronous File Processing

Asynchronous file reading optimizes the computational speed of a file reading via parallel or concurrent programming paradigms. There are a few approaches in the realm of parallel programming, such as using [Multiprocessing](https://docs.python.org/3/library/multiprocessing.html) and concurrency-based approaches, including [threads](https://docs.python.org/3/library/threading.html) and [asyncio](https://docs.python.org/3/library/asyncio.html). The asynchronous file reading feature is supported by an off-the-shelf [aiofiles](https://pypi.org/project/aiofiles/) library which is outside the scope of our writing. Hence, we split the file into several smaller CSV files and read them concurrently. We have Asyncio as the base for our solution. ### Source code + [Version 1](https://gist.github.com/kenluck2001/9e3630c5a8f394bd2f7622c033cb0092#file-asyncfilev1-py) + [Version 2](https://gist.github.com/kenluck2001/9e3630c5a8f394bd2f7622c033cb0092#file-asyncfilev2-py) + [Dataset](https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD) ### System Setup The [source code](https://gist.github.com/kenluck2001/9e3630c5a8f394bd2f7622c033cb0092) tested on the system with the following characteristics: + Ubuntu 24.04 + Python 3.12 # Method The first approach in solving a problem with multiprogramming is to determine if the problem is CPU-bound or I/O bound. A rule of thumb to decide is to figure out if the core bottleneck of your serial program is spending time with number crunching, then your problem is CPU-bound. On the contrary, when our serial program has the computational bottleneck of read/write operations. The problem of parallel file reading is mainly I/O-bound, rather than CPU-bound. Asyncio is a cooperative scheduling of threads to solve a problem. This async/await pattern of asyncio is ideal for I/O-bound tasks. [Multiprocessing](https://docs.python.org/3/library/multiprocessing.html) is limited by the number of available CPUs for parallelizing a task. Asynchio uses careful slicing to run multiple threads within a process, while [Multiprocessing](https://docs.python.org/3/library/multiprocessing.html) maps a process to a CPU (single process per core). # Issues + Our solution in [Version 1](https://gist.github.com/kenluck2001/9e3630c5a8f394bd2f7622c033cb0092#file-asyncfilev1-py) did not consider deleting split files. There are several approaches to removing these stale secondary files derived from chunking the original files into a set of ancillary files for concurrent processing. One approach is to use the temporary files provided by Python. However, it is problematic as there is no guarantee that the temporary files get deleted using the lifecycle of the instance of the FileContent class via the destructor. Another approach is to prevent potential race conditions by using unique filenames for derived files, which still has not solved the deletion problem. + In the `processBatch` method of the FileContent class, we optimize for space complexity by using the 'readlines` method of the file object with a predefined buffer size to avoid reading huge bulk of data that can't fit in memory at a time. Hence, we incrementally load chunks of the data. ``` async def processBatch(self, config_file_name: str) -> int: """ Given a file name extract every content in the file """ total_record_count: int = 0 cnt: int = 0 with open(config_file_name, "r") as csvfile: # simplify file reading with an implicit generator lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) while lines: lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) return total_record_count ``` + Furthermore, we adopted the approach of catching exceptions, as reading files is an error-prone activity, as demonstrated in the snippet. We have used the argument `return_exceptions` to alter the execution logic. ``` async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = True) -> int: """ Asynchronously read the files. """ tasks = await self.__createTaskList(config_file_name) file_paths = self.splitCSVToMultipleFiles(config_file_name) for file_name in file_paths: tasks.append(self.processBatch(file_name)) count_lst = [ v for v in await asyncio.gather(*tasks, return_exceptions=True) if not isinstance(v, Exception) ] # recreate a task for use and testing if not return_exceptions: tasks = await self.__createTaskList(config_file_name) count_lst = await asyncio.gather(*tasks) return sum(count_lst) ``` + One must avoid mixing [Multiprocessing](https://docs.python.org/3/library/multiprocessing.html) with asyncio as shown in the first part of the snippet below. ``` from multiprocessing import cpu_count from multiprocessing.dummy import Pool from functools import partial async def load_file_contents_using_tasks(config_file_name: str) -> None: ''' DON'T MIX ASYNCIO AND MULTIPROCESSING AS IT MAKES NO SENSE TO CONSIDER I/O BOUND TASKS AS CPU-BOUND TASK ''' file_paths = splitCSVToMultipleFiles(config_file_name) # restrict the number of split files to the number of available CPU total_record_count = 0 available_num_cores = cpu_count() # Don't take all system resources with Pool(processes=available_num_cores) as pool: for record_cnt in pool.map(partial(processBatch), file_paths): total_record_count += record_cnt print(f"Total number of Inserted {total_record_count} records into database") if __name__ == "__main__": # Don't do this asyncio.run(load_file_contents_using_tasks(csvfile)) # instead do load_file_contents_using_tasks(csvfile) ``` # Remedies We took a two-step approach to address the problem by explicitly destroying derived files from the original file in the destructor. We provide side effects in the shared state of the parent_file_lst field of the FileContent class by using a [Context Manager](https://book.pythontips.com/en/latest/context_managers.html). This step helps to prevent stale files and manage the lifecycle of instances of the FileContent class. Hence, we present our solution in [Version 2](https://gist.github.com/kenluck2001/9e3630c5a8f394bd2f7622c033cb0092#file-asyncfilev2-py). # Evaluation The output from executing the program is displayed in the snippet below. ``` ~/asyncfile$ python3 asyncfilev2.py [ASYNC RUNS: ] Average Execution time: 211.65 seconds for 990294 records. [SYNCHRONOUS RUNS: ] Average Execution time: 41.82 seconds for 990294 records. ~/asyncfile$ ``` The [dataset](https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD) used for our evaluation is a [CSV](https://catalog.data.gov/dataset/crime-data-from-2020-to-present) of crime gathered by the Los Angeles Police Department. For consistency, we have used similar shared settings across synchronous and asynchronous file readings to allow for comparisons. I assumed the problem was I/O-bound because I thought a counter is simple in computational complexity. We observed that when an exception gets swallowed, some rows of the CSV file get skipped as asyncio seems slower when we don't return the exception as captured in the argument named `return_exceptions` in load_file_contents_using_tasks of FileContent class. Unfortunately, we experienced a fivefold slowdown on the async version of file reading, as shown by the timing of the runs. This observation follows as the serial version of the file reading is faster than the async version because time slicing in asyncio spends unnecessary time with opening/closing file overhead and saving the states during thread interrupt. The performance degradation when using [asyncio](https://docs.python.org/3/library/asyncio.html) as the number of threads keeps increasing due to rising communication overhead. [Multiprocessing](https://docs.python.org/3/library/multiprocessing.html) is a better option for parallelizing our task. Although the problem looks like an I/O-bound operation in an ideal world should be faster, the bookkeeping operation can be expensive during thread preemption. # Conclusions This work has allowed me to learn [asyncio](https://docs.python.org/3/library/asyncio.html) and how to apply it in the real world. In retrospect, maybe our experiment was not a failure after all. When asyncio gets utilized in the appropriate scenario, it becomes a valuable addition to the programmer's toolbox. We demonstrate the usability of parallel file reading to the problem of CSV processing. However, it is ubiquitous in many other uses. We aimed for pedagogy and not performant source code.

previous here

1/16

next here

Please feel free to donate to support my work by clicking donate here