Whamcloud - gitweb
LU-6147 lfsck: NOT purge object by OI scrub
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         "create_mdtobj",
102         NULL
103 };
104
105 enum lfsck_verify_lpf_types {
106         LVLT_BY_BOOKMARK        = 0,
107         LVLT_BY_NAMEENTRY       = 1,
108 };
109
110 const char *lfsck_status2names(enum lfsck_status status)
111 {
112         if (unlikely(status < 0 || status >= LS_MAX))
113                 return "unknown";
114
115         return lfsck_status_names[status];
116 }
117
118 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
119 {
120         spin_lock_init(&ltds->ltd_lock);
121         init_rwsem(&ltds->ltd_rw_sem);
122         INIT_LIST_HEAD(&ltds->ltd_orphan);
123         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
124         if (ltds->ltd_tgts_bitmap == NULL)
125                 return -ENOMEM;
126
127         return 0;
128 }
129
130 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
131 {
132         struct lfsck_tgt_desc   *ltd;
133         struct lfsck_tgt_desc   *next;
134         int                      idx;
135
136         down_write(&ltds->ltd_rw_sem);
137
138         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
139                                  ltd_orphan_list) {
140                 list_del_init(&ltd->ltd_orphan_list);
141                 lfsck_tgt_put(ltd);
142         }
143
144         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
145                 up_write(&ltds->ltd_rw_sem);
146
147                 return;
148         }
149
150         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
151                 ltd = LTD_TGT(ltds, idx);
152                 if (likely(ltd != NULL)) {
153                         LASSERT(list_empty(&ltd->ltd_layout_list));
154                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_list));
156                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
157
158                         ltds->ltd_tgtnr--;
159                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
160                         LTD_TGT(ltds, idx) = NULL;
161                         lfsck_tgt_put(ltd);
162                 }
163         }
164
165         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
166                  ltds->ltd_tgtnr);
167
168         for (idx = 0; idx < TGT_PTRS; idx++) {
169                 if (ltds->ltd_tgts_idx[idx] != NULL) {
170                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
171                         ltds->ltd_tgts_idx[idx] = NULL;
172                 }
173         }
174
175         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
176         ltds->ltd_tgts_bitmap = NULL;
177         up_write(&ltds->ltd_rw_sem);
178 }
179
180 static int __lfsck_add_target(const struct lu_env *env,
181                               struct lfsck_instance *lfsck,
182                               struct lfsck_tgt_desc *ltd,
183                               bool for_ost, bool locked)
184 {
185         struct lfsck_tgt_descs *ltds;
186         __u32                   index = ltd->ltd_index;
187         int                     rc    = 0;
188         ENTRY;
189
190         if (for_ost)
191                 ltds = &lfsck->li_ost_descs;
192         else
193                 ltds = &lfsck->li_mdt_descs;
194
195         if (!locked)
196                 down_write(&ltds->ltd_rw_sem);
197
198         LASSERT(ltds->ltd_tgts_bitmap != NULL);
199
200         if (index >= ltds->ltd_tgts_bitmap->size) {
201                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
202                                     (__u32)BITS_PER_LONG);
203                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
204                 cfs_bitmap_t *new_bitmap;
205
206                 while (newsize < index + 1)
207                         newsize <<= 1;
208
209                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
210                 if (new_bitmap == NULL)
211                         GOTO(unlock, rc = -ENOMEM);
212
213                 if (ltds->ltd_tgtnr > 0)
214                         cfs_bitmap_copy(new_bitmap, old_bitmap);
215                 ltds->ltd_tgts_bitmap = new_bitmap;
216                 CFS_FREE_BITMAP(old_bitmap);
217         }
218
219         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
220                 CERROR("%s: the device %s (%u) is registered already\n",
221                        lfsck_lfsck2name(lfsck),
222                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
223                 GOTO(unlock, rc = -EEXIST);
224         }
225
226         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
227                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
228                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
229                         GOTO(unlock, rc = -ENOMEM);
230         }
231
232         LTD_TGT(ltds, index) = ltd;
233         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
234         ltds->ltd_tgtnr++;
235
236         GOTO(unlock, rc = 0);
237
238 unlock:
239         if (!locked)
240                 up_write(&ltds->ltd_rw_sem);
241
242         return rc;
243 }
244
245 static int lfsck_add_target_from_orphan(const struct lu_env *env,
246                                         struct lfsck_instance *lfsck)
247 {
248         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
249         struct lfsck_tgt_desc   *ltd;
250         struct lfsck_tgt_desc   *next;
251         struct list_head        *head    = &lfsck_ost_orphan_list;
252         int                      rc;
253         bool                     for_ost = true;
254
255 again:
256         spin_lock(&lfsck_instance_lock);
257         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
258                 if (ltd->ltd_key == lfsck->li_bottom)
259                         list_move_tail(&ltd->ltd_orphan_list,
260                                        &ltds->ltd_orphan);
261         }
262         spin_unlock(&lfsck_instance_lock);
263
264         down_write(&ltds->ltd_rw_sem);
265         while (!list_empty(&ltds->ltd_orphan)) {
266                 ltd = list_entry(ltds->ltd_orphan.next,
267                                  struct lfsck_tgt_desc,
268                                  ltd_orphan_list);
269                 list_del_init(&ltd->ltd_orphan_list);
270                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
271                 /* Do not hold the semaphore for too long time. */
272                 up_write(&ltds->ltd_rw_sem);
273                 if (rc != 0)
274                         return rc;
275
276                 down_write(&ltds->ltd_rw_sem);
277         }
278         up_write(&ltds->ltd_rw_sem);
279
280         if (for_ost) {
281                 ltds = &lfsck->li_mdt_descs;
282                 head = &lfsck_mdt_orphan_list;
283                 for_ost = false;
284                 goto again;
285         }
286
287         return 0;
288 }
289
290 static inline struct lfsck_component *
291 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
292                        struct list_head *list)
293 {
294         struct lfsck_component *com;
295
296         list_for_each_entry(com, list, lc_link) {
297                 if (com->lc_type == type)
298                         return com;
299         }
300         return NULL;
301 }
302
303 struct lfsck_component *
304 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
305 {
306         struct lfsck_component *com;
307
308         spin_lock(&lfsck->li_lock);
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
310         if (com != NULL)
311                 goto unlock;
312
313         com = __lfsck_component_find(lfsck, type,
314                                      &lfsck->li_list_double_scan);
315         if (com != NULL)
316                 goto unlock;
317
318         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
319
320 unlock:
321         if (com != NULL)
322                 lfsck_component_get(com);
323         spin_unlock(&lfsck->li_lock);
324         return com;
325 }
326
327 void lfsck_component_cleanup(const struct lu_env *env,
328                              struct lfsck_component *com)
329 {
330         if (!list_empty(&com->lc_link))
331                 list_del_init(&com->lc_link);
332         if (!list_empty(&com->lc_link_dir))
333                 list_del_init(&com->lc_link_dir);
334
335         lfsck_component_put(env, com);
336 }
337
338 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lu_fid *fid, bool locked)
340 {
341         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
342         int                      rc = 0;
343         ENTRY;
344
345         if (!locked)
346                 mutex_lock(&lfsck->li_mutex);
347
348         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
349         if (rc >= 0) {
350                 bk->lb_last_fid = *fid;
351                 /* We do not care about whether the subsequent sub-operations
352                  * failed or not. The worst case is that one FID is lost that
353                  * is not a big issue for the LFSCK since it is relative rare
354                  * for LFSCK create. */
355                 rc = lfsck_bookmark_store(env, lfsck);
356         }
357
358         if (!locked)
359                 mutex_unlock(&lfsck->li_mutex);
360
361         RETURN(rc);
362 }
363
364 /**
365  * Request the specified ibits lock for the given object.
366  *
367  * Before the LFSCK modifying on the namespace visible object,
368  * it needs to acquire related ibits ldlm lock.
369  *
370  * \param[in] env       pointer to the thread context
371  * \param[in] lfsck     pointer to the lfsck instance
372  * \param[in] obj       pointer to the dt_object to be locked
373  * \param[out] lh       pointer to the lock handle
374  * \param[in] ibits     the bits for the ldlm lock to be acquired
375  * \param[in] mode      the mode for the ldlm lock to be acquired
376  *
377  * \retval              0 for success
378  * \retval              negative error number on failure
379  */
380 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
381                      struct dt_object *obj, struct lustre_handle *lh,
382                      __u64 bits, ldlm_mode_t mode)
383 {
384         struct lfsck_thread_info        *info   = lfsck_env_info(env);
385         ldlm_policy_data_t              *policy = &info->lti_policy;
386         struct ldlm_res_id              *resid  = &info->lti_resid;
387         __u64                            flags  = LDLM_FL_ATOMIC_CB;
388         int                              rc;
389
390         LASSERT(lfsck->li_namespace != NULL);
391
392         memset(policy, 0, sizeof(*policy));
393         policy->l_inodebits.bits = bits;
394         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
395         if (dt_object_remote(obj)) {
396                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
397
398                 memset(einfo, 0, sizeof(*einfo));
399                 einfo->ei_type = LDLM_IBITS;
400                 einfo->ei_mode = mode;
401                 einfo->ei_cb_bl = ldlm_blocking_ast;
402                 einfo->ei_cb_cp = ldlm_completion_ast;
403                 einfo->ei_res_id = resid;
404
405                 rc = dt_object_lock(env, obj, lh, einfo, policy);
406         } else {
407                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
408                                             LDLM_IBITS, policy, mode,
409                                             &flags, ldlm_blocking_ast,
410                                             ldlm_completion_ast, NULL, NULL,
411                                             0, LVB_T_NONE, NULL, lh);
412         }
413
414         if (rc == ELDLM_OK) {
415                 rc = 0;
416         } else {
417                 memset(lh, 0, sizeof(*lh));
418                 rc = -EIO;
419         }
420
421         return rc;
422 }
423
424 /**
425  * Release the the specified ibits lock.
426  *
427  * If the lock has been acquired before, release it
428  * and cleanup the handle. Otherwise, do nothing.
429  *
430  * \param[in] lh        pointer to the lock handle
431  * \param[in] mode      the mode for the ldlm lock to be released
432  */
433 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
434 {
435         if (lustre_handle_is_used(lh)) {
436                 ldlm_lock_decref(lh, mode);
437                 memset(lh, 0, sizeof(*lh));
438         }
439 }
440
441 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
442                               struct lfsck_instance *lfsck,
443                               const struct lu_fid *fid)
444 {
445         struct seq_server_site  *ss     =
446                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
447         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
448         int                      rc;
449
450         fld_range_set_mdt(range);
451         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
452         if (rc == 0)
453                 rc = range->lsr_index;
454
455         return rc;
456 }
457
458 const char dot[] = ".";
459 const char dotdot[] = "..";
460 static const char dotlustre[] = ".lustre";
461 static const char lostfound[] = "lost+found";
462
463 /**
464  * Remove the name entry from the .lustre/lost+found directory.
465  *
466  * No need to care about the object referenced by the name entry,
467  * either the name entry is invalid or redundant, or the referenced
468  * object has been processed or will be handled by others.
469  *
470  * \param[in] env       pointer to the thread context
471  * \param[in] lfsck     pointer to the lfsck instance
472  * \param[in] name      the name for the name entry to be removed
473  *
474  * \retval              0 for success
475  * \retval              negative error number on failure
476  */
477 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
478                                        struct lfsck_instance *lfsck,
479                                        const char *name)
480 {
481         struct dt_object        *parent = lfsck->li_lpf_root_obj;
482         struct dt_device        *dev    = lfsck->li_next;
483         struct thandle          *th;
484         struct lustre_handle     lh     = { 0 };
485         int                      rc;
486         ENTRY;
487
488         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
489                               MDS_INODELOCK_UPDATE, LCK_EX);
490         if (rc != 0)
491                 RETURN(rc);
492
493         th = dt_trans_create(env, dev);
494         if (IS_ERR(th))
495                 GOTO(unlock, rc = PTR_ERR(th));
496
497         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
498         if (rc != 0)
499                 GOTO(stop, rc);
500
501         rc = dt_declare_ref_del(env, parent, th);
502         if (rc != 0)
503                 GOTO(stop, rc);
504
505         rc = dt_trans_start(env, dev, th);
506         if (rc != 0)
507                 GOTO(stop, rc);
508
509         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
510                        BYPASS_CAPA);
511         if (rc != 0)
512                 GOTO(stop, rc);
513
514         dt_write_lock(env, parent, 0);
515         rc = dt_ref_del(env, parent, th);
516         dt_write_unlock(env, parent);
517
518         GOTO(stop, rc);
519
520 stop:
521         dt_trans_stop(env, dev, th);
522
523 unlock:
524         lfsck_ibits_unlock(&lh, LCK_EX);
525
526         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
527                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
528
529         return rc;
530 }
531
532 static int lfsck_create_lpf_local(const struct lu_env *env,
533                                   struct lfsck_instance *lfsck,
534                                   struct dt_object *child,
535                                   struct lu_attr *la,
536                                   struct dt_object_format *dof,
537                                   const char *name)
538 {
539         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
540         struct dt_object        *parent = lfsck->li_lpf_root_obj;
541         struct dt_device        *dev    = lfsck->li_bottom;
542         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
543         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
544         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
545         struct thandle          *th     = NULL;
546         struct linkea_data       ldata  = { NULL };
547         struct lu_buf            linkea_buf;
548         const struct lu_name    *cname;
549         loff_t                   pos    = 0;
550         int                      len    = sizeof(struct lfsck_bookmark);
551         int                      rc;
552         ENTRY;
553
554         rc = linkea_data_new(&ldata,
555                              &lfsck_env_info(env)->lti_linkea_buf2);
556         if (rc != 0)
557                 RETURN(rc);
558
559         cname = lfsck_name_get_const(env, name, strlen(name));
560         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
561         if (rc != 0)
562                 RETURN(rc);
563
564         th = dt_trans_create(env, dev);
565         if (IS_ERR(th))
566                 RETURN(PTR_ERR(th));
567
568         /* 1a. create child */
569         rc = dt_declare_create(env, child, la, NULL, dof, th);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         /* 2a. increase child nlink */
574         rc = dt_declare_ref_add(env, child, th);
575         if (rc != 0)
576                 GOTO(stop, rc);
577
578         /* 3a. insert linkEA for child */
579         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
580                        ldata.ld_leh->leh_len);
581         rc = dt_declare_xattr_set(env, child, &linkea_buf,
582                                   XATTR_NAME_LINK, 0, th);
583         if (rc != 0)
584                 GOTO(stop, rc);
585
586         /* 4a. insert name into parent dir */
587         rec->rec_type = S_IFDIR;
588         rec->rec_fid = cfid;
589         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
590                                (const struct dt_key *)name, th);
591         if (rc != 0)
592                 GOTO(stop, rc);
593
594         /* 5a. increase parent nlink */
595         rc = dt_declare_ref_add(env, parent, th);
596         if (rc != 0)
597                 GOTO(stop, rc);
598
599         /* 6a. update bookmark */
600         rc = dt_declare_record_write(env, bk_obj,
601                                      lfsck_buf_get(env, bk, len), 0, th);
602         if (rc != 0)
603                 GOTO(stop, rc);
604
605         rc = dt_trans_start_local(env, dev, th);
606         if (rc != 0)
607                 GOTO(stop, rc);
608
609         dt_write_lock(env, child, 0);
610         /* 1b.1. create child */
611         rc = dt_create(env, child, la, NULL, dof, th);
612         if (rc != 0)
613                 GOTO(unlock, rc);
614
615         if (unlikely(!dt_try_as_dir(env, child)))
616                 GOTO(unlock, rc = -ENOTDIR);
617
618         /* 1b.2. insert dot into child dir */
619         rec->rec_fid = cfid;
620         rc = dt_insert(env, child, (const struct dt_rec *)rec,
621                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
622         if (rc != 0)
623                 GOTO(unlock, rc);
624
625         /* 1b.3. insert dotdot into child dir */
626         rec->rec_fid = &LU_LPF_FID;
627         rc = dt_insert(env, child, (const struct dt_rec *)rec,
628                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
629         if (rc != 0)
630                 GOTO(unlock, rc);
631
632         /* 2b. increase child nlink */
633         rc = dt_ref_add(env, child, th);
634         if (rc != 0)
635                 GOTO(unlock, rc);
636
637         /* 3b. insert linkEA for child. */
638         rc = dt_xattr_set(env, child, &linkea_buf,
639                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
640         dt_write_unlock(env, child);
641         if (rc != 0)
642                 GOTO(stop, rc);
643
644         /* 4b. insert name into parent dir */
645         rec->rec_fid = cfid;
646         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
647                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
648         if (rc != 0)
649                 GOTO(stop, rc);
650
651         dt_write_lock(env, parent, 0);
652         /* 5b. increase parent nlink */
653         rc = dt_ref_add(env, parent, th);
654         dt_write_unlock(env, parent);
655         if (rc != 0)
656                 GOTO(stop, rc);
657
658         bk->lb_lpf_fid = *cfid;
659         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
660
661         /* 6b. update bookmark */
662         rc = dt_record_write(env, bk_obj,
663                              lfsck_buf_get(env, bk, len), &pos, th);
664
665         GOTO(stop, rc);
666
667 unlock:
668         dt_write_unlock(env, child);
669
670 stop:
671         dt_trans_stop(env, dev, th);
672
673         return rc;
674 }
675
676 static int lfsck_create_lpf_remote(const struct lu_env *env,
677                                    struct lfsck_instance *lfsck,
678                                    struct dt_object *child,
679                                    struct lu_attr *la,
680                                    struct dt_object_format *dof,
681                                    const char *name)
682 {
683         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
684         struct dt_object        *parent = lfsck->li_lpf_root_obj;
685         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
686         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
687         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
688         struct thandle          *th     = NULL;
689         struct linkea_data       ldata  = { NULL };
690         struct lu_buf            linkea_buf;
691         const struct lu_name    *cname;
692         struct dt_device        *dev;
693         loff_t                   pos    = 0;
694         int                      len    = sizeof(struct lfsck_bookmark);
695         int                      rc;
696         ENTRY;
697
698         rc = linkea_data_new(&ldata,
699                              &lfsck_env_info(env)->lti_linkea_buf2);
700         if (rc != 0)
701                 RETURN(rc);
702
703         cname = lfsck_name_get_const(env, name, strlen(name));
704         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
705         if (rc != 0)
706                 RETURN(rc);
707
708         /* Create .lustre/lost+found/MDTxxxx. */
709
710         /* XXX: Currently, cross-MDT create operation needs to create the child
711          *      object firstly, then insert name into the parent directory. For
712          *      this case, the child object resides on current MDT (local), but
713          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
714          *      easy to contain all the sub-modifications orderly within single
715          *      transaction.
716          *
717          *      To avoid more inconsistency, we split the create operation into
718          *      two transactions:
719          *
720          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
721          *         locally.
722          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
723          *         remotely.
724          *
725          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
726          *      repair such inconsistency when LFSCK run next time. */
727
728         /* Transaction I: locally */
729
730         dev = lfsck->li_bottom;
731         th = dt_trans_create(env, dev);
732         if (IS_ERR(th))
733                 RETURN(PTR_ERR(th));
734
735         /* 1a. create child */
736         rc = dt_declare_create(env, child, la, NULL, dof, th);
737         if (rc != 0)
738                 GOTO(stop, rc);
739
740         /* 2a. increase child nlink */
741         rc = dt_declare_ref_add(env, child, th);
742         if (rc != 0)
743                 GOTO(stop, rc);
744
745         /* 3a. insert linkEA for child */
746         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
747                        ldata.ld_leh->leh_len);
748         rc = dt_declare_xattr_set(env, child, &linkea_buf,
749                                   XATTR_NAME_LINK, 0, th);
750         if (rc != 0)
751                 GOTO(stop, rc);
752
753         /* 4a. update bookmark */
754         rc = dt_declare_record_write(env, bk_obj,
755                                      lfsck_buf_get(env, bk, len), 0, th);
756         if (rc != 0)
757                 GOTO(stop, rc);
758
759         rc = dt_trans_start_local(env, dev, th);
760         if (rc != 0)
761                 GOTO(stop, rc);
762
763         dt_write_lock(env, child, 0);
764         /* 1b.1. create child */
765         rc = dt_create(env, child, la, NULL, dof, th);
766         if (rc != 0)
767                 GOTO(unlock, rc);
768
769         if (unlikely(!dt_try_as_dir(env, child)))
770                 GOTO(unlock, rc = -ENOTDIR);
771
772         /* 1b.2. insert dot into child dir */
773         rec->rec_type = S_IFDIR;
774         rec->rec_fid = cfid;
775         rc = dt_insert(env, child, (const struct dt_rec *)rec,
776                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
777         if (rc != 0)
778                 GOTO(unlock, rc);
779
780         /* 1b.3. insert dotdot into child dir */
781         rec->rec_fid = &LU_LPF_FID;
782         rc = dt_insert(env, child, (const struct dt_rec *)rec,
783                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
784         if (rc != 0)
785                 GOTO(unlock, rc);
786
787         /* 2b. increase child nlink */
788         rc = dt_ref_add(env, child, th);
789         if (rc != 0)
790                 GOTO(unlock, rc);
791
792         /* 3b. insert linkEA for child */
793         rc = dt_xattr_set(env, child, &linkea_buf,
794                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
795         if (rc != 0)
796                 GOTO(unlock, rc);
797
798         bk->lb_lpf_fid = *cfid;
799         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
800
801         /* 4b. update bookmark */
802         rc = dt_record_write(env, bk_obj,
803                              lfsck_buf_get(env, bk, len), &pos, th);
804
805         dt_write_unlock(env, child);
806         dt_trans_stop(env, dev, th);
807         if (rc != 0)
808                 RETURN(rc);
809
810         /* Transaction II: remotely */
811
812         dev = lfsck->li_next;
813         th = dt_trans_create(env, dev);
814         if (IS_ERR(th))
815                 RETURN(PTR_ERR(th));
816
817         /* 5a. insert name into parent dir */
818         rec->rec_fid = cfid;
819         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
820                                (const struct dt_key *)name, th);
821         if (rc != 0)
822                 GOTO(stop, rc);
823
824         /* 6a. increase parent nlink */
825         rc = dt_declare_ref_add(env, parent, th);
826         if (rc != 0)
827                 GOTO(stop, rc);
828
829         rc = dt_trans_start(env, dev, th);
830         if (rc != 0)
831                 GOTO(stop, rc);
832
833         /* 5b. insert name into parent dir */
834         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
835                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
836         if (rc != 0)
837                 GOTO(stop, rc);
838
839         dt_write_lock(env, parent, 0);
840         /* 6b. increase parent nlink */
841         rc = dt_ref_add(env, parent, th);
842         dt_write_unlock(env, parent);
843
844         GOTO(stop, rc);
845
846 unlock:
847         dt_write_unlock(env, child);
848 stop:
849         dt_trans_stop(env, dev, th);
850
851         if (rc != 0 && dev == lfsck->li_next)
852                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
853                        "for orphans, but failed to insert the name %s "
854                        "to the .lustre/lost+found/. Such inconsistency "
855                        "will be repaired when LFSCK run next time: rc = %d\n",
856                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
857
858         return rc;
859 }
860
861 /**
862  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
863  *
864  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
865  * orphans and other uncertain inconsistent objects found during the
866  * LFSCK. Such directory will be created by the LFSCK engine on the
867  * local MDT before the LFSCK scanning.
868  *
869  * \param[in] env       pointer to the thread context
870  * \param[in] lfsck     pointer to the lfsck instance
871  *
872  * \retval              0 for success
873  * \retval              negative error number on failure
874  */
875 static int lfsck_create_lpf(const struct lu_env *env,
876                             struct lfsck_instance *lfsck)
877 {
878         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
879         struct lfsck_thread_info *info  = lfsck_env_info(env);
880         struct lu_fid            *cfid  = &info->lti_fid2;
881         struct lu_attr           *la    = &info->lti_la;
882         struct dt_object_format  *dof   = &info->lti_dof;
883         struct dt_object         *parent = lfsck->li_lpf_root_obj;
884         struct dt_object         *child = NULL;
885         struct lustre_handle      lh    = { 0 };
886         char                      name[8];
887         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
888         int                       rc    = 0;
889         ENTRY;
890
891         LASSERT(lfsck->li_master);
892         LASSERT(parent != NULL);
893         LASSERT(lfsck->li_lpf_obj == NULL);
894
895         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
896                               MDS_INODELOCK_UPDATE, LCK_EX);
897         if (rc != 0)
898                 RETURN(rc);
899
900         snprintf(name, 8, "MDT%04x", node);
901         if (fid_is_zero(&bk->lb_lpf_fid)) {
902                 /* There is corner case that: in former LFSCK scanning we have
903                  * created the .lustre/lost+found/MDTxxxx but failed to update
904                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
905                  * it from MDT0 firstly. */
906                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
907                                (const struct dt_key *)name, BYPASS_CAPA);
908                 if (rc != 0 && rc != -ENOENT)
909                         GOTO(unlock, rc);
910
911                 if (rc == 0) {
912                         bk->lb_lpf_fid = *cfid;
913                         rc = lfsck_bookmark_store(env, lfsck);
914                 } else {
915                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
916                 }
917                 if (rc != 0)
918                         GOTO(unlock, rc);
919         } else {
920                 *cfid = bk->lb_lpf_fid;
921         }
922
923         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
924         if (IS_ERR(child))
925                 GOTO(unlock, rc = PTR_ERR(child));
926
927         if (dt_object_exists(child) != 0) {
928                 if (unlikely(!dt_try_as_dir(env, child)))
929                         rc = -ENOTDIR;
930                 else
931                         lfsck->li_lpf_obj = child;
932
933                 GOTO(unlock, rc);
934         }
935
936         memset(la, 0, sizeof(*la));
937         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
938         la->la_mode = S_IFDIR | S_IRWXU;
939         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
940                        LA_UID | LA_GID;
941         memset(dof, 0, sizeof(*dof));
942         dof->dof_type = dt_mode_to_dft(S_IFDIR);
943
944         if (node == 0)
945                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
946         else
947                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
948         if (rc == 0)
949                 lfsck->li_lpf_obj = child;
950
951         GOTO(unlock, rc);
952
953 unlock:
954         lfsck_ibits_unlock(&lh, LCK_EX);
955         if (rc != 0 && child != NULL && !IS_ERR(child))
956                 lu_object_put(env, &child->do_lu);
957
958         return rc;
959 }
960
961 /**
962  * Scan .lustre/lost+found for bad name entries and remove them.
963  *
964  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
965  * index in the system. Any other formatted name is invalid and should be
966  * removed.
967  *
968  * \param[in] env       pointer to the thread context
969  * \param[in] lfsck     pointer to the lfsck instance
970  *
971  * \retval              0 for success
972  * \retval              negative error number on failure
973  */
974 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
975                                       struct lfsck_instance *lfsck)
976 {
977         struct dt_object        *parent = lfsck->li_lpf_root_obj;
978         struct lu_dirent        *ent    =
979                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
980         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
981         struct dt_it            *it;
982         int                      rc;
983         ENTRY;
984
985         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
986         if (IS_ERR(it))
987                 RETURN(PTR_ERR(it));
988
989         rc = iops->load(env, it, 0);
990         if (rc == 0)
991                 rc = iops->next(env, it);
992         else if (rc > 0)
993                 rc = 0;
994
995         while (rc == 0) {
996                 int off = 3;
997
998                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
999                 if (rc != 0)
1000                         break;
1001
1002                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1003                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1004                         goto next;
1005
1006                 /* name length must be strlen("MDTxxxx") */
1007                 if (ent->lde_namelen != 7)
1008                         goto remove;
1009
1010                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1011                         goto remove;
1012
1013                 while (off < 7 && isxdigit(ent->lde_name[off]))
1014                         off++;
1015
1016                 if (off != 7) {
1017
1018 remove:
1019                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1020                                                          ent->lde_name);
1021                         if (rc != 0)
1022                                 break;
1023                 }
1024
1025 next:
1026                 rc = iops->next(env, it);
1027         }
1028
1029         iops->put(env, it);
1030         iops->fini(env, it);
1031
1032         RETURN(rc > 0 ? 0 : rc);
1033 }
1034
1035 static int lfsck_update_lpf_entry(const struct lu_env *env,
1036                                   struct lfsck_instance *lfsck,
1037                                   struct dt_object *parent,
1038                                   struct dt_object *child,
1039                                   const char *name,
1040                                   enum lfsck_verify_lpf_types type)
1041 {
1042         int rc;
1043
1044         if (type == LVLT_BY_BOOKMARK) {
1045                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1046                                              lfsck_dto2fid(child), S_IFDIR);
1047         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1048                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1049                 rc = lfsck_bookmark_store(env, lfsck);
1050
1051                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1052                        " in the bookmark file: rc = %d\n",
1053                        lfsck_lfsck2name(lfsck),
1054                        PFID(lfsck_dto2fid(child)), rc);
1055         }
1056
1057         return rc;
1058 }
1059
1060 /**
1061  * Check whether the @child back references the @parent.
1062  *
1063  * Two cases:
1064  * 1) The child's FID is stored in the bookmark file. If the child back
1065  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1066  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1067  *    the child back references another parent2, then:
1068  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1069  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1070  *      references the child. So keep them there. As the LFSCK processing,
1071  *      the parent3 may be found, then when the LFSCK run next time, the
1072  *      inconsistency can be repaired.
1073  *
1074  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1075  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1076  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1077  *    back references another parent2, then:
1078  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1079  *      from .lustre/lost+found/;
1080  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1081  *      sub-directory name entry and update the child;
1082  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1083  *      or not, then keep them there.
1084  *
1085  * \param[in] env       pointer to the thread context
1086  * \param[in] lfsck     pointer to the lfsck instance
1087  * \param[in] child     pointer to the lost+found sub-directory object
1088  * \param[in] name      the name for lost+found sub-directory object
1089  * \param[out] fid      pointer to the buffer to hold the FID of the object
1090  *                      (called it as parent2) that is referenced via the
1091  *                      child's dotdot entry; it also can be the FID that
1092  *                      is referenced by the name entry under the parent2.
1093  * \param[in] type      to indicate where the child's FID is stored in
1094  *
1095  * \retval              positive number for uncertain inconsistency
1096  * \retval              0 for success
1097  * \retval              negative error number on failure
1098  */
1099 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1100                                   struct lfsck_instance *lfsck,
1101                                   struct dt_object *child, const char *name,
1102                                   struct lu_fid *fid,
1103                                   enum lfsck_verify_lpf_types type)
1104 {
1105         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1106         struct lfsck_thread_info *info    = lfsck_env_info(env);
1107         char                     *name2   = info->lti_key;
1108         struct lu_fid            *fid2    = &info->lti_fid3;
1109         struct dt_object         *parent2 = NULL;
1110         struct lustre_handle      lh      = { 0 };
1111         int                       rc;
1112         ENTRY;
1113
1114         fid_zero(fid);
1115         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1116                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1117         if (rc != 0)
1118                 GOTO(linkea, rc);
1119
1120         if (!fid_is_sane(fid))
1121                 GOTO(linkea, rc = -EINVAL);
1122
1123         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1124                 const struct lu_name *cname;
1125
1126                 if (lfsck->li_lpf_obj == NULL) {
1127                         lu_object_get(&child->do_lu);
1128                         lfsck->li_lpf_obj = child;
1129                 }
1130
1131                 cname = lfsck_name_get_const(env, name, strlen(name));
1132                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1133                                          &LU_LPF_FID);
1134                 if (rc == 0)
1135                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1136                                                     name, type);
1137
1138                 GOTO(out_done, rc);
1139         }
1140
1141         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1142         if (IS_ERR(parent2))
1143                 GOTO(linkea, parent2);
1144
1145         if (!dt_object_exists(parent2)) {
1146                 lu_object_put(env, &parent2->do_lu);
1147
1148                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1149         }
1150
1151         if (!dt_try_as_dir(env, parent2)) {
1152                 lu_object_put(env, &parent2->do_lu);
1153
1154                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1155         }
1156
1157 linkea:
1158         /* To prevent rename/unlink race */
1159         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1160                               MDS_INODELOCK_UPDATE, LCK_PR);
1161         if (rc != 0)
1162                 GOTO(out_put, rc);
1163
1164         dt_read_lock(env, child, 0);
1165         rc = lfsck_links_get_first(env, child, name2, fid2);
1166         if (rc != 0) {
1167                 dt_read_unlock(env, child);
1168                 lfsck_ibits_unlock(&lh, LCK_PR);
1169
1170                 GOTO(out_put, rc = 1);
1171         }
1172
1173         /* It is almost impossible that the bookmark file (or the name entry)
1174          * and the linkEA hit the same data corruption. Trust the linkEA. */
1175         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1176                 dt_read_unlock(env, child);
1177                 lfsck_ibits_unlock(&lh, LCK_PR);
1178
1179                 *fid = *fid2;
1180                 if (lfsck->li_lpf_obj == NULL) {
1181                         lu_object_get(&child->do_lu);
1182                         lfsck->li_lpf_obj = child;
1183                 }
1184
1185                 /* Update the child's dotdot entry */
1186                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1187                                              &LU_LPF_FID, S_IFDIR);
1188                 if (rc == 0)
1189                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1190                                                     name, type);
1191
1192                 GOTO(out_put, rc);
1193         }
1194
1195         if (parent2 == NULL || IS_ERR(parent2)) {
1196                 dt_read_unlock(env, child);
1197                 lfsck_ibits_unlock(&lh, LCK_PR);
1198
1199                 GOTO(out_done, rc = 1);
1200         }
1201
1202         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1203                        (const struct dt_key *)name2, BYPASS_CAPA);
1204         dt_read_unlock(env, child);
1205         lfsck_ibits_unlock(&lh, LCK_PR);
1206         if (rc != 0 && rc != -ENOENT)
1207                 GOTO(out_put, rc);
1208
1209         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1210                 if (type == LVLT_BY_BOOKMARK)
1211                         GOTO(out_put, rc = 1);
1212
1213                 /* Trust the name entry, update the child's dotdot entry. */
1214                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1215                                              &LU_LPF_FID, S_IFDIR);
1216
1217                 GOTO(out_put, rc);
1218         }
1219
1220         if (type == LVLT_BY_BOOKMARK) {
1221                 /* Invalid FID record in the bookmark file, reset it. */
1222                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1223                 rc = lfsck_bookmark_store(env, lfsck);
1224
1225                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1226                        " in the bookmark file: rc = %d\n",
1227                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1228         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1229                 /* The name entry is wrong, remove it. */
1230                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1231         }
1232
1233         GOTO(out_put, rc);
1234
1235 out_put:
1236         if (parent2 != NULL && !IS_ERR(parent2))
1237                 lu_object_put(env, &parent2->do_lu);
1238
1239 out_done:
1240         return rc;
1241 }
1242
1243 /**
1244  * Verify the /ROOT/.lustre/lost+found/ directory.
1245  *
1246  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1247  * the LFSCK does not exactly know how to handle, such as orphans. So before
1248  * the LFSCK scanning the system, the consistency of such directory needs to
1249  * be verified firstly to allow the users to use it during the LFSCK.
1250  *
1251  * \param[in] env       pointer to the thread context
1252  * \param[in] lfsck     pointer to the lfsck instance
1253  *
1254  * \retval              positive number for uncertain inconsistency
1255  * \retval              0 for success
1256  * \retval              negative error number on failure
1257  */
1258 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1259 {
1260         struct lfsck_thread_info *info   = lfsck_env_info(env);
1261         struct lu_fid            *pfid   = &info->lti_fid;
1262         struct lu_fid            *cfid   = &info->lti_fid2;
1263         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1264         struct dt_object         *parent;
1265         /* child1's FID is in the bookmark file. */
1266         struct dt_object         *child1 = NULL;
1267         /* child2's FID is in the name entry MDTxxxx. */
1268         struct dt_object         *child2 = NULL;
1269         struct dt_device         *dev    = lfsck->li_bottom;
1270         const struct lu_name     *cname;
1271         char                      name[8];
1272         int                       node   = lfsck_dev_idx(dev);
1273         int                       rc     = 0;
1274         ENTRY;
1275
1276         LASSERT(lfsck->li_master);
1277
1278         if (lfsck->li_lpf_root_obj != NULL)
1279                 RETURN(0);
1280
1281         if (node == 0) {
1282                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1283         } else {
1284                 struct lfsck_tgt_desc *ltd;
1285
1286                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1287                 if (unlikely(ltd == NULL))
1288                         RETURN(-ENXIO);
1289
1290                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1291                                                   &LU_LPF_FID);
1292                 lfsck_tgt_put(ltd);
1293         }
1294
1295         if (IS_ERR(parent))
1296                 RETURN(PTR_ERR(parent));
1297
1298         LASSERT(dt_object_exists(parent));
1299
1300         if (unlikely(!dt_try_as_dir(env, parent))) {
1301                 lu_object_put(env, &parent->do_lu);
1302
1303                 GOTO(put, rc = -ENOTDIR);
1304         }
1305
1306         lfsck->li_lpf_root_obj = parent;
1307         if (node == 0) {
1308                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1309                 if (rc != 0)
1310                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1311                                "for bad sub-directories: rc = %d\n",
1312                                lfsck_lfsck2name(lfsck), rc);
1313         }
1314
1315         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1316                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1317                         struct lu_fid tfid = bk->lb_lpf_fid;
1318
1319                         /* Invalid FID record in the bookmark file, reset it. */
1320                         fid_zero(&bk->lb_lpf_fid);
1321                         rc = lfsck_bookmark_store(env, lfsck);
1322
1323                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1324                                " in the bookmark file: rc = %d\n",
1325                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1326
1327                         if (rc != 0)
1328                                 GOTO(put, rc);
1329                 } else {
1330                         child1 = lfsck_object_find_by_dev(env, dev,
1331                                                           &bk->lb_lpf_fid);
1332                         if (IS_ERR(child1)) {
1333                                 child1 = NULL;
1334                                 goto find_child2;
1335                         }
1336
1337                         if (unlikely(!dt_object_exists(child1) ||
1338                                      dt_object_remote(child1)) ||
1339                                      !S_ISDIR(lfsck_object_type(child1))) {
1340                                 /* Invalid FID record in the bookmark file,
1341                                  * reset it. */
1342                                 fid_zero(&bk->lb_lpf_fid);
1343                                 rc = lfsck_bookmark_store(env, lfsck);
1344
1345                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1346                                        " in the bookmark file: rc = %d\n",
1347                                        lfsck_lfsck2name(lfsck),
1348                                        PFID(lfsck_dto2fid(child1)), rc);
1349
1350                                 if (rc != 0)
1351                                         GOTO(put, rc);
1352
1353                                 lu_object_put(env, &child1->do_lu);
1354                                 child1 = NULL;
1355                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1356                                 GOTO(put, rc = -ENOTDIR);
1357                         }
1358                 }
1359         }
1360
1361 find_child2:
1362         snprintf(name, 8, "MDT%04x", node);
1363         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1364                        (const struct dt_key *)name, BYPASS_CAPA);
1365         if (rc == -ENOENT) {
1366                 if (!fid_is_zero(&bk->lb_lpf_fid))
1367                         goto check_child1;
1368
1369                 GOTO(put, rc = 0);
1370         }
1371
1372         if (rc != 0)
1373                 GOTO(put, rc);
1374
1375         /* Invalid FID in the name entry, remove the name entry. */
1376         if (!fid_is_norm(cfid)) {
1377                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1378                 if (rc != 0)
1379                         GOTO(put, rc);
1380
1381                 goto check_child1;
1382         }
1383
1384         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1385         if (IS_ERR(child2))
1386                 GOTO(put, rc = PTR_ERR(child2));
1387
1388         if (unlikely(!dt_object_exists(child2) ||
1389                      dt_object_remote(child2)) ||
1390                      !S_ISDIR(lfsck_object_type(child2))) {
1391                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1392                 if (rc != 0)
1393                         GOTO(put, rc);
1394
1395                 goto check_child1;
1396         }
1397
1398         if (unlikely(!dt_try_as_dir(env, child2)))
1399                 GOTO(put, rc = -ENOTDIR);
1400
1401         if (child1 == NULL) {
1402                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1403                                             pfid, LVLT_BY_NAMEENTRY);
1404         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1405                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1406                                             pfid, LVLT_BY_BOOKMARK);
1407                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1408                         rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
1409                                                     name, pfid,
1410                                                     LVLT_BY_NAMEENTRY);
1411         } else {
1412                 if (lfsck->li_lpf_obj == NULL) {
1413                         lu_object_get(&child2->do_lu);
1414                         lfsck->li_lpf_obj = child2;
1415                 }
1416
1417                 cname = lfsck_name_get_const(env, name, strlen(name));
1418                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1419         }
1420
1421         GOTO(put, rc);
1422
1423 check_child1:
1424         if (child1 != NULL)
1425                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1426                                             pfid, LVLT_BY_BOOKMARK);
1427
1428         GOTO(put, rc);
1429
1430 put:
1431         if (lfsck->li_lpf_obj != NULL) {
1432                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1433                         lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1434                         lfsck->li_lpf_obj = NULL;
1435                         rc = -ENOTDIR;
1436                 }
1437         } else if (rc == 0) {
1438                 rc = lfsck_create_lpf(env, lfsck);
1439         }
1440
1441         if (child2 != NULL && !IS_ERR(child2))
1442                 lu_object_put(env, &child2->do_lu);
1443         if (child1 != NULL && !IS_ERR(child1))
1444                 lu_object_put(env, &child1->do_lu);
1445
1446         return rc;
1447 }
1448
1449 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1450 {
1451         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1452         struct seq_server_site  *ss;
1453         char                    *prefix;
1454         int                      rc     = 0;
1455         ENTRY;
1456
1457         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1458         if (unlikely(ss == NULL))
1459                 RETURN(-ENXIO);
1460
1461         OBD_ALLOC_PTR(lfsck->li_seq);
1462         if (lfsck->li_seq == NULL)
1463                 RETURN(-ENOMEM);
1464
1465         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1466         if (prefix == NULL)
1467                 GOTO(out, rc = -ENOMEM);
1468
1469         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1470         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1471                              ss->ss_server_seq);
1472         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1473         if (rc != 0)
1474                 GOTO(out, rc);
1475
1476         if (fid_is_sane(&bk->lb_last_fid))
1477                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1478
1479         RETURN(0);
1480
1481 out:
1482         OBD_FREE_PTR(lfsck->li_seq);
1483         lfsck->li_seq = NULL;
1484
1485         return rc;
1486 }
1487
1488 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1489 {
1490         if (lfsck->li_seq != NULL) {
1491                 seq_client_fini(lfsck->li_seq);
1492                 OBD_FREE_PTR(lfsck->li_seq);
1493                 lfsck->li_seq = NULL;
1494         }
1495 }
1496
1497 void lfsck_instance_cleanup(const struct lu_env *env,
1498                             struct lfsck_instance *lfsck)
1499 {
1500         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1501         struct lfsck_component  *com;
1502         struct lfsck_component  *next;
1503         struct lfsck_lmv_unit   *llu;
1504         struct lfsck_lmv_unit   *llu_next;
1505         struct lfsck_lmv        *llmv;
1506         ENTRY;
1507
1508         LASSERT(list_empty(&lfsck->li_link));
1509         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1510
1511         if (lfsck->li_obj_oit != NULL) {
1512                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1513                 lfsck->li_obj_oit = NULL;
1514         }
1515
1516         LASSERT(lfsck->li_obj_dir == NULL);
1517         LASSERT(lfsck->li_lmv == NULL);
1518
1519         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1520                 llmv = &llu->llu_lmv;
1521
1522                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1523                          "still in using: %u\n",
1524                          atomic_read(&llmv->ll_ref));
1525
1526                 lfsck_lmv_put(env, llmv);
1527         }
1528
1529         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1530                 lfsck_component_cleanup(env, com);
1531         }
1532
1533         LASSERT(list_empty(&lfsck->li_list_dir));
1534
1535         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1536                                  lc_link) {
1537                 lfsck_component_cleanup(env, com);
1538         }
1539
1540         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1541                 lfsck_component_cleanup(env, com);
1542         }
1543
1544         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1545         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1546
1547         if (lfsck->li_bookmark_obj != NULL) {
1548                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1549                 lfsck->li_bookmark_obj = NULL;
1550         }
1551
1552         if (lfsck->li_lpf_obj != NULL) {
1553                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1554                 lfsck->li_lpf_obj = NULL;
1555         }
1556
1557         if (lfsck->li_lpf_root_obj != NULL) {
1558                 lu_object_put(env, &lfsck->li_lpf_root_obj->do_lu);
1559                 lfsck->li_lpf_root_obj = NULL;
1560         }
1561
1562         if (lfsck->li_los != NULL) {
1563                 local_oid_storage_fini(env, lfsck->li_los);
1564                 lfsck->li_los = NULL;
1565         }
1566
1567         lfsck_fid_fini(lfsck);
1568
1569         OBD_FREE_PTR(lfsck);
1570 }
1571
1572 static inline struct lfsck_instance *
1573 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1574 {
1575         struct lfsck_instance *lfsck;
1576
1577         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1578                 if (lfsck->li_bottom == key) {
1579                         if (ref)
1580                                 lfsck_instance_get(lfsck);
1581                         if (unlink)
1582                                 list_del_init(&lfsck->li_link);
1583
1584                         return lfsck;
1585                 }
1586         }
1587
1588         return NULL;
1589 }
1590
1591 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1592                                            bool unlink)
1593 {
1594         struct lfsck_instance *lfsck;
1595
1596         spin_lock(&lfsck_instance_lock);
1597         lfsck = __lfsck_instance_find(key, ref, unlink);
1598         spin_unlock(&lfsck_instance_lock);
1599
1600         return lfsck;
1601 }
1602
1603 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1604 {
1605         struct lfsck_instance *tmp;
1606
1607         spin_lock(&lfsck_instance_lock);
1608         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1609                 if (lfsck->li_bottom == tmp->li_bottom) {
1610                         spin_unlock(&lfsck_instance_lock);
1611                         return -EEXIST;
1612                 }
1613         }
1614
1615         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1616         spin_unlock(&lfsck_instance_lock);
1617         return 0;
1618 }
1619
1620 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1621                     const char *prefix)
1622 {
1623         int flag;
1624         int i;
1625         bool newline = (bits != 0 ? false : true);
1626
1627         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1628
1629         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1630                 if (flag & bits) {
1631                         bits &= ~flag;
1632                         if (names[i] != NULL) {
1633                                 if (bits == 0)
1634                                         newline = true;
1635
1636                                 seq_printf(m, "%s%c", names[i],
1637                                            newline ? '\n' : ',');
1638                         }
1639                 }
1640         }
1641
1642         if (!newline)
1643                 seq_printf(m, "\n");
1644         return 0;
1645 }
1646
1647 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1648 {
1649         if (time != 0)
1650                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1651                           cfs_time_current_sec() - time);
1652         else
1653                 seq_printf(m, "%s: N/A\n", prefix);
1654         return 0;
1655 }
1656
1657 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1658                    const char *prefix)
1659 {
1660         if (fid_is_zero(&pos->lp_dir_parent)) {
1661                 if (pos->lp_oit_cookie == 0)
1662                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1663                                    prefix);
1664                 else
1665                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1666                                    prefix, pos->lp_oit_cookie);
1667         } else {
1668                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1669                            prefix, pos->lp_oit_cookie,
1670                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1671         }
1672         return 0;
1673 }
1674
1675 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1676                     struct lfsck_position *pos, bool init)
1677 {
1678         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1679
1680         if (unlikely(lfsck->li_di_oit == NULL)) {
1681                 memset(pos, 0, sizeof(*pos));
1682                 return;
1683         }
1684
1685         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1686         if (!lfsck->li_current_oit_processed && !init)
1687                 pos->lp_oit_cookie--;
1688
1689         LASSERT(pos->lp_oit_cookie > 0);
1690
1691         if (lfsck->li_di_dir != NULL) {
1692                 struct dt_object *dto = lfsck->li_obj_dir;
1693
1694                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1695                                                         lfsck->li_di_dir);
1696
1697                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1698                         fid_zero(&pos->lp_dir_parent);
1699                         pos->lp_dir_cookie = 0;
1700                 } else {
1701                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1702                 }
1703         } else {
1704                 fid_zero(&pos->lp_dir_parent);
1705                 pos->lp_dir_cookie = 0;
1706         }
1707 }
1708
1709 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1710 {
1711         bool dirty = false;
1712
1713         if (limit != LFSCK_SPEED_NO_LIMIT) {
1714                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1715                         lfsck->li_sleep_rate = limit /
1716                                                msecs_to_jiffies(MSEC_PER_SEC);
1717                         lfsck->li_sleep_jif = 1;
1718                 } else {
1719                         lfsck->li_sleep_rate = 1;
1720                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1721                                               limit;
1722                 }
1723         } else {
1724                 lfsck->li_sleep_jif = 0;
1725                 lfsck->li_sleep_rate = 0;
1726         }
1727
1728         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1729                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1730                 dirty = true;
1731         }
1732
1733         return dirty;
1734 }
1735
1736 void lfsck_control_speed(struct lfsck_instance *lfsck)
1737 {
1738         struct ptlrpc_thread *thread = &lfsck->li_thread;
1739         struct l_wait_info    lwi;
1740
1741         if (lfsck->li_sleep_jif > 0 &&
1742             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1743                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1744                                        LWI_ON_SIGNAL_NOOP, NULL);
1745
1746                 l_wait_event(thread->t_ctl_waitq,
1747                              !thread_is_running(thread),
1748                              &lwi);
1749                 lfsck->li_new_scanned = 0;
1750         }
1751 }
1752
1753 void lfsck_control_speed_by_self(struct lfsck_component *com)
1754 {
1755         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1756         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1757         struct l_wait_info       lwi;
1758
1759         if (lfsck->li_sleep_jif > 0 &&
1760             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1761                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1762                                        LWI_ON_SIGNAL_NOOP, NULL);
1763
1764                 l_wait_event(thread->t_ctl_waitq,
1765                              !thread_is_running(thread),
1766                              &lwi);
1767                 com->lc_new_scanned = 0;
1768         }
1769 }
1770
1771 static struct lfsck_thread_args *
1772 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1773                        struct lfsck_component *com,
1774                        struct lfsck_start_param *lsp)
1775 {
1776         struct lfsck_thread_args *lta;
1777         int                       rc;
1778
1779         OBD_ALLOC_PTR(lta);
1780         if (lta == NULL)
1781                 return ERR_PTR(-ENOMEM);
1782
1783         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1784         if (rc != 0) {
1785                 OBD_FREE_PTR(lta);
1786                 return ERR_PTR(rc);
1787         }
1788
1789         lta->lta_lfsck = lfsck_instance_get(lfsck);
1790         if (com != NULL)
1791                 lta->lta_com = lfsck_component_get(com);
1792
1793         lta->lta_lsp = lsp;
1794
1795         return lta;
1796 }
1797
1798 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1799 {
1800         if (lta->lta_com != NULL)
1801                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1802         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1803         lu_env_fini(&lta->lta_env);
1804         OBD_FREE_PTR(lta);
1805 }
1806
1807 struct lfsck_assistant_data *
1808 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1809                           const char *name)
1810 {
1811         struct lfsck_assistant_data *lad;
1812
1813         OBD_ALLOC_PTR(lad);
1814         if (lad != NULL) {
1815                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1816                 if (lad->lad_bitmap == NULL) {
1817                         OBD_FREE_PTR(lad);
1818                         return NULL;
1819                 }
1820
1821                 INIT_LIST_HEAD(&lad->lad_req_list);
1822                 spin_lock_init(&lad->lad_lock);
1823                 INIT_LIST_HEAD(&lad->lad_ost_list);
1824                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1825                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1826                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1827                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1828                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1829                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1830                 lad->lad_ops = lao;
1831                 lad->lad_name = name;
1832         }
1833
1834         return lad;
1835 }
1836
1837 /**
1838  * Generic LFSCK asynchronous communication interpretor function.
1839  * The LFSCK RPC reply for both the event notification and status
1840  * querying will be handled here.
1841  *
1842  * \param[in] env       pointer to the thread context
1843  * \param[in] req       pointer to the LFSCK request
1844  * \param[in] args      pointer to the lfsck_async_interpret_args
1845  * \param[in] rc        the result for handling the LFSCK request
1846  *
1847  * \retval              0 for success
1848  * \retval              negative error number on failure
1849  */
1850 int lfsck_async_interpret_common(const struct lu_env *env,
1851                                  struct ptlrpc_request *req,
1852                                  void *args, int rc)
1853 {
1854         struct lfsck_async_interpret_args *laia = args;
1855         struct lfsck_component            *com  = laia->laia_com;
1856         struct lfsck_assistant_data       *lad  = com->lc_data;
1857         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1858         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1859         struct lfsck_request              *lr   = laia->laia_lr;
1860
1861         LASSERT(com->lc_lfsck->li_master);
1862
1863         switch (lr->lr_event) {
1864         case LE_START:
1865                 if (rc != 0) {
1866                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1867                                "start: rc = %d\n",
1868                                lfsck_lfsck2name(com->lc_lfsck),
1869                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1870                                ltd->ltd_index, lad->lad_name, rc);
1871
1872                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1873                                 struct lfsck_layout *lo = com->lc_file_ram;
1874
1875                                 if (lr->lr_flags & LEF_TO_OST)
1876                                         lfsck_lad_set_bitmap(env, com,
1877                                                              ltd->ltd_index);
1878                                 else
1879                                         lo->ll_flags |= LF_INCOMPLETE;
1880                         } else {
1881                                 struct lfsck_namespace *ns = com->lc_file_ram;
1882
1883                                 /* If some MDT does not join the namespace
1884                                  * LFSCK, then we cannot know whether there
1885                                  * is some name entry on such MDT that with
1886                                  * the referenced MDT-object on this MDT or
1887                                  * not. So the namespace LFSCK on this MDT
1888                                  * cannot handle orphan MDT-objects properly.
1889                                  * So we mark the LFSCK as LF_INCOMPLETE and
1890                                  * skip orphan MDT-objects handling. */
1891                                 ns->ln_flags |= LF_INCOMPLETE;
1892                         }
1893                         break;
1894                 }
1895
1896                 spin_lock(&ltds->ltd_lock);
1897                 if (ltd->ltd_dead) {
1898                         spin_unlock(&ltds->ltd_lock);
1899                         break;
1900                 }
1901
1902                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1903                         struct list_head *list;
1904                         struct list_head *phase_list;
1905
1906                         if (ltd->ltd_layout_done) {
1907                                 spin_unlock(&ltds->ltd_lock);
1908                                 break;
1909                         }
1910
1911                         if (lr->lr_flags & LEF_TO_OST) {
1912                                 list = &lad->lad_ost_list;
1913                                 phase_list = &lad->lad_ost_phase1_list;
1914                         } else {
1915                                 list = &lad->lad_mdt_list;
1916                                 phase_list = &lad->lad_mdt_phase1_list;
1917                         }
1918
1919                         if (list_empty(&ltd->ltd_layout_list))
1920                                 list_add_tail(&ltd->ltd_layout_list, list);
1921                         if (list_empty(&ltd->ltd_layout_phase_list))
1922                                 list_add_tail(&ltd->ltd_layout_phase_list,
1923                                               phase_list);
1924                 } else {
1925                         if (ltd->ltd_namespace_done) {
1926                                 spin_unlock(&ltds->ltd_lock);
1927                                 break;
1928                         }
1929
1930                         if (list_empty(&ltd->ltd_namespace_list))
1931                                 list_add_tail(&ltd->ltd_namespace_list,
1932                                               &lad->lad_mdt_list);
1933                         if (list_empty(&ltd->ltd_namespace_phase_list))
1934                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1935                                               &lad->lad_mdt_phase1_list);
1936                 }
1937                 spin_unlock(&ltds->ltd_lock);
1938                 break;
1939         case LE_STOP:
1940         case LE_PHASE1_DONE:
1941         case LE_PHASE2_DONE:
1942         case LE_PEER_EXIT:
1943                 if (rc != 0 && rc != -EALREADY)
1944                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1945                               "event = %d, rc = %d\n",
1946                               lfsck_lfsck2name(com->lc_lfsck),
1947                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1948                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1949                 break;
1950         case LE_QUERY: {
1951                 struct lfsck_reply *reply;
1952                 struct list_head *list;
1953                 struct list_head *phase_list;
1954
1955                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1956                         list = &ltd->ltd_layout_list;
1957                         phase_list = &ltd->ltd_layout_phase_list;
1958                 } else {
1959                         list = &ltd->ltd_namespace_list;
1960                         phase_list = &ltd->ltd_namespace_phase_list;
1961                 }
1962
1963                 if (rc != 0) {
1964                         spin_lock(&ltds->ltd_lock);
1965                         list_del_init(phase_list);
1966                         list_del_init(list);
1967                         spin_unlock(&ltds->ltd_lock);
1968                         break;
1969                 }
1970
1971                 reply = req_capsule_server_get(&req->rq_pill,
1972                                                &RMF_LFSCK_REPLY);
1973                 if (reply == NULL) {
1974                         rc = -EPROTO;
1975                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1976                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1977                                lad->lad_name, rc);
1978                         spin_lock(&ltds->ltd_lock);
1979                         list_del_init(phase_list);
1980                         list_del_init(list);
1981                         spin_unlock(&ltds->ltd_lock);
1982                         break;
1983                 }
1984
1985                 switch (reply->lr_status) {
1986                 case LS_SCANNING_PHASE1:
1987                         break;
1988                 case LS_SCANNING_PHASE2:
1989                         spin_lock(&ltds->ltd_lock);
1990                         list_del_init(phase_list);
1991                         if (ltd->ltd_dead) {
1992                                 spin_unlock(&ltds->ltd_lock);
1993                                 break;
1994                         }
1995
1996                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1997                                 if (ltd->ltd_layout_done) {
1998                                         spin_unlock(&ltds->ltd_lock);
1999                                         break;
2000                                 }
2001
2002                                 if (lr->lr_flags & LEF_TO_OST)
2003                                         list_add_tail(phase_list,
2004                                                 &lad->lad_ost_phase2_list);
2005                                 else
2006                                         list_add_tail(phase_list,
2007                                                 &lad->lad_mdt_phase2_list);
2008                         } else {
2009                                 if (ltd->ltd_namespace_done) {
2010                                         spin_unlock(&ltds->ltd_lock);
2011                                         break;
2012                                 }
2013
2014                                 list_add_tail(phase_list,
2015                                               &lad->lad_mdt_phase2_list);
2016                         }
2017                         spin_unlock(&ltds->ltd_lock);
2018                         break;
2019                 default:
2020                         spin_lock(&ltds->ltd_lock);
2021                         list_del_init(phase_list);
2022                         list_del_init(list);
2023                         spin_unlock(&ltds->ltd_lock);
2024                         break;
2025                 }
2026                 break;
2027         }
2028         default:
2029                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2030                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2031                 break;
2032         }
2033
2034         if (!laia->laia_shared) {
2035                 lfsck_tgt_put(ltd);
2036                 lfsck_component_put(env, com);
2037         }
2038
2039         return 0;
2040 }
2041
2042 static void lfsck_interpret(const struct lu_env *env,
2043                             struct lfsck_instance *lfsck,
2044                             struct ptlrpc_request *req, void *args, int result)
2045 {
2046         struct lfsck_async_interpret_args *laia = args;
2047         struct lfsck_component            *com;
2048
2049         LASSERT(laia->laia_com == NULL);
2050         LASSERT(laia->laia_shared);
2051
2052         spin_lock(&lfsck->li_lock);
2053         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2054                 laia->laia_com = com;
2055                 lfsck_async_interpret_common(env, req, laia, result);
2056         }
2057
2058         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2059                 laia->laia_com = com;
2060                 lfsck_async_interpret_common(env, req, laia, result);
2061         }
2062         spin_unlock(&lfsck->li_lock);
2063 }
2064
2065 static int lfsck_stop_notify(const struct lu_env *env,
2066                              struct lfsck_instance *lfsck,
2067                              struct lfsck_tgt_descs *ltds,
2068                              struct lfsck_tgt_desc *ltd, __u16 type)
2069 {
2070         struct lfsck_component *com;
2071         int                     rc = 0;
2072         ENTRY;
2073
2074         LASSERT(lfsck->li_master);
2075
2076         spin_lock(&lfsck->li_lock);
2077         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2078         if (com == NULL)
2079                 com = __lfsck_component_find(lfsck, type,
2080                                              &lfsck->li_list_double_scan);
2081         if (com != NULL)
2082                 lfsck_component_get(com);
2083         spin_unlock(&lfsck->li_lock);
2084
2085         if (com != NULL) {
2086                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2087                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2088                 struct lfsck_request              *lr    = &info->lti_lr;
2089                 struct lfsck_assistant_data       *lad   = com->lc_data;
2090                 struct list_head                  *list;
2091                 struct list_head                  *phase_list;
2092                 struct ptlrpc_request_set         *set;
2093
2094                 set = ptlrpc_prep_set();
2095                 if (set == NULL) {
2096                         lfsck_component_put(env, com);
2097
2098                         RETURN(-ENOMEM);
2099                 }
2100
2101                 if (type == LFSCK_TYPE_LAYOUT) {
2102                         list = &ltd->ltd_layout_list;
2103                         phase_list = &ltd->ltd_layout_phase_list;
2104                 } else {
2105                         list = &ltd->ltd_namespace_list;
2106                         phase_list = &ltd->ltd_namespace_phase_list;
2107                 }
2108
2109                 spin_lock(&ltds->ltd_lock);
2110                 if (list_empty(list)) {
2111                         LASSERT(list_empty(phase_list));
2112                         spin_unlock(&ltds->ltd_lock);
2113                         ptlrpc_set_destroy(set);
2114
2115                         RETURN(0);
2116                 }
2117
2118                 list_del_init(phase_list);
2119                 list_del_init(list);
2120                 spin_unlock(&ltds->ltd_lock);
2121
2122                 memset(lr, 0, sizeof(*lr));
2123                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2124                 lr->lr_event = LE_PEER_EXIT;
2125                 lr->lr_active = type;
2126                 lr->lr_status = LS_CO_PAUSED;
2127                 if (ltds == &lfsck->li_ost_descs)
2128                         lr->lr_flags = LEF_TO_OST;
2129
2130                 laia->laia_com = com;
2131                 laia->laia_ltds = ltds;
2132                 atomic_inc(&ltd->ltd_ref);
2133                 laia->laia_ltd = ltd;
2134                 laia->laia_lr = lr;
2135                 laia->laia_shared = 0;
2136
2137                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2138                                          lfsck_async_interpret_common,
2139                                          laia, LFSCK_NOTIFY);
2140                 if (rc != 0) {
2141                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2142                                "co-stop for %s: rc = %d\n",
2143                                lfsck_lfsck2name(lfsck),
2144                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2145                                ltd->ltd_index, lad->lad_name, rc);
2146                         lfsck_tgt_put(ltd);
2147                 } else {
2148                         rc = ptlrpc_set_wait(set);
2149                 }
2150
2151                 ptlrpc_set_destroy(set);
2152                 lfsck_component_put(env, com);
2153         }
2154
2155         RETURN(rc);
2156 }
2157
2158 static int lfsck_async_interpret(const struct lu_env *env,
2159                                  struct ptlrpc_request *req,
2160                                  void *args, int rc)
2161 {
2162         struct lfsck_async_interpret_args *laia = args;
2163         struct lfsck_instance             *lfsck;
2164
2165         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2166                               li_mdt_descs);
2167         lfsck_interpret(env, lfsck, req, laia, rc);
2168         lfsck_tgt_put(laia->laia_ltd);
2169         if (rc != 0 && laia->laia_result != -EALREADY)
2170                 laia->laia_result = rc;
2171
2172         return 0;
2173 }
2174
2175 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2176                         struct lfsck_request *lr,
2177                         struct ptlrpc_request_set *set,
2178                         ptlrpc_interpterer_t interpreter,
2179                         void *args, int request)
2180 {
2181         struct lfsck_async_interpret_args *laia;
2182         struct ptlrpc_request             *req;
2183         struct lfsck_request              *tmp;
2184         struct req_format                 *format;
2185         int                                rc;
2186
2187         switch (request) {
2188         case LFSCK_NOTIFY:
2189                 format = &RQF_LFSCK_NOTIFY;
2190                 break;
2191         case LFSCK_QUERY:
2192                 format = &RQF_LFSCK_QUERY;
2193                 break;
2194         default:
2195                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2196                        exp->exp_obd->obd_name, request, -EINVAL);
2197                 return -EINVAL;
2198         }
2199
2200         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2201         if (req == NULL)
2202                 return -ENOMEM;
2203
2204         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2205         if (rc != 0) {
2206                 ptlrpc_request_free(req);
2207
2208                 return rc;
2209         }
2210
2211         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2212         *tmp = *lr;
2213         ptlrpc_request_set_replen(req);
2214
2215         laia = ptlrpc_req_async_args(req);
2216         *laia = *(struct lfsck_async_interpret_args *)args;
2217         if (laia->laia_com != NULL)
2218                 lfsck_component_get(laia->laia_com);
2219         req->rq_interpret_reply = interpreter;
2220         ptlrpc_set_add_req(set, req);
2221
2222         return 0;
2223 }
2224
2225 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2226                           struct lfsck_start_param *lsp)
2227 {
2228         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2229         struct lfsck_assistant_data     *lad     = com->lc_data;
2230         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2231         struct ptlrpc_thread            *athread = &lad->lad_thread;
2232         struct lfsck_thread_args        *lta;
2233         struct task_struct              *task;
2234         int                              rc;
2235         ENTRY;
2236
2237         lad->lad_assistant_status = 0;
2238         lad->lad_post_result = 0;
2239         lad->lad_to_post = 0;
2240         lad->lad_to_double_scan = 0;
2241         lad->lad_in_double_scan = 0;
2242         lad->lad_exit = 0;
2243         thread_set_flags(athread, 0);
2244
2245         lta = lfsck_thread_args_init(lfsck, com, lsp);
2246         if (IS_ERR(lta))
2247                 RETURN(PTR_ERR(lta));
2248
2249         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2250         if (IS_ERR(task)) {
2251                 rc = PTR_ERR(task);
2252                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2253                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2254                 lfsck_thread_args_fini(lta);
2255         } else {
2256                 struct l_wait_info lwi = { 0 };
2257
2258                 l_wait_event(mthread->t_ctl_waitq,
2259                              thread_is_running(athread) ||
2260                              thread_is_stopped(athread),
2261                              &lwi);
2262                 if (unlikely(!thread_is_running(athread)))
2263                         rc = lad->lad_assistant_status;
2264                 else
2265                         rc = 0;
2266         }
2267
2268         RETURN(rc);
2269 }
2270
2271 int lfsck_checkpoint_generic(const struct lu_env *env,
2272                              struct lfsck_component *com)
2273 {
2274         struct lfsck_assistant_data     *lad     = com->lc_data;
2275         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2276         struct ptlrpc_thread            *athread = &lad->lad_thread;
2277         struct l_wait_info               lwi     = { 0 };
2278
2279         if (com->lc_new_checked == 0)
2280                 return LFSCK_CHECKPOINT_SKIP;
2281
2282         l_wait_event(mthread->t_ctl_waitq,
2283                      list_empty(&lad->lad_req_list) ||
2284                      !thread_is_running(mthread) ||
2285                      thread_is_stopped(athread),
2286                      &lwi);
2287
2288         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2289                 return LFSCK_CHECKPOINT_SKIP;
2290
2291         return 0;
2292 }
2293
2294 void lfsck_post_generic(const struct lu_env *env,
2295                         struct lfsck_component *com, int *result)
2296 {
2297         struct lfsck_assistant_data     *lad     = com->lc_data;
2298         struct ptlrpc_thread            *athread = &lad->lad_thread;
2299         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2300         struct l_wait_info               lwi     = { 0 };
2301
2302         lad->lad_post_result = *result;
2303         if (*result <= 0)
2304                 lad->lad_exit = 1;
2305         lad->lad_to_post = 1;
2306
2307         wake_up_all(&athread->t_ctl_waitq);
2308         l_wait_event(mthread->t_ctl_waitq,
2309                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2310                      thread_is_stopped(athread),
2311                      &lwi);
2312
2313         if (lad->lad_assistant_status < 0)
2314                 *result = lad->lad_assistant_status;
2315 }
2316
2317 int lfsck_double_scan_generic(const struct lu_env *env,
2318                               struct lfsck_component *com, int status)
2319 {
2320         struct lfsck_assistant_data     *lad     = com->lc_data;
2321         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2322         struct ptlrpc_thread            *athread = &lad->lad_thread;
2323         struct l_wait_info               lwi     = { 0 };
2324
2325         if (status != LS_SCANNING_PHASE2)
2326                 lad->lad_exit = 1;
2327         else
2328                 lad->lad_to_double_scan = 1;
2329
2330         wake_up_all(&athread->t_ctl_waitq);
2331         l_wait_event(mthread->t_ctl_waitq,
2332                      lad->lad_in_double_scan ||
2333                      thread_is_stopped(athread),
2334                      &lwi);
2335
2336         if (lad->lad_assistant_status < 0)
2337                 return lad->lad_assistant_status;
2338
2339         return 0;
2340 }
2341
2342 void lfsck_quit_generic(const struct lu_env *env,
2343                         struct lfsck_component *com)
2344 {
2345         struct lfsck_assistant_data     *lad     = com->lc_data;
2346         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2347         struct ptlrpc_thread            *athread = &lad->lad_thread;
2348         struct l_wait_info               lwi     = { 0 };
2349
2350         lad->lad_exit = 1;
2351         wake_up_all(&athread->t_ctl_waitq);
2352         l_wait_event(mthread->t_ctl_waitq,
2353                      thread_is_init(athread) ||
2354                      thread_is_stopped(athread),
2355                      &lwi);
2356 }
2357
2358 /* external interfaces */
2359
2360 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2361 {
2362         struct lu_env           env;
2363         struct lfsck_instance  *lfsck;
2364         int                     rc;
2365         ENTRY;
2366
2367         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2368         if (rc != 0)
2369                 RETURN(rc);
2370
2371         lfsck = lfsck_instance_find(key, true, false);
2372         if (likely(lfsck != NULL)) {
2373                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2374                 lfsck_instance_put(&env, lfsck);
2375         } else {
2376                 rc = -ENXIO;
2377         }
2378
2379         lu_env_fini(&env);
2380
2381         RETURN(rc);
2382 }
2383 EXPORT_SYMBOL(lfsck_get_speed);
2384
2385 int lfsck_set_speed(struct dt_device *key, int val)
2386 {
2387         struct lu_env           env;
2388         struct lfsck_instance  *lfsck;
2389         int                     rc;
2390         ENTRY;
2391
2392         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2393         if (rc != 0)
2394                 RETURN(rc);
2395
2396         lfsck = lfsck_instance_find(key, true, false);
2397         if (likely(lfsck != NULL)) {
2398                 mutex_lock(&lfsck->li_mutex);
2399                 if (__lfsck_set_speed(lfsck, val))
2400                         rc = lfsck_bookmark_store(&env, lfsck);
2401                 mutex_unlock(&lfsck->li_mutex);
2402                 lfsck_instance_put(&env, lfsck);
2403         } else {
2404                 rc = -ENXIO;
2405         }
2406
2407         lu_env_fini(&env);
2408
2409         RETURN(rc);
2410 }
2411 EXPORT_SYMBOL(lfsck_set_speed);
2412
2413 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2414 {
2415         struct lu_env           env;
2416         struct lfsck_instance  *lfsck;
2417         int                     rc;
2418         ENTRY;
2419
2420         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2421         if (rc != 0)
2422                 RETURN(rc);
2423
2424         lfsck = lfsck_instance_find(key, true, false);
2425         if (likely(lfsck != NULL)) {
2426                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2427                 lfsck_instance_put(&env, lfsck);
2428         } else {
2429                 rc = -ENXIO;
2430         }
2431
2432         lu_env_fini(&env);
2433
2434         RETURN(rc);
2435 }
2436 EXPORT_SYMBOL(lfsck_get_windows);
2437
2438 int lfsck_set_windows(struct dt_device *key, int val)
2439 {
2440         struct lu_env           env;
2441         struct lfsck_instance  *lfsck;
2442         int                     rc;
2443         ENTRY;
2444
2445         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2446         if (rc != 0)
2447                 RETURN(rc);
2448
2449         lfsck = lfsck_instance_find(key, true, false);
2450         if (likely(lfsck != NULL)) {
2451                 if (val > LFSCK_ASYNC_WIN_MAX) {
2452                         CWARN("%s: Too large async window size, which "
2453                               "may cause memory issues. The valid range "
2454                               "is [0 - %u]. If you do not want to restrict "
2455                               "the window size for async requests pipeline, "
2456                               "just set it as 0.\n",
2457                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2458                         rc = -EINVAL;
2459                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2460                         mutex_lock(&lfsck->li_mutex);
2461                         lfsck->li_bookmark_ram.lb_async_windows = val;
2462                         rc = lfsck_bookmark_store(&env, lfsck);
2463                         mutex_unlock(&lfsck->li_mutex);
2464                 }
2465                 lfsck_instance_put(&env, lfsck);
2466         } else {
2467                 rc = -ENXIO;
2468         }
2469
2470         lu_env_fini(&env);
2471
2472         RETURN(rc);
2473 }
2474 EXPORT_SYMBOL(lfsck_set_windows);
2475
2476 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2477 {
2478         struct lu_env           env;
2479         struct lfsck_instance  *lfsck;
2480         struct lfsck_component *com;
2481         int                     rc;
2482         ENTRY;
2483
2484         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2485         if (rc != 0)
2486                 RETURN(rc);
2487
2488         lfsck = lfsck_instance_find(key, true, false);
2489         if (likely(lfsck != NULL)) {
2490                 com = lfsck_component_find(lfsck, type);
2491                 if (likely(com != NULL)) {
2492                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2493                         lfsck_component_put(&env, com);
2494                 } else {
2495                         rc = -ENOTSUPP;
2496                 }
2497
2498                 lfsck_instance_put(&env, lfsck);
2499         } else {
2500                 rc = -ENXIO;
2501         }
2502
2503         lu_env_fini(&env);
2504
2505         RETURN(rc);
2506 }
2507 EXPORT_SYMBOL(lfsck_dump);
2508
2509 static int lfsck_stop_all(const struct lu_env *env,
2510                           struct lfsck_instance *lfsck,
2511                           struct lfsck_stop *stop)
2512 {
2513         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2514         struct lfsck_request              *lr     = &info->lti_lr;
2515         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2516         struct ptlrpc_request_set         *set;
2517         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2518         struct lfsck_tgt_desc             *ltd;
2519         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2520         __u32                              idx;
2521         int                                rc     = 0;
2522         int                                rc1    = 0;
2523         ENTRY;
2524
2525         LASSERT(stop->ls_flags & LPF_BROADCAST);
2526
2527         set = ptlrpc_prep_set();
2528         if (unlikely(set == NULL))
2529                 RETURN(-ENOMEM);
2530
2531         memset(lr, 0, sizeof(*lr));
2532         lr->lr_event = LE_STOP;
2533         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2534         lr->lr_status = stop->ls_status;
2535         lr->lr_version = bk->lb_version;
2536         lr->lr_active = LFSCK_TYPES_ALL;
2537         lr->lr_param = stop->ls_flags;
2538
2539         laia->laia_com = NULL;
2540         laia->laia_ltds = ltds;
2541         laia->laia_lr = lr;
2542         laia->laia_result = 0;
2543         laia->laia_shared = 1;
2544
2545         down_read(&ltds->ltd_rw_sem);
2546         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2547                 ltd = lfsck_tgt_get(ltds, idx);
2548                 LASSERT(ltd != NULL);
2549
2550                 laia->laia_ltd = ltd;
2551                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2552                                          lfsck_async_interpret, laia,
2553                                          LFSCK_NOTIFY);
2554                 if (rc != 0) {
2555                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2556                         lfsck_tgt_put(ltd);
2557                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2558                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2559                         rc1 = rc;
2560                 }
2561         }
2562         up_read(&ltds->ltd_rw_sem);
2563
2564         rc = ptlrpc_set_wait(set);
2565         ptlrpc_set_destroy(set);
2566
2567         if (rc == 0)
2568                 rc = laia->laia_result;
2569
2570         if (rc == -EALREADY)
2571                 rc = 0;
2572
2573         if (rc != 0)
2574                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2575                        lfsck_lfsck2name(lfsck), rc);
2576
2577         RETURN(rc != 0 ? rc : rc1);
2578 }
2579
2580 static int lfsck_start_all(const struct lu_env *env,
2581                            struct lfsck_instance *lfsck,
2582                            struct lfsck_start *start)
2583 {
2584         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2585         struct lfsck_request              *lr     = &info->lti_lr;
2586         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2587         struct ptlrpc_request_set         *set;
2588         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2589         struct lfsck_tgt_desc             *ltd;
2590         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2591         __u32                              idx;
2592         int                                rc     = 0;
2593         ENTRY;
2594
2595         LASSERT(start->ls_flags & LPF_BROADCAST);
2596
2597         set = ptlrpc_prep_set();
2598         if (unlikely(set == NULL))
2599                 RETURN(-ENOMEM);
2600
2601         memset(lr, 0, sizeof(*lr));
2602         lr->lr_event = LE_START;
2603         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2604         lr->lr_speed = bk->lb_speed_limit;
2605         lr->lr_version = bk->lb_version;
2606         lr->lr_active = start->ls_active;
2607         lr->lr_param = start->ls_flags;
2608         lr->lr_async_windows = bk->lb_async_windows;
2609         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2610                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2611                        LSV_CREATE_MDTOBJ;
2612
2613         laia->laia_com = NULL;
2614         laia->laia_ltds = ltds;
2615         laia->laia_lr = lr;
2616         laia->laia_result = 0;
2617         laia->laia_shared = 1;
2618
2619         down_read(&ltds->ltd_rw_sem);
2620         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2621                 ltd = lfsck_tgt_get(ltds, idx);
2622                 LASSERT(ltd != NULL);
2623
2624                 laia->laia_ltd = ltd;
2625                 ltd->ltd_layout_done = 0;
2626                 ltd->ltd_namespace_done = 0;
2627                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2628                                          lfsck_async_interpret, laia,
2629                                          LFSCK_NOTIFY);
2630                 if (rc != 0) {
2631                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2632                         lfsck_tgt_put(ltd);
2633                         CERROR("%s: cannot notify MDT %x for LFSCK "
2634                                "start, failout: rc = %d\n",
2635                                lfsck_lfsck2name(lfsck), idx, rc);
2636                         break;
2637                 }
2638         }
2639         up_read(&ltds->ltd_rw_sem);
2640
2641         if (rc != 0) {
2642                 ptlrpc_set_destroy(set);
2643
2644                 RETURN(rc);
2645         }
2646
2647         rc = ptlrpc_set_wait(set);
2648         ptlrpc_set_destroy(set);
2649
2650         if (rc == 0)
2651                 rc = laia->laia_result;
2652
2653         if (rc != 0) {
2654                 struct lfsck_stop *stop = &info->lti_stop;
2655
2656                 CERROR("%s: cannot start LFSCK on some MDTs, "
2657                        "stop all: rc = %d\n",
2658                        lfsck_lfsck2name(lfsck), rc);
2659                 if (rc != -EALREADY) {
2660                         stop->ls_status = LS_FAILED;
2661                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2662                         lfsck_stop_all(env, lfsck, stop);
2663                 }
2664         }
2665
2666         RETURN(rc);
2667 }
2668
2669 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2670                 struct lfsck_start_param *lsp)
2671 {
2672         struct lfsck_start              *start  = lsp->lsp_start;
2673         struct lfsck_instance           *lfsck;
2674         struct lfsck_bookmark           *bk;
2675         struct ptlrpc_thread            *thread;
2676         struct lfsck_component          *com;
2677         struct l_wait_info               lwi    = { 0 };
2678         struct lfsck_thread_args        *lta;
2679         struct task_struct              *task;
2680         int                              rc     = 0;
2681         __u16                            valid  = 0;
2682         __u16                            flags  = 0;
2683         __u16                            type   = 1;
2684         ENTRY;
2685
2686         lfsck = lfsck_instance_find(key, true, false);
2687         if (unlikely(lfsck == NULL))
2688                 RETURN(-ENXIO);
2689
2690         /* System is not ready, try again later. */
2691         if (unlikely(lfsck->li_namespace == NULL))
2692                 GOTO(put, rc = -EAGAIN);
2693
2694         /* start == NULL means auto trigger paused LFSCK. */
2695         if ((start == NULL) &&
2696             (list_empty(&lfsck->li_list_scan) ||
2697              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2698                 GOTO(put, rc = 0);
2699
2700         bk = &lfsck->li_bookmark_ram;
2701         thread = &lfsck->li_thread;
2702         mutex_lock(&lfsck->li_mutex);
2703         spin_lock(&lfsck->li_lock);
2704         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2705                 rc = -EALREADY;
2706                 if (unlikely(start == NULL)) {
2707                         spin_unlock(&lfsck->li_lock);
2708                         GOTO(out, rc);
2709                 }
2710
2711                 while (start->ls_active != 0) {
2712                         if (!(type & start->ls_active)) {
2713                                 type <<= 1;
2714                                 continue;
2715                         }
2716
2717                         com = __lfsck_component_find(lfsck, type,
2718                                                      &lfsck->li_list_scan);
2719                         if (com == NULL)
2720                                 com = __lfsck_component_find(lfsck, type,
2721                                                 &lfsck->li_list_double_scan);
2722                         if (com == NULL) {
2723                                 rc = -EOPNOTSUPP;
2724                                 break;
2725                         }
2726
2727                         if (com->lc_ops->lfsck_join != NULL) {
2728                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2729                                 if (rc != 0 && rc != -EALREADY)
2730                                         break;
2731                         }
2732                         start->ls_active &= ~type;
2733                         type <<= 1;
2734                 }
2735                 spin_unlock(&lfsck->li_lock);
2736                 GOTO(out, rc);
2737         }
2738         spin_unlock(&lfsck->li_lock);
2739
2740         lfsck->li_status = 0;
2741         lfsck->li_oit_over = 0;
2742         lfsck->li_start_unplug = 0;
2743         lfsck->li_drop_dryrun = 0;
2744         lfsck->li_new_scanned = 0;
2745
2746         /* For auto trigger. */
2747         if (start == NULL)
2748                 goto trigger;
2749
2750         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2751                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2752                        lfsck_lfsck2name(lfsck));
2753
2754                 GOTO(out, rc = -EPERM);
2755         }
2756
2757         start->ls_version = bk->lb_version;
2758
2759         if (start->ls_active != 0) {
2760                 struct lfsck_component *next;
2761
2762                 if (start->ls_active == LFSCK_TYPES_ALL)
2763                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2764
2765                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2766                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2767                         GOTO(out, rc = -ENOTSUPP);
2768                 }
2769
2770                 list_for_each_entry_safe(com, next,
2771                                          &lfsck->li_list_scan, lc_link) {
2772                         if (!(com->lc_type & start->ls_active)) {
2773                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2774                                                              false);
2775                                 if (rc != 0)
2776                                         GOTO(out, rc);
2777                         }
2778                 }
2779
2780                 while (start->ls_active != 0) {
2781                         if (type & start->ls_active) {
2782                                 com = __lfsck_component_find(lfsck, type,
2783                                                         &lfsck->li_list_idle);
2784                                 if (com != NULL)
2785                                         /* The component status will be updated
2786                                          * when its prep() is called later by
2787                                          * the LFSCK main engine. */
2788                                         list_move_tail(&com->lc_link,
2789                                                        &lfsck->li_list_scan);
2790                                 start->ls_active &= ~type;
2791                         }
2792                         type <<= 1;
2793                 }
2794         }
2795
2796         if (list_empty(&lfsck->li_list_scan)) {
2797                 /* The speed limit will be used to control both the LFSCK and
2798                  * low layer scrub (if applied), need to be handled firstly. */
2799                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2800                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2801                                 rc = lfsck_bookmark_store(env, lfsck);
2802                                 if (rc != 0)
2803                                         GOTO(out, rc);
2804                         }
2805                 }
2806
2807                 goto trigger;
2808         }
2809
2810         if (start->ls_flags & LPF_RESET)
2811                 flags |= DOIF_RESET;
2812
2813         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2814         if (rc != 0)
2815                 GOTO(out, rc);
2816
2817         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2818                 start->ls_active |= com->lc_type;
2819                 if (flags & DOIF_RESET) {
2820                         rc = com->lc_ops->lfsck_reset(env, com, false);
2821                         if (rc != 0)
2822                                 GOTO(out, rc);
2823                 }
2824         }
2825
2826 trigger:
2827         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2828         if (bk->lb_param & LPF_DRYRUN)
2829                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2830
2831         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2832                 valid |= DOIV_ERROR_HANDLE;
2833                 if (start->ls_flags & LPF_FAILOUT)
2834                         flags |= DOIF_FAILOUT;
2835         }
2836
2837         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2838                 valid |= DOIV_DRYRUN;
2839                 if (start->ls_flags & LPF_DRYRUN)
2840                         flags |= DOIF_DRYRUN;
2841         }
2842
2843         if (!list_empty(&lfsck->li_list_scan))
2844                 flags |= DOIF_OUTUSED;
2845
2846         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2847         thread_set_flags(thread, 0);
2848         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2849         if (IS_ERR(lta))
2850                 GOTO(out, rc = PTR_ERR(lta));
2851
2852         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2853         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2854         if (IS_ERR(task)) {
2855                 rc = PTR_ERR(task);
2856                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2857                        lfsck_lfsck2name(lfsck), rc);
2858                 lfsck_thread_args_fini(lta);
2859
2860                 GOTO(out, rc);
2861         }
2862
2863         l_wait_event(thread->t_ctl_waitq,
2864                      thread_is_running(thread) ||
2865                      thread_is_stopped(thread),
2866                      &lwi);
2867         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2868                 lfsck->li_start_unplug = 1;
2869                 wake_up_all(&thread->t_ctl_waitq);
2870
2871                 GOTO(out, rc = 0);
2872         }
2873
2874         /* release lfsck::li_mutex to avoid deadlock. */
2875         mutex_unlock(&lfsck->li_mutex);
2876         rc = lfsck_start_all(env, lfsck, start);
2877         if (rc != 0) {
2878                 spin_lock(&lfsck->li_lock);
2879                 if (thread_is_stopped(thread)) {
2880                         spin_unlock(&lfsck->li_lock);
2881                 } else {
2882                         lfsck->li_status = LS_FAILED;
2883                         lfsck->li_flags = 0;
2884                         thread_set_flags(thread, SVC_STOPPING);
2885                         spin_unlock(&lfsck->li_lock);
2886
2887                         lfsck->li_start_unplug = 1;
2888                         wake_up_all(&thread->t_ctl_waitq);
2889                         l_wait_event(thread->t_ctl_waitq,
2890                                      thread_is_stopped(thread),
2891                                      &lwi);
2892                 }
2893         } else {
2894                 lfsck->li_start_unplug = 1;
2895                 wake_up_all(&thread->t_ctl_waitq);
2896         }
2897
2898         GOTO(put, rc);
2899
2900 out:
2901         mutex_unlock(&lfsck->li_mutex);
2902
2903 put:
2904         lfsck_instance_put(env, lfsck);
2905
2906         return rc < 0 ? rc : 0;
2907 }
2908 EXPORT_SYMBOL(lfsck_start);
2909
2910 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2911                struct lfsck_stop *stop)
2912 {
2913         struct lfsck_instance   *lfsck;
2914         struct ptlrpc_thread    *thread;
2915         struct l_wait_info       lwi    = { 0 };
2916         int                      rc     = 0;
2917         int                      rc1    = 0;
2918         ENTRY;
2919
2920         lfsck = lfsck_instance_find(key, true, false);
2921         if (unlikely(lfsck == NULL))
2922                 RETURN(-ENXIO);
2923
2924         thread = &lfsck->li_thread;
2925         /* release lfsck::li_mutex to avoid deadlock. */
2926         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2927                 if (!lfsck->li_master) {
2928                         CERROR("%s: only allow to specify '-A' via MDS\n",
2929                                lfsck_lfsck2name(lfsck));
2930
2931                         GOTO(out, rc = -EPERM);
2932                 }
2933
2934                 rc1 = lfsck_stop_all(env, lfsck, stop);
2935         }
2936
2937         mutex_lock(&lfsck->li_mutex);
2938         spin_lock(&lfsck->li_lock);
2939         /* no error if LFSCK is already stopped, or was never started */
2940         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2941                 spin_unlock(&lfsck->li_lock);
2942                 GOTO(out, rc = 0);
2943         }
2944
2945         if (stop != NULL) {
2946                 lfsck->li_status = stop->ls_status;
2947                 lfsck->li_flags = stop->ls_flags;
2948         } else {
2949                 lfsck->li_status = LS_STOPPED;
2950                 lfsck->li_flags = 0;
2951         }
2952
2953         thread_set_flags(thread, SVC_STOPPING);
2954         spin_unlock(&lfsck->li_lock);
2955
2956         wake_up_all(&thread->t_ctl_waitq);
2957         l_wait_event(thread->t_ctl_waitq,
2958                      thread_is_stopped(thread),
2959                      &lwi);
2960
2961         GOTO(out, rc = 0);
2962
2963 out:
2964         mutex_unlock(&lfsck->li_mutex);
2965         lfsck_instance_put(env, lfsck);
2966
2967         return rc != 0 ? rc : rc1;
2968 }
2969 EXPORT_SYMBOL(lfsck_stop);
2970
2971 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2972                     struct lfsck_request *lr, struct thandle *th)
2973 {
2974         int rc = -EOPNOTSUPP;
2975         ENTRY;
2976
2977         switch (lr->lr_event) {
2978         case LE_START: {
2979                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2980                 struct lfsck_start_param  lsp;
2981
2982                 memset(start, 0, sizeof(*start));
2983                 start->ls_valid = lr->lr_valid;
2984                 start->ls_speed_limit = lr->lr_speed;
2985                 start->ls_version = lr->lr_version;
2986                 start->ls_active = lr->lr_active;
2987                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2988                 start->ls_async_windows = lr->lr_async_windows;
2989
2990                 lsp.lsp_start = start;
2991                 lsp.lsp_index = lr->lr_index;
2992                 lsp.lsp_index_valid = 1;
2993                 rc = lfsck_start(env, key, &lsp);
2994                 break;
2995         }
2996         case LE_STOP: {
2997                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2998
2999                 memset(stop, 0, sizeof(*stop));
3000                 stop->ls_status = lr->lr_status;
3001                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3002                 rc = lfsck_stop(env, key, stop);
3003                 break;
3004         }
3005         case LE_PHASE1_DONE:
3006         case LE_PHASE2_DONE:
3007         case LE_FID_ACCESSED:
3008         case LE_PEER_EXIT:
3009         case LE_CONDITIONAL_DESTROY:
3010         case LE_SKIP_NLINK_DECLARE:
3011         case LE_SKIP_NLINK:
3012         case LE_SET_LMV_MASTER:
3013         case LE_SET_LMV_SLAVE:
3014         case LE_PAIRS_VERIFY: {
3015                 struct lfsck_instance  *lfsck;
3016                 struct lfsck_component *com;
3017
3018                 lfsck = lfsck_instance_find(key, true, false);
3019                 if (unlikely(lfsck == NULL))
3020                         RETURN(-ENXIO);
3021
3022                 com = lfsck_component_find(lfsck, lr->lr_active);
3023                 if (likely(com != NULL)) {
3024                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3025                         lfsck_component_put(env, com);
3026                 }
3027
3028                 lfsck_instance_put(env, lfsck);
3029                 break;
3030         }
3031         default:
3032                 break;
3033         }
3034
3035         RETURN(rc);
3036 }
3037 EXPORT_SYMBOL(lfsck_in_notify);
3038
3039 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3040                 struct lfsck_request *lr)
3041 {
3042         struct lfsck_instance  *lfsck;
3043         struct lfsck_component *com;
3044         int                     rc;
3045         ENTRY;
3046
3047         lfsck = lfsck_instance_find(key, true, false);
3048         if (unlikely(lfsck == NULL))
3049                 RETURN(-ENXIO);
3050
3051         com = lfsck_component_find(lfsck, lr->lr_active);
3052         if (likely(com != NULL)) {
3053                 rc = com->lc_ops->lfsck_query(env, com);
3054                 lfsck_component_put(env, com);
3055         } else {
3056                 rc = -ENOTSUPP;
3057         }
3058
3059         lfsck_instance_put(env, lfsck);
3060
3061         RETURN(rc);
3062 }
3063 EXPORT_SYMBOL(lfsck_query);
3064
3065 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3066                              struct ldlm_namespace *ns)
3067 {
3068         struct lfsck_instance  *lfsck;
3069         int                     rc      = -ENXIO;
3070
3071         lfsck = lfsck_instance_find(key, true, false);
3072         if (likely(lfsck != NULL)) {
3073                 lfsck->li_namespace = ns;
3074                 lfsck_instance_put(env, lfsck);
3075                 rc = 0;
3076         }
3077
3078         return rc;
3079 }
3080 EXPORT_SYMBOL(lfsck_register_namespace);
3081
3082 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3083                    struct dt_device *next, struct obd_device *obd,
3084                    lfsck_out_notify notify, void *notify_data, bool master)
3085 {
3086         struct lfsck_instance   *lfsck;
3087         struct dt_object        *root  = NULL;
3088         struct dt_object        *obj   = NULL;
3089         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3090         int                      rc;
3091         ENTRY;
3092
3093         lfsck = lfsck_instance_find(key, false, false);
3094         if (unlikely(lfsck != NULL))
3095                 RETURN(-EEXIST);
3096
3097         OBD_ALLOC_PTR(lfsck);
3098         if (lfsck == NULL)
3099                 RETURN(-ENOMEM);
3100
3101         mutex_init(&lfsck->li_mutex);
3102         spin_lock_init(&lfsck->li_lock);
3103         INIT_LIST_HEAD(&lfsck->li_link);
3104         INIT_LIST_HEAD(&lfsck->li_list_scan);
3105         INIT_LIST_HEAD(&lfsck->li_list_dir);
3106         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3107         INIT_LIST_HEAD(&lfsck->li_list_idle);
3108         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3109         atomic_set(&lfsck->li_ref, 1);
3110         atomic_set(&lfsck->li_double_scan_count, 0);
3111         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3112         lfsck->li_out_notify = notify;
3113         lfsck->li_out_notify_data = notify_data;
3114         lfsck->li_next = next;
3115         lfsck->li_bottom = key;
3116         lfsck->li_obd = obd;
3117
3118         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3119         if (rc != 0)
3120                 GOTO(out, rc);
3121
3122         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3123         if (rc != 0)
3124                 GOTO(out, rc);
3125
3126         fid->f_seq = FID_SEQ_LOCAL_NAME;
3127         fid->f_oid = 1;
3128         fid->f_ver = 0;
3129         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3130         if (rc != 0)
3131                 GOTO(out, rc);
3132
3133         rc = dt_root_get(env, key, fid);
3134         if (rc != 0)
3135                 GOTO(out, rc);
3136
3137         root = dt_locate(env, key, fid);
3138         if (IS_ERR(root))
3139                 GOTO(out, rc = PTR_ERR(root));
3140
3141         if (unlikely(!dt_try_as_dir(env, root)))
3142                 GOTO(out, rc = -ENOTDIR);
3143
3144         lfsck->li_local_root_fid = *fid;
3145         if (master) {
3146                 lfsck->li_master = 1;
3147                 if (lfsck_dev_idx(key) == 0) {
3148                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3149                         const struct lu_name *cname;
3150
3151                         rc = dt_lookup(env, root,
3152                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3153                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3154                         if (rc != 0)
3155                                 GOTO(out, rc);
3156
3157                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3158                         if (IS_ERR(obj))
3159                                 GOTO(out, rc = PTR_ERR(obj));
3160
3161                         if (unlikely(!dt_try_as_dir(env, obj)))
3162                                 GOTO(out, rc = -ENOTDIR);
3163
3164                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3165                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3166                         if (rc != 0)
3167                                 GOTO(out, rc);
3168
3169                         lu_object_put(env, &obj->do_lu);
3170                         obj = dt_locate(env, key, fid);
3171                         if (IS_ERR(obj))
3172                                 GOTO(out, rc = PTR_ERR(obj));
3173
3174                         cname = lfsck_name_get_const(env, dotlustre,
3175                                                      strlen(dotlustre));
3176                         rc = lfsck_verify_linkea(env, key, obj, cname,
3177                                                  &lfsck->li_global_root_fid);
3178                         if (rc != 0)
3179                                 GOTO(out, rc);
3180
3181                         if (unlikely(!dt_try_as_dir(env, obj)))
3182                                 GOTO(out, rc = -ENOTDIR);
3183
3184                         *pfid = *fid;
3185                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3186                                        (const struct dt_key *)lostfound,
3187                                        BYPASS_CAPA);
3188                         if (rc != 0)
3189                                 GOTO(out, rc);
3190
3191                         lu_object_put(env, &obj->do_lu);