Compare View

switch
from
...
to
 
Commits (2)

Diff

Showing 4 changed files Side-by-side Diff

... ... @@ -53,11 +53,11 @@
53 53 },
54 54 "nixpkgs_2": {
55 55 "locked": {
56   - "lastModified": 1624634316,
57   - "narHash": "sha256-MUEJzeZZr+nP1V9D19PS9mKVJUJBgnweHFGuFN6hvAQ=",
  56 + "lastModified": 1624727938,
  57 + "narHash": "sha256-K0fYTmUVZTlB8UKBDL0JOmGlYuTWeEaOcpPJUe9NL0U=",
58 58 "owner": "NixOS",
59 59 "repo": "nixpkgs",
60   - "rev": "61e983dbac933aa26cf913aef47aa92a5c40143c",
  60 + "rev": "c74057781fee9d29a28172402c6edeb68dd2d885",
61 61 "type": "github"
62 62 },
63 63 "original": {
... ... @@ -18,22 +18,14 @@
18 18  
19 19 # Explicit settings per package
20 20 };
21   - requirements = ''
22   - pandas
23   - watchdog
24   - debugpy
25   - pytest
26   - sphinx
27   - sphinx_rtd_theme
28   - '';
29   -
  21 + pyEnv = mach-nix-utils.mkPython{
  22 + requirements = builtins.readFile ./requirements.txt;
  23 + providers = mach-provider;
  24 + };
30 25 in
31 26 {
32   - devShell."${system}" = (mach-nix-utils.mkPythonShell {
33   - inherit requirements;
34   - providers = mach-provider;
35   - }) // (pkgs.mkShell {
36   - nativeBuildInputs = [pkgs.google-drive-ocamlfuse];
37   - });
  27 + devShell."${system}" = pkgs.mkShell{
  28 + buildInputs = [pkgs.google-drive-ocamlfuse pyEnv];
  29 + };
38 30 };
39 31 }
requirements.txt View file @ 232ba00
... ... @@ -0,0 +1,7 @@
  1 +pandas
  2 +dask
  3 +watchdog
  4 +debugpy
  5 +pytest
  6 +sphinx
  7 +sphinx_rtd_theme
utils/consume.py View file @ 232ba00
1 1 import sys
2 2 import time
3 3 import logging
4   -import dask
  4 +import dask.dataframe as dd
5 5 import pandas as pd
6   -from watchdog.observers import Observer
  6 +from watchdog.observers.polling import PollingObserver as Observer
7 7 from watchdog.events import FileSystemEventHandler
8 8  
  9 +class MyQueue(object):
  10 + """
  11 + A queue for handling integers.
  12 +
  13 + ...
  14 +
  15 + Attributes
  16 + ----------
  17 + queue : list
  18 + A list of integers representing rows in a dataframe.
  19 + """
  20 + queue = []
  21 +
  22 + def enqueue(self, element):
  23 + """Enque an element into the queue.
  24 +
  25 + Parameters
  26 + ----------
  27 + element : int or list of int
  28 + If an int it is the element to be enqued into the queue
  29 + if a list of ints the list is appended to the queue
  30 + """
  31 + if isinstance(element, list):
  32 + self.queue.extend(element)
  33 + else:
  34 + self.queue.append(element)
  35 +
  36 + def dequeue(self):
  37 + """Dequeue an element from the queue.
  38 +
  39 + Returns
  40 + ----------
  41 + int or list of int
  42 + the zeroth element in the queue list
  43 + """
  44 + if len(self.queue) == 0:
  45 + return None
  46 + else:
  47 + return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element
  48 +
9 49 class MyCSVHandler(FileSystemEventHandler):
10   - current_row = 0
11   - self.queue = []
  50 + currentRow = 0
  51 + queue = MyQueue()
12 52  
13 53 def __init__(self, row, queue):
14   - self.current_row = row
  54 + self.currentRow = row
15 55 self.queue = queue
16 56  
17 57 def on_created(self, event):
... ... @@ -20,18 +60,20 @@ class MyCSVHandler(FileSystemEventHandler):
20 60  
21 61 def on_modified(self, event):
22 62 print("Watchdog received modified event - % s." % event.src_path)
23   - currentDf = dask.read_csv(event.src_path)
  63 + currentDf = dd.read_csv(event.src_path)
24 64 lastRow = currentDf.shape[0].compute()
25 65 #TODO: generate a list of row numbers from self.current row+1 to the last row in the df
26   - rowSpan = get_row_span(self.current_row, lastRow)
  66 + rowSpan = get_row_span(self.currentRow, lastRow)
27 67 #TODO: put this list of numbers in the queue
28   - queue.extend(rowSpan)
  68 + print('rowspan: ')
  69 + print(rowSpan)
  70 + self.queue.enqueue(rowSpan)
29 71 #TODO: update self.current row
30 72 self.currentRow = lastRow
31 73 # Event is modified, youu\ can process it now
32 74  
33 75 class MyZipHandler(FileSystemEventHandler):
34   - self.queue = []
  76 + queue = []
35 77  
36 78 def __init__(self, row, queue):
37 79 self.queue = queue
... ... @@ -42,34 +84,33 @@ class MyZipHandler(FileSystemEventHandler):
42 84  
43 85 def on_modified(self, event):
44 86 print("Watchdog received modified event - % s." % event.src_path)
45   - queue.append(event.src_path)
  87 + self.queue.append(event.src_path)
46 88 # Event is modified, youu\ can process it now
47 89  
48   -class MyQueue(object):
49   - self.queue = []
50   -
51   - def enqueue(self, element):
52   - if isinstance(element, list):
53   - self.queue.extend(element)
54   - else:
55   - self.queue.append(element)
56   - def dequeue(self):
57   - if len(self.queue) == 0:
58   - return None
59   - else:
60   - return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element
61   -
62 90  
63 91 def consume(path,line):
64   - queue = []
65   - event_handler = MyZipHandler(line, queue)
  92 + """Processes neurocube jobs by watching the gdfuse file system.
  93 +
  94 + Parameters
  95 + ----------
  96 + path : str
  97 + The path of the file or directory to watch for changes.
  98 + line : int
  99 + If processing a csv file the last line in the frame that
  100 + has already been processed.
  101 + Notes
  102 + -----
  103 + Ctrl-C will terminate the function.
  104 + """
  105 + queue = MyQueue()
  106 + event_handler = MyCSVHandler(line, queue)
66 107 observer = Observer()
67 108 observer.schedule(event_handler, path, recursive=True)
68 109 observer.start()
69 110 try:
70 111 while True:
71 112 time.sleep(1)
72   - currentJob = queue.deque()
  113 + currentJob = queue.dequeue()
73 114 if currentJob != None:
74 115 dummy_process(currentJob)
75 116 except KeyboardInterrupt:
... ... @@ -77,10 +118,31 @@ def consume(path,line):
77 118 observer.join()
78 119  
79 120 def get_row_span(first,last):
80   - if first+1 == last:
  121 + """Returns a list of integers (first,last].
  122 +
  123 + Parameters
  124 + ----------
  125 + first : int
  126 + the integer before the first element in the list.
  127 + last : int
  128 + the last element in the list
  129 + Returns
  130 + -------
  131 + list of int
  132 + a list of integers spanning the next integer greeater than first to last
  133 + """
  134 + if first == last:
81 135 return [last]
82 136 else:
83   - return list(range(first+1,last))
  137 + return list(range(first,last))
84 138  
85 139 def dummy_process(path):
  140 + """Mounts a google drive.
  141 +
  142 + Parameters
  143 + ----------
  144 + mountPoint : str
  145 + The directory to mount the google drive to
  146 + """
  147 + print('dummy: ')
86 148 print(path)