Commit 232ba008961e5533c87ba41f8a5e51ce855d536e

Authored by Justin Frank
1 parent 0db7db210b
Exists in master

csv handler appears to work

Showing 1 changed file with 91 additions and 29 deletions Side-by-side Diff

utils/consume.py View file @ 232ba00
1 1 import sys
2 2 import time
3 3 import logging
4   -import dask
  4 +import dask.dataframe as dd
5 5 import pandas as pd
6   -from watchdog.observers import Observer
  6 +from watchdog.observers.polling import PollingObserver as Observer
7 7 from watchdog.events import FileSystemEventHandler
8 8  
  9 +class MyQueue(object):
  10 + """
  11 + A queue for handling integers.
  12 +
  13 + ...
  14 +
  15 + Attributes
  16 + ----------
  17 + queue : list
  18 + A list of integers representing rows in a dataframe.
  19 + """
  20 + queue = []
  21 +
  22 + def enqueue(self, element):
  23 + """Enque an element into the queue.
  24 +
  25 + Parameters
  26 + ----------
  27 + element : int or list of int
  28 + If an int it is the element to be enqued into the queue
  29 + if a list of ints the list is appended to the queue
  30 + """
  31 + if isinstance(element, list):
  32 + self.queue.extend(element)
  33 + else:
  34 + self.queue.append(element)
  35 +
  36 + def dequeue(self):
  37 + """Dequeue an element from the queue.
  38 +
  39 + Returns
  40 + ----------
  41 + int or list of int
  42 + the zeroth element in the queue list
  43 + """
  44 + if len(self.queue) == 0:
  45 + return None
  46 + else:
  47 + return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element
  48 +
9 49 class MyCSVHandler(FileSystemEventHandler):
10   - current_row = 0
11   - self.queue = []
  50 + currentRow = 0
  51 + queue = MyQueue()
12 52  
13 53 def __init__(self, row, queue):
14   - self.current_row = row
  54 + self.currentRow = row
15 55 self.queue = queue
16 56  
17 57 def on_created(self, event):
18 58  
19 59  
20 60  
... ... @@ -20,18 +60,20 @@
20 60  
21 61 def on_modified(self, event):
22 62 print("Watchdog received modified event - % s." % event.src_path)
23   - currentDf = dask.read_csv(event.src_path)
  63 + currentDf = dd.read_csv(event.src_path)
24 64 lastRow = currentDf.shape[0].compute()
25 65 #TODO: generate a list of row numbers from self.current row+1 to the last row in the df
26   - rowSpan = get_row_span(self.current_row, lastRow)
  66 + rowSpan = get_row_span(self.currentRow, lastRow)
27 67 #TODO: put this list of numbers in the queue
28   - queue.extend(rowSpan)
  68 + print('rowspan: ')
  69 + print(rowSpan)
  70 + self.queue.enqueue(rowSpan)
29 71 #TODO: update self.current row
30 72 self.currentRow = lastRow
31 73 # Event is modified, youu\ can process it now
32 74  
33 75 class MyZipHandler(FileSystemEventHandler):
34   - self.queue = []
  76 + queue = []
35 77  
36 78 def __init__(self, row, queue):
37 79 self.queue = queue
38 80  
39 81  
40 82  
41 83  
... ... @@ -42,34 +84,33 @@
42 84  
43 85 def on_modified(self, event):
44 86 print("Watchdog received modified event - % s." % event.src_path)
45   - queue.append(event.src_path)
  87 + self.queue.append(event.src_path)
46 88 # Event is modified, youu\ can process it now
47 89  
48   -class MyQueue(object):
49   - self.queue = []
50 90  
51   - def enqueue(self, element):
52   - if isinstance(element, list):
53   - self.queue.extend(element)
54   - else:
55   - self.queue.append(element)
56   - def dequeue(self):
57   - if len(self.queue) == 0:
58   - return None
59   - else:
60   - return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element
61   -
62   -
63 91 def consume(path,line):
64   - queue = []
65   - event_handler = MyZipHandler(line, queue)
  92 + """Processes neurocube jobs by watching the gdfuse file system.
  93 +
  94 + Parameters
  95 + ----------
  96 + path : str
  97 + The path of the file or directory to watch for changes.
  98 + line : int
  99 + If processing a csv file the last line in the frame that
  100 + has already been processed.
  101 + Notes
  102 + -----
  103 + Ctrl-C will terminate the function.
  104 + """
  105 + queue = MyQueue()
  106 + event_handler = MyCSVHandler(line, queue)
66 107 observer = Observer()
67 108 observer.schedule(event_handler, path, recursive=True)
68 109 observer.start()
69 110 try:
70 111 while True:
71 112 time.sleep(1)
72   - currentJob = queue.deque()
  113 + currentJob = queue.dequeue()
73 114 if currentJob != None:
74 115 dummy_process(currentJob)
75 116 except KeyboardInterrupt:
76 117  
77 118  
... ... @@ -77,11 +118,32 @@
77 118 observer.join()
78 119  
79 120 def get_row_span(first,last):
80   - if first+1 == last:
  121 + """Returns a list of integers (first,last].
  122 +
  123 + Parameters
  124 + ----------
  125 + first : int
  126 + the integer before the first element in the list.
  127 + last : int
  128 + the last element in the list
  129 + Returns
  130 + -------
  131 + list of int
  132 + a list of integers spanning the next integer greeater than first to last
  133 + """
  134 + if first == last:
81 135 return [last]
82 136 else:
83   - return list(range(first+1,last))
  137 + return list(range(first,last))
84 138  
85 139 def dummy_process(path):
  140 + """Mounts a google drive.
  141 +
  142 + Parameters
  143 + ----------
  144 + mountPoint : str
  145 + The directory to mount the google drive to
  146 + """
  147 + print('dummy: ')
86 148 print(path)