Whamcloud - gitweb
LU-16267 lnet: fix missing error check in LUTF
[fs/lustre-release.git] / lustre / tests / lutf / python / infra / lutf_file.py
1 import lutf_agent
2 from lutf_basetest import BaseTest
3 from lutf_paramiko import *
4 from clutf_global import *
5 from clutf_agent import *
6 from lutf import me
7 from lutf_exception import LUTFError
8 import os, random, tempfile, shutil, pathlib
9
10 class LutfDir(BaseTest):
11         def __init__(self, dname, script=os.path.abspath(__file__),
12                      target=None):
13                 self.dname = dname
14                 super().__init__(script, target, self.dname)
15
16         def listdir(self):
17                 return os.listdir(self.dname)
18
19 class LutfFile(BaseTest):
20         def __init__(self, fname, script=os.path.abspath(__file__),
21                      full_path=False, target=None):
22                 if not full_path:
23                         self.fname = os.path.join(os.getcwd(), fname)
24                 else:
25                         self.fname = fname
26                 super().__init__(script, target, self.fname, full_path=full_path)
27                 self.file_handle = None
28
29         def open(self, mode):
30                 self.file_handle = open(self.fname, mode)
31
32         def write(self, data):
33                 if not self.file_handle or self.file_handle.closed:
34                         raise LUTFError("%s not opened" % self.fname)
35                 self.file_handle.write(data)
36
37         def get_full_path(self):
38                 return self.fname
39
40         def readlines(self):
41                 lines = self.file_handle.readlines()
42                 return lines
43
44         def read(self):
45                 data = self.file_handle.read()
46                 return data
47
48         def isclosed(self):
49                 if self.file_handle:
50                         return self.file_handle.closed
51                 return True
52
53         def close(self):
54                 if self.file_handle and not self.file_handle.closed:
55                         self.file_handle.close()
56
57         def remove(self):
58                 if self.file_handle and not self.file_handle.closed:
59                         self.file_handle.close()
60                         os.remove(self.fname)
61                 else:
62                         os.remove(self.fname)
63                 self.fname = ''
64                 self.file_handle = None
65
66         def find_replace_file(self, fname, search, replace, getline=False):
67                 count = 0
68                 found_line = None
69                 if not self.isclosed():
70                         raise LUTFError("Can not perform operation on file. Close it first")
71
72                 # if no replace is provided then just count the instances of the
73                 # search string
74                 if not replace:
75                         with open(fname) as old_file:
76                                 try:
77                                         for line in old_file:
78                                                 if search in line:
79                                                         if not found_line:
80                                                                 found_line = line
81                                                         count += 1
82                                 except UnicodeDecodeError:
83                                         pass
84                         if getline:
85                                 return count, found_line
86                         return count
87
88                 fh, abs_path = tempfile.mkstemp()
89                 with os.fdopen(fh,'w') as new_file:
90                         with open(fname) as old_file:
91                                 try:
92                                         for line in old_file:
93                                                 if search in line:
94                                                         if not found_line:
95                                                                 found_line = line
96                                                         new_file.write(replace)
97                                                         count += 1
98                                                 else:
99                                                         new_file.write(line)
100                                 except UnicodeDecodeError:
101                                         pass
102                 if count:
103                         #Copy the file permissions from the old file to the new file
104                         shutil.copymode(fname, abs_path)
105                         #Remove original file
106                         os.remove(fname)
107                         #Move new file
108                         shutil.move(abs_path, fname)
109                 else:
110                         os.remove(abs_path)
111
112                 if getline:
113                         return count, found_line
114                 return count
115
116         def find_replace_global(self, directory, search, replace, getline=False):
117                 count = 0
118                 lines = []
119                 for filename in os.listdir(directory):
120                         fpath = os.path.join(directory, filename)
121                         if getline:
122                                 c, line = self.find_replace_file(fpath, search, replace, getline)
123                                 if c >= 1:
124                                         lines.append(line)
125                                 count += c
126                         else:
127                                 count += self.find_replace_file(fpath, search, replace, getline)
128                 if getline:
129                         return count, lines
130                 return count
131
132         def find_replace(self, search, replace):
133                 count = 0
134                 if os.path.isdir(self.fname):
135                         count = self.find_replace_global(self.fname, search, replace)
136                 elif os.path.isfile(self.fname):
137                         count = self.find_replace_file(self.fname, search, replace)
138                 return count
139
140         def find(self, search):
141                 count = 0
142                 if os.path.isdir(self.fname):
143                         count = self.find_replace_global(self.fname, search, None)
144                 elif os.path.isfile(self.fname):
145                         count = self.find_replace_file(self.fname, search, None)
146                 return count
147
148         def get(self, search):
149                 count = 0
150                 if os.path.isdir(self.fname):
151                         count, line = self.find_replace_global(self.fname, search, None, getline=True)
152                 elif os.path.isfile(self.fname):
153                         count, line = self.find_replace_file(self.fname, search, None, getline=True)
154                 return count, line
155