Whamcloud - gitweb
LU-4525 lfsck: distinguish objects visibility by LFSCK
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
43 {
44         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45         *cookie = le64_to_cpu(ent->lde_hash);
46         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
49
50         /* Make sure the name is terminated with '0'.
51          * The data (type) after ent::lde_name maybe
52          * broken, but we do not care. */
53         ent->lde_name[ent->lde_namelen] = 0;
54 }
55
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
57 {
58         const struct dt_it_ops  *iops;
59         struct dt_it            *di;
60
61         spin_lock(&lfsck->li_lock);
62         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63         di = lfsck->li_di_oit;
64         lfsck->li_di_oit = NULL;
65         spin_unlock(&lfsck->li_lock);
66         iops->put(env, di);
67 }
68
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76         di = lfsck->li_di_dir;
77         lfsck->li_di_dir = NULL;
78         lfsck->li_cookie_dir = 0;
79         spin_unlock(&lfsck->li_lock);
80         iops->put(env, di);
81 }
82
83 static void lfsck_close_dir(const struct lu_env *env,
84                             struct lfsck_instance *lfsck)
85 {
86         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
87         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
88         struct dt_it            *dir_di   = lfsck->li_di_dir;
89
90         lfsck_di_dir_put(env, lfsck);
91         dir_iops->fini(env, dir_di);
92         lfsck->li_obj_dir = NULL;
93         lfsck_object_put(env, dir_obj);
94 }
95
96 static int lfsck_update_lma(const struct lu_env *env,
97                             struct lfsck_instance *lfsck, struct dt_object *obj)
98 {
99         struct lfsck_thread_info        *info   = lfsck_env_info(env);
100         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
101         struct dt_device                *dt     = lfsck->li_bottom;
102         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
103         struct lu_buf                   *buf;
104         struct thandle                  *th;
105         int                              fl;
106         int                              rc;
107         ENTRY;
108
109         if (bk->lb_param & LPF_DRYRUN)
110                 RETURN(0);
111
112         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
113         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
114         if (rc < 0) {
115                 if (rc != -ENODATA)
116                         RETURN(rc);
117
118                 fl = LU_XATTR_CREATE;
119                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
120         } else {
121                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
122                         RETURN(-EINVAL);
123
124                 fl = LU_XATTR_REPLACE;
125                 lustre_lma_swab(lma);
126                 lustre_lma_init(lma, lfsck_dto2fid(obj),
127                                 lma->lma_compat | LMAC_FID_ON_OST,
128                                 lma->lma_incompat);
129         }
130         lustre_lma_swab(lma);
131
132         th = dt_trans_create(env, dt);
133         if (IS_ERR(th))
134                 RETURN(PTR_ERR(th));
135
136         buf = lfsck_buf_get(env, lma, sizeof(*lma));
137         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
138         if (rc != 0)
139                 GOTO(stop, rc);
140
141         rc = dt_trans_start(env, dt, th);
142         if (rc != 0)
143                 GOTO(stop, rc);
144
145         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
146
147         GOTO(stop, rc);
148
149 stop:
150         dt_trans_stop(env, dt, th);
151         return rc;
152 }
153
154 static int lfsck_master_dir_engine(const struct lu_env *env,
155                                    struct lfsck_instance *lfsck)
156 {
157         struct lfsck_thread_info        *info   = lfsck_env_info(env);
158         const struct dt_it_ops          *iops   =
159                         &lfsck->li_obj_dir->do_index_ops->dio_it;
160         struct dt_it                    *di     = lfsck->li_di_dir;
161         struct lu_dirent                *ent    = &info->lti_ent;
162         struct lu_fid                   *fid    = &info->lti_fid;
163         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
164         struct ptlrpc_thread            *thread = &lfsck->li_thread;
165         int                              rc;
166         ENTRY;
167
168         do {
169                 struct dt_object *child;
170
171                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
172                     cfs_fail_val > 0) {
173                         struct l_wait_info lwi;
174
175                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
176                                           NULL, NULL);
177                         l_wait_event(thread->t_ctl_waitq,
178                                      !thread_is_running(thread),
179                                      &lwi);
180                 }
181
182                 lfsck->li_new_scanned++;
183                 rc = iops->rec(env, di, (struct dt_rec *)ent,
184                                lfsck->li_args_dir);
185                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
186                 if (rc != 0) {
187                         lfsck_fail(env, lfsck, true);
188                         if (bk->lb_param & LPF_FAILOUT)
189                                 RETURN(rc);
190                         else
191                                 goto checkpoint;
192                 }
193
194                 if (ent->lde_attrs & LUDA_IGNORE)
195                         goto checkpoint;
196
197                 *fid = ent->lde_fid;
198                 child = lfsck_object_find(env, lfsck, fid);
199                 if (child == NULL) {
200                         goto checkpoint;
201                 } else if (IS_ERR(child)) {
202                         lfsck_fail(env, lfsck, true);
203                         if (bk->lb_param & LPF_FAILOUT)
204                                 RETURN(PTR_ERR(child));
205                         else
206                                 goto checkpoint;
207                 }
208
209                 /* XXX: Currently, skip remote object, the consistency for
210                  *      remote object will be processed in LFSCK phase III. */
211                 if (dt_object_exists(child) && !dt_object_remote(child))
212                         rc = lfsck_exec_dir(env, lfsck, child, ent);
213                 lfsck_object_put(env, child);
214                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
215                         RETURN(rc);
216
217 checkpoint:
218                 rc = lfsck_checkpoint(env, lfsck);
219                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
220                         RETURN(rc);
221
222                 /* Rate control. */
223                 lfsck_control_speed(lfsck);
224                 if (unlikely(!thread_is_running(thread)))
225                         RETURN(0);
226
227                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
228                         spin_lock(&lfsck->li_lock);
229                         thread_set_flags(thread, SVC_STOPPING);
230                         spin_unlock(&lfsck->li_lock);
231                         RETURN(-EINVAL);
232                 }
233
234                 rc = iops->next(env, di);
235         } while (rc == 0);
236
237         if (rc > 0 && !lfsck->li_oit_over)
238                 lfsck_close_dir(env, lfsck);
239
240         RETURN(rc);
241 }
242
243 static int lfsck_master_oit_engine(const struct lu_env *env,
244                                    struct lfsck_instance *lfsck)
245 {
246         struct lfsck_thread_info        *info   = lfsck_env_info(env);
247         const struct dt_it_ops          *iops   =
248                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
249         struct dt_it                    *di     = lfsck->li_di_oit;
250         struct lu_fid                   *fid    = &info->lti_fid;
251         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
252         struct ptlrpc_thread            *thread = &lfsck->li_thread;
253         __u32                            idx    =
254                                 lfsck_dev_idx(lfsck->li_bottom);
255         int                              rc;
256         ENTRY;
257
258         do {
259                 struct dt_object *target;
260                 bool              update_lma = false;
261
262                 if (lfsck->li_di_dir != NULL) {
263                         rc = lfsck_master_dir_engine(env, lfsck);
264                         if (rc <= 0)
265                                 RETURN(rc);
266                 }
267
268                 if (unlikely(lfsck->li_oit_over))
269                         RETURN(1);
270
271                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
272                     cfs_fail_val > 0) {
273                         struct l_wait_info lwi;
274
275                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
276                                           NULL, NULL);
277                         l_wait_event(thread->t_ctl_waitq,
278                                      !thread_is_running(thread),
279                                      &lwi);
280                 }
281
282                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
283                         RETURN(0);
284
285                 lfsck->li_current_oit_processed = 1;
286                 lfsck->li_new_scanned++;
287                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
288                 if (rc != 0) {
289                         lfsck_fail(env, lfsck, true);
290                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
291                                 RETURN(rc);
292                         else
293                                 goto checkpoint;
294                 }
295
296                 if (fid_is_idif(fid)) {
297                         __u32 idx1 = fid_idif_ost_idx(fid);
298
299                         LASSERT(!lfsck->li_master);
300
301                         /* It is an old format device, update the LMA. */
302                         if (idx != idx1) {
303                                 struct ost_id *oi = &info->lti_oi;
304
305                                 fid_to_ostid(fid, oi);
306                                 ostid_to_fid(fid, oi, idx);
307                                 update_lma = true;
308                         }
309                 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
310                            !fid_is_last_id(fid) && !fid_is_root(fid) &&
311                            !fid_seq_is_dot(fid_seq(fid))) {
312                         /* If the FID/object is only used locally and invisible
313                          * to external nodes, then LFSCK will not handle it. */
314                         goto checkpoint;
315                 }
316
317                 target = lfsck_object_find(env, lfsck, fid);
318                 if (target == NULL) {
319                         goto checkpoint;
320                 } else if (IS_ERR(target)) {
321                         lfsck_fail(env, lfsck, true);
322                         if (bk->lb_param & LPF_FAILOUT)
323                                 RETURN(PTR_ERR(target));
324                         else
325                                 goto checkpoint;
326                 }
327
328                 /* XXX: Currently, skip remote object, the consistency for
329                  *      remote object will be processed in LFSCK phase III. */
330                 if (dt_object_exists(target) && !dt_object_remote(target)) {
331                         if (update_lma)
332                                 rc = lfsck_update_lma(env, lfsck, target);
333                         if (rc == 0)
334                                 rc = lfsck_exec_oit(env, lfsck, target);
335                 }
336                 lfsck_object_put(env, target);
337                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
338                         RETURN(rc);
339
340 checkpoint:
341                 rc = lfsck_checkpoint(env, lfsck);
342                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
343                         RETURN(rc);
344
345                 /* Rate control. */
346                 lfsck_control_speed(lfsck);
347
348                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
349                         spin_lock(&lfsck->li_lock);
350                         thread_set_flags(thread, SVC_STOPPING);
351                         spin_unlock(&lfsck->li_lock);
352                         RETURN(-EINVAL);
353                 }
354
355                 rc = iops->next(env, di);
356                 if (unlikely(rc > 0))
357                         lfsck->li_oit_over = 1;
358                 else if (likely(rc == 0))
359                         lfsck->li_current_oit_processed = 0;
360
361                 if (unlikely(!thread_is_running(thread)))
362                         RETURN(0);
363         } while (rc == 0 || lfsck->li_di_dir != NULL);
364
365         RETURN(rc);
366 }
367
368 int lfsck_master_engine(void *args)
369 {
370         struct lfsck_thread_args *lta      = args;
371         struct lu_env            *env      = &lta->lta_env;
372         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
373         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
374         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
375         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
376         struct dt_it             *oit_di;
377         int                       rc;
378         ENTRY;
379
380         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
381         if (IS_ERR(oit_di)) {
382                 rc = PTR_ERR(oit_di);
383                 CERROR("%s: LFSCK, fail to init iteration: rc = %d\n",
384                        lfsck_lfsck2name(lfsck), rc);
385
386                 GOTO(fini_args, rc);
387         }
388
389         spin_lock(&lfsck->li_lock);
390         lfsck->li_di_oit = oit_di;
391         spin_unlock(&lfsck->li_lock);
392         rc = lfsck_prep(env, lfsck, lta->lta_lsp);
393         if (rc != 0)
394                 GOTO(fini_oit, rc);
395
396         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
397                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
398                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
399                lfsck->li_pos_current.lp_oit_cookie,
400                lfsck->li_pos_current.lp_dir_cookie,
401                PFID(&lfsck->li_pos_current.lp_dir_parent),
402                current_pid());
403
404         spin_lock(&lfsck->li_lock);
405         thread_set_flags(thread, SVC_RUNNING);
406         spin_unlock(&lfsck->li_lock);
407         wake_up_all(&thread->t_ctl_waitq);
408
409         if (!cfs_list_empty(&lfsck->li_list_scan) ||
410             cfs_list_empty(&lfsck->li_list_double_scan))
411                 rc = lfsck_master_oit_engine(env, lfsck);
412         else
413                 rc = 1;
414
415         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
416                "oit_cookie = "LPU64", dir_cookie = "LPU64", parent = "DFID
417                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
418                lfsck->li_pos_current.lp_oit_cookie,
419                lfsck->li_pos_current.lp_dir_cookie,
420                PFID(&lfsck->li_pos_current.lp_dir_parent),
421                current_pid(), rc);
422
423         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
424                 rc = lfsck_post(env, lfsck, rc);
425
426         if (lfsck->li_di_dir != NULL)
427                 lfsck_close_dir(env, lfsck);
428
429 fini_oit:
430         lfsck_di_oit_put(env, lfsck);
431         oit_iops->fini(env, oit_di);
432         if (rc == 1) {
433                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
434                         rc = lfsck_double_scan(env, lfsck);
435                 else
436                         rc = 0;
437         } else {
438                 lfsck_quit(env, lfsck);
439         }
440
441         /* XXX: Purge the pinned objects in the future. */
442
443 fini_args:
444         spin_lock(&lfsck->li_lock);
445         thread_set_flags(thread, SVC_STOPPED);
446         spin_unlock(&lfsck->li_lock);
447         wake_up_all(&thread->t_ctl_waitq);
448         lfsck_thread_args_fini(lta);
449         return rc;
450 }