Commit 26eb0c89277a43fc9e8809258eb2fcc1a9d88665

Authored by Justin Frank
1 parent 6556b8b227
Exists in master

Files can now be found on disk based on the csv

Showing 1 changed file with 33 additions and 6 deletions Side-by-side Diff

utils/consume.py View file @ 26eb0c8
... ... @@ -3,6 +3,8 @@
3 3 import logging
4 4 import dask.dataframe as dd
5 5 import pandas as pd
  6 +import sqlite3
  7 +import os
6 8 from watchdog.observers.polling import PollingObserver as Observer
7 9 from watchdog.events import FileSystemEventHandler
8 10  
9 11  
... ... @@ -144,12 +146,12 @@
144 146 # # Event is modified, youu\ can process it now
145 147  
146 148  
147   -def consume(path,line):
  149 +def consume(filePath, mntPath, line, db):
148 150 """Processes neurocube jobs by watching the gdfuse file system.
149 151  
150 152 Parameters
151 153 ----------
152   - path : str
  154 + filePath : str
153 155 The path of the file or directory to watch for changes.
154 156 line : int
155 157 If processing a csv file the last line in the frame that
... ... @@ -162,7 +164,7 @@
162 164 df = pd.DataFrame()
163 165 event_handler = MyCSVHandler(line, queue)
164 166 observer = Observer()
165   - observer.schedule(event_handler, path, recursive=True)
  167 + observer.schedule(event_handler, filePath, recursive=True)
166 168 observer.start()
167 169 try:
168 170 while True:
... ... @@ -170,7 +172,7 @@
170 172 currentJob = queue.dequeue()
171 173 if currentJob != None:
172 174 print(df)
173   - dummy_process(currentJob, df, path)
  175 + dummy_process(currentJob, df, mntPath, db, filePath)
174 176 except KeyboardInterrupt:
175 177 observer.stop()
176 178 observer.join()
... ... @@ -194,7 +196,7 @@
194 196 else:
195 197 return list(range(first,last))
196 198  
197   -def dummy_process(job, df, path):
  199 +def dummy_process(job, df, mntPath, db, filePath):
198 200 """Prints a row from the csv file.
199 201  
200 202 Parameters
201 203  
202 204  
... ... @@ -202,11 +204,16 @@
202 204 mountPoint : str
203 205 The directory to mount the google drive to
204 206 """
205   - df = pd.read_csv(path)
  207 + df = pd.read_csv(filePath)
206 208 row = get_row(job,df)
  209 + gdUrl = row['magicVal'] #TODO: this column name needs to be added to the csv MANUALLY
  210 + GId = parse_gdUrl(gdUrl)
  211 + print(GId)
  212 + path = gid_to_path(GId, mntPath, db)
207 213 print('dummy: ')
208 214 print(job)
209 215 print(row)
  216 + print(path)
210 217  
211 218 def get_row(job,df):
212 219 """Prints a row from the csv file.
... ... @@ -217,4 +224,24 @@
217 224 The directory to mount the google drive to
218 225 """
219 226 return df.iloc[job]
  227 +
  228 +def gid_to_path(GId, mntPath, db):
  229 + conn = sqlite3.connect(db)
  230 + c = conn.cursor()
  231 + relPath = get_relPath(GId, c)
  232 + path = os.path.join(mntPath, relPath)
  233 + conn.close()
  234 + return path
  235 +
  236 +def get_relPath(GId, c):
  237 + while True:
  238 + c.execute("""SELECT path FROM resource WHERE remote_id=:remote_id""", {'remote_id': GId})
  239 + relPath = c.fetchone()
  240 + if relPath != None:
  241 + break
  242 + print('miss')
  243 + return relPath[0]
  244 +
  245 +def parse_gdUrl(gdUrl):
  246 + return gdUrl.split("id=",1)[1]