diff --git a/utils/consume.py b/utils/consume.py index 0d997b4..707d4d0 100644 --- a/utils/consume.py +++ b/utils/consume.py @@ -47,12 +47,28 @@ class MyQueue(object): return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element class MyCSVHandler(FileSystemEventHandler): + """ + An event handler to watch the csv. + + ... + + Attributes + ---------- + currentRow : int + The next row to be processed + queue : list + A list of integers representing rows in a dataframe. + currentDf : DataFrame + The csv as it was last read in + """ currentRow = 0 queue = MyQueue() + currentDf = pd.DataFrame() - def __init__(self, row, queue): + def __init__(self, row, queue, df): self.currentRow = row self.queue = queue + self.currentDf = df def on_created(self, event): print("Watchdog received created event - % s." % event.src_path) @@ -60,8 +76,8 @@ class MyCSVHandler(FileSystemEventHandler): def on_modified(self, event): print("Watchdog received modified event - % s." % event.src_path) - currentDf = dd.read_csv(event.src_path) - lastRow = currentDf.shape[0].compute() + self.currentDf = pd.read_csv(event.src_path) + lastRow = self.currentDf.shape[0] #TODO: generate a list of row numbers from self.current row+1 to the last row in the df rowSpan = get_row_span(self.currentRow, lastRow) #TODO: put this list of numbers in the queue @@ -103,7 +119,8 @@ def consume(path,line): Ctrl-C will terminate the function. """ queue = MyQueue() - event_handler = MyCSVHandler(line, queue) + df = pd.DataFrame() + event_handler = MyCSVHandler(line, queue, df) observer = Observer() observer.schedule(event_handler, path, recursive=True) observer.start() @@ -112,7 +129,8 @@ def consume(path,line): time.sleep(1) currentJob = queue.dequeue() if currentJob != None: - dummy_process(currentJob) + print(df) + dummy_process(currentJob, df, path) except KeyboardInterrupt: observer.stop() observer.join() @@ -136,13 +154,19 @@ def get_row_span(first,last): else: return list(range(first,last)) -def dummy_process(path): - """Mounts a google drive. +def dummy_process(job, df, path): + """Prints a row from the csv file. Parameters ---------- mountPoint : str The directory to mount the google drive to """ + df = pd.read_csv(path) + row = get_row(job,df) print('dummy: ') - print(path) + print(job) + print(row) + +def get_row(job,df): + return df.iloc[job]