Compare View
Commits (2)
Diff
Showing 4 changed files Side-by-side Diff
flake.lock
View file @
232ba00
... | ... | @@ -53,11 +53,11 @@ |
53 | 53 | }, |
54 | 54 | "nixpkgs_2": { |
55 | 55 | "locked": { |
56 | - "lastModified": 1624634316, | |
57 | - "narHash": "sha256-MUEJzeZZr+nP1V9D19PS9mKVJUJBgnweHFGuFN6hvAQ=", | |
56 | + "lastModified": 1624727938, | |
57 | + "narHash": "sha256-K0fYTmUVZTlB8UKBDL0JOmGlYuTWeEaOcpPJUe9NL0U=", | |
58 | 58 | "owner": "NixOS", |
59 | 59 | "repo": "nixpkgs", |
60 | - "rev": "61e983dbac933aa26cf913aef47aa92a5c40143c", | |
60 | + "rev": "c74057781fee9d29a28172402c6edeb68dd2d885", | |
61 | 61 | "type": "github" |
62 | 62 | }, |
63 | 63 | "original": { |
flake.nix
View file @
232ba00
... | ... | @@ -18,22 +18,14 @@ |
18 | 18 | |
19 | 19 | # Explicit settings per package |
20 | 20 | }; |
21 | - requirements = '' | |
22 | - pandas | |
23 | - watchdog | |
24 | - debugpy | |
25 | - pytest | |
26 | - sphinx | |
27 | - sphinx_rtd_theme | |
28 | - ''; | |
29 | - | |
21 | + pyEnv = mach-nix-utils.mkPython{ | |
22 | + requirements = builtins.readFile ./requirements.txt; | |
23 | + providers = mach-provider; | |
24 | + }; | |
30 | 25 | in |
31 | 26 | { |
32 | - devShell."${system}" = (mach-nix-utils.mkPythonShell { | |
33 | - inherit requirements; | |
34 | - providers = mach-provider; | |
35 | - }) // (pkgs.mkShell { | |
36 | - nativeBuildInputs = [pkgs.google-drive-ocamlfuse]; | |
37 | - }); | |
27 | + devShell."${system}" = pkgs.mkShell{ | |
28 | + buildInputs = [pkgs.google-drive-ocamlfuse pyEnv]; | |
29 | + }; | |
38 | 30 | }; |
39 | 31 | } |
requirements.txt
View file @
232ba00
utils/consume.py
View file @
232ba00
1 | 1 | import sys |
2 | 2 | import time |
3 | 3 | import logging |
4 | -import dask | |
4 | +import dask.dataframe as dd | |
5 | 5 | import pandas as pd |
6 | -from watchdog.observers import Observer | |
6 | +from watchdog.observers.polling import PollingObserver as Observer | |
7 | 7 | from watchdog.events import FileSystemEventHandler |
8 | 8 | |
9 | +class MyQueue(object): | |
10 | + """ | |
11 | + A queue for handling integers. | |
12 | + | |
13 | + ... | |
14 | + | |
15 | + Attributes | |
16 | + ---------- | |
17 | + queue : list | |
18 | + A list of integers representing rows in a dataframe. | |
19 | + """ | |
20 | + queue = [] | |
21 | + | |
22 | + def enqueue(self, element): | |
23 | + """Enque an element into the queue. | |
24 | + | |
25 | + Parameters | |
26 | + ---------- | |
27 | + element : int or list of int | |
28 | + If an int it is the element to be enqued into the queue | |
29 | + if a list of ints the list is appended to the queue | |
30 | + """ | |
31 | + if isinstance(element, list): | |
32 | + self.queue.extend(element) | |
33 | + else: | |
34 | + self.queue.append(element) | |
35 | + | |
36 | + def dequeue(self): | |
37 | + """Dequeue an element from the queue. | |
38 | + | |
39 | + Returns | |
40 | + ---------- | |
41 | + int or list of int | |
42 | + the zeroth element in the queue list | |
43 | + """ | |
44 | + if len(self.queue) == 0: | |
45 | + return None | |
46 | + else: | |
47 | + return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element | |
48 | + | |
9 | 49 | class MyCSVHandler(FileSystemEventHandler): |
10 | - current_row = 0 | |
11 | - self.queue = [] | |
50 | + currentRow = 0 | |
51 | + queue = MyQueue() | |
12 | 52 | |
13 | 53 | def __init__(self, row, queue): |
14 | - self.current_row = row | |
54 | + self.currentRow = row | |
15 | 55 | self.queue = queue |
16 | 56 | |
17 | 57 | def on_created(self, event): |
... | ... | @@ -20,18 +60,20 @@ class MyCSVHandler(FileSystemEventHandler): |
20 | 60 | |
21 | 61 | def on_modified(self, event): |
22 | 62 | print("Watchdog received modified event - % s." % event.src_path) |
23 | - currentDf = dask.read_csv(event.src_path) | |
63 | + currentDf = dd.read_csv(event.src_path) | |
24 | 64 | lastRow = currentDf.shape[0].compute() |
25 | 65 | #TODO: generate a list of row numbers from self.current row+1 to the last row in the df |
26 | - rowSpan = get_row_span(self.current_row, lastRow) | |
66 | + rowSpan = get_row_span(self.currentRow, lastRow) | |
27 | 67 | #TODO: put this list of numbers in the queue |
28 | - queue.extend(rowSpan) | |
68 | + print('rowspan: ') | |
69 | + print(rowSpan) | |
70 | + self.queue.enqueue(rowSpan) | |
29 | 71 | #TODO: update self.current row |
30 | 72 | self.currentRow = lastRow |
31 | 73 | # Event is modified, youu\ can process it now |
32 | 74 | |
33 | 75 | class MyZipHandler(FileSystemEventHandler): |
34 | - self.queue = [] | |
76 | + queue = [] | |
35 | 77 | |
36 | 78 | def __init__(self, row, queue): |
37 | 79 | self.queue = queue |
... | ... | @@ -42,34 +84,33 @@ class MyZipHandler(FileSystemEventHandler): |
42 | 84 | |
43 | 85 | def on_modified(self, event): |
44 | 86 | print("Watchdog received modified event - % s." % event.src_path) |
45 | - queue.append(event.src_path) | |
87 | + self.queue.append(event.src_path) | |
46 | 88 | # Event is modified, youu\ can process it now |
47 | 89 | |
48 | -class MyQueue(object): | |
49 | - self.queue = [] | |
50 | - | |
51 | - def enqueue(self, element): | |
52 | - if isinstance(element, list): | |
53 | - self.queue.extend(element) | |
54 | - else: | |
55 | - self.queue.append(element) | |
56 | - def dequeue(self): | |
57 | - if len(self.queue) == 0: | |
58 | - return None | |
59 | - else: | |
60 | - return self.queue.pop(0) #pop(0) specifies to pop the zeroth list element | |
61 | - | |
62 | 90 | |
63 | 91 | def consume(path,line): |
64 | - queue = [] | |
65 | - event_handler = MyZipHandler(line, queue) | |
92 | + """Processes neurocube jobs by watching the gdfuse file system. | |
93 | + | |
94 | + Parameters | |
95 | + ---------- | |
96 | + path : str | |
97 | + The path of the file or directory to watch for changes. | |
98 | + line : int | |
99 | + If processing a csv file the last line in the frame that | |
100 | + has already been processed. | |
101 | + Notes | |
102 | + ----- | |
103 | + Ctrl-C will terminate the function. | |
104 | + """ | |
105 | + queue = MyQueue() | |
106 | + event_handler = MyCSVHandler(line, queue) | |
66 | 107 | observer = Observer() |
67 | 108 | observer.schedule(event_handler, path, recursive=True) |
68 | 109 | observer.start() |
69 | 110 | try: |
70 | 111 | while True: |
71 | 112 | time.sleep(1) |
72 | - currentJob = queue.deque() | |
113 | + currentJob = queue.dequeue() | |
73 | 114 | if currentJob != None: |
74 | 115 | dummy_process(currentJob) |
75 | 116 | except KeyboardInterrupt: |
... | ... | @@ -77,10 +118,31 @@ def consume(path,line): |
77 | 118 | observer.join() |
78 | 119 | |
79 | 120 | def get_row_span(first,last): |
80 | - if first+1 == last: | |
121 | + """Returns a list of integers (first,last]. | |
122 | + | |
123 | + Parameters | |
124 | + ---------- | |
125 | + first : int | |
126 | + the integer before the first element in the list. | |
127 | + last : int | |
128 | + the last element in the list | |
129 | + Returns | |
130 | + ------- | |
131 | + list of int | |
132 | + a list of integers spanning the next integer greeater than first to last | |
133 | + """ | |
134 | + if first == last: | |
81 | 135 | return [last] |
82 | 136 | else: |
83 | - return list(range(first+1,last)) | |
137 | + return list(range(first,last)) | |
84 | 138 | |
85 | 139 | def dummy_process(path): |
140 | + """Mounts a google drive. | |
141 | + | |
142 | + Parameters | |
143 | + ---------- | |
144 | + mountPoint : str | |
145 | + The directory to mount the google drive to | |
146 | + """ | |
147 | + print('dummy: ') | |
86 | 148 | print(path) |