diff --git a/utils/consume.py b/utils/consume.py index 707d4d0..f2ede2b 100644 --- a/utils/consume.py +++ b/utils/consume.py @@ -63,45 +63,85 @@ class MyCSVHandler(FileSystemEventHandler): """ currentRow = 0 queue = MyQueue() - currentDf = pd.DataFrame() - def __init__(self, row, queue, df): + def __init__(self, row, queue): + """Initialize the myCsv handler. + + Parameters + ---------- + row : int or list of int + The index of the first row to begin processing at + queue : MyQueue + The queue in which to store row indexes in + """ self.currentRow = row self.queue = queue - self.currentDf = df def on_created(self, event): + """Prints the file path if the file being watched for is created. + + Parameters + ---------- + event : event + The event object representing the event that triggered the handler. + """ print("Watchdog received created event - % s." % event.src_path) # Event is created, you can process it now def on_modified(self, event): + """Prints the file path if the file being watched for is modified and adds the indecies of all + rows in the csv including and after "self.currentRow". + + Parameters + ---------- + event : event + The event object representing the event that triggered the handler. + """ print("Watchdog received modified event - % s." % event.src_path) - self.currentDf = pd.read_csv(event.src_path) - lastRow = self.currentDf.shape[0] - #TODO: generate a list of row numbers from self.current row+1 to the last row in the df + currentDf = pd.read_csv(event.src_path) + lastRow = currentDf.shape[0] rowSpan = get_row_span(self.currentRow, lastRow) - #TODO: put this list of numbers in the queue - print('rowspan: ') - print(rowSpan) + #print('rowspan: ') + #print(rowSpan) self.queue.enqueue(rowSpan) - #TODO: update self.current row self.currentRow = lastRow # Event is modified, youu\ can process it now -class MyZipHandler(FileSystemEventHandler): - queue = [] +# class MyZipHandler(FileSystemEventHandler): +# queue = [] - def __init__(self, row, queue): - self.queue = queue +# def __init__(self, queue): +# """Initialize the MyZipHandler. - def on_created(self, event): - print("Watchdog received created event - % s." % event.src_path) - # Event is created, you can process it now +# Parameters +# ---------- +# queue : MyQueue +# The queue in which to store row indexes in +# """ +# self.queue = queue - def on_modified(self, event): - print("Watchdog received modified event - % s." % event.src_path) - self.queue.append(event.src_path) - # Event is modified, youu\ can process it now +# def on_created(self, event): +# """Prints the file path if the file being watched for is created and adds the indecies of all. + +# Parameters +# ---------- +# event : event +# The event object representing the event that triggered the handler. +# """ +# print("Watchdog received created event - % s." % event.src_path) +# # Event is created, you can process it now + +# def on_modified(self, event): +# """Prints the file path if the file being watched is modified and adds the path to a queue. + +# Parameters +# ---------- +# event : event +# The event object representing the event that triggered the handler. +# """ +# print("Watchdog received modified event - % s." % event.src_path) +# self.queue.append(event.src_path) +# # Event is modified, youu\ can process it now def consume(path,line): @@ -120,13 +160,13 @@ def consume(path,line): """ queue = MyQueue() df = pd.DataFrame() - event_handler = MyCSVHandler(line, queue, df) + event_handler = MyCSVHandler(line, queue) observer = Observer() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: - time.sleep(1) + time.sleep(1) #This may not be needed currentJob = queue.dequeue() if currentJob != None: print(df) @@ -169,4 +209,11 @@ def dummy_process(job, df, path): print(row) def get_row(job,df): + """Prints a row from the csv file. + + Parameters + ---------- + mountPoint : str + The directory to mount the google drive to + """ return df.iloc[job]