From 232ba008961e5533c87ba41f8a5e51ce855d536e Mon Sep 17 00:00:00 2001 From: Justin Frank Date: Sat, 26 Jun 2021 15:18:42 -0500 Subject: [PATCH] csv handler appears to work --- utils/consume.py | 120 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 91 insertions(+), 29 deletions(-) diff --git a/utils/consume.py b/utils/consume.py index 15d6223..0d997b4 100644 --- a/utils/consume.py +++ b/utils/consume.py @@ -1,17 +1,57 @@ import sys import time import logging -import dask +import dask.dataframe as dd import pandas as pd -from watchdog.observers import Observer +from watchdog.observers.polling import PollingObserver as Observer from watchdog.events import FileSystemEventHandler +class MyQueue(object): + """ + A queue for handling integers. + + ... + + Attributes + ---------- + queue : list + A list of integers representing rows in a dataframe. + """ + queue = [] + + def enqueue(self, element): + """Enque an element into the queue. + + Parameters + ---------- + element : int or list of int + If an int it is the element to be enqued into the queue + if a list of ints the list is appended to the queue + """ + if isinstance(element, list): + self.queue.extend(element) + else: + self.queue.append(element) + + def dequeue(self): + """Dequeue an element from the queue. + + Returns + ---------- + int or list of int + the zeroth element in the queue list + """ + if len(self.queue) == 0: + return None + else: + return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element + class MyCSVHandler(FileSystemEventHandler): - current_row = 0 - self.queue = [] + currentRow = 0 + queue = MyQueue() def __init__(self, row, queue): - self.current_row = row + self.currentRow = row self.queue = queue def on_created(self, event): @@ -20,18 +60,20 @@ class MyCSVHandler(FileSystemEventHandler): def on_modified(self, event): print("Watchdog received modified event - % s." % event.src_path) - currentDf = dask.read_csv(event.src_path) + currentDf = dd.read_csv(event.src_path) lastRow = currentDf.shape[0].compute() #TODO: generate a list of row numbers from self.current row+1 to the last row in the df - rowSpan = get_row_span(self.current_row, lastRow) + rowSpan = get_row_span(self.currentRow, lastRow) #TODO: put this list of numbers in the queue - queue.extend(rowSpan) + print('rowspan: ') + print(rowSpan) + self.queue.enqueue(rowSpan) #TODO: update self.current row self.currentRow = lastRow # Event is modified, youu\ can process it now class MyZipHandler(FileSystemEventHandler): - self.queue = [] + queue = [] def __init__(self, row, queue): self.queue = queue @@ -42,34 +84,33 @@ class MyZipHandler(FileSystemEventHandler): def on_modified(self, event): print("Watchdog received modified event - % s." % event.src_path) - queue.append(event.src_path) + self.queue.append(event.src_path) # Event is modified, youu\ can process it now -class MyQueue(object): - self.queue = [] - - def enqueue(self, element): - if isinstance(element, list): - self.queue.extend(element) - else: - self.queue.append(element) - def dequeue(self): - if len(self.queue) == 0: - return None - else: - return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element - def consume(path,line): - queue = [] - event_handler = MyZipHandler(line, queue) + """Processes neurocube jobs by watching the gdfuse file system. + + Parameters + ---------- + path : str + The path of the file or directory to watch for changes. + line : int + If processing a csv file the last line in the frame that + has already been processed. + Notes + ----- + Ctrl-C will terminate the function. + """ + queue = MyQueue() + event_handler = MyCSVHandler(line, queue) observer = Observer() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) - currentJob = queue.deque() + currentJob = queue.dequeue() if currentJob != None: dummy_process(currentJob) except KeyboardInterrupt: @@ -77,10 +118,31 @@ def consume(path,line): observer.join() def get_row_span(first,last): - if first+1 == last: + """Returns a list of integers (first,last]. + + Parameters + ---------- + first : int + the integer before the first element in the list. + last : int + the last element in the list + Returns + ------- + list of int + a list of integers spanning the next integer greeater than first to last + """ + if first == last: return [last] else: - return list(range(first+1,last)) + return list(range(first,last)) def dummy_process(path): + """Mounts a google drive. + + Parameters + ---------- + mountPoint : str + The directory to mount the google drive to + """ + print('dummy: ') print(path) -- 1.9.1