Whamcloud - gitweb
LU-2915 lfsck: LFSCK 1.5 technical debts (2)
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <lustre_net.h>
39 #include <lustre_fid.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42
43 #include "lfsck_internal.h"
44
45 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
46 {
47         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
48         *cookie = le64_to_cpu(ent->lde_hash);
49         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
50         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
51         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
52
53         /* Make sure the name is terminated with '0'.
54          * The data (type) after ent::lde_name maybe
55          * broken, but we do not care. */
56         ent->lde_name[ent->lde_namelen] = 0;
57 }
58
59 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
60 {
61         const struct dt_it_ops  *iops;
62         struct dt_it            *di;
63
64         spin_lock(&lfsck->li_lock);
65         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
66         di = lfsck->li_di_oit;
67         lfsck->li_di_oit = NULL;
68         spin_unlock(&lfsck->li_lock);
69         iops->put(env, di);
70 }
71
72 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
73 {
74         const struct dt_it_ops  *iops;
75         struct dt_it            *di;
76
77         spin_lock(&lfsck->li_lock);
78         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
79         di = lfsck->li_di_dir;
80         lfsck->li_di_dir = NULL;
81         lfsck->li_cookie_dir = 0;
82         spin_unlock(&lfsck->li_lock);
83         iops->put(env, di);
84 }
85
86 static void lfsck_close_dir(const struct lu_env *env,
87                             struct lfsck_instance *lfsck)
88 {
89         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
90         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
91         struct dt_it            *dir_di   = lfsck->li_di_dir;
92
93         lfsck_di_dir_put(env, lfsck);
94         dir_iops->fini(env, dir_di);
95         lfsck->li_obj_dir = NULL;
96         lfsck_object_put(env, dir_obj);
97 }
98
99 static int lfsck_master_dir_engine(const struct lu_env *env,
100                                    struct lfsck_instance *lfsck)
101 {
102         struct lfsck_thread_info        *info   = lfsck_env_info(env);
103         const struct dt_it_ops          *iops   =
104                         &lfsck->li_obj_dir->do_index_ops->dio_it;
105         struct dt_it                    *di     = lfsck->li_di_dir;
106         struct lu_dirent                *ent    = &info->lti_ent;
107         struct lu_fid                   *fid    = &info->lti_fid;
108         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
109         struct ptlrpc_thread            *thread = &lfsck->li_thread;
110         int                              rc;
111         ENTRY;
112
113         do {
114                 struct dt_object *child;
115
116                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
117                     cfs_fail_val > 0) {
118                         struct l_wait_info lwi;
119
120                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
121                                           NULL, NULL);
122                         l_wait_event(thread->t_ctl_waitq,
123                                      !thread_is_running(thread),
124                                      &lwi);
125                 }
126
127                 lfsck->li_new_scanned++;
128                 rc = iops->rec(env, di, (struct dt_rec *)ent,
129                                lfsck->li_args_dir);
130                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
131                 if (rc != 0) {
132                         lfsck_fail(env, lfsck, true);
133                         if (bk->lb_param & LPF_FAILOUT)
134                                 RETURN(rc);
135                         else
136                                 goto checkpoint;
137                 }
138
139                 if (ent->lde_attrs & LUDA_IGNORE)
140                         goto checkpoint;
141
142                 *fid = ent->lde_fid;
143                 child = lfsck_object_find(env, lfsck, fid);
144                 if (child == NULL) {
145                         goto checkpoint;
146                 } else if (IS_ERR(child)) {
147                         lfsck_fail(env, lfsck, true);
148                         if (bk->lb_param & LPF_FAILOUT)
149                                 RETURN(PTR_ERR(child));
150                         else
151                                 goto checkpoint;
152                 }
153
154                 /* XXX: Currently, skip remote object, the consistency for
155                  *      remote object will be processed in LFSCK phase III. */
156                 if (dt_object_exists(child) && !dt_object_remote(child))
157                         rc = lfsck_exec_dir(env, lfsck, child, ent);
158                 lfsck_object_put(env, child);
159                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
160                         RETURN(rc);
161
162 checkpoint:
163                 rc = lfsck_checkpoint(env, lfsck);
164                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
165                         RETURN(rc);
166
167                 /* Rate control. */
168                 lfsck_control_speed(lfsck);
169                 if (unlikely(!thread_is_running(thread)))
170                         RETURN(0);
171
172                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
173                         spin_lock(&lfsck->li_lock);
174                         thread_set_flags(thread, SVC_STOPPING);
175                         spin_unlock(&lfsck->li_lock);
176                         RETURN(-EINVAL);
177                 }
178
179                 rc = iops->next(env, di);
180         } while (rc == 0);
181
182         if (rc > 0 && !lfsck->li_oit_over)
183                 lfsck_close_dir(env, lfsck);
184
185         RETURN(rc);
186 }
187
188 static int lfsck_master_oit_engine(const struct lu_env *env,
189                                    struct lfsck_instance *lfsck)
190 {
191         struct lfsck_thread_info        *info   = lfsck_env_info(env);
192         const struct dt_it_ops          *iops   =
193                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
194         struct dt_it                    *di     = lfsck->li_di_oit;
195         struct lu_fid                   *fid    = &info->lti_fid;
196         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
197         struct ptlrpc_thread            *thread = &lfsck->li_thread;
198         int                              rc;
199         ENTRY;
200
201         do {
202                 struct dt_object *target;
203
204                 if (lfsck->li_di_dir != NULL) {
205                         rc = lfsck_master_dir_engine(env, lfsck);
206                         if (rc <= 0)
207                                 RETURN(rc);
208                 }
209
210                 if (unlikely(lfsck->li_oit_over))
211                         RETURN(1);
212
213                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
214                     cfs_fail_val > 0) {
215                         struct l_wait_info lwi;
216
217                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
218                                           NULL, NULL);
219                         l_wait_event(thread->t_ctl_waitq,
220                                      !thread_is_running(thread),
221                                      &lwi);
222                 }
223
224                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
225                         RETURN(0);
226
227                 lfsck->li_current_oit_processed = 1;
228                 lfsck->li_new_scanned++;
229                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
230                 if (rc != 0) {
231                         lfsck_fail(env, lfsck, true);
232                         if (bk->lb_param & LPF_FAILOUT)
233                                 RETURN(rc);
234                         else
235                                 goto checkpoint;
236                 }
237
238                 target = lfsck_object_find(env, lfsck, fid);
239                 if (target == NULL) {
240                         goto checkpoint;
241                 } else if (IS_ERR(target)) {
242                         lfsck_fail(env, lfsck, true);
243                         if (bk->lb_param & LPF_FAILOUT)
244                                 RETURN(PTR_ERR(target));
245                         else
246                                 goto checkpoint;
247                 }
248
249                 /* XXX: Currently, skip remote object, the consistency for
250                  *      remote object will be processed in LFSCK phase III. */
251                 if (dt_object_exists(target) && !dt_object_remote(target))
252                         rc = lfsck_exec_oit(env, lfsck, target);
253                 lfsck_object_put(env, target);
254                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
255                         RETURN(rc);
256
257 checkpoint:
258                 rc = lfsck_checkpoint(env, lfsck);
259                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
260                         RETURN(rc);
261
262                 /* Rate control. */
263                 lfsck_control_speed(lfsck);
264
265                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
266                         spin_lock(&lfsck->li_lock);
267                         thread_set_flags(thread, SVC_STOPPING);
268                         spin_unlock(&lfsck->li_lock);
269                         RETURN(-EINVAL);
270                 }
271
272                 rc = iops->next(env, di);
273                 if (unlikely(rc > 0))
274                         lfsck->li_oit_over = 1;
275                 else if (likely(rc == 0))
276                         lfsck->li_current_oit_processed = 0;
277
278                 if (unlikely(!thread_is_running(thread)))
279                         RETURN(0);
280         } while (rc == 0 || lfsck->li_di_dir != NULL);
281
282         RETURN(rc);
283 }
284
285 int lfsck_master_engine(void *args)
286 {
287         struct lu_env            env;
288         struct lfsck_instance   *lfsck    = (struct lfsck_instance *)args;
289         struct ptlrpc_thread    *thread   = &lfsck->li_thread;
290         struct dt_object        *oit_obj  = lfsck->li_obj_oit;
291         const struct dt_it_ops  *oit_iops = &oit_obj->do_index_ops->dio_it;
292         struct dt_it            *oit_di;
293         int                      rc;
294         ENTRY;
295
296         cfs_daemonize("lfsck_master");
297         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
298         if (rc != 0) {
299                 CERROR("%s: LFSCK, fail to init env, rc = %d\n",
300                        lfsck_lfsck2name(lfsck), rc);
301                 GOTO(noenv, rc);
302         }
303
304         oit_di = oit_iops->init(&env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
305         if (IS_ERR(oit_di)) {
306                 rc = PTR_ERR(oit_di);
307                 CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
308                        lfsck_lfsck2name(lfsck), rc);
309                 GOTO(fini_env, rc);
310         }
311
312         spin_lock(&lfsck->li_lock);
313         lfsck->li_di_oit = oit_di;
314         spin_unlock(&lfsck->li_lock);
315         rc = lfsck_prep(&env, lfsck);
316         if (rc != 0)
317                 GOTO(fini_oit, rc);
318
319         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
320                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
321                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
322                lfsck->li_pos_current.lp_oit_cookie,
323                lfsck->li_pos_current.lp_dir_cookie,
324                PFID(&lfsck->li_pos_current.lp_dir_parent),
325                cfs_curproc_pid());
326
327         spin_lock(&lfsck->li_lock);
328         thread_set_flags(thread, SVC_RUNNING);
329         spin_unlock(&lfsck->li_lock);
330         cfs_waitq_broadcast(&thread->t_ctl_waitq);
331
332         if (!cfs_list_empty(&lfsck->li_list_scan) ||
333             cfs_list_empty(&lfsck->li_list_double_scan))
334                 rc = lfsck_master_oit_engine(&env, lfsck);
335         else
336                 rc = 1;
337
338         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
339                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
340                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
341                lfsck->li_pos_current.lp_oit_cookie,
342                lfsck->li_pos_current.lp_dir_cookie,
343                PFID(&lfsck->li_pos_current.lp_dir_parent),
344                cfs_curproc_pid(), rc);
345
346         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
347                 rc = lfsck_post(&env, lfsck, rc);
348         if (lfsck->li_di_dir != NULL)
349                 lfsck_close_dir(&env, lfsck);
350
351 fini_oit:
352         lfsck_di_oit_put(&env, lfsck);
353         oit_iops->fini(&env, oit_di);
354         if (rc == 1) {
355                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
356                         rc = lfsck_double_scan(&env, lfsck);
357                 else
358                         rc = 0;
359         }
360
361         /* XXX: Purge the pinned objects in the future. */
362
363 fini_env:
364         lu_env_fini(&env);
365
366 noenv:
367         spin_lock(&lfsck->li_lock);
368         thread_set_flags(thread, SVC_STOPPED);
369         cfs_waitq_broadcast(&thread->t_ctl_waitq);
370         spin_unlock(&lfsck->li_lock);
371         return rc;
372 }