Whamcloud - gitweb
LU-1346 libcfs: cleanup cfs_curproc_xxx macros
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
43 {
44         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45         *cookie = le64_to_cpu(ent->lde_hash);
46         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
49
50         /* Make sure the name is terminated with '0'.
51          * The data (type) after ent::lde_name maybe
52          * broken, but we do not care. */
53         ent->lde_name[ent->lde_namelen] = 0;
54 }
55
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
57 {
58         const struct dt_it_ops  *iops;
59         struct dt_it            *di;
60
61         spin_lock(&lfsck->li_lock);
62         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63         di = lfsck->li_di_oit;
64         lfsck->li_di_oit = NULL;
65         spin_unlock(&lfsck->li_lock);
66         iops->put(env, di);
67 }
68
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76         di = lfsck->li_di_dir;
77         lfsck->li_di_dir = NULL;
78         lfsck->li_cookie_dir = 0;
79         spin_unlock(&lfsck->li_lock);
80         iops->put(env, di);
81 }
82
83 static void lfsck_close_dir(const struct lu_env *env,
84                             struct lfsck_instance *lfsck)
85 {
86         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
87         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
88         struct dt_it            *dir_di   = lfsck->li_di_dir;
89
90         lfsck_di_dir_put(env, lfsck);
91         dir_iops->fini(env, dir_di);
92         lfsck->li_obj_dir = NULL;
93         lfsck_object_put(env, dir_obj);
94 }
95
96 static int lfsck_master_dir_engine(const struct lu_env *env,
97                                    struct lfsck_instance *lfsck)
98 {
99         struct lfsck_thread_info        *info   = lfsck_env_info(env);
100         const struct dt_it_ops          *iops   =
101                         &lfsck->li_obj_dir->do_index_ops->dio_it;
102         struct dt_it                    *di     = lfsck->li_di_dir;
103         struct lu_dirent                *ent    = &info->lti_ent;
104         struct lu_fid                   *fid    = &info->lti_fid;
105         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
106         struct ptlrpc_thread            *thread = &lfsck->li_thread;
107         int                              rc;
108         ENTRY;
109
110         do {
111                 struct dt_object *child;
112
113                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
114                     cfs_fail_val > 0) {
115                         struct l_wait_info lwi;
116
117                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
118                                           NULL, NULL);
119                         l_wait_event(thread->t_ctl_waitq,
120                                      !thread_is_running(thread),
121                                      &lwi);
122                 }
123
124                 lfsck->li_new_scanned++;
125                 rc = iops->rec(env, di, (struct dt_rec *)ent,
126                                lfsck->li_args_dir);
127                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
128                 if (rc != 0) {
129                         lfsck_fail(env, lfsck, true);
130                         if (bk->lb_param & LPF_FAILOUT)
131                                 RETURN(rc);
132                         else
133                                 goto checkpoint;
134                 }
135
136                 if (ent->lde_attrs & LUDA_IGNORE)
137                         goto checkpoint;
138
139                 *fid = ent->lde_fid;
140                 child = lfsck_object_find(env, lfsck, fid);
141                 if (child == NULL) {
142                         goto checkpoint;
143                 } else if (IS_ERR(child)) {
144                         lfsck_fail(env, lfsck, true);
145                         if (bk->lb_param & LPF_FAILOUT)
146                                 RETURN(PTR_ERR(child));
147                         else
148                                 goto checkpoint;
149                 }
150
151                 /* XXX: Currently, skip remote object, the consistency for
152                  *      remote object will be processed in LFSCK phase III. */
153                 if (dt_object_exists(child) && !dt_object_remote(child))
154                         rc = lfsck_exec_dir(env, lfsck, child, ent);
155                 lfsck_object_put(env, child);
156                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
157                         RETURN(rc);
158
159 checkpoint:
160                 rc = lfsck_checkpoint(env, lfsck);
161                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
162                         RETURN(rc);
163
164                 /* Rate control. */
165                 lfsck_control_speed(lfsck);
166                 if (unlikely(!thread_is_running(thread)))
167                         RETURN(0);
168
169                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
170                         spin_lock(&lfsck->li_lock);
171                         thread_set_flags(thread, SVC_STOPPING);
172                         spin_unlock(&lfsck->li_lock);
173                         RETURN(-EINVAL);
174                 }
175
176                 rc = iops->next(env, di);
177         } while (rc == 0);
178
179         if (rc > 0 && !lfsck->li_oit_over)
180                 lfsck_close_dir(env, lfsck);
181
182         RETURN(rc);
183 }
184
185 static int lfsck_master_oit_engine(const struct lu_env *env,
186                                    struct lfsck_instance *lfsck)
187 {
188         struct lfsck_thread_info        *info   = lfsck_env_info(env);
189         const struct dt_it_ops          *iops   =
190                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
191         struct dt_it                    *di     = lfsck->li_di_oit;
192         struct lu_fid                   *fid    = &info->lti_fid;
193         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
194         struct ptlrpc_thread            *thread = &lfsck->li_thread;
195         int                              rc;
196         ENTRY;
197
198         do {
199                 struct dt_object *target;
200
201                 if (lfsck->li_di_dir != NULL) {
202                         rc = lfsck_master_dir_engine(env, lfsck);
203                         if (rc <= 0)
204                                 RETURN(rc);
205                 }
206
207                 if (unlikely(lfsck->li_oit_over))
208                         RETURN(1);
209
210                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
211                     cfs_fail_val > 0) {
212                         struct l_wait_info lwi;
213
214                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
215                                           NULL, NULL);
216                         l_wait_event(thread->t_ctl_waitq,
217                                      !thread_is_running(thread),
218                                      &lwi);
219                 }
220
221                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
222                         RETURN(0);
223
224                 lfsck->li_current_oit_processed = 1;
225                 lfsck->li_new_scanned++;
226                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
227                 if (rc != 0) {
228                         lfsck_fail(env, lfsck, true);
229                         if (bk->lb_param & LPF_FAILOUT)
230                                 RETURN(rc);
231                         else
232                                 goto checkpoint;
233                 }
234
235                 target = lfsck_object_find(env, lfsck, fid);
236                 if (target == NULL) {
237                         goto checkpoint;
238                 } else if (IS_ERR(target)) {
239                         lfsck_fail(env, lfsck, true);
240                         if (bk->lb_param & LPF_FAILOUT)
241                                 RETURN(PTR_ERR(target));
242                         else
243                                 goto checkpoint;
244                 }
245
246                 /* XXX: Currently, skip remote object, the consistency for
247                  *      remote object will be processed in LFSCK phase III. */
248                 if (dt_object_exists(target) && !dt_object_remote(target))
249                         rc = lfsck_exec_oit(env, lfsck, target);
250                 lfsck_object_put(env, target);
251                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
252                         RETURN(rc);
253
254 checkpoint:
255                 rc = lfsck_checkpoint(env, lfsck);
256                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
257                         RETURN(rc);
258
259                 /* Rate control. */
260                 lfsck_control_speed(lfsck);
261
262                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
263                         spin_lock(&lfsck->li_lock);
264                         thread_set_flags(thread, SVC_STOPPING);
265                         spin_unlock(&lfsck->li_lock);
266                         RETURN(-EINVAL);
267                 }
268
269                 rc = iops->next(env, di);
270                 if (unlikely(rc > 0))
271                         lfsck->li_oit_over = 1;
272                 else if (likely(rc == 0))
273                         lfsck->li_current_oit_processed = 0;
274
275                 if (unlikely(!thread_is_running(thread)))
276                         RETURN(0);
277         } while (rc == 0 || lfsck->li_di_dir != NULL);
278
279         RETURN(rc);
280 }
281
282 int lfsck_master_engine(void *args)
283 {
284         struct lu_env            env;
285         struct lfsck_instance   *lfsck    = (struct lfsck_instance *)args;
286         struct ptlrpc_thread    *thread   = &lfsck->li_thread;
287         struct dt_object        *oit_obj  = lfsck->li_obj_oit;
288         const struct dt_it_ops  *oit_iops = &oit_obj->do_index_ops->dio_it;
289         struct dt_it            *oit_di;
290         int                      rc;
291         ENTRY;
292
293         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
294         if (rc != 0) {
295                 CERROR("%s: LFSCK, fail to init env, rc = %d\n",
296                        lfsck_lfsck2name(lfsck), rc);
297                 GOTO(noenv, rc);
298         }
299
300         oit_di = oit_iops->init(&env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
301         if (IS_ERR(oit_di)) {
302                 rc = PTR_ERR(oit_di);
303                 CERROR("%s: LFSCK, fail to init iteration, rc = %d\n",
304                        lfsck_lfsck2name(lfsck), rc);
305                 GOTO(fini_env, rc);
306         }
307
308         spin_lock(&lfsck->li_lock);
309         lfsck->li_di_oit = oit_di;
310         spin_unlock(&lfsck->li_lock);
311         rc = lfsck_prep(&env, lfsck);
312         if (rc != 0)
313                 GOTO(fini_oit, rc);
314
315         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = 0x%x, dir_flags = 0x%x, "
316                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
317                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
318                lfsck->li_pos_current.lp_oit_cookie,
319                lfsck->li_pos_current.lp_dir_cookie,
320                PFID(&lfsck->li_pos_current.lp_dir_parent),
321                current_pid());
322
323         spin_lock(&lfsck->li_lock);
324         thread_set_flags(thread, SVC_RUNNING);
325         spin_unlock(&lfsck->li_lock);
326         cfs_waitq_broadcast(&thread->t_ctl_waitq);
327
328         if (!cfs_list_empty(&lfsck->li_list_scan) ||
329             cfs_list_empty(&lfsck->li_list_double_scan))
330                 rc = lfsck_master_oit_engine(&env, lfsck);
331         else
332                 rc = 1;
333
334         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = 0x%x, dir_flags = 0x%x, "
335                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
336                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
337                lfsck->li_pos_current.lp_oit_cookie,
338                lfsck->li_pos_current.lp_dir_cookie,
339                PFID(&lfsck->li_pos_current.lp_dir_parent),
340                current_pid(), rc);
341
342         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
343                 rc = lfsck_post(&env, lfsck, rc);
344         if (lfsck->li_di_dir != NULL)
345                 lfsck_close_dir(&env, lfsck);
346
347 fini_oit:
348         lfsck_di_oit_put(&env, lfsck);
349         oit_iops->fini(&env, oit_di);
350         if (rc == 1) {
351                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
352                         rc = lfsck_double_scan(&env, lfsck);
353                 else
354                         rc = 0;
355         }
356
357         /* XXX: Purge the pinned objects in the future. */
358
359 fini_env:
360         lu_env_fini(&env);
361
362 noenv:
363         spin_lock(&lfsck->li_lock);
364         thread_set_flags(thread, SVC_STOPPED);
365         cfs_waitq_broadcast(&thread->t_ctl_waitq);
366         spin_unlock(&lfsck->li_lock);
367         return rc;
368 }