Commit e548365204efb995d1b9fcb7fedeb6ac48edb037

Authored by Justin Frank
1 parent 232ba00896
Exists in master

csv rows can be parsed now

Showing 1 changed file with 32 additions and 8 deletions Side-by-side Diff

utils/consume.py View file @ e548365
... ... @@ -47,12 +47,28 @@
47 47 return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element
48 48  
49 49 class MyCSVHandler(FileSystemEventHandler):
  50 + """
  51 + An event handler to watch the csv.
  52 +
  53 + ...
  54 +
  55 + Attributes
  56 + ----------
  57 + currentRow : int
  58 + The next row to be processed
  59 + queue : list
  60 + A list of integers representing rows in a dataframe.
  61 + currentDf : DataFrame
  62 + The csv as it was last read in
  63 + """
50 64 currentRow = 0
51 65 queue = MyQueue()
  66 + currentDf = pd.DataFrame()
52 67  
53   - def __init__(self, row, queue):
  68 + def __init__(self, row, queue, df):
54 69 self.currentRow = row
55 70 self.queue = queue
  71 + self.currentDf = df
56 72  
57 73 def on_created(self, event):
58 74 print("Watchdog received created event - % s." % event.src_path)
... ... @@ -60,8 +76,8 @@
60 76  
61 77 def on_modified(self, event):
62 78 print("Watchdog received modified event - % s." % event.src_path)
63   - currentDf = dd.read_csv(event.src_path)
64   - lastRow = currentDf.shape[0].compute()
  79 + self.currentDf = pd.read_csv(event.src_path)
  80 + lastRow = self.currentDf.shape[0]
65 81 #TODO: generate a list of row numbers from self.current row+1 to the last row in the df
66 82 rowSpan = get_row_span(self.currentRow, lastRow)
67 83 #TODO: put this list of numbers in the queue
... ... @@ -103,7 +119,8 @@
103 119 Ctrl-C will terminate the function.
104 120 """
105 121 queue = MyQueue()
106   - event_handler = MyCSVHandler(line, queue)
  122 + df = pd.DataFrame()
  123 + event_handler = MyCSVHandler(line, queue, df)
107 124 observer = Observer()
108 125 observer.schedule(event_handler, path, recursive=True)
109 126 observer.start()
... ... @@ -112,7 +129,8 @@
112 129 time.sleep(1)
113 130 currentJob = queue.dequeue()
114 131 if currentJob != None:
115   - dummy_process(currentJob)
  132 + print(df)
  133 + dummy_process(currentJob, df, path)
116 134 except KeyboardInterrupt:
117 135 observer.stop()
118 136 observer.join()
119 137  
120 138  
... ... @@ -136,14 +154,20 @@
136 154 else:
137 155 return list(range(first,last))
138 156  
139   -def dummy_process(path):
140   - """Mounts a google drive.
  157 +def dummy_process(job, df, path):
  158 + """Prints a row from the csv file.
141 159  
142 160 Parameters
143 161 ----------
144 162 mountPoint : str
145 163 The directory to mount the google drive to
146 164 """
  165 + df = pd.read_csv(path)
  166 + row = get_row(job,df)
147 167 print('dummy: ')
148   - print(path)
  168 + print(job)
  169 + print(row)
  170 +
  171 +def get_row(job,df):
  172 + return df.iloc[job]