Whamcloud - gitweb
LU-5223 lmv: build master LMV EA dynamically build via readdir
[fs/lustre-release.git] / lustre / lfsck / lfsck_engine.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_engine.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lu_object.h>
34 #include <dt_object.h>
35 #include <lustre_net.h>
36 #include <lustre_fid.h>
37 #include <obd_support.h>
38 #include <lustre_lib.h>
39
40 #include "lfsck_internal.h"
41
42 static void lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie)
43 {
44         fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
45         *cookie = le64_to_cpu(ent->lde_hash);
46         ent->lde_reclen = le16_to_cpu(ent->lde_reclen);
47         ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
48         ent->lde_attrs = le32_to_cpu(ent->lde_attrs);
49
50         /* Make sure the name is terminated with '0'.
51          * The data (type) after ent::lde_name maybe
52          * broken, but we do not care. */
53         ent->lde_name[ent->lde_namelen] = 0;
54 }
55
56 static void lfsck_di_oit_put(const struct lu_env *env, struct lfsck_instance *lfsck)
57 {
58         const struct dt_it_ops  *iops;
59         struct dt_it            *di;
60
61         spin_lock(&lfsck->li_lock);
62         iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
63         di = lfsck->li_di_oit;
64         lfsck->li_di_oit = NULL;
65         spin_unlock(&lfsck->li_lock);
66         iops->put(env, di);
67 }
68
69 static void lfsck_di_dir_put(const struct lu_env *env, struct lfsck_instance *lfsck)
70 {
71         const struct dt_it_ops  *iops;
72         struct dt_it            *di;
73
74         spin_lock(&lfsck->li_lock);
75         iops = &lfsck->li_obj_dir->do_index_ops->dio_it;
76         di = lfsck->li_di_dir;
77         lfsck->li_di_dir = NULL;
78         lfsck->li_cookie_dir = 0;
79         spin_unlock(&lfsck->li_lock);
80         iops->put(env, di);
81 }
82
83 static void lfsck_close_dir(const struct lu_env *env,
84                             struct lfsck_instance *lfsck)
85 {
86         struct dt_object        *dir_obj  = lfsck->li_obj_dir;
87         const struct dt_it_ops  *dir_iops = &dir_obj->do_index_ops->dio_it;
88         struct dt_it            *dir_di   = lfsck->li_di_dir;
89
90         lfsck_di_dir_put(env, lfsck);
91         dir_iops->fini(env, dir_di);
92         lfsck->li_obj_dir = NULL;
93         lfsck_object_put(env, dir_obj);
94 }
95
96 static int lfsck_update_lma(const struct lu_env *env,
97                             struct lfsck_instance *lfsck, struct dt_object *obj)
98 {
99         struct lfsck_thread_info        *info   = lfsck_env_info(env);
100         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
101         struct dt_device                *dt     = lfsck->li_bottom;
102         struct lustre_mdt_attrs         *lma    = &info->lti_lma;
103         struct lu_buf                   *buf;
104         struct thandle                  *th;
105         int                              fl;
106         int                              rc;
107         ENTRY;
108
109         if (bk->lb_param & LPF_DRYRUN)
110                 RETURN(0);
111
112         buf = lfsck_buf_get(env, info->lti_lma_old, LMA_OLD_SIZE);
113         rc = dt_xattr_get(env, obj, buf, XATTR_NAME_LMA, BYPASS_CAPA);
114         if (rc < 0) {
115                 if (rc != -ENODATA)
116                         RETURN(rc);
117
118                 fl = LU_XATTR_CREATE;
119                 lustre_lma_init(lma, lfsck_dto2fid(obj), LMAC_FID_ON_OST, 0);
120         } else {
121                 if (rc != LMA_OLD_SIZE && rc != sizeof(struct lustre_mdt_attrs))
122                         RETURN(-EINVAL);
123
124                 fl = LU_XATTR_REPLACE;
125                 lustre_lma_swab(lma);
126                 lustre_lma_init(lma, lfsck_dto2fid(obj),
127                                 lma->lma_compat | LMAC_FID_ON_OST,
128                                 lma->lma_incompat);
129         }
130         lustre_lma_swab(lma);
131
132         th = dt_trans_create(env, dt);
133         if (IS_ERR(th))
134                 RETURN(PTR_ERR(th));
135
136         buf = lfsck_buf_get(env, lma, sizeof(*lma));
137         rc = dt_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th);
138         if (rc != 0)
139                 GOTO(stop, rc);
140
141         rc = dt_trans_start(env, dt, th);
142         if (rc != 0)
143                 GOTO(stop, rc);
144
145         rc = dt_xattr_set(env, obj, buf, XATTR_NAME_LMA, fl, th, BYPASS_CAPA);
146
147         GOTO(stop, rc);
148
149 stop:
150         dt_trans_stop(env, dt, th);
151         return rc;
152 }
153
154 static int lfsck_master_dir_engine(const struct lu_env *env,
155                                    struct lfsck_instance *lfsck)
156 {
157         struct lfsck_thread_info        *info   = lfsck_env_info(env);
158         struct dt_object                *dir    = lfsck->li_obj_dir;
159         const struct dt_it_ops          *iops   = &dir->do_index_ops->dio_it;
160         struct dt_it                    *di     = lfsck->li_di_dir;
161         struct lu_dirent                *ent    =
162                         (struct lu_dirent *)info->lti_key;
163         struct lu_fid                   *fid    = &info->lti_fid;
164         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
165         struct ptlrpc_thread            *thread = &lfsck->li_thread;
166         int                              rc;
167         ENTRY;
168
169         do {
170                 struct dt_object *child;
171
172                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY2) &&
173                     cfs_fail_val > 0) {
174                         struct l_wait_info lwi;
175
176                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
177                                           NULL, NULL);
178                         l_wait_event(thread->t_ctl_waitq,
179                                      !thread_is_running(thread),
180                                      &lwi);
181                 }
182
183                 lfsck->li_new_scanned++;
184                 rc = iops->rec(env, di, (struct dt_rec *)ent,
185                                lfsck->li_args_dir);
186                 lfsck_unpack_ent(ent, &lfsck->li_cookie_dir);
187                 if (rc != 0) {
188                         CDEBUG(D_LFSCK, "%s: scan dir failed at rec(), "
189                                "parent "DFID", cookie "LPX64": rc = %d\n",
190                                lfsck_lfsck2name(lfsck),
191                                PFID(lfsck_dto2fid(dir)),
192                                lfsck->li_cookie_dir, rc);
193                         lfsck_fail(env, lfsck, true);
194                         if (bk->lb_param & LPF_FAILOUT)
195                                 RETURN(rc);
196                         else
197                                 goto checkpoint;
198                 }
199
200                 if (ent->lde_attrs & LUDA_IGNORE)
201                         goto checkpoint;
202
203                 *fid = ent->lde_fid;
204                 child = lfsck_object_find(env, lfsck, fid);
205                 if (child == NULL) {
206                         goto checkpoint;
207                 } else if (IS_ERR(child)) {
208                         CDEBUG(D_LFSCK, "%s: scan dir failed at find target, "
209                                "parent "DFID", child %.*s "DFID": rc = %d\n",
210                                lfsck_lfsck2name(lfsck),
211                                PFID(lfsck_dto2fid(dir)),
212                                ent->lde_namelen, ent->lde_name,
213                                PFID(&ent->lde_fid), rc);
214                         lfsck_fail(env, lfsck, true);
215                         if (bk->lb_param & LPF_FAILOUT)
216                                 RETURN(PTR_ERR(child));
217                         else
218                                 goto checkpoint;
219                 }
220
221                 /* XXX: Currently, skip remote object, the consistency for
222                  *      remote object will be processed in LFSCK phase III. */
223                 if (dt_object_exists(child) && !dt_object_remote(child))
224                         rc = lfsck_exec_dir(env, lfsck, child, ent);
225                 lfsck_object_put(env, child);
226                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
227                         RETURN(rc);
228
229 checkpoint:
230                 rc = lfsck_checkpoint(env, lfsck);
231                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
232                         RETURN(rc);
233
234                 /* Rate control. */
235                 lfsck_control_speed(lfsck);
236                 if (unlikely(!thread_is_running(thread))) {
237                         CDEBUG(D_LFSCK, "%s: scan dir exit for engine stop, "
238                                "parent "DFID", cookie "LPX64"\n",
239                                lfsck_lfsck2name(lfsck),
240                                PFID(lfsck_dto2fid(dir)),
241                                lfsck->li_cookie_dir);
242                         RETURN(0);
243                 }
244
245                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
246                         spin_lock(&lfsck->li_lock);
247                         thread_set_flags(thread, SVC_STOPPING);
248                         spin_unlock(&lfsck->li_lock);
249                         RETURN(-EINVAL);
250                 }
251
252                 rc = iops->next(env, di);
253         } while (rc == 0);
254
255         if (rc > 0 && !lfsck->li_oit_over)
256                 lfsck_close_dir(env, lfsck);
257
258         RETURN(rc);
259 }
260
261 static int lfsck_master_oit_engine(const struct lu_env *env,
262                                    struct lfsck_instance *lfsck)
263 {
264         struct lfsck_thread_info        *info   = lfsck_env_info(env);
265         const struct dt_it_ops          *iops   =
266                                 &lfsck->li_obj_oit->do_index_ops->dio_it;
267         struct dt_it                    *di     = lfsck->li_di_oit;
268         struct lu_fid                   *fid    = &info->lti_fid;
269         struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
270         struct ptlrpc_thread            *thread = &lfsck->li_thread;
271         __u32                            idx    =
272                                 lfsck_dev_idx(lfsck->li_bottom);
273         int                              rc;
274         ENTRY;
275
276         do {
277                 struct dt_object *target;
278                 bool              update_lma = false;
279
280                 if (lfsck->li_di_dir != NULL) {
281                         rc = lfsck_master_dir_engine(env, lfsck);
282                         if (rc <= 0)
283                                 RETURN(rc);
284                 }
285
286                 if (unlikely(lfsck->li_oit_over))
287                         RETURN(1);
288
289                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY1) &&
290                     cfs_fail_val > 0) {
291                         struct l_wait_info lwi;
292
293                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
294                                           NULL, NULL);
295                         l_wait_event(thread->t_ctl_waitq,
296                                      !thread_is_running(thread),
297                                      &lwi);
298                 }
299
300                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
301                         RETURN(0);
302
303                 lfsck->li_current_oit_processed = 1;
304                 lfsck->li_new_scanned++;
305                 rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
306                 if (rc != 0) {
307                         CDEBUG(D_LFSCK, "%s: OIT scan failed at rec(): "
308                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
309                         lfsck_fail(env, lfsck, true);
310                         if (rc < 0 && bk->lb_param & LPF_FAILOUT)
311                                 RETURN(rc);
312                         else
313                                 goto checkpoint;
314                 }
315
316                 if (fid_is_idif(fid)) {
317                         __u32 idx1 = fid_idif_ost_idx(fid);
318
319                         LASSERT(!lfsck->li_master);
320
321                         /* It is an old format device, update the LMA. */
322                         if (idx != idx1) {
323                                 struct ost_id *oi = &info->lti_oi;
324
325                                 fid_to_ostid(fid, oi);
326                                 ostid_to_fid(fid, oi, idx);
327                                 update_lma = true;
328                         }
329                 } else if (!fid_is_norm(fid) && !fid_is_igif(fid) &&
330                            !fid_is_last_id(fid) && !fid_is_root(fid) &&
331                            !fid_seq_is_dot(fid_seq(fid))) {
332                         /* If the FID/object is only used locally and invisible
333                          * to external nodes, then LFSCK will not handle it. */
334                         goto checkpoint;
335                 }
336
337                 target = lfsck_object_find(env, lfsck, fid);
338                 if (target == NULL) {
339                         goto checkpoint;
340                 } else if (IS_ERR(target)) {
341                         CDEBUG(D_LFSCK, "%s: OIT scan failed at find target "
342                                DFID", cookie "LPU64": rc = %d\n",
343                                lfsck_lfsck2name(lfsck), PFID(fid),
344                                iops->store(env, di), rc);
345                         lfsck_fail(env, lfsck, true);
346                         if (bk->lb_param & LPF_FAILOUT)
347                                 RETURN(PTR_ERR(target));
348                         else
349                                 goto checkpoint;
350                 }
351
352                 /* XXX: Currently, skip remote object, the consistency for
353                  *      remote object will be processed in LFSCK phase III. */
354                 if (dt_object_exists(target) && !dt_object_remote(target)) {
355                         if (update_lma) {
356                                 rc = lfsck_update_lma(env, lfsck, target);
357                                 if (rc != 0)
358                                         CDEBUG(D_LFSCK, "%s: fail to update "
359                                                "LMA for "DFID": rc = %d\n",
360                                                lfsck_lfsck2name(lfsck),
361                                                PFID(lfsck_dto2fid(target)), rc);
362                         }
363                         if (rc == 0)
364                                 rc = lfsck_exec_oit(env, lfsck, target);
365                 }
366                 lfsck_object_put(env, target);
367                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
368                         RETURN(rc);
369
370 checkpoint:
371                 rc = lfsck_checkpoint(env, lfsck);
372                 if (rc != 0 && bk->lb_param & LPF_FAILOUT)
373                         RETURN(rc);
374
375                 /* Rate control. */
376                 lfsck_control_speed(lfsck);
377
378                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL1)) {
379                         spin_lock(&lfsck->li_lock);
380                         thread_set_flags(thread, SVC_STOPPING);
381                         spin_unlock(&lfsck->li_lock);
382                         RETURN(-EINVAL);
383                 }
384
385                 rc = iops->next(env, di);
386                 if (unlikely(rc > 0))
387                         lfsck->li_oit_over = 1;
388                 else if (likely(rc == 0))
389                         lfsck->li_current_oit_processed = 0;
390
391                 if (unlikely(!thread_is_running(thread))) {
392                         CDEBUG(D_LFSCK, "%s: OIT scan exit for engine stop, "
393                                "cookie "LPU64"\n", lfsck_lfsck2name(lfsck),
394                                iops->store(env, di));
395                         RETURN(0);
396                 }
397         } while (rc == 0 || lfsck->li_di_dir != NULL);
398
399         RETURN(rc);
400 }
401
402 int lfsck_master_engine(void *args)
403 {
404         struct lfsck_thread_args *lta      = args;
405         struct lu_env            *env      = &lta->lta_env;
406         struct lfsck_instance    *lfsck    = lta->lta_lfsck;
407         struct ptlrpc_thread     *thread   = &lfsck->li_thread;
408         struct dt_object         *oit_obj  = lfsck->li_obj_oit;
409         const struct dt_it_ops   *oit_iops = &oit_obj->do_index_ops->dio_it;
410         struct dt_it             *oit_di;
411         struct l_wait_info        lwi      = { 0 };
412         int                       rc;
413         ENTRY;
414
415         oit_di = oit_iops->init(env, oit_obj, lfsck->li_args_oit, BYPASS_CAPA);
416         if (IS_ERR(oit_di)) {
417                 rc = PTR_ERR(oit_di);
418                 CDEBUG(D_LFSCK, "%s: master engine fail to init iteration: "
419                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
420
421                 GOTO(fini_args, rc);
422         }
423
424         spin_lock(&lfsck->li_lock);
425         lfsck->li_di_oit = oit_di;
426         spin_unlock(&lfsck->li_lock);
427         rc = lfsck_prep(env, lfsck, lta->lta_lsp);
428         if (rc != 0)
429                 GOTO(fini_oit, rc);
430
431         CDEBUG(D_LFSCK, "LFSCK entry: oit_flags = %#x, dir_flags = %#x, "
432                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
433                ", pid = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
434                lfsck->li_pos_current.lp_oit_cookie,
435                lfsck->li_pos_current.lp_dir_cookie,
436                PFID(&lfsck->li_pos_current.lp_dir_parent),
437                current_pid());
438
439         spin_lock(&lfsck->li_lock);
440         thread_set_flags(thread, SVC_RUNNING);
441         spin_unlock(&lfsck->li_lock);
442         wake_up_all(&thread->t_ctl_waitq);
443
444         l_wait_event(thread->t_ctl_waitq,
445                      lfsck->li_start_unplug ||
446                      !thread_is_running(thread),
447                      &lwi);
448         if (!thread_is_running(thread))
449                 GOTO(fini_oit, rc = 0);
450
451         if (!cfs_list_empty(&lfsck->li_list_scan) ||
452             cfs_list_empty(&lfsck->li_list_double_scan))
453                 rc = lfsck_master_oit_engine(env, lfsck);
454         else
455                 rc = 1;
456
457         CDEBUG(D_LFSCK, "LFSCK exit: oit_flags = %#x, dir_flags = %#x, "
458                "oit_cookie = "LPU64", dir_cookie = "LPX64", parent = "DFID
459                ", pid = %d, rc = %d\n", lfsck->li_args_oit, lfsck->li_args_dir,
460                lfsck->li_pos_current.lp_oit_cookie,
461                lfsck->li_pos_current.lp_dir_cookie,
462                PFID(&lfsck->li_pos_current.lp_dir_parent),
463                current_pid(), rc);
464
465         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CRASH))
466                 rc = lfsck_post(env, lfsck, rc);
467
468         if (lfsck->li_di_dir != NULL)
469                 lfsck_close_dir(env, lfsck);
470
471 fini_oit:
472         lfsck_di_oit_put(env, lfsck);
473         oit_iops->fini(env, oit_di);
474         if (rc == 1) {
475                 if (!cfs_list_empty(&lfsck->li_list_double_scan))
476                         rc = lfsck_double_scan(env, lfsck);
477                 else
478                         rc = 0;
479         } else {
480                 lfsck_quit(env, lfsck);
481         }
482
483         /* XXX: Purge the pinned objects in the future. */
484
485 fini_args:
486         spin_lock(&lfsck->li_lock);
487         thread_set_flags(thread, SVC_STOPPED);
488         spin_unlock(&lfsck->li_lock);
489         wake_up_all(&thread->t_ctl_waitq);
490         lfsck_thread_args_fini(lta);
491         return rc;
492 }