Whamcloud - gitweb
3d8e75b09e1d9f11a1eca99422be1c98119883ad
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <libcfs/list.h>
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <md_object.h>
39 #include <lustre_fld.h>
40 #include <lustre_lib.h>
41 #include <lustre_net.h>
42 #include <lustre_lfsck.h>
43 #include <lustre/lustre_lfsck_user.h>
44
45 #include "lfsck_internal.h"
46
47 #define LFSCK_CHECKPOINT_SKIP   1
48
49 /* define lfsck thread key */
50 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
51
52 static void lfsck_key_fini(const struct lu_context *ctx,
53                            struct lu_context_key *key, void *data)
54 {
55         struct lfsck_thread_info *info = data;
56
57         lu_buf_free(&info->lti_linkea_buf);
58         lu_buf_free(&info->lti_linkea_buf2);
59         lu_buf_free(&info->lti_big_buf);
60         OBD_FREE_PTR(info);
61 }
62
63 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
64 LU_KEY_INIT_GENERIC(lfsck);
65
66 static struct list_head lfsck_instance_list;
67 static struct list_head lfsck_ost_orphan_list;
68 static struct list_head lfsck_mdt_orphan_list;
69 static DEFINE_SPINLOCK(lfsck_instance_lock);
70
71 const char *lfsck_flags_names[] = {
72         "scanned-once",
73         "inconsistent",
74         "upgrade",
75         "incomplete",
76         "crashed_lastid",
77         NULL
78 };
79
80 const char *lfsck_param_names[] = {
81         NULL,
82         "failout",
83         "dryrun",
84         "all_targets",
85         "broadcast",
86         "orphan",
87         "create_ostobj",
88         "create_mdtobj",
89         NULL
90 };
91
92 enum lfsck_verify_lpf_types {
93         LVLT_BY_BOOKMARK        = 0,
94         LVLT_BY_NAMEENTRY       = 1,
95 };
96
97 static inline void
98 lfsck_reset_ltd_status(struct lfsck_tgt_desc *ltd, enum lfsck_type type)
99 {
100         if (type == LFSCK_TYPE_LAYOUT) {
101                 ltd->ltd_layout_status = LS_MAX;
102                 ltd->ltd_layout_repaired = 0;
103         } else {
104                 ltd->ltd_namespace_status = LS_MAX;
105                 ltd->ltd_namespace_repaired = 0;
106         }
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = lfsck_ltd2tgt(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146                         LASSERT(list_empty(&ltd->ltd_namespace_list));
147                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
148
149                         ltds->ltd_tgtnr--;
150                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
151                         lfsck_assign_tgt(ltds, NULL, idx);
152                         lfsck_tgt_put(ltd);
153                 }
154         }
155
156         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
157                  ltds->ltd_tgtnr);
158
159         for (idx = 0; idx < TGT_PTRS; idx++) {
160                 if (ltds->ltd_tgts_idx[idx] != NULL) {
161                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
162                         ltds->ltd_tgts_idx[idx] = NULL;
163                 }
164         }
165
166         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
167         ltds->ltd_tgts_bitmap = NULL;
168         up_write(&ltds->ltd_rw_sem);
169 }
170
171 static int __lfsck_add_target(const struct lu_env *env,
172                               struct lfsck_instance *lfsck,
173                               struct lfsck_tgt_desc *ltd,
174                               bool for_ost, bool locked)
175 {
176         struct lfsck_tgt_descs *ltds;
177         __u32                   index = ltd->ltd_index;
178         int                     rc    = 0;
179         ENTRY;
180
181         if (for_ost)
182                 ltds = &lfsck->li_ost_descs;
183         else
184                 ltds = &lfsck->li_mdt_descs;
185
186         if (!locked)
187                 down_write(&ltds->ltd_rw_sem);
188
189         LASSERT(ltds->ltd_tgts_bitmap != NULL);
190
191         if (index >= ltds->ltd_tgts_bitmap->size) {
192                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
193                                     (__u32)BITS_PER_LONG);
194                 struct cfs_bitmap *old_bitmap = ltds->ltd_tgts_bitmap;
195                 struct cfs_bitmap *new_bitmap;
196
197                 while (newsize < index + 1)
198                         newsize <<= 1;
199
200                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
201                 if (new_bitmap == NULL)
202                         GOTO(unlock, rc = -ENOMEM);
203
204                 if (ltds->ltd_tgtnr > 0)
205                         cfs_bitmap_copy(new_bitmap, old_bitmap);
206                 ltds->ltd_tgts_bitmap = new_bitmap;
207                 CFS_FREE_BITMAP(old_bitmap);
208         }
209
210         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
211                 CERROR("%s: the device %s (%u) is registered already\n",
212                        lfsck_lfsck2name(lfsck),
213                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
214                 GOTO(unlock, rc = -EEXIST);
215         }
216
217         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
218                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
219                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
220                         GOTO(unlock, rc = -ENOMEM);
221         }
222
223         lfsck_assign_tgt(ltds, ltd, index);
224         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
225         ltds->ltd_tgtnr++;
226
227         GOTO(unlock, rc = 0);
228
229 unlock:
230         if (!locked)
231                 up_write(&ltds->ltd_rw_sem);
232
233         return rc;
234 }
235
236 static int lfsck_add_target_from_orphan(const struct lu_env *env,
237                                         struct lfsck_instance *lfsck)
238 {
239         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
240         struct lfsck_tgt_desc   *ltd;
241         struct lfsck_tgt_desc   *next;
242         struct list_head        *head    = &lfsck_ost_orphan_list;
243         int                      rc;
244         bool                     for_ost = true;
245
246 again:
247         spin_lock(&lfsck_instance_lock);
248         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
249                 if (ltd->ltd_key == lfsck->li_bottom)
250                         list_move_tail(&ltd->ltd_orphan_list,
251                                        &ltds->ltd_orphan);
252         }
253         spin_unlock(&lfsck_instance_lock);
254
255         down_write(&ltds->ltd_rw_sem);
256         while (!list_empty(&ltds->ltd_orphan)) {
257                 ltd = list_entry(ltds->ltd_orphan.next,
258                                  struct lfsck_tgt_desc,
259                                  ltd_orphan_list);
260                 list_del_init(&ltd->ltd_orphan_list);
261                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
262                 /* Do not hold the semaphore for too long time. */
263                 up_write(&ltds->ltd_rw_sem);
264                 if (rc != 0)
265                         return rc;
266
267                 down_write(&ltds->ltd_rw_sem);
268         }
269         up_write(&ltds->ltd_rw_sem);
270
271         if (for_ost) {
272                 ltds = &lfsck->li_mdt_descs;
273                 head = &lfsck_mdt_orphan_list;
274                 for_ost = false;
275                 goto again;
276         }
277
278         return 0;
279 }
280
281 static inline struct lfsck_component *
282 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
283                        struct list_head *list)
284 {
285         struct lfsck_component *com;
286
287         list_for_each_entry(com, list, lc_link) {
288                 if (com->lc_type == type)
289                         return com;
290         }
291         return NULL;
292 }
293
294 struct lfsck_component *
295 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
296 {
297         struct lfsck_component *com;
298
299         spin_lock(&lfsck->li_lock);
300         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
301         if (com != NULL)
302                 goto unlock;
303
304         com = __lfsck_component_find(lfsck, type,
305                                      &lfsck->li_list_double_scan);
306         if (com != NULL)
307                 goto unlock;
308
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
310
311 unlock:
312         if (com != NULL)
313                 lfsck_component_get(com);
314         spin_unlock(&lfsck->li_lock);
315         return com;
316 }
317
318 void lfsck_component_cleanup(const struct lu_env *env,
319                              struct lfsck_component *com)
320 {
321         if (!list_empty(&com->lc_link))
322                 list_del_init(&com->lc_link);
323         if (!list_empty(&com->lc_link_dir))
324                 list_del_init(&com->lc_link_dir);
325
326         lfsck_component_put(env, com);
327 }
328
329 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
330                     struct lu_fid *fid, bool locked)
331 {
332         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
333         int                      rc = 0;
334         ENTRY;
335
336         if (!locked)
337                 mutex_lock(&lfsck->li_mutex);
338
339         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
340         if (rc >= 0) {
341                 bk->lb_last_fid = *fid;
342                 /* We do not care about whether the subsequent sub-operations
343                  * failed or not. The worst case is that one FID is lost that
344                  * is not a big issue for the LFSCK since it is relative rare
345                  * for LFSCK create. */
346                 rc = lfsck_bookmark_store(env, lfsck);
347         }
348
349         if (!locked)
350                 mutex_unlock(&lfsck->li_mutex);
351
352         RETURN(rc);
353 }
354
355 static int __lfsck_ibits_lock(const struct lu_env *env,
356                               struct lfsck_instance *lfsck,
357                               struct dt_object *obj, struct ldlm_res_id *resid,
358                               struct lustre_handle *lh, __u64 bits,
359                               enum ldlm_mode mode)
360 {
361         struct lfsck_thread_info        *info   = lfsck_env_info(env);
362         union ldlm_policy_data          *policy = &info->lti_policy;
363         __u64                            flags  = LDLM_FL_ATOMIC_CB;
364         int                              rc;
365
366         LASSERT(lfsck->li_namespace != NULL);
367
368         memset(policy, 0, sizeof(*policy));
369         policy->l_inodebits.bits = bits;
370         if (dt_object_remote(obj)) {
371                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
372
373                 memset(einfo, 0, sizeof(*einfo));
374                 einfo->ei_type = LDLM_IBITS;
375                 einfo->ei_mode = mode;
376                 einfo->ei_cb_bl = ldlm_blocking_ast;
377                 einfo->ei_cb_cp = ldlm_completion_ast;
378                 einfo->ei_res_id = resid;
379
380                 rc = dt_object_lock(env, obj, lh, einfo, policy);
381         } else {
382                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
383                                             LDLM_IBITS, policy, mode,
384                                             &flags, ldlm_blocking_ast,
385                                             ldlm_completion_ast, NULL, NULL,
386                                             0, LVB_T_NONE, NULL, lh);
387         }
388
389         if (rc == ELDLM_OK) {
390                 rc = 0;
391         } else {
392                 memset(lh, 0, sizeof(*lh));
393                 rc = -EIO;
394         }
395
396         return rc;
397 }
398
399 /**
400  * Request the specified ibits lock for the given object.
401  *
402  * Before the LFSCK modifying on the namespace visible object,
403  * it needs to acquire related ibits ldlm lock.
404  *
405  * \param[in] env       pointer to the thread context
406  * \param[in] lfsck     pointer to the lfsck instance
407  * \param[in] obj       pointer to the dt_object to be locked
408  * \param[out] lh       pointer to the lock handle
409  * \param[in] bits      the bits for the ldlm lock to be acquired
410  * \param[in] mode      the mode for the ldlm lock to be acquired
411  *
412  * \retval              0 for success
413  * \retval              negative error number on failure
414  */
415 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
416                      struct dt_object *obj, struct lustre_handle *lh,
417                      __u64 bits, enum ldlm_mode mode)
418 {
419         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
420
421         LASSERT(!lustre_handle_is_used(lh));
422
423         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
424         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
425 }
426
427 /**
428  * Release the the specified ibits lock.
429  *
430  * If the lock has been acquired before, release it
431  * and cleanup the handle. Otherwise, do nothing.
432  *
433  * \param[in] lh        pointer to the lock handle
434  * \param[in] mode      the mode for the ldlm lock to be released
435  */
436 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
437 {
438         if (lustre_handle_is_used(lh)) {
439                 ldlm_lock_decref(lh, mode);
440                 memset(lh, 0, sizeof(*lh));
441         }
442 }
443
444 /**
445  * Request compound ibits locks for the given <obj, name> pairs.
446  *
447  * Before the LFSCK modifying on the namespace visible object, it needs to
448  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
449  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
450  * modificationis (such as insert name entry to the directory) may be too
451  * coarse-grained and not efficient.
452  *
453  * The lfsck_lock() will request compound ibits locks on the specified
454  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
455  * lock on the directory object, and the regular ibits lock on the name hash.
456  *
457  * \param[in] env       pointer to the thread context
458  * \param[in] lfsck     pointer to the lfsck instance
459  * \param[in] obj       pointer to the dt_object to be locked
460  * \param[in] name      used for building the PDO lock resource
461  * \param[out] llh      pointer to the lfsck_lock_handle
462  * \param[in] bits      the bits for the ldlm lock to be acquired
463  * \param[in] mode      the mode for the ldlm lock to be acquired
464  *
465  * \retval              0 for success
466  * \retval              negative error number on failure
467  */
468 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
469                struct dt_object *obj, const char *name,
470                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
471 {
472         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
473         int                 rc;
474
475         LASSERT(S_ISDIR(lfsck_object_type(obj)));
476         LASSERT(name != NULL);
477         LASSERT(name[0] != 0);
478         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
479         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
480
481         switch (mode) {
482         case LCK_EX:
483                 llh->llh_pdo_mode = LCK_EX;
484                 break;
485         case LCK_PW:
486                 llh->llh_pdo_mode = LCK_CW;
487                 break;
488         case LCK_PR:
489                 llh->llh_pdo_mode = LCK_CR;
490                 break;
491         default:
492                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
493                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
494                        PFID(lfsck_dto2fid(obj)));
495                 LBUG();
496         }
497
498         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
499         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
500                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
501         if (rc != 0)
502                 return rc;
503
504         llh->llh_reg_mode = mode;
505         resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name));
506         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
507         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
508                                 bits, llh->llh_reg_mode);
509         if (rc != 0)
510                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
511
512         return rc;
513 }
514
515 /**
516  * Release the the compound ibits locks.
517  *
518  * \param[in] llh       pointer to the lfsck_lock_handle to be released
519  */
520 void lfsck_unlock(struct lfsck_lock_handle *llh)
521 {
522         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
523         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
524 }
525
526 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
527                               struct lfsck_instance *lfsck,
528                               const struct lu_fid *fid)
529 {
530         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
531         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
532         int                      rc;
533
534         fld_range_set_mdt(range);
535         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
536         if (rc == 0)
537                 rc = range->lsr_index;
538
539         return rc;
540 }
541
542 const char dot[] = ".";
543 const char dotdot[] = "..";
544 static const char dotlustre[] = ".lustre";
545 static const char lostfound[] = "lost+found";
546
547 /**
548  * Remove the name entry from the .lustre/lost+found directory.
549  *
550  * No need to care about the object referenced by the name entry,
551  * either the name entry is invalid or redundant, or the referenced
552  * object has been processed or will be handled by others.
553  *
554  * \param[in] env       pointer to the thread context
555  * \param[in] lfsck     pointer to the lfsck instance
556  * \param[in] name      the name for the name entry to be removed
557  *
558  * \retval              0 for success
559  * \retval              negative error number on failure
560  */
561 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
562                                        struct lfsck_instance *lfsck,
563                                        const char *name)
564 {
565         struct dt_object        *parent = lfsck->li_lpf_root_obj;
566         struct dt_device        *dev    = lfsck_obj2dev(parent);
567         struct thandle          *th;
568         struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
569         int                      rc;
570         ENTRY;
571
572         rc = lfsck_lock(env, lfsck, parent, name, llh,
573                         MDS_INODELOCK_UPDATE, LCK_PW);
574         if (rc != 0)
575                 RETURN(rc);
576
577         th = dt_trans_create(env, dev);
578         if (IS_ERR(th))
579                 GOTO(unlock, rc = PTR_ERR(th));
580
581         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
582         if (rc != 0)
583                 GOTO(stop, rc);
584
585         rc = dt_declare_ref_del(env, parent, th);
586         if (rc != 0)
587                 GOTO(stop, rc);
588
589         rc = dt_trans_start_local(env, dev, th);
590         if (rc != 0)
591                 GOTO(stop, rc);
592
593         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
594         if (rc != 0)
595                 GOTO(stop, rc);
596
597         dt_write_lock(env, parent, 0);
598         rc = dt_ref_del(env, parent, th);
599         dt_write_unlock(env, parent);
600
601         GOTO(stop, rc);
602
603 stop:
604         dt_trans_stop(env, dev, th);
605
606 unlock:
607         lfsck_unlock(llh);
608
609         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
610                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
611
612         return rc;
613 }
614
615 static int lfsck_create_lpf_local(const struct lu_env *env,
616                                   struct lfsck_instance *lfsck,
617                                   struct dt_object *child,
618                                   struct lu_attr *la,
619                                   struct dt_object_format *dof,
620                                   const char *name)
621 {
622         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
623         struct dt_object        *parent = lfsck->li_lpf_root_obj;
624         struct dt_device        *dev    = lfsck_obj2dev(child);
625         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
626         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
627         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
628         struct thandle          *th     = NULL;
629         struct linkea_data       ldata  = { NULL };
630         struct lu_buf            linkea_buf;
631         const struct lu_name    *cname;
632         loff_t                   pos    = 0;
633         int                      len    = sizeof(struct lfsck_bookmark);
634         int                      rc;
635         ENTRY;
636
637         rc = linkea_data_new(&ldata,
638                              &lfsck_env_info(env)->lti_linkea_buf2);
639         if (rc != 0)
640                 RETURN(rc);
641
642         cname = lfsck_name_get_const(env, name, strlen(name));
643         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
644         if (rc != 0)
645                 RETURN(rc);
646
647         th = dt_trans_create(env, dev);
648         if (IS_ERR(th))
649                 RETURN(PTR_ERR(th));
650
651         /* 1a. create child */
652         rc = dt_declare_create(env, child, la, NULL, dof, th);
653         if (rc != 0)
654                 GOTO(stop, rc);
655
656         if (!dt_try_as_dir(env, child))
657                 GOTO(stop, rc = -ENOTDIR);
658
659         /* 2a. increase child nlink */
660         rc = dt_declare_ref_add(env, child, th);
661         if (rc != 0)
662                 GOTO(stop, rc);
663
664         /* 3a. insert dot into child dir */
665         rec->rec_type = S_IFDIR;
666         rec->rec_fid = cfid;
667         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
668                                (const struct dt_key *)dot, th);
669         if (rc != 0)
670                 GOTO(stop, rc);
671
672         /* 4a. insert dotdot into child dir */
673         rec->rec_fid = &LU_LPF_FID;
674         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
675                                (const struct dt_key *)dotdot, th);
676         if (rc != 0)
677                 GOTO(stop, rc);
678
679         /* 5a. insert linkEA for child */
680         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
681                        ldata.ld_leh->leh_len);
682         rc = dt_declare_xattr_set(env, child, &linkea_buf,
683                                   XATTR_NAME_LINK, 0, th);
684         if (rc != 0)
685                 GOTO(stop, rc);
686
687         /* 6a. insert name into parent dir */
688         rec->rec_type = S_IFDIR;
689         rec->rec_fid = cfid;
690         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
691                                (const struct dt_key *)name, th);
692         if (rc != 0)
693                 GOTO(stop, rc);
694
695         /* 7a. increase parent nlink */
696         rc = dt_declare_ref_add(env, parent, th);
697         if (rc != 0)
698                 GOTO(stop, rc);
699
700         /* 8a. update bookmark */
701         rc = dt_declare_record_write(env, bk_obj,
702                                      lfsck_buf_get(env, bk, len), 0, th);
703         if (rc != 0)
704                 GOTO(stop, rc);
705
706         rc = dt_trans_start_local(env, dev, th);
707         if (rc != 0)
708                 GOTO(stop, rc);
709
710         dt_write_lock(env, child, 0);
711         /* 1b. create child */
712         rc = dt_create(env, child, la, NULL, dof, th);
713         if (rc != 0)
714                 GOTO(unlock, rc);
715
716         /* 2b. increase child nlink */
717         rc = dt_ref_add(env, child, th);
718         if (rc != 0)
719                 GOTO(unlock, rc);
720
721         /* 3b. insert dot into child dir */
722         rec->rec_fid = cfid;
723         rc = dt_insert(env, child, (const struct dt_rec *)rec,
724                        (const struct dt_key *)dot, th, 1);
725         if (rc != 0)
726                 GOTO(unlock, rc);
727
728         /* 4b. insert dotdot into child dir */
729         rec->rec_fid = &LU_LPF_FID;
730         rc = dt_insert(env, child, (const struct dt_rec *)rec,
731                        (const struct dt_key *)dotdot, th, 1);
732         if (rc != 0)
733                 GOTO(unlock, rc);
734
735         /* 5b. insert linkEA for child. */
736         rc = dt_xattr_set(env, child, &linkea_buf,
737                           XATTR_NAME_LINK, 0, th);
738         dt_write_unlock(env, child);
739         if (rc != 0)
740                 GOTO(stop, rc);
741
742         /* 6b. insert name into parent dir */
743         rec->rec_fid = cfid;
744         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
745                        (const struct dt_key *)name, th, 1);
746         if (rc != 0)
747                 GOTO(stop, rc);
748
749         dt_write_lock(env, parent, 0);
750         /* 7b. increase parent nlink */
751         rc = dt_ref_add(env, parent, th);
752         dt_write_unlock(env, parent);
753         if (rc != 0)
754                 GOTO(stop, rc);
755
756         bk->lb_lpf_fid = *cfid;
757         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
758
759         /* 8b. update bookmark */
760         rc = dt_record_write(env, bk_obj,
761                              lfsck_buf_get(env, bk, len), &pos, th);
762
763         GOTO(stop, rc);
764
765 unlock:
766         dt_write_unlock(env, child);
767
768 stop:
769         dt_trans_stop(env, dev, th);
770
771         return rc;
772 }
773
774 static int lfsck_create_lpf_remote(const struct lu_env *env,
775                                    struct lfsck_instance *lfsck,
776                                    struct dt_object *child,
777                                    struct lu_attr *la,
778                                    struct dt_object_format *dof,
779                                    const char *name)
780 {
781         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
782         struct dt_object        *parent = lfsck->li_lpf_root_obj;
783         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
784         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
785         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
786         struct thandle          *th     = NULL;
787         struct linkea_data       ldata  = { NULL };
788         struct lu_buf            linkea_buf;
789         const struct lu_name    *cname;
790         struct dt_device        *dev;
791         loff_t                   pos    = 0;
792         int                      len    = sizeof(struct lfsck_bookmark);
793         int                      rc;
794         ENTRY;
795
796         rc = linkea_data_new(&ldata,
797                              &lfsck_env_info(env)->lti_linkea_buf2);
798         if (rc != 0)
799                 RETURN(rc);
800
801         cname = lfsck_name_get_const(env, name, strlen(name));
802         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
803         if (rc != 0)
804                 RETURN(rc);
805
806         /* Create .lustre/lost+found/MDTxxxx. */
807
808         /* XXX: Currently, cross-MDT create operation needs to create the child
809          *      object firstly, then insert name into the parent directory. For
810          *      this case, the child object resides on current MDT (local), but
811          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
812          *      easy to contain all the sub-modifications orderly within single
813          *      transaction.
814          *
815          *      To avoid more inconsistency, we split the create operation into
816          *      two transactions:
817          *
818          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
819          *         locally.
820          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
821          *         remotely.
822          *
823          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
824          *      repair such inconsistency when LFSCK run next time. */
825
826         /* Transaction I: locally */
827
828         dev = lfsck_obj2dev(child);
829         th = dt_trans_create(env, dev);
830         if (IS_ERR(th))
831                 RETURN(PTR_ERR(th));
832
833         /* 1a. create child */
834         rc = dt_declare_create(env, child, la, NULL, dof, th);
835         if (rc != 0)
836                 GOTO(stop, rc);
837
838         if (!dt_try_as_dir(env, child))
839                 GOTO(stop, rc = -ENOTDIR);
840
841         /* 2a. increase child nlink */
842         rc = dt_declare_ref_add(env, child, th);
843         if (rc != 0)
844                 GOTO(stop, rc);
845
846         /* 3a. insert dot into child dir */
847         rec->rec_type = S_IFDIR;
848         rec->rec_fid = cfid;
849         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
850                                (const struct dt_key *)dot, th);
851         if (rc != 0)
852                 GOTO(stop, rc);
853
854         /* 4a. insert dotdot into child dir */
855         rec->rec_fid = &LU_LPF_FID;
856         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
857                                (const struct dt_key *)dotdot, th);
858         if (rc != 0)
859                 GOTO(stop, rc);
860
861         /* 5a. insert linkEA for child */
862         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
863                        ldata.ld_leh->leh_len);
864         rc = dt_declare_xattr_set(env, child, &linkea_buf,
865                                   XATTR_NAME_LINK, 0, th);
866         if (rc != 0)
867                 GOTO(stop, rc);
868
869         /* 6a. update bookmark */
870         rc = dt_declare_record_write(env, bk_obj,
871                                      lfsck_buf_get(env, bk, len), 0, th);
872         if (rc != 0)
873                 GOTO(stop, rc);
874
875         rc = dt_trans_start_local(env, dev, th);
876         if (rc != 0)
877                 GOTO(stop, rc);
878
879         dt_write_lock(env, child, 0);
880         /* 1b. create child */
881         rc = dt_create(env, child, la, NULL, dof, th);
882         if (rc != 0)
883                 GOTO(unlock, rc);
884
885         /* 2b. increase child nlink */
886         rc = dt_ref_add(env, child, th);
887         if (rc != 0)
888                 GOTO(unlock, rc);
889
890         /* 3b. insert dot into child dir */
891         rec->rec_type = S_IFDIR;
892         rec->rec_fid = cfid;
893         rc = dt_insert(env, child, (const struct dt_rec *)rec,
894                        (const struct dt_key *)dot, th, 1);
895         if (rc != 0)
896                 GOTO(unlock, rc);
897
898         /* 4b. insert dotdot into child dir */
899         rec->rec_fid = &LU_LPF_FID;
900         rc = dt_insert(env, child, (const struct dt_rec *)rec,
901                        (const struct dt_key *)dotdot, th, 1);
902         if (rc != 0)
903                 GOTO(unlock, rc);
904
905         /* 5b. insert linkEA for child */
906         rc = dt_xattr_set(env, child, &linkea_buf,
907                           XATTR_NAME_LINK, 0, th);
908         if (rc != 0)
909                 GOTO(unlock, rc);
910
911         bk->lb_lpf_fid = *cfid;
912         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
913
914         /* 6b. update bookmark */
915         rc = dt_record_write(env, bk_obj,
916                              lfsck_buf_get(env, bk, len), &pos, th);
917
918         dt_write_unlock(env, child);
919         dt_trans_stop(env, dev, th);
920         if (rc != 0)
921                 RETURN(rc);
922
923         /* Transaction II: remotely */
924
925         dev = lfsck_obj2dev(parent);
926         th = dt_trans_create(env, dev);
927         if (IS_ERR(th))
928                 RETURN(PTR_ERR(th));
929
930         th->th_sync = 1;
931         /* 5a. insert name into parent dir */
932         rec->rec_fid = cfid;
933         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
934                                (const struct dt_key *)name, th);
935         if (rc != 0)
936                 GOTO(stop, rc);
937
938         /* 6a. increase parent nlink */
939         rc = dt_declare_ref_add(env, parent, th);
940         if (rc != 0)
941                 GOTO(stop, rc);
942
943         rc = dt_trans_start_local(env, dev, th);
944         if (rc != 0)
945                 GOTO(stop, rc);
946
947         /* 5b. insert name into parent dir */
948         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
949                        (const struct dt_key *)name, th, 1);
950         if (rc != 0)
951                 GOTO(stop, rc);
952
953         dt_write_lock(env, parent, 0);
954         /* 6b. increase parent nlink */
955         rc = dt_ref_add(env, parent, th);
956         dt_write_unlock(env, parent);
957
958         GOTO(stop, rc);
959
960 unlock:
961         dt_write_unlock(env, child);
962 stop:
963         dt_trans_stop(env, dev, th);
964
965         if (rc != 0 && dev == lfsck_obj2dev(parent))
966                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
967                        "for orphans, but failed to insert the name %s "
968                        "to the .lustre/lost+found/. Such inconsistency "
969                        "will be repaired when LFSCK run next time: rc = %d\n",
970                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
971
972         return rc;
973 }
974
975 /**
976  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
977  *
978  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
979  * orphans and other uncertain inconsistent objects found during the
980  * LFSCK. Such directory will be created by the LFSCK engine on the
981  * local MDT before the LFSCK scanning.
982  *
983  * \param[in] env       pointer to the thread context
984  * \param[in] lfsck     pointer to the lfsck instance
985  *
986  * \retval              0 for success
987  * \retval              negative error number on failure
988  */
989 static int lfsck_create_lpf(const struct lu_env *env,
990                             struct lfsck_instance *lfsck)
991 {
992         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
993         struct lfsck_thread_info *info  = lfsck_env_info(env);
994         struct lu_fid            *cfid  = &info->lti_fid2;
995         struct lu_attr           *la    = &info->lti_la;
996         struct dt_object_format  *dof   = &info->lti_dof;
997         struct dt_object         *parent = lfsck->li_lpf_root_obj;
998         struct dt_object         *child = NULL;
999         struct lfsck_lock_handle *llh   = &info->lti_llh;
1000         char                      name[8];
1001         int                       node  = lfsck_dev_idx(lfsck);
1002         int                       rc    = 0;
1003         ENTRY;
1004
1005         LASSERT(lfsck->li_master);
1006         LASSERT(parent != NULL);
1007         LASSERT(lfsck->li_lpf_obj == NULL);
1008
1009         snprintf(name, 8, "MDT%04x", node);
1010         rc = lfsck_lock(env, lfsck, parent, name, llh,
1011                         MDS_INODELOCK_UPDATE, LCK_PW);
1012         if (rc != 0)
1013                 RETURN(rc);
1014
1015         if (fid_is_zero(&bk->lb_lpf_fid)) {
1016                 /* There is corner case that: in former LFSCK scanning we have
1017                  * created the .lustre/lost+found/MDTxxxx but failed to update
1018                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1019                  * it from MDT0 firstly. */
1020                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1021                                (const struct dt_key *)name);
1022                 if (rc != 0 && rc != -ENOENT)
1023                         GOTO(unlock, rc);
1024
1025                 if (rc == 0) {
1026                         bk->lb_lpf_fid = *cfid;
1027                         rc = lfsck_bookmark_store(env, lfsck);
1028                 } else {
1029                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1030                 }
1031                 if (rc != 0)
1032                         GOTO(unlock, rc);
1033         } else {
1034                 *cfid = bk->lb_lpf_fid;
1035         }
1036
1037         child = lfsck_object_find_bottom(env, lfsck, cfid);
1038         if (IS_ERR(child))
1039                 GOTO(unlock, rc = PTR_ERR(child));
1040
1041         if (dt_object_exists(child) != 0) {
1042                 if (unlikely(!dt_try_as_dir(env, child)))
1043                         rc = -ENOTDIR;
1044                 else
1045                         lfsck->li_lpf_obj = child;
1046
1047                 GOTO(unlock, rc);
1048         }
1049
1050         memset(la, 0, sizeof(*la));
1051         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
1052         la->la_mode = S_IFDIR | S_IRWXU;
1053         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1054                        LA_UID | LA_GID;
1055         memset(dof, 0, sizeof(*dof));
1056         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1057
1058         if (node == 0)
1059                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1060         else
1061                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1062         if (rc == 0)
1063                 lfsck->li_lpf_obj = child;
1064
1065         GOTO(unlock, rc);
1066
1067 unlock:
1068         lfsck_unlock(llh);
1069         if (rc != 0 && child != NULL && !IS_ERR(child))
1070                 lfsck_object_put(env, child);
1071
1072         return rc;
1073 }
1074
1075 /**
1076  * Scan .lustre/lost+found for bad name entries and remove them.
1077  *
1078  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1079  * index in the system. Any other formatted name is invalid and should be
1080  * removed.
1081  *
1082  * \param[in] env       pointer to the thread context
1083  * \param[in] lfsck     pointer to the lfsck instance
1084  *
1085  * \retval              0 for success
1086  * \retval              negative error number on failure
1087  */
1088 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1089                                       struct lfsck_instance *lfsck)
1090 {
1091         struct dt_object        *parent = lfsck->li_lpf_root_obj;
1092         struct lu_dirent        *ent    =
1093                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1094         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
1095         struct dt_it            *it;
1096         int                      rc;
1097         ENTRY;
1098
1099         it = iops->init(env, parent, LUDA_64BITHASH);
1100         if (IS_ERR(it))
1101                 RETURN(PTR_ERR(it));
1102
1103         rc = iops->load(env, it, 0);
1104         if (rc == 0)
1105                 rc = iops->next(env, it);
1106         else if (rc > 0)
1107                 rc = 0;
1108
1109         while (rc == 0) {
1110                 int off = 3;
1111
1112                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1113                 if (rc != 0)
1114                         break;
1115
1116                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1117                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1118                         goto next;
1119
1120                 /* name length must be strlen("MDTxxxx") */
1121                 if (ent->lde_namelen != 7)
1122                         goto remove;
1123
1124                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1125                         goto remove;
1126
1127                 while (off < 7 && isxdigit(ent->lde_name[off]))
1128                         off++;
1129
1130                 if (off != 7) {
1131
1132 remove:
1133                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1134                                                          ent->lde_name);
1135                         if (rc != 0)
1136                                 break;
1137                 }
1138
1139 next:
1140                 rc = iops->next(env, it);
1141         }
1142
1143         iops->put(env, it);
1144         iops->fini(env, it);
1145
1146         RETURN(rc > 0 ? 0 : rc);
1147 }
1148
1149 static int lfsck_update_lpf_entry(const struct lu_env *env,
1150                                   struct lfsck_instance *lfsck,
1151                                   struct dt_object *parent,
1152                                   struct dt_object *child,
1153                                   const char *name,
1154                                   enum lfsck_verify_lpf_types type)
1155 {
1156         int rc;
1157
1158         if (type == LVLT_BY_BOOKMARK) {
1159                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1160                                              lfsck_dto2fid(child), S_IFDIR);
1161         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1162                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1163                 rc = lfsck_bookmark_store(env, lfsck);
1164
1165                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1166                        " in the bookmark file: rc = %d\n",
1167                        lfsck_lfsck2name(lfsck),
1168                        PFID(lfsck_dto2fid(child)), rc);
1169         }
1170
1171         return rc;
1172 }
1173
1174 /**
1175  * Check whether the @child back references the @parent.
1176  *
1177  * Two cases:
1178  * 1) The child's FID is stored in the bookmark file. If the child back
1179  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1180  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1181  *    the child back references another parent2, then:
1182  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1183  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1184  *      references the child. So keep them there. As the LFSCK processing,
1185  *      the parent3 may be found, then when the LFSCK run next time, the
1186  *      inconsistency can be repaired.
1187  *
1188  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1189  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1190  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1191  *    back references another parent2, then:
1192  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1193  *      from .lustre/lost+found/;
1194  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1195  *      sub-directory name entry and update the child;
1196  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1197  *      or not, then keep them there.
1198  *
1199  * \param[in] env       pointer to the thread context
1200  * \param[in] lfsck     pointer to the lfsck instance
1201  * \param[in] child     pointer to the lost+found sub-directory object
1202  * \param[in] name      the name for lost+found sub-directory object
1203  * \param[out] fid      pointer to the buffer to hold the FID of the object
1204  *                      (called it as parent2) that is referenced via the
1205  *                      child's dotdot entry; it also can be the FID that
1206  *                      is referenced by the name entry under the parent2.
1207  * \param[in] type      to indicate where the child's FID is stored in
1208  *
1209  * \retval              positive number for uncertain inconsistency
1210  * \retval              0 for success
1211  * \retval              negative error number on failure
1212  */
1213 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1214                                   struct lfsck_instance *lfsck,
1215                                   struct dt_object *child, const char *name,
1216                                   struct lu_fid *fid,
1217                                   enum lfsck_verify_lpf_types type)
1218 {
1219         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1220         struct lfsck_thread_info *info    = lfsck_env_info(env);
1221         char                     *name2   = info->lti_key;
1222         struct lu_fid            *fid2    = &info->lti_fid3;
1223         struct dt_object         *parent2 = NULL;
1224         struct lustre_handle      lh      = { 0 };
1225         int                       rc;
1226         ENTRY;
1227
1228         fid_zero(fid);
1229         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1230                        (const struct dt_key *)dotdot);
1231         if (rc != 0)
1232                 GOTO(linkea, rc);
1233
1234         if (!fid_is_sane(fid))
1235                 GOTO(linkea, rc = -EINVAL);
1236
1237         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1238                 const struct lu_name *cname;
1239
1240                 if (lfsck->li_lpf_obj == NULL) {
1241                         lu_object_get(&child->do_lu);
1242                         lfsck->li_lpf_obj = child;
1243                 }
1244
1245                 cname = lfsck_name_get_const(env, name, strlen(name));
1246                 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1247                 if (rc == 0)
1248                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1249                                                     name, type);
1250
1251                 GOTO(out_done, rc);
1252         }
1253
1254         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1255         if (IS_ERR(parent2))
1256                 GOTO(linkea, parent2);
1257
1258         if (!dt_object_exists(parent2)) {
1259                 lfsck_object_put(env, parent2);
1260
1261                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1262         }
1263
1264         if (!dt_try_as_dir(env, parent2)) {
1265                 lfsck_object_put(env, parent2);
1266
1267                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1268         }
1269
1270 linkea:
1271         /* To prevent rename/unlink race */
1272         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1273                               MDS_INODELOCK_UPDATE, LCK_PR);
1274         if (rc != 0)
1275                 GOTO(out_put, rc);
1276
1277         dt_read_lock(env, child, 0);
1278         rc = lfsck_links_get_first(env, child, name2, fid2);
1279         if (rc != 0) {
1280                 dt_read_unlock(env, child);
1281                 lfsck_ibits_unlock(&lh, LCK_PR);
1282
1283                 GOTO(out_put, rc = 1);
1284         }
1285
1286         /* It is almost impossible that the bookmark file (or the name entry)
1287          * and the linkEA hit the same data corruption. Trust the linkEA. */
1288         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1289                 dt_read_unlock(env, child);
1290                 lfsck_ibits_unlock(&lh, LCK_PR);
1291
1292                 *fid = *fid2;
1293                 if (lfsck->li_lpf_obj == NULL) {
1294                         lu_object_get(&child->do_lu);
1295                         lfsck->li_lpf_obj = child;
1296                 }
1297
1298                 /* Update the child's dotdot entry */
1299                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1300                                              &LU_LPF_FID, S_IFDIR);
1301                 if (rc == 0)
1302                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1303                                                     name, type);
1304
1305                 GOTO(out_put, rc);
1306         }
1307
1308         if (parent2 == NULL || IS_ERR(parent2)) {
1309                 dt_read_unlock(env, child);
1310                 lfsck_ibits_unlock(&lh, LCK_PR);
1311
1312                 GOTO(out_done, rc = 1);
1313         }
1314
1315         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1316                        (const struct dt_key *)name2);
1317         dt_read_unlock(env, child);
1318         lfsck_ibits_unlock(&lh, LCK_PR);
1319         if (rc != 0 && rc != -ENOENT)
1320                 GOTO(out_put, rc);
1321
1322         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1323                 if (type == LVLT_BY_BOOKMARK)
1324                         GOTO(out_put, rc = 1);
1325
1326                 /* Trust the name entry, update the child's dotdot entry. */
1327                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1328                                              &LU_LPF_FID, S_IFDIR);
1329
1330                 GOTO(out_put, rc);
1331         }
1332
1333         if (type == LVLT_BY_BOOKMARK) {
1334                 /* Invalid FID record in the bookmark file, reset it. */
1335                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1336                 rc = lfsck_bookmark_store(env, lfsck);
1337
1338                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1339                        " in the bookmark file: rc = %d\n",
1340                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1341         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1342                 /* The name entry is wrong, remove it. */
1343                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1344         }
1345
1346         GOTO(out_put, rc);
1347
1348 out_put:
1349         if (parent2 != NULL && !IS_ERR(parent2))
1350                 lfsck_object_put(env, parent2);
1351
1352 out_done:
1353         return rc;
1354 }
1355
1356 /**
1357  * Verify the /ROOT/.lustre/lost+found/ directory.
1358  *
1359  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1360  * the LFSCK does not exactly know how to handle, such as orphans. So before
1361  * the LFSCK scanning the system, the consistency of such directory needs to
1362  * be verified firstly to allow the users to use it during the LFSCK.
1363  *
1364  * \param[in] env       pointer to the thread context
1365  * \param[in] lfsck     pointer to the lfsck instance
1366  *
1367  * \retval              positive number for uncertain inconsistency
1368  * \retval              0 for success
1369  * \retval              negative error number on failure
1370  */
1371 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1372 {
1373         struct lfsck_thread_info *info   = lfsck_env_info(env);
1374         struct lu_fid            *pfid   = &info->lti_fid;
1375         struct lu_fid            *cfid   = &info->lti_fid2;
1376         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1377         struct dt_object         *parent;
1378         /* child1's FID is in the bookmark file. */
1379         struct dt_object         *child1 = NULL;
1380         /* child2's FID is in the name entry MDTxxxx. */
1381         struct dt_object         *child2 = NULL;
1382         const struct lu_name     *cname;
1383         char                      name[8];
1384         int                       node   = lfsck_dev_idx(lfsck);
1385         int                       rc     = 0;
1386         ENTRY;
1387
1388         LASSERT(lfsck->li_master);
1389
1390         if (lfsck->li_lpf_root_obj != NULL)
1391                 RETURN(0);
1392
1393         if (node == 0) {
1394                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1395                                                   &LU_LPF_FID);
1396         } else {
1397                 struct lfsck_tgt_desc *ltd;
1398
1399                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1400                 if (unlikely(ltd == NULL))
1401                         RETURN(-ENXIO);
1402
1403                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1404                                                   &LU_LPF_FID);
1405                 lfsck_tgt_put(ltd);
1406         }
1407
1408         if (IS_ERR(parent))
1409                 RETURN(PTR_ERR(parent));
1410
1411         LASSERT(dt_object_exists(parent));
1412
1413         if (unlikely(!dt_try_as_dir(env, parent))) {
1414                 lfsck_object_put(env, parent);
1415
1416                 GOTO(put, rc = -ENOTDIR);
1417         }
1418
1419         lfsck->li_lpf_root_obj = parent;
1420         if (node == 0) {
1421                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1422                 if (rc != 0)
1423                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1424                                "for bad sub-directories: rc = %d\n",
1425                                lfsck_lfsck2name(lfsck), rc);
1426         }
1427
1428         /* child2 */
1429         snprintf(name, 8, "MDT%04x", node);
1430         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1431                        (const struct dt_key *)name);
1432         if (rc == -ENOENT) {
1433                 rc = 0;
1434                 goto find_child1;
1435         }
1436
1437         if (rc != 0)
1438                 GOTO(put, rc);
1439
1440         /* Invalid FID in the name entry, remove the name entry. */
1441         if (!fid_is_norm(cfid)) {
1442                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1443                 if (rc != 0)
1444                         GOTO(put, rc);
1445
1446                 goto find_child1;
1447         }
1448
1449         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1450         if (IS_ERR(child2))
1451                 GOTO(put, rc = PTR_ERR(child2));
1452
1453         if (unlikely(!dt_object_exists(child2) ||
1454                      dt_object_remote(child2)) ||
1455                      !S_ISDIR(lfsck_object_type(child2))) {
1456                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1457                 if (rc != 0)
1458                         GOTO(put, rc);
1459
1460                 goto find_child1;
1461         }
1462
1463         if (unlikely(!dt_try_as_dir(env, child2))) {
1464                 lfsck_object_put(env, child2);
1465                 child2 = NULL;
1466                 rc = -ENOTDIR;
1467         }
1468
1469 find_child1:
1470         if (fid_is_zero(&bk->lb_lpf_fid))
1471                 goto check_child2;
1472
1473         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1474                 if (lfsck->li_lpf_obj == NULL) {
1475                         lu_object_get(&child2->do_lu);
1476                         lfsck->li_lpf_obj = child2;
1477                 }
1478
1479                 cname = lfsck_name_get_const(env, name, strlen(name));
1480                 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1481
1482                 GOTO(put, rc);
1483         }
1484
1485         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1486                 struct lu_fid tfid = bk->lb_lpf_fid;
1487
1488                 /* Invalid FID record in the bookmark file, reset it. */
1489                 fid_zero(&bk->lb_lpf_fid);
1490                 rc = lfsck_bookmark_store(env, lfsck);
1491
1492                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1493                        " in the bookmark file: rc = %d\n",
1494                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1495
1496                 if (rc != 0)
1497                         GOTO(put, rc);
1498
1499                 goto check_child2;
1500         }
1501
1502         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1503         if (IS_ERR(child1)) {
1504                 child1 = NULL;
1505                 goto check_child2;
1506         }
1507
1508         if (unlikely(!dt_object_exists(child1) ||
1509                      dt_object_remote(child1)) ||
1510                      !S_ISDIR(lfsck_object_type(child1))) {
1511                 /* Invalid FID record in the bookmark file, reset it. */
1512                 fid_zero(&bk->lb_lpf_fid);
1513                 rc = lfsck_bookmark_store(env, lfsck);
1514
1515                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1516                        " in the bookmark file: rc = %d\n",
1517                        lfsck_lfsck2name(lfsck),
1518                        PFID(lfsck_dto2fid(child1)), rc);
1519
1520                 if (rc != 0)
1521                         GOTO(put, rc);
1522
1523                 lfsck_object_put(env, child1);
1524                 child1 = NULL;
1525                 goto check_child2;
1526         }
1527
1528         if (unlikely(!dt_try_as_dir(env, child1))) {
1529                 lfsck_object_put(env, child1);
1530                 child1 = NULL;
1531                 rc = -ENOTDIR;
1532                 goto check_child2;
1533         }
1534
1535         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1536                                     LVLT_BY_BOOKMARK);
1537         if (lu_fid_eq(pfid, &LU_LPF_FID))
1538                 GOTO(put, rc);
1539
1540 check_child2:
1541         if (child2 != NULL)
1542                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1543                                             pfid, LVLT_BY_NAMEENTRY);
1544
1545         GOTO(put, rc);
1546
1547 put:
1548         if (lfsck->li_lpf_obj != NULL) {
1549                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1550                         lfsck_object_put(env, lfsck->li_lpf_obj);
1551                         lfsck->li_lpf_obj = NULL;
1552                         rc = -ENOTDIR;
1553                 }
1554         } else if (rc == 0) {
1555                 rc = lfsck_create_lpf(env, lfsck);
1556         }
1557
1558         if (child2 != NULL && !IS_ERR(child2))
1559                 lfsck_object_put(env, child2);
1560         if (child1 != NULL && !IS_ERR(child1))
1561                 lfsck_object_put(env, child1);
1562
1563         return rc;
1564 }
1565
1566 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1567 {
1568         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1569         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1570         char                    *prefix;
1571         int                      rc     = 0;
1572         ENTRY;
1573
1574         if (unlikely(ss == NULL))
1575                 RETURN(-ENXIO);
1576
1577         OBD_ALLOC_PTR(lfsck->li_seq);
1578         if (lfsck->li_seq == NULL)
1579                 RETURN(-ENOMEM);
1580
1581         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1582         if (prefix == NULL)
1583                 GOTO(out, rc = -ENOMEM);
1584
1585         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1586         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1587                              ss->ss_server_seq);
1588         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1589         if (rc != 0)
1590                 GOTO(out, rc);
1591
1592         if (fid_is_sane(&bk->lb_last_fid))
1593                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1594
1595         RETURN(0);
1596
1597 out:
1598         OBD_FREE_PTR(lfsck->li_seq);
1599         lfsck->li_seq = NULL;
1600
1601         return rc;
1602 }
1603
1604 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1605 {
1606         if (lfsck->li_seq != NULL) {
1607                 seq_client_fini(lfsck->li_seq);
1608                 OBD_FREE_PTR(lfsck->li_seq);
1609                 lfsck->li_seq = NULL;
1610         }
1611 }
1612
1613 void lfsck_instance_cleanup(const struct lu_env *env,
1614                             struct lfsck_instance *lfsck)
1615 {
1616         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1617         struct lfsck_component  *com;
1618         struct lfsck_component  *next;
1619         struct lfsck_lmv_unit   *llu;
1620         struct lfsck_lmv_unit   *llu_next;
1621         struct lfsck_lmv        *llmv;
1622         ENTRY;
1623
1624         LASSERT(list_empty(&lfsck->li_link));
1625         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1626
1627         if (lfsck->li_obj_oit != NULL) {
1628                 lfsck_object_put(env, lfsck->li_obj_oit);
1629                 lfsck->li_obj_oit = NULL;
1630         }
1631
1632         LASSERT(lfsck->li_obj_dir == NULL);
1633         LASSERT(lfsck->li_lmv == NULL);
1634
1635         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1636                 llmv = &llu->llu_lmv;
1637
1638                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1639                          "still in using: %u\n",
1640                          atomic_read(&llmv->ll_ref));
1641
1642                 lfsck_lmv_put(env, llmv);
1643         }
1644
1645         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1646                 lfsck_component_cleanup(env, com);
1647         }
1648
1649         LASSERT(list_empty(&lfsck->li_list_dir));
1650
1651         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1652                                  lc_link) {
1653                 lfsck_component_cleanup(env, com);
1654         }
1655
1656         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1657                 lfsck_component_cleanup(env, com);
1658         }
1659
1660         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1661         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1662
1663         if (lfsck->li_lfsck_dir != NULL) {
1664                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1665                 lfsck->li_lfsck_dir = NULL;
1666         }
1667
1668         if (lfsck->li_bookmark_obj != NULL) {
1669                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1670                 lfsck->li_bookmark_obj = NULL;
1671         }
1672
1673         if (lfsck->li_lpf_obj != NULL) {
1674                 lfsck_object_put(env, lfsck->li_lpf_obj);
1675                 lfsck->li_lpf_obj = NULL;
1676         }
1677
1678         if (lfsck->li_lpf_root_obj != NULL) {
1679                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1680                 lfsck->li_lpf_root_obj = NULL;
1681         }
1682
1683         if (lfsck->li_los != NULL) {
1684                 local_oid_storage_fini(env, lfsck->li_los);
1685                 lfsck->li_los = NULL;
1686         }
1687
1688         lfsck_fid_fini(lfsck);
1689
1690         OBD_FREE_PTR(lfsck);
1691 }
1692
1693 static inline struct lfsck_instance *
1694 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1695 {
1696         struct lfsck_instance *lfsck;
1697
1698         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1699                 if (lfsck->li_bottom == key) {
1700                         if (ref)
1701                                 lfsck_instance_get(lfsck);
1702                         if (unlink)
1703                                 list_del_init(&lfsck->li_link);
1704
1705                         return lfsck;
1706                 }
1707         }
1708
1709         return NULL;
1710 }
1711
1712 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1713                                            bool unlink)
1714 {
1715         struct lfsck_instance *lfsck;
1716
1717         spin_lock(&lfsck_instance_lock);
1718         lfsck = __lfsck_instance_find(key, ref, unlink);
1719         spin_unlock(&lfsck_instance_lock);
1720
1721         return lfsck;
1722 }
1723
1724 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1725 {
1726         struct lfsck_instance *tmp;
1727
1728         spin_lock(&lfsck_instance_lock);
1729         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1730                 if (lfsck->li_bottom == tmp->li_bottom) {
1731                         spin_unlock(&lfsck_instance_lock);
1732                         return -EEXIST;
1733                 }
1734         }
1735
1736         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1737         spin_unlock(&lfsck_instance_lock);
1738         return 0;
1739 }
1740
1741 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1742                     const char *prefix)
1743 {
1744         int flag;
1745         int i;
1746         bool newline = (bits != 0 ? false : true);
1747         int rc;
1748
1749         rc = seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1750         if (rc < 0)
1751                 return rc;
1752
1753         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1754                 if (flag & bits) {
1755                         bits &= ~flag;
1756                         if (names[i] != NULL) {
1757                                 if (bits == 0)
1758                                         newline = true;
1759
1760                                 rc = seq_printf(m, "%s%c", names[i],
1761                                                 newline ? '\n' : ',');
1762                                 if (rc < 0)
1763                                         return rc;
1764                         }
1765                 }
1766         }
1767
1768         if (!newline)
1769                 rc = seq_printf(m, "\n");
1770
1771         return rc;
1772 }
1773
1774 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *name)
1775 {
1776         int rc;
1777
1778         if (time == 0) {
1779                 rc = seq_printf(m, "%s_time: N/A\n", name);
1780                 if (rc == 0)
1781                         rc = seq_printf(m, "time_since_%s: N/A\n", name);
1782
1783                 return rc;
1784         }
1785
1786         rc = seq_printf(m, "%s_time: "LPU64"\n", name, time);
1787         if (rc == 0)
1788                 rc = seq_printf(m, "time_since_%s: "LPU64" seconds\n",
1789                                 name, cfs_time_current_sec() - time);
1790
1791         return rc;
1792 }
1793
1794 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1795                    const char *prefix)
1796 {
1797         if (fid_is_zero(&pos->lp_dir_parent)) {
1798                 if (pos->lp_oit_cookie == 0)
1799                         return seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1800
1801                 return seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1802                                   prefix, pos->lp_oit_cookie);
1803         }
1804
1805         return seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1806                           prefix, pos->lp_oit_cookie,
1807                           PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1808 }
1809
1810 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1811                     struct lfsck_position *pos, bool init)
1812 {
1813         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1814
1815         if (unlikely(lfsck->li_di_oit == NULL)) {
1816                 memset(pos, 0, sizeof(*pos));
1817                 return;
1818         }
1819
1820         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1821         if (!lfsck->li_current_oit_processed && !init)
1822                 pos->lp_oit_cookie--;
1823
1824         LASSERT(pos->lp_oit_cookie > 0);
1825
1826         if (lfsck->li_di_dir != NULL) {
1827                 struct dt_object *dto = lfsck->li_obj_dir;
1828
1829                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1830                                                         lfsck->li_di_dir);
1831
1832                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1833                         fid_zero(&pos->lp_dir_parent);
1834                         pos->lp_dir_cookie = 0;
1835                 } else {
1836                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1837                 }
1838         } else {
1839                 fid_zero(&pos->lp_dir_parent);
1840                 pos->lp_dir_cookie = 0;
1841         }
1842 }
1843
1844 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1845 {
1846         bool dirty = false;
1847
1848         if (limit != LFSCK_SPEED_NO_LIMIT) {
1849                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1850                         lfsck->li_sleep_rate = limit /
1851                                                msecs_to_jiffies(MSEC_PER_SEC);
1852                         lfsck->li_sleep_jif = 1;
1853                 } else {
1854                         lfsck->li_sleep_rate = 1;
1855                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1856                                               limit;
1857                 }
1858         } else {
1859                 lfsck->li_sleep_jif = 0;
1860                 lfsck->li_sleep_rate = 0;
1861         }
1862
1863         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1864                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1865                 dirty = true;
1866         }
1867
1868         return dirty;
1869 }
1870
1871 void lfsck_control_speed(struct lfsck_instance *lfsck)
1872 {
1873         struct ptlrpc_thread *thread = &lfsck->li_thread;
1874         struct l_wait_info    lwi;
1875
1876         if (lfsck->li_sleep_jif > 0 &&
1877             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1878                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1879                                        LWI_ON_SIGNAL_NOOP, NULL);
1880
1881                 l_wait_event(thread->t_ctl_waitq,
1882                              !thread_is_running(thread),
1883                              &lwi);
1884                 lfsck->li_new_scanned = 0;
1885         }
1886 }
1887
1888 void lfsck_control_speed_by_self(struct lfsck_component *com)
1889 {
1890         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1891         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1892         struct l_wait_info       lwi;
1893
1894         if (lfsck->li_sleep_jif > 0 &&
1895             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1896                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1897                                        LWI_ON_SIGNAL_NOOP, NULL);
1898
1899                 l_wait_event(thread->t_ctl_waitq,
1900                              !thread_is_running(thread),
1901                              &lwi);
1902                 com->lc_new_scanned = 0;
1903         }
1904 }
1905
1906 static struct lfsck_thread_args *
1907 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1908                        struct lfsck_component *com,
1909                        struct lfsck_start_param *lsp)
1910 {
1911         struct lfsck_thread_args *lta;
1912         int                       rc;
1913
1914         OBD_ALLOC_PTR(lta);
1915         if (lta == NULL)
1916                 return ERR_PTR(-ENOMEM);
1917
1918         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1919         if (rc != 0) {
1920                 OBD_FREE_PTR(lta);
1921                 return ERR_PTR(rc);
1922         }
1923
1924         lta->lta_lfsck = lfsck_instance_get(lfsck);
1925         if (com != NULL)
1926                 lta->lta_com = lfsck_component_get(com);
1927
1928         lta->lta_lsp = lsp;
1929
1930         return lta;
1931 }
1932
1933 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1934 {
1935         if (lta->lta_com != NULL)
1936                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1937         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1938         lu_env_fini(&lta->lta_env);
1939         OBD_FREE_PTR(lta);
1940 }
1941
1942 struct lfsck_assistant_data *
1943 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1944                           const char *name)
1945 {
1946         struct lfsck_assistant_data *lad;
1947
1948         OBD_ALLOC_PTR(lad);
1949         if (lad != NULL) {
1950                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1951                 if (lad->lad_bitmap == NULL) {
1952                         OBD_FREE_PTR(lad);
1953                         return NULL;
1954                 }
1955
1956                 INIT_LIST_HEAD(&lad->lad_req_list);
1957                 spin_lock_init(&lad->lad_lock);
1958                 INIT_LIST_HEAD(&lad->lad_ost_list);
1959                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1960                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1961                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1962                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1963                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1964                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1965                 lad->lad_ops = lao;
1966                 lad->lad_name = name;
1967         }
1968
1969         return lad;
1970 }
1971
1972 struct lfsck_assistant_object *
1973 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1974                             const struct lu_attr *attr, __u64 cookie,
1975                             bool is_dir)
1976 {
1977         struct lfsck_assistant_object   *lso;
1978
1979         OBD_ALLOC_PTR(lso);
1980         if (lso == NULL)
1981                 return ERR_PTR(-ENOMEM);
1982
1983         lso->lso_fid = *fid;
1984         if (attr != NULL)
1985                 lso->lso_attr = *attr;
1986
1987         atomic_set(&lso->lso_ref, 1);
1988         lso->lso_oit_cookie = cookie;
1989         if (is_dir)
1990                 lso->lso_is_dir = 1;
1991
1992         return lso;
1993 }
1994
1995 struct dt_object *
1996 lfsck_assistant_object_load(const struct lu_env *env,
1997                             struct lfsck_instance *lfsck,
1998                             struct lfsck_assistant_object *lso)
1999 {
2000         struct dt_object *obj;
2001
2002         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2003         if (IS_ERR(obj))
2004                 return obj;
2005
2006         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2007                 lso->lso_dead = 1;
2008                 lfsck_object_put(env, obj);
2009
2010                 return ERR_PTR(-ENOENT);
2011         }
2012
2013         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) {
2014                 lfsck_object_put(env, obj);
2015
2016                 return ERR_PTR(-ENOTDIR);
2017         }
2018
2019         return obj;
2020 }
2021
2022 /**
2023  * Generic LFSCK asynchronous communication interpretor function.
2024  * The LFSCK RPC reply for both the event notification and status
2025  * querying will be handled here.
2026  *
2027  * \param[in] env       pointer to the thread context
2028  * \param[in] req       pointer to the LFSCK request
2029  * \param[in] args      pointer to the lfsck_async_interpret_args
2030  * \param[in] rc        the result for handling the LFSCK request
2031  *
2032  * \retval              0 for success
2033  * \retval              negative error number on failure
2034  */
2035 int lfsck_async_interpret_common(const struct lu_env *env,
2036                                  struct ptlrpc_request *req,
2037                                  void *args, int rc)
2038 {
2039         struct lfsck_async_interpret_args *laia = args;
2040         struct lfsck_component            *com  = laia->laia_com;
2041         struct lfsck_assistant_data       *lad  = com->lc_data;
2042         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
2043         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
2044         struct lfsck_request              *lr   = laia->laia_lr;
2045
2046         LASSERT(com->lc_lfsck->li_master);
2047
2048         switch (lr->lr_event) {
2049         case LE_START:
2050                 if (rc != 0) {
2051                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2052                                "start: rc = %d\n",
2053                                lfsck_lfsck2name(com->lc_lfsck),
2054                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2055                                ltd->ltd_index, lad->lad_name, rc);
2056
2057                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2058                                 struct lfsck_layout *lo = com->lc_file_ram;
2059
2060                                 if (lr->lr_flags & LEF_TO_OST)
2061                                         lfsck_lad_set_bitmap(env, com,
2062                                                              ltd->ltd_index);
2063                                 else
2064                                         lo->ll_flags |= LF_INCOMPLETE;
2065                         } else {
2066                                 struct lfsck_namespace *ns = com->lc_file_ram;
2067
2068                                 /* If some MDT does not join the namespace
2069                                  * LFSCK, then we cannot know whether there
2070                                  * is some name entry on such MDT that with
2071                                  * the referenced MDT-object on this MDT or
2072                                  * not. So the namespace LFSCK on this MDT
2073                                  * cannot handle orphan MDT-objects properly.
2074                                  * So we mark the LFSCK as LF_INCOMPLETE and
2075                                  * skip orphan MDT-objects handling. */
2076                                 ns->ln_flags |= LF_INCOMPLETE;
2077                         }
2078                         break;
2079                 }
2080
2081                 spin_lock(&ltds->ltd_lock);
2082                 if (ltd->ltd_dead) {
2083                         spin_unlock(&ltds->ltd_lock);
2084                         break;
2085                 }
2086
2087                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2088                         struct list_head *list;
2089                         struct list_head *phase_list;
2090
2091                         if (ltd->ltd_layout_done) {
2092                                 spin_unlock(&ltds->ltd_lock);
2093                                 break;
2094                         }
2095
2096                         if (lr->lr_flags & LEF_TO_OST) {
2097                                 list = &lad->lad_ost_list;
2098                                 phase_list = &lad->lad_ost_phase1_list;
2099                         } else {
2100                                 list = &lad->lad_mdt_list;
2101                                 phase_list = &lad->lad_mdt_phase1_list;
2102                         }
2103
2104                         if (list_empty(&ltd->ltd_layout_list))
2105                                 list_add_tail(&ltd->ltd_layout_list, list);
2106                         if (list_empty(&ltd->ltd_layout_phase_list))
2107                                 list_add_tail(&ltd->ltd_layout_phase_list,
2108                                               phase_list);
2109                 } else {
2110                         if (ltd->ltd_namespace_done) {
2111                                 spin_unlock(&ltds->ltd_lock);
2112                                 break;
2113                         }
2114
2115                         if (list_empty(&ltd->ltd_namespace_list))
2116                                 list_add_tail(&ltd->ltd_namespace_list,
2117                                               &lad->lad_mdt_list);
2118                         if (list_empty(&ltd->ltd_namespace_phase_list))
2119                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2120                                               &lad->lad_mdt_phase1_list);
2121                 }
2122                 spin_unlock(&ltds->ltd_lock);
2123                 break;
2124         case LE_STOP:
2125         case LE_PHASE1_DONE:
2126         case LE_PHASE2_DONE:
2127         case LE_PEER_EXIT:
2128                 if (rc != 0 && rc != -EALREADY)
2129                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2130                               "event = %d, rc = %d\n",
2131                               lfsck_lfsck2name(com->lc_lfsck),
2132                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2133                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2134                 break;
2135         case LE_QUERY: {
2136                 struct lfsck_reply *reply;
2137                 struct list_head *list;
2138                 struct list_head *phase_list;
2139
2140                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2141                         list = &ltd->ltd_layout_list;
2142                         phase_list = &ltd->ltd_layout_phase_list;
2143                 } else {
2144                         list = &ltd->ltd_namespace_list;
2145                         phase_list = &ltd->ltd_namespace_phase_list;
2146                 }
2147
2148                 if (rc != 0) {
2149                         if (lr->lr_flags & LEF_QUERY_ALL) {
2150                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2151                                 break;
2152                         }
2153
2154                         spin_lock(&ltds->ltd_lock);
2155                         list_del_init(phase_list);
2156                         list_del_init(list);
2157                         spin_unlock(&ltds->ltd_lock);
2158                         break;
2159                 }
2160
2161                 reply = req_capsule_server_get(&req->rq_pill,
2162                                                &RMF_LFSCK_REPLY);
2163                 if (reply == NULL) {
2164                         rc = -EPROTO;
2165                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2166                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2167                                lad->lad_name, rc);
2168
2169                         if (lr->lr_flags & LEF_QUERY_ALL) {
2170                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2171                                 break;
2172                         }
2173
2174                         spin_lock(&ltds->ltd_lock);
2175                         list_del_init(phase_list);
2176                         list_del_init(list);
2177                         spin_unlock(&ltds->ltd_lock);
2178                         break;
2179                 }
2180
2181                 if (lr->lr_flags & LEF_QUERY_ALL) {
2182                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2183                                 ltd->ltd_layout_status = reply->lr_status;
2184                                 ltd->ltd_layout_repaired = reply->lr_repaired;
2185                         } else {
2186                                 ltd->ltd_namespace_status = reply->lr_status;
2187                                 ltd->ltd_namespace_repaired =
2188                                                         reply->lr_repaired;
2189                         }
2190                         break;
2191                 }
2192
2193                 switch (reply->lr_status) {
2194                 case LS_SCANNING_PHASE1:
2195                         break;
2196                 case LS_SCANNING_PHASE2:
2197                         spin_lock(&ltds->ltd_lock);
2198                         list_del_init(phase_list);
2199                         if (ltd->ltd_dead) {
2200                                 spin_unlock(&ltds->ltd_lock);
2201                                 break;
2202                         }
2203
2204                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2205                                 if (ltd->ltd_layout_done) {
2206                                         spin_unlock(&ltds->ltd_lock);
2207                                         break;
2208                                 }
2209
2210                                 if (lr->lr_flags & LEF_TO_OST)
2211                                         list_add_tail(phase_list,
2212                                                 &lad->lad_ost_phase2_list);
2213                                 else
2214                                         list_add_tail(phase_list,
2215                                                 &lad->lad_mdt_phase2_list);
2216                         } else {
2217                                 if (ltd->ltd_namespace_done) {
2218                                         spin_unlock(&ltds->ltd_lock);
2219                                         break;
2220                                 }
2221
2222                                 list_add_tail(phase_list,
2223                                               &lad->lad_mdt_phase2_list);
2224                         }
2225                         spin_unlock(&ltds->ltd_lock);
2226                         break;
2227                 default:
2228                         spin_lock(&ltds->ltd_lock);
2229                         list_del_init(phase_list);
2230                         list_del_init(list);
2231                         spin_unlock(&ltds->ltd_lock);
2232                         break;
2233                 }
2234                 break;
2235         }
2236         default:
2237                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2238                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2239                 break;
2240         }
2241
2242         if (!laia->laia_shared) {
2243                 lfsck_tgt_put(ltd);
2244                 lfsck_component_put(env, com);
2245         }
2246
2247         return 0;
2248 }
2249
2250 static void lfsck_interpret(const struct lu_env *env,
2251                             struct lfsck_instance *lfsck,
2252                             struct ptlrpc_request *req, void *args, int result)
2253 {
2254         struct lfsck_async_interpret_args *laia = args;
2255         struct lfsck_component            *com;
2256
2257         LASSERT(laia->laia_com == NULL);
2258         LASSERT(laia->laia_shared);
2259
2260         spin_lock(&lfsck->li_lock);
2261         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2262                 laia->laia_com = com;
2263                 lfsck_async_interpret_common(env, req, laia, result);
2264         }
2265
2266         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2267                 laia->laia_com = com;
2268                 lfsck_async_interpret_common(env, req, laia, result);
2269         }
2270         spin_unlock(&lfsck->li_lock);
2271 }
2272
2273 static int lfsck_stop_notify(const struct lu_env *env,
2274                              struct lfsck_instance *lfsck,
2275                              struct lfsck_tgt_descs *ltds,
2276                              struct lfsck_tgt_desc *ltd, __u16 type)
2277 {
2278         struct lfsck_component *com;
2279         int                     rc = 0;
2280         ENTRY;
2281
2282         LASSERT(lfsck->li_master);
2283
2284         spin_lock(&lfsck->li_lock);
2285         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2286         if (com == NULL)
2287                 com = __lfsck_component_find(lfsck, type,
2288                                              &lfsck->li_list_double_scan);
2289         if (com != NULL)
2290                 lfsck_component_get(com);
2291         spin_unlock(&lfsck->li_lock);
2292
2293         if (com != NULL) {
2294                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2295                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2296                 struct lfsck_request              *lr    = &info->lti_lr;
2297                 struct lfsck_assistant_data       *lad   = com->lc_data;
2298                 struct list_head                  *list;
2299                 struct list_head                  *phase_list;
2300                 struct ptlrpc_request_set         *set;
2301
2302                 set = ptlrpc_prep_set();
2303                 if (set == NULL) {
2304                         lfsck_component_put(env, com);
2305
2306                         RETURN(-ENOMEM);
2307                 }
2308
2309                 if (type == LFSCK_TYPE_LAYOUT) {
2310                         list = &ltd->ltd_layout_list;
2311                         phase_list = &ltd->ltd_layout_phase_list;
2312                 } else {
2313                         list = &ltd->ltd_namespace_list;
2314                         phase_list = &ltd->ltd_namespace_phase_list;
2315                 }
2316
2317                 spin_lock(&ltds->ltd_lock);
2318                 if (list_empty(list)) {
2319                         LASSERT(list_empty(phase_list));
2320                         spin_unlock(&ltds->ltd_lock);
2321                         ptlrpc_set_destroy(set);
2322
2323                         RETURN(0);
2324                 }
2325
2326                 list_del_init(phase_list);
2327                 list_del_init(list);
2328                 spin_unlock(&ltds->ltd_lock);
2329
2330                 memset(lr, 0, sizeof(*lr));
2331                 lr->lr_index = lfsck_dev_idx(lfsck);
2332                 lr->lr_event = LE_PEER_EXIT;
2333                 lr->lr_active = type;
2334                 lr->lr_status = LS_CO_PAUSED;
2335                 if (ltds == &lfsck->li_ost_descs)
2336                         lr->lr_flags = LEF_TO_OST;
2337
2338                 memset(laia, 0, sizeof(*laia));
2339                 laia->laia_com = com;
2340                 laia->laia_ltds = ltds;
2341                 atomic_inc(&ltd->ltd_ref);
2342                 laia->laia_ltd = ltd;
2343                 laia->laia_lr = lr;
2344
2345                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2346                                          lfsck_async_interpret_common,
2347                                          laia, LFSCK_NOTIFY);
2348                 if (rc != 0) {
2349                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2350                                "co-stop for %s: rc = %d\n",
2351                                lfsck_lfsck2name(lfsck),
2352                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2353                                ltd->ltd_index, lad->lad_name, rc);
2354                         lfsck_tgt_put(ltd);
2355                 } else {
2356                         rc = ptlrpc_set_wait(set);
2357                 }
2358
2359                 ptlrpc_set_destroy(set);
2360                 lfsck_component_put(env, com);
2361         }
2362
2363         RETURN(rc);
2364 }
2365
2366 static int lfsck_async_interpret(const struct lu_env *env,
2367                                  struct ptlrpc_request *req,
2368                                  void *args, int rc)
2369 {
2370         struct lfsck_async_interpret_args *laia = args;
2371         struct lfsck_instance             *lfsck;
2372
2373         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2374                               li_mdt_descs);
2375         lfsck_interpret(env, lfsck, req, laia, rc);
2376         lfsck_tgt_put(laia->laia_ltd);
2377         if (rc != 0 && laia->laia_result != -EALREADY)
2378                 laia->laia_result = rc;
2379
2380         return 0;
2381 }
2382
2383 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2384                         struct lfsck_request *lr,
2385                         struct ptlrpc_request_set *set,
2386                         ptlrpc_interpterer_t interpreter,
2387                         void *args, int request)
2388 {
2389         struct lfsck_async_interpret_args *laia;
2390         struct ptlrpc_request             *req;
2391         struct lfsck_request              *tmp;
2392         struct req_format                 *format;
2393         int                                rc;
2394
2395         switch (request) {
2396         case LFSCK_NOTIFY:
2397                 format = &RQF_LFSCK_NOTIFY;
2398                 break;
2399         case LFSCK_QUERY:
2400                 format = &RQF_LFSCK_QUERY;
2401                 break;
2402         default:
2403                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2404                        exp->exp_obd->obd_name, request, -EINVAL);
2405                 return -EINVAL;
2406         }
2407
2408         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2409         if (req == NULL)
2410                 return -ENOMEM;
2411
2412         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2413         if (rc != 0) {
2414                 ptlrpc_request_free(req);
2415
2416                 return rc;
2417         }
2418
2419         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2420         *tmp = *lr;
2421         ptlrpc_request_set_replen(req);
2422
2423         laia = ptlrpc_req_async_args(req);
2424         *laia = *(struct lfsck_async_interpret_args *)args;
2425         if (laia->laia_com != NULL)
2426                 lfsck_component_get(laia->laia_com);
2427         req->rq_interpret_reply = interpreter;
2428         req->rq_allow_intr = 1;
2429         ptlrpc_set_add_req(set, req);
2430
2431         return 0;
2432 }
2433
2434 int lfsck_query_all(const struct lu_env *env, struct lfsck_component *com)
2435 {
2436         struct lfsck_thread_info          *info  = lfsck_env_info(env);
2437         struct lfsck_request              *lr    = &info->lti_lr;
2438         struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2439         struct lfsck_instance             *lfsck = com->lc_lfsck;
2440         struct lfsck_tgt_descs            *ltds  = &lfsck->li_mdt_descs;
2441         struct lfsck_tgt_desc             *ltd;
2442         struct ptlrpc_request_set         *set;
2443         int                                idx;
2444         int                                rc;
2445         ENTRY;
2446
2447         memset(lr, 0, sizeof(*lr));
2448         lr->lr_event = LE_QUERY;
2449         lr->lr_active = com->lc_type;
2450         lr->lr_flags = LEF_QUERY_ALL;
2451
2452         memset(laia, 0, sizeof(*laia));
2453         laia->laia_com = com;
2454         laia->laia_lr = lr;
2455
2456         set = ptlrpc_prep_set();
2457         if (set == NULL)
2458                 RETURN(-ENOMEM);
2459
2460 again:
2461         laia->laia_ltds = ltds;
2462         down_read(&ltds->ltd_rw_sem);
2463         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2464                 ltd = lfsck_tgt_get(ltds, idx);
2465                 LASSERT(ltd != NULL);
2466
2467                 laia->laia_ltd = ltd;
2468                 up_read(&ltds->ltd_rw_sem);
2469                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2470                                          lfsck_async_interpret_common,
2471                                          laia, LFSCK_QUERY);
2472                 if (rc != 0) {
2473                         struct lfsck_assistant_data *lad = com->lc_data;
2474
2475                         CDEBUG(D_LFSCK, "%s: Fail to query %s %x for stat %s: "
2476                                "rc = %d\n", lfsck_lfsck2name(lfsck),
2477                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2478                                ltd->ltd_index, lad->lad_name, rc);
2479                         lfsck_reset_ltd_status(ltd, com->lc_type);
2480                         lfsck_tgt_put(ltd);
2481                 }
2482                 down_read(&ltds->ltd_rw_sem);
2483         }
2484         up_read(&ltds->ltd_rw_sem);
2485
2486         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST)) {
2487                 ltds = &lfsck->li_ost_descs;
2488                 lr->lr_flags |= LEF_TO_OST;
2489                 goto again;
2490         }
2491
2492         rc = ptlrpc_set_wait(set);
2493         ptlrpc_set_destroy(set);
2494
2495         RETURN(rc);
2496 }
2497
2498 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2499                           struct lfsck_start_param *lsp)
2500 {
2501         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2502         struct lfsck_assistant_data     *lad     = com->lc_data;
2503         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2504         struct ptlrpc_thread            *athread = &lad->lad_thread;
2505         struct lfsck_thread_args        *lta;
2506         struct task_struct              *task;
2507         int                              rc;
2508         ENTRY;
2509
2510         lad->lad_assistant_status = 0;
2511         lad->lad_post_result = 0;
2512         lad->lad_to_post = 0;
2513         lad->lad_to_double_scan = 0;
2514         lad->lad_in_double_scan = 0;
2515         lad->lad_exit = 0;
2516         lad->lad_advance_lock = false;
2517         thread_set_flags(athread, 0);
2518
2519         lta = lfsck_thread_args_init(lfsck, com, lsp);
2520         if (IS_ERR(lta))
2521                 RETURN(PTR_ERR(lta));
2522
2523         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2524         if (IS_ERR(task)) {
2525                 rc = PTR_ERR(task);
2526                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2527                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2528                 lfsck_thread_args_fini(lta);
2529         } else {
2530                 struct l_wait_info lwi = { 0 };
2531
2532                 l_wait_event(mthread->t_ctl_waitq,
2533                              thread_is_running(athread) ||
2534                              thread_is_stopped(athread),
2535                              &lwi);
2536                 if (unlikely(!thread_is_running(athread)))
2537                         rc = lad->lad_assistant_status;
2538                 else
2539                         rc = 0;
2540         }
2541
2542         RETURN(rc);
2543 }
2544
2545 int lfsck_checkpoint_generic(const struct lu_env *env,
2546                              struct lfsck_component *com)
2547 {
2548         struct lfsck_assistant_data     *lad     = com->lc_data;
2549         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2550         struct ptlrpc_thread            *athread = &lad->lad_thread;
2551         struct l_wait_info               lwi     = { 0 };
2552
2553         l_wait_event(mthread->t_ctl_waitq,
2554                      list_empty(&lad->lad_req_list) ||
2555                      !thread_is_running(mthread) ||
2556                      thread_is_stopped(athread),
2557                      &lwi);
2558
2559         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2560                 return LFSCK_CHECKPOINT_SKIP;
2561
2562         return 0;
2563 }
2564
2565 void lfsck_post_generic(const struct lu_env *env,
2566                         struct lfsck_component *com, int *result)
2567 {
2568         struct lfsck_assistant_data     *lad     = com->lc_data;
2569         struct ptlrpc_thread            *athread = &lad->lad_thread;
2570         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2571         struct l_wait_info               lwi     = { 0 };
2572
2573         lad->lad_post_result = *result;
2574         if (*result <= 0)
2575                 lad->lad_exit = 1;
2576         lad->lad_to_post = 1;
2577
2578         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2579                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2580
2581         wake_up_all(&athread->t_ctl_waitq);
2582         l_wait_event(mthread->t_ctl_waitq,
2583                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2584                      thread_is_stopped(athread),
2585                      &lwi);
2586
2587         if (lad->lad_assistant_status < 0)
2588                 *result = lad->lad_assistant_status;
2589
2590         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2591                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2592 }
2593
2594 int lfsck_double_scan_generic(const struct lu_env *env,
2595                               struct lfsck_component *com, int status)
2596 {
2597         struct lfsck_assistant_data     *lad     = com->lc_data;
2598         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2599         struct ptlrpc_thread            *athread = &lad->lad_thread;
2600         struct l_wait_info               lwi     = { 0 };
2601
2602         if (status != LS_SCANNING_PHASE2)
2603                 lad->lad_exit = 1;
2604         else
2605                 lad->lad_to_double_scan = 1;
2606
2607         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2608                "status %d\n",
2609                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2610
2611         wake_up_all(&athread->t_ctl_waitq);
2612         l_wait_event(mthread->t_ctl_waitq,
2613                      lad->lad_in_double_scan ||
2614                      thread_is_stopped(athread),
2615                      &lwi);
2616
2617         CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2618                "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2619                lad->lad_assistant_status);
2620
2621         if (lad->lad_assistant_status < 0)
2622                 return lad->lad_assistant_status;
2623
2624         return 0;
2625 }
2626
2627 void lfsck_quit_generic(const struct lu_env *env,
2628                         struct lfsck_component *com)
2629 {
2630         struct lfsck_assistant_data     *lad     = com->lc_data;
2631         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2632         struct ptlrpc_thread            *athread = &lad->lad_thread;
2633         struct l_wait_info               lwi     = { 0 };
2634
2635         lad->lad_exit = 1;
2636         wake_up_all(&athread->t_ctl_waitq);
2637         l_wait_event(mthread->t_ctl_waitq,
2638                      thread_is_init(athread) ||
2639                      thread_is_stopped(athread),
2640                      &lwi);
2641 }
2642
2643 /* external interfaces */
2644
2645 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2646 {
2647         struct lu_env           env;
2648         struct lfsck_instance  *lfsck;
2649         int                     rc;
2650         ENTRY;
2651
2652         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2653         if (rc != 0)
2654                 RETURN(rc);
2655
2656         lfsck = lfsck_instance_find(key, true, false);
2657         if (likely(lfsck != NULL)) {
2658                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2659                 lfsck_instance_put(&env, lfsck);
2660         } else {
2661                 rc = -ENXIO;
2662         }
2663
2664         lu_env_fini(&env);
2665
2666         RETURN(rc);
2667 }
2668 EXPORT_SYMBOL(lfsck_get_speed);
2669
2670 int lfsck_set_speed(struct dt_device *key, int val)
2671 {
2672         struct lu_env           env;
2673         struct lfsck_instance  *lfsck;
2674         int                     rc;
2675         ENTRY;
2676
2677         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2678         if (rc != 0)
2679                 RETURN(rc);
2680
2681         lfsck = lfsck_instance_find(key, true, false);
2682         if (likely(lfsck != NULL)) {
2683                 mutex_lock(&lfsck->li_mutex);
2684                 if (__lfsck_set_speed(lfsck, val))
2685                         rc = lfsck_bookmark_store(&env, lfsck);
2686                 mutex_unlock(&lfsck->li_mutex);
2687                 lfsck_instance_put(&env, lfsck);
2688         } else {
2689                 rc = -ENXIO;
2690         }
2691
2692         lu_env_fini(&env);
2693
2694         RETURN(rc);
2695 }
2696 EXPORT_SYMBOL(lfsck_set_speed);
2697
2698 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2699 {
2700         struct lu_env           env;
2701         struct lfsck_instance  *lfsck;
2702         int                     rc;
2703         ENTRY;
2704
2705         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2706         if (rc != 0)
2707                 RETURN(rc);
2708
2709         lfsck = lfsck_instance_find(key, true, false);
2710         if (likely(lfsck != NULL)) {
2711                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2712                 lfsck_instance_put(&env, lfsck);
2713         } else {
2714                 rc = -ENXIO;
2715         }
2716
2717         lu_env_fini(&env);
2718
2719         RETURN(rc);
2720 }
2721 EXPORT_SYMBOL(lfsck_get_windows);
2722
2723 int lfsck_set_windows(struct dt_device *key, int val)
2724 {
2725         struct lu_env           env;
2726         struct lfsck_instance  *lfsck;
2727         int                     rc;
2728         ENTRY;
2729
2730         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2731         if (rc != 0)
2732                 RETURN(rc);
2733
2734         lfsck = lfsck_instance_find(key, true, false);
2735         if (likely(lfsck != NULL)) {
2736                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2737                         CWARN("%s: invalid async windows size that may "
2738                               "cause memory issues. The valid range is "
2739                               "[1 - %u].\n",
2740                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2741                         rc = -EINVAL;
2742                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2743                         mutex_lock(&lfsck->li_mutex);
2744                         lfsck->li_bookmark_ram.lb_async_windows = val;
2745                         rc = lfsck_bookmark_store(&env, lfsck);
2746                         mutex_unlock(&lfsck->li_mutex);
2747                 }
2748                 lfsck_instance_put(&env, lfsck);
2749         } else {
2750                 rc = -ENXIO;
2751         }
2752
2753         lu_env_fini(&env);
2754
2755         RETURN(rc);
2756 }
2757 EXPORT_SYMBOL(lfsck_set_windows);
2758
2759 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2760 {
2761         struct lu_env           env;
2762         struct lfsck_instance  *lfsck;
2763         struct lfsck_component *com;
2764         int                     rc;
2765         ENTRY;
2766
2767         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2768         if (rc != 0)
2769                 RETURN(rc);
2770
2771         lfsck = lfsck_instance_find(key, true, false);
2772         if (likely(lfsck != NULL)) {
2773                 com = lfsck_component_find(lfsck, type);
2774                 if (likely(com != NULL)) {
2775                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2776                         lfsck_component_put(&env, com);
2777                 } else {
2778                         rc = -ENOTSUPP;
2779                 }
2780
2781                 lfsck_instance_put(&env, lfsck);
2782         } else {
2783                 rc = -ENXIO;
2784         }
2785
2786         lu_env_fini(&env);
2787
2788         RETURN(rc);
2789 }
2790 EXPORT_SYMBOL(lfsck_dump);
2791
2792 static int lfsck_stop_all(const struct lu_env *env,
2793                           struct lfsck_instance *lfsck,
2794                           struct lfsck_stop *stop)
2795 {
2796         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2797         struct lfsck_request              *lr     = &info->lti_lr;
2798         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2799         struct ptlrpc_request_set         *set;
2800         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2801         struct lfsck_tgt_desc             *ltd;
2802         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2803         __u32                              idx;
2804         int                                rc     = 0;
2805         int                                rc1    = 0;
2806         ENTRY;
2807
2808         LASSERT(stop->ls_flags & LPF_BROADCAST);
2809
2810         set = ptlrpc_prep_set();
2811         if (unlikely(set == NULL))
2812                 RETURN(-ENOMEM);
2813
2814         memset(lr, 0, sizeof(*lr));
2815         lr->lr_event = LE_STOP;
2816         lr->lr_index = lfsck_dev_idx(lfsck);
2817         lr->lr_status = stop->ls_status;
2818         lr->lr_version = bk->lb_version;
2819         lr->lr_active = LFSCK_TYPES_ALL;
2820         lr->lr_param = stop->ls_flags;
2821
2822         memset(laia, 0, sizeof(*laia));
2823         laia->laia_ltds = ltds;
2824         laia->laia_lr = lr;
2825         laia->laia_shared = 1;
2826
2827         down_read(&ltds->ltd_rw_sem);
2828         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2829                 ltd = lfsck_tgt_get(ltds, idx);
2830                 LASSERT(ltd != NULL);
2831
2832                 laia->laia_ltd = ltd;
2833                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2834                                          lfsck_async_interpret, laia,
2835                                          LFSCK_NOTIFY);
2836                 if (rc != 0) {
2837                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2838                         lfsck_tgt_put(ltd);
2839                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2840                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2841                         rc1 = rc;
2842                 }
2843         }
2844         up_read(&ltds->ltd_rw_sem);
2845
2846         rc = ptlrpc_set_wait(set);
2847         ptlrpc_set_destroy(set);
2848
2849         if (rc == 0)
2850                 rc = laia->laia_result;
2851
2852         if (rc == -EALREADY)
2853                 rc = 0;
2854
2855         if (rc != 0)
2856                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2857                        lfsck_lfsck2name(lfsck), rc);
2858
2859         RETURN(rc != 0 ? rc : rc1);
2860 }
2861
2862 static int lfsck_start_all(const struct lu_env *env,
2863                            struct lfsck_instance *lfsck,
2864                            struct lfsck_start *start)
2865 {
2866         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2867         struct lfsck_request              *lr     = &info->lti_lr;
2868         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2869         struct ptlrpc_request_set         *set;
2870         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2871         struct lfsck_tgt_desc             *ltd;
2872         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2873         __u32                              idx;
2874         int                                rc     = 0;
2875         ENTRY;
2876
2877         LASSERT(start->ls_flags & LPF_BROADCAST);
2878
2879         set = ptlrpc_prep_set();
2880         if (unlikely(set == NULL))
2881                 RETURN(-ENOMEM);
2882
2883         memset(lr, 0, sizeof(*lr));
2884         lr->lr_event = LE_START;
2885         lr->lr_index = lfsck_dev_idx(lfsck);
2886         lr->lr_speed = bk->lb_speed_limit;
2887         lr->lr_version = bk->lb_version;
2888         lr->lr_active = start->ls_active;
2889         lr->lr_param = start->ls_flags;
2890         lr->lr_async_windows = bk->lb_async_windows;
2891         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2892                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2893                        LSV_CREATE_MDTOBJ;
2894
2895         memset(laia, 0, sizeof(*laia));
2896         laia->laia_ltds = ltds;
2897         laia->laia_lr = lr;
2898         laia->laia_shared = 1;
2899
2900         down_read(&ltds->ltd_rw_sem);
2901         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2902                 ltd = lfsck_tgt_get(ltds, idx);
2903                 LASSERT(ltd != NULL);
2904
2905                 laia->laia_ltd = ltd;
2906                 ltd->ltd_layout_done = 0;
2907                 ltd->ltd_namespace_done = 0;
2908                 ltd->ltd_synced_failures = 0;
2909                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2910                                          lfsck_async_interpret, laia,
2911                                          LFSCK_NOTIFY);
2912                 if (rc != 0) {
2913                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2914                         lfsck_tgt_put(ltd);
2915                         CERROR("%s: cannot notify MDT %x for LFSCK "
2916                                "start, failout: rc = %d\n",
2917                                lfsck_lfsck2name(lfsck), idx, rc);
2918                         break;
2919                 }
2920         }
2921         up_read(&ltds->ltd_rw_sem);
2922
2923         if (rc != 0) {
2924                 ptlrpc_set_destroy(set);
2925
2926                 RETURN(rc);
2927         }
2928
2929         rc = ptlrpc_set_wait(set);
2930         ptlrpc_set_destroy(set);
2931
2932         if (rc == 0)
2933                 rc = laia->laia_result;
2934
2935         if (rc != 0) {
2936                 struct lfsck_stop *stop = &info->lti_stop;
2937
2938                 CERROR("%s: cannot start LFSCK on some MDTs, "
2939                        "stop all: rc = %d\n",
2940                        lfsck_lfsck2name(lfsck), rc);
2941                 if (rc != -EALREADY) {
2942                         stop->ls_status = LS_FAILED;
2943                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2944                         lfsck_stop_all(env, lfsck, stop);
2945                 }
2946         }
2947
2948         RETURN(rc);
2949 }
2950
2951 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2952                 struct lfsck_start_param *lsp)
2953 {
2954         struct lfsck_start              *start  = lsp->lsp_start;
2955         struct lfsck_instance           *lfsck;
2956         struct lfsck_bookmark           *bk;
2957         struct ptlrpc_thread            *thread;
2958         struct lfsck_component          *com;
2959         struct l_wait_info               lwi    = { 0 };
2960         struct lfsck_thread_args        *lta;
2961         struct task_struct              *task;
2962         struct lfsck_tgt_descs          *ltds;
2963         struct lfsck_tgt_desc           *ltd;
2964         __u32                            idx;
2965         int                              rc     = 0;
2966         __u16                            valid  = 0;
2967         __u16                            flags  = 0;
2968         __u16                            type   = 1;
2969         ENTRY;
2970
2971         lfsck = lfsck_instance_find(key, true, false);
2972         if (unlikely(lfsck == NULL))
2973                 RETURN(-ENXIO);
2974
2975         /* System is not ready, try again later. */
2976         if (unlikely(lfsck->li_namespace == NULL))
2977                 GOTO(put, rc = -EAGAIN);
2978
2979         /* start == NULL means auto trigger paused LFSCK. */
2980         if ((start == NULL) &&
2981             (list_empty(&lfsck->li_list_scan) ||
2982              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2983                 GOTO(put, rc = 0);
2984
2985         bk = &lfsck->li_bookmark_ram;
2986         thread = &lfsck->li_thread;
2987         mutex_lock(&lfsck->li_mutex);
2988         spin_lock(&lfsck->li_lock);
2989         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2990                 rc = -EALREADY;
2991                 if (unlikely(start == NULL)) {
2992                         spin_unlock(&lfsck->li_lock);
2993                         GOTO(out, rc);
2994                 }
2995
2996                 while (start->ls_active != 0) {
2997                         if (!(type & start->ls_active)) {
2998                                 type <<= 1;
2999                                 continue;
3000                         }
3001
3002                         com = __lfsck_component_find(lfsck, type,
3003                                                      &lfsck->li_list_scan);
3004                         if (com == NULL)
3005                                 com = __lfsck_component_find(lfsck, type,
3006                                                 &lfsck->li_list_double_scan);
3007                         if (com == NULL) {
3008                                 rc = -EOPNOTSUPP;
3009                                 break;
3010                         }
3011
3012                         if (com->lc_ops->lfsck_join != NULL) {
3013                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
3014                                 if (rc != 0 && rc != -EALREADY)
3015                                         break;
3016                         }
3017                         start->ls_active &= ~type;
3018                         type <<= 1;
3019                 }
3020                 spin_unlock(&lfsck->li_lock);
3021                 GOTO(out, rc);
3022         }
3023         spin_unlock(&lfsck->li_lock);
3024
3025         lfsck->li_status = 0;
3026         lfsck->li_oit_over = 0;
3027         lfsck->li_start_unplug = 0;
3028         lfsck->li_drop_dryrun = 0;
3029         lfsck->li_new_scanned = 0;
3030
3031         /* For auto trigger. */
3032         if (start == NULL)
3033                 goto trigger;
3034
3035         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3036                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
3037                        lfsck_lfsck2name(lfsck));
3038
3039                 GOTO(out, rc = -EPERM);
3040         }
3041
3042         start->ls_version = bk->lb_version;
3043
3044         if (start->ls_active != 0) {
3045                 struct lfsck_component *next;
3046
3047                 if (start->ls_active == LFSCK_TYPES_ALL)
3048                         start->ls_active = LFSCK_TYPES_SUPPORTED;
3049
3050                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
3051                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
3052                         GOTO(out, rc = -ENOTSUPP);
3053                 }
3054
3055                 list_for_each_entry_safe(com, next,
3056                                          &lfsck->li_list_scan, lc_link) {
3057                         if (!(com->lc_type & start->ls_active)) {
3058                                 rc = com->lc_ops->lfsck_post(env, com, 0,
3059                                                              false);
3060                                 if (rc != 0)
3061                                         GOTO(out, rc);
3062                         }
3063                 }
3064
3065                 while (start->ls_active != 0) {
3066                         if (type & start->ls_active) {
3067                                 com = __lfsck_component_find(lfsck, type,
3068                                                         &lfsck->li_list_idle);
3069                                 if (com != NULL)
3070                                         /* The component status will be updated
3071                                          * when its prep() is called later by
3072                                          * the LFSCK main engine. */
3073                                         list_move_tail(&com->lc_link,
3074                                                        &lfsck->li_list_scan);
3075                                 start->ls_active &= ~type;
3076                         }
3077                         type <<= 1;
3078                 }
3079         }
3080
3081         if (list_empty(&lfsck->li_list_scan)) {
3082                 /* The speed limit will be used to control both the LFSCK and
3083                  * low layer scrub (if applied), need to be handled firstly. */
3084                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3085                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3086                                 rc = lfsck_bookmark_store(env, lfsck);
3087                                 if (rc != 0)
3088                                         GOTO(out, rc);
3089                         }
3090                 }
3091
3092                 goto trigger;
3093         }
3094
3095         if (start->ls_flags & LPF_RESET)
3096                 flags |= DOIF_RESET;
3097
3098         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3099         if (rc != 0)
3100                 GOTO(out, rc);
3101
3102         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3103                 start->ls_active |= com->lc_type;
3104                 if (flags & DOIF_RESET) {
3105                         rc = com->lc_ops->lfsck_reset(env, com, false);
3106                         if (rc != 0)
3107                                 GOTO(out, rc);
3108                 }
3109         }
3110
3111         ltds = &lfsck->li_mdt_descs;
3112         down_read(&ltds->ltd_rw_sem);
3113         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
3114                 ltd = lfsck_ltd2tgt(ltds, idx);
3115                 LASSERT(ltd != NULL);
3116
3117                 ltd->ltd_layout_done = 0;
3118                 ltd->ltd_namespace_done = 0;
3119                 ltd->ltd_synced_failures = 0;
3120                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_NAMESPACE);
3121                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3122                 list_del_init(&ltd->ltd_layout_phase_list);
3123                 list_del_init(&ltd->ltd_layout_list);
3124                 list_del_init(&ltd->ltd_namespace_phase_list);
3125                 list_del_init(&ltd->ltd_namespace_list);
3126         }
3127         up_read(&ltds->ltd_rw_sem);
3128
3129         ltds = &lfsck->li_ost_descs;
3130         down_read(&ltds->ltd_rw_sem);
3131         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
3132                 ltd = lfsck_ltd2tgt(ltds, idx);
3133                 LASSERT(ltd != NULL);
3134
3135                 ltd->ltd_layout_done = 0;
3136                 ltd->ltd_synced_failures = 0;
3137                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3138                 list_del_init(&ltd->ltd_layout_phase_list);
3139                 list_del_init(&ltd->ltd_layout_list);
3140         }
3141         up_read(&ltds->ltd_rw_sem);
3142
3143 trigger:
3144         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3145         if (bk->lb_param & LPF_DRYRUN)
3146                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3147
3148         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3149                 valid |= DOIV_ERROR_HANDLE;
3150                 if (start->ls_flags & LPF_FAILOUT)
3151                         flags |= DOIF_FAILOUT;
3152         }
3153
3154         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3155                 valid |= DOIV_DRYRUN;
3156                 if (start->ls_flags & LPF_DRYRUN)
3157                         flags |= DOIF_DRYRUN;
3158         }
3159
3160         if (!list_empty(&lfsck->li_list_scan))
3161                 flags |= DOIF_OUTUSED;
3162
3163         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3164         thread_set_flags(thread, 0);
3165         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3166         if (IS_ERR(lta))
3167                 GOTO(out, rc = PTR_ERR(lta));
3168
3169         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3170         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3171         if (IS_ERR(task)) {
3172                 rc = PTR_ERR(task);
3173                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3174                        lfsck_lfsck2name(lfsck), rc);
3175                 lfsck_thread_args_fini(lta);
3176
3177                 GOTO(out, rc);
3178         }
3179
3180         l_wait_event(thread->t_ctl_waitq,
3181                      thread_is_running(thread) ||
3182                      thread_is_stopped(thread),
3183                      &lwi);
3184         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3185                 lfsck->li_start_unplug = 1;
3186                 wake_up_all(&thread->t_ctl_waitq);
3187
3188                 GOTO(out, rc = 0);
3189         }
3190
3191         /* release lfsck::li_mutex to avoid deadlock. */
3192         mutex_unlock(&lfsck->li_mutex);
3193         rc = lfsck_start_all(env, lfsck, start);
3194         if (rc != 0) {
3195                 spin_lock(&lfsck->li_lock);
3196                 if (thread_is_stopped(thread)) {
3197                         spin_unlock(&lfsck->li_lock);
3198                 } else {
3199                         lfsck->li_status = LS_FAILED;
3200                         lfsck->li_flags = 0;
3201                         thread_set_flags(thread, SVC_STOPPING);
3202                         spin_unlock(&lfsck->li_lock);
3203
3204                         lfsck->li_start_unplug = 1;
3205                         wake_up_all(&thread->t_ctl_waitq);
3206                         l_wait_event(thread->t_ctl_waitq,
3207                                      thread_is_stopped(thread),
3208                                      &lwi);
3209                 }
3210         } else {
3211                 lfsck->li_start_unplug = 1;
3212                 wake_up_all(&thread->t_ctl_waitq);
3213         }
3214
3215         GOTO(put, rc);
3216
3217 out:
3218         mutex_unlock(&lfsck->li_mutex);
3219
3220 put:
3221         lfsck_instance_put(env, lfsck);
3222
3223         return rc < 0 ? rc : 0;
3224 }
3225 EXPORT_SYMBOL(lfsck_start);
3226
3227 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3228                struct lfsck_stop *stop)
3229 {
3230         struct lfsck_instance   *lfsck;
3231         struct ptlrpc_thread    *thread;
3232         struct l_wait_info       lwi    = { 0 };
3233         int                      rc     = 0;
3234         int                      rc1    = 0;
3235         ENTRY;
3236
3237         lfsck = lfsck_instance_find(key, true, false);
3238         if (unlikely(lfsck == NULL))
3239                 RETURN(-ENXIO);
3240
3241         thread = &lfsck->li_thread;
3242         /* release lfsck::li_mutex to avoid deadlock. */
3243         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
3244                 if (!lfsck->li_master) {
3245                         CERROR("%s: only allow to specify '-A' via MDS\n",
3246                                lfsck_lfsck2name(lfsck));
3247
3248                         GOTO(out, rc = -EPERM);
3249                 }
3250
3251                 rc1 = lfsck_stop_all(env, lfsck, stop);
3252         }
3253
3254         mutex_lock(&lfsck->li_mutex);
3255         spin_lock(&lfsck->li_lock);
3256         /* no error if LFSCK is already stopped, or was never started */
3257         if (thread_is_init(thread) || thread_is_stopped(thread)) {
3258                 spin_unlock(&lfsck->li_lock);
3259                 GOTO(out, rc = 0);
3260         }
3261
3262         if (stop != NULL) {
3263                 lfsck->li_status = stop->ls_status;
3264                 lfsck->li_flags = stop->ls_flags;
3265         } else {
3266                 lfsck->li_status = LS_STOPPED;
3267                 lfsck->li_flags = 0;
3268         }
3269
3270         thread_set_flags(thread, SVC_STOPPING);
3271
3272         if (lfsck->li_master) {
3273                 struct lfsck_component *com;
3274                 struct lfsck_assistant_data *lad;
3275
3276                 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3277                         lad = com->lc_data;
3278                         spin_lock(&lad->lad_lock);
3279                         if (lad->lad_task != NULL)
3280                                 force_sig(SIGINT, lad->lad_task);
3281                         spin_unlock(&lad->lad_lock);
3282                 }
3283
3284                 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3285                         lad = com->lc_data;
3286                         spin_lock(&lad->lad_lock);
3287                         if (lad->lad_task != NULL)
3288                                 force_sig(SIGINT, lad->lad_task);
3289                         spin_unlock(&lad->lad_lock);
3290                 }
3291         }
3292
3293         spin_unlock(&lfsck->li_lock);
3294
3295         wake_up_all(&thread->t_ctl_waitq);
3296         l_wait_event(thread->t_ctl_waitq,
3297                      thread_is_stopped(thread),
3298                      &lwi);
3299
3300         GOTO(out, rc = 0);
3301
3302 out:
3303         mutex_unlock(&lfsck->li_mutex);
3304         lfsck_instance_put(env, lfsck);
3305
3306         return rc != 0 ? rc : rc1;
3307 }
3308 EXPORT_SYMBOL(lfsck_stop);
3309
3310 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3311                     struct lfsck_request *lr, struct thandle *th)
3312 {
3313         int rc = -EOPNOTSUPP;
3314         ENTRY;
3315
3316         switch (lr->lr_event) {
3317         case LE_START: {
3318                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
3319                 struct lfsck_start_param  lsp;
3320
3321                 memset(start, 0, sizeof(*start));
3322                 start->ls_valid = lr->lr_valid;
3323                 start->ls_speed_limit = lr->lr_speed;
3324                 start->ls_version = lr->lr_version;
3325                 start->ls_active = lr->lr_active;
3326                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3327                 start->ls_async_windows = lr->lr_async_windows;
3328
3329                 lsp.lsp_start = start;
3330                 lsp.lsp_index = lr->lr_index;
3331                 lsp.lsp_index_valid = 1;
3332                 rc = lfsck_start(env, key, &lsp);
3333                 break;
3334         }
3335         case LE_STOP: {
3336                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3337
3338                 memset(stop, 0, sizeof(*stop));
3339                 stop->ls_status = lr->lr_status;
3340                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3341                 rc = lfsck_stop(env, key, stop);
3342                 break;
3343         }
3344         case LE_PHASE1_DONE:
3345         case LE_PHASE2_DONE:
3346         case LE_FID_ACCESSED:
3347         case LE_PEER_EXIT:
3348         case LE_CONDITIONAL_DESTROY:
3349         case LE_SKIP_NLINK_DECLARE:
3350         case LE_SKIP_NLINK:
3351         case LE_SET_LMV_MASTER:
3352         case LE_SET_LMV_SLAVE:
3353         case LE_PAIRS_VERIFY: {
3354                 struct lfsck_instance  *lfsck;
3355                 struct lfsck_component *com;
3356
3357                 lfsck = lfsck_instance_find(key, true, false);
3358                 if (unlikely(lfsck == NULL))
3359                         RETURN(-ENXIO);
3360
3361                 com = lfsck_component_find(lfsck, lr->lr_active);
3362                 if (likely(com != NULL)) {
3363                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3364                         lfsck_component_put(env, com);
3365                 }
3366
3367                 lfsck_instance_put(env, lfsck);
3368                 break;
3369         }
3370         default:
3371                 break;
3372         }
3373
3374         RETURN(rc);
3375 }
3376 EXPORT_SYMBOL(lfsck_in_notify);
3377
3378 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3379                 struct lfsck_request *req, struct lfsck_reply *rep,
3380                 struct lfsck_query *que)
3381 {
3382         struct lfsck_instance  *lfsck;
3383         struct lfsck_component *com;
3384         int                     i;
3385         int                     rc = 0;
3386         __u16                   type;
3387         ENTRY;
3388
3389         lfsck = lfsck_instance_find(key, true, false);
3390         if (unlikely(lfsck == NULL))
3391                 RETURN(-ENXIO);
3392
3393         if (que != NULL) {
3394                 if (que->lu_types == LFSCK_TYPES_ALL)
3395                         que->lu_types =
3396                                 LFSCK_TYPES_SUPPORTED & ~LFSCK_TYPE_SCRUB;
3397
3398                 if (que->lu_types & ~LFSCK_TYPES_SUPPORTED) {
3399                         que->lu_types &= ~LFSCK_TYPES_SUPPORTED;
3400
3401                         GOTO(out, rc = -ENOTSUPP);
3402                 }
3403
3404                 for (i = 0, type = 1 << i; i < LFSCK_TYPE_BITS;
3405                      i++, type = 1 << i) {
3406                         if (!(que->lu_types & type))
3407                                 continue;
3408
3409 again:
3410                         com = lfsck_component_find(lfsck, type);
3411                         if (unlikely(com == NULL))
3412                                 GOTO(out, rc = -ENOTSUPP);
3413
3414                         memset(que->lu_mdts_count[i], 0,
3415                                sizeof(__u32) * (LS_MAX + 1));
3416                         memset(que->lu_osts_count[i], 0,
3417                                sizeof(__u32) * (LS_MAX + 1));
3418                         que->lu_repaired[i] = 0;
3419                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3420                                                       que, i);
3421                         lfsck_component_put(env, com);
3422                         if  (rc < 0)
3423                                 GOTO(out, rc);
3424                 }
3425
3426                 if (!(que->lu_flags & LPF_WAIT))
3427                         GOTO(out, rc);
3428
3429                 for (i = 0, type = 1 << i; i < LFSCK_TYPE_BITS;
3430                      i++, type = 1 << i) {
3431                         if (!(que->lu_types & type))
3432                                 continue;
3433
3434                         if (que->lu_mdts_count[i][LS_SCANNING_PHASE1] != 0 ||
3435                             que->lu_mdts_count[i][LS_SCANNING_PHASE2] != 0 ||
3436                             que->lu_osts_count[i][LS_SCANNING_PHASE1] != 0 ||
3437                             que->lu_osts_count[i][LS_SCANNING_PHASE2] != 0) {
3438                                 struct l_wait_info lwi;
3439
3440                                 /* If it is required to wait, then sleep
3441                                  * 3 seconds and try to query again. */
3442                                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(3),
3443                                                        NULL,
3444                                                        LWI_ON_SIGNAL_NOOP,
3445                                                        NULL);
3446                                 rc = l_wait_event(lfsck->li_thread.t_ctl_waitq,
3447                                                   0, &lwi);
3448                                 if (rc == -ETIMEDOUT)
3449                                         goto again;
3450                         }
3451                 }
3452         } else {
3453                 com = lfsck_component_find(lfsck, req->lr_active);
3454                 if (likely(com != NULL)) {
3455                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3456                                                       que, -1);
3457                         lfsck_component_put(env, com);
3458                 } else {
3459                         rc = -ENOTSUPP;
3460                 }
3461         }
3462
3463         GOTO(out, rc);
3464
3465 out:
3466         lfsck_instance_put(env, lfsck);
3467         return rc;
3468 }
3469 EXPORT_SYMBOL(lfsck_query);
3470
3471 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3472                              struct ldlm_namespace *ns)
3473 {
3474         struct lfsck_instance  *lfsck;
3475         int                     rc      = -ENXIO;
3476
3477         lfsck = lfsck_instance_find(key, true, false);
3478         if (likely(lfsck != NULL)) {
3479                 lfsck->li_namespace = ns;
3480                 lfsck_instance_put(env, lfsck);
3481                 rc = 0;
3482         }
3483
3484         return rc;
3485 }
3486 EXPORT_SYMBOL(lfsck_register_namespace);
3487
3488 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3489                    struct dt_device *next, struct obd_device *obd,
3490                    lfsck_out_notify notify, void *notify_data, bool master)
3491 {
3492         struct lfsck_instance   *lfsck;
3493         struct dt_object        *root  = NULL;
3494         struct dt_object        *obj   = NULL;
3495         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3496         int                      rc;
3497         ENTRY;
3498
3499         lfsck = lfsck_instance_find(key, false, false);
3500         if (unlikely(lfsck != NULL))
3501                 RETURN(-EEXIST);
3502
3503         OBD_ALLOC_PTR(lfsck);
3504         if (lfsck == NULL)
3505                 RETURN(-ENOMEM);
3506
3507         mutex_init(&lfsck->li_mutex);
3508         spin_lock_init(&lfsck->li_lock);
3509         INIT_LIST_HEAD(&lfsck->li_link);
3510         INIT_LIST_HEAD(&lfsck->li_list_scan);
3511         INIT_LIST_HEAD(&lfsck->li_list_dir);
3512         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3513         INIT_LIST_HEAD(&lfsck->li_list_idle);
3514         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3515         atomic_set(&lfsck->li_ref, 1);
3516         atomic_set(&lfsck->li_double_scan_count, 0);
3517         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3518         lfsck->li_out_notify = notify;
3519         lfsck->li_out_notify_data = notify_data;
3520         lfsck->li_next = next;
3521         lfsck->li_bottom = key;
3522         lfsck->li_obd = obd;
3523
3524         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3525         if (rc != 0)
3526                 GOTO(out, rc);
3527
3528         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3529         if (rc != 0)
3530                 GOTO(out, rc);
3531
3532         fid->f_seq = FID_SEQ_LOCAL_NAME;
3533         fid->f_oid = 1;
3534         fid->f_ver = 0;
3535         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3536         if (rc != 0)
3537                 GOTO(out, rc);
3538
3539         rc = dt_root_get(env, key, fid);
3540         if (rc != 0)
3541                 GOTO(out, rc);
3542
3543         root = dt_locate(env, key, fid);
3544         if (IS_ERR(root))
3545                 GOTO(out, rc = PTR_ERR(root));
3546
3547         if (unlikely(!dt_try_as_dir(env, root)))
3548                 GOTO(out, rc = -ENOTDIR);
3549
3550         lfsck->li_local_root_fid = *fid;
3551         if (master) {
3552                 lfsck->li_master = 1;
3553                 if (lfsck_dev_idx(lfsck) == 0) {
3554                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3555                         const struct lu_name *cname;
3556
3557                         rc = dt_lookup(env, root,
3558                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3559                                 (const struct dt_key *)"ROOT");
3560                         if (rc != 0)
3561                                 GOTO(out, rc);
3562
3563                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3564                         if (IS_ERR(obj))
3565                                 GOTO(out, rc = PTR_ERR(obj));
3566
3567                         if (unlikely(!dt_try_as_dir(env, obj)))
3568                                 GOTO(out, rc = -ENOTDIR);
3569
3570                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3571                                 (const struct dt_key *)dotlustre);
3572                         if (rc != 0)
3573                                 GOTO(out, rc);
3574
3575                         lfsck_object_put(env, obj);
3576                         obj = dt_locate(env, key, fid);
3577                         if (IS_ERR(obj))
3578                                 GOTO(out, rc = PTR_ERR(obj));
3579
3580                         cname = lfsck_name_get_const(env, dotlustre,
3581                                                      strlen(dotlustre));
3582                         rc = lfsck_verify_linkea(env, obj, cname,
3583                                                  &lfsck->li_global_root_fid);
3584                         if (rc != 0)
3585                                 GOTO(out, rc);
3586
3587                         if (unlikely(!dt_try_as_dir(env, obj)))
3588                                 GOTO(out, rc = -ENOTDIR);
3589
3590                         *pfid = *fid;
3591                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3592                                        (const struct dt_key *)lostfound);
3593                         if (rc != 0)
3594                                 GOTO(out, rc);
3595
3596                         lfsck_object_put(env, obj);
3597                         obj = dt_locate(env, key, fid);
3598                         if (IS_ERR(obj))
3599                                 GOTO(out, rc = PTR_ERR(obj));
3600
3601                         cname = lfsck_name_get_const(env, lostfound,
3602                                                      strlen(lostfound));
3603                         rc = lfsck_verify_linkea(env, obj, cname, pfid);
3604                         if (rc != 0)
3605                                 GOTO(out, rc);
3606
3607                         lfsck_object_put(env, obj);
3608                         obj = NULL;
3609                 }
3610         }
3611
3612         fid->f_seq = FID_SEQ_LOCAL_FILE;
3613         fid->f_oid = OTABLE_IT_OID;
3614         fid->f_ver = 0;
3615         obj = dt_locate(env, key, fid);
3616         if (IS_ERR(obj))
3617                 GOTO(out, rc = PTR_ERR(obj));
3618
3619         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3620         if (rc != 0)
3621                 GOTO(out, rc);
3622
3623         lfsck->li_obj_oit = obj;
3624         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3625                                         S_IFDIR | S_IRUGO | S_IWUSR);
3626         if (IS_ERR(obj))
3627                 GOTO(out, rc = PTR_ERR(obj));
3628
3629         lu_object_get(&obj->do_lu);
3630         lfsck->li_lfsck_dir = obj;
3631         rc = lfsck_bookmark_setup(env, lfsck);
3632         if (rc != 0)
3633                 GOTO(out, rc);
3634
3635         if (master) {
3636                 rc = lfsck_fid_init(lfsck);
3637                 if (rc < 0)
3638                         GOTO(out, rc);
3639
3640                 rc = lfsck_namespace_setup(env, lfsck);
3641                 if (rc < 0)
3642                         GOTO(out, rc);
3643         }
3644
3645         rc = lfsck_layout_setup(env, lfsck);
3646         if (rc < 0)
3647                 GOTO(out, rc);
3648
3649         /* XXX: more LFSCK components initialization to be added here. */
3650
3651         rc = lfsck_instance_add(lfsck);
3652         if (rc == 0)
3653                 rc = lfsck_add_target_from_orphan(env, lfsck);
3654 out:
3655         if (obj != NULL && !IS_ERR(obj))
3656                 lfsck_object_put(env, obj);
3657         if (root != NULL && !IS_ERR(root))
3658                 lfsck_object_put(env, root);
3659         if (rc != 0)
3660                 lfsck_instance_cleanup(env, lfsck);
3661         return rc;
3662 }
3663 EXPORT_SYMBOL(lfsck_register);
3664
3665 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3666 {
3667         struct lfsck_instance *lfsck;
3668
3669         lfsck = lfsck_instance_find(key, false, true);
3670         if (lfsck != NULL)
3671                 lfsck_instance_put(env, lfsck);
3672 }
3673 EXPORT_SYMBOL(lfsck_degister);
3674
3675 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3676                      struct dt_device *tgt, struct obd_export *exp,
3677                      __u32 index, bool for_ost)
3678 {
3679         struct lfsck_instance   *lfsck;
3680         struct lfsck_tgt_desc   *ltd;
3681         int                      rc;
3682         ENTRY;
3683
3684         OBD_ALLOC_PTR(ltd);
3685         if (ltd == NULL)
3686                 RETURN(-ENOMEM);
3687
3688         ltd->ltd_tgt = tgt;
3689         ltd->ltd_key = key;
3690         ltd->ltd_exp = exp;
3691         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3692         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3693         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3694         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3695         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3696         atomic_set(&ltd->ltd_ref, 1);
3697         ltd->ltd_index = index;
3698
3699         spin_lock(&lfsck_instance_lock);
3700         lfsck = __lfsck_instance_find(key, true, false);
3701         if (lfsck == NULL) {
3702                 if (for_ost)
3703                         list_add_tail(&ltd->ltd_orphan_list,
3704                                       &lfsck_ost_orphan_list);
3705                 else
3706                         list_add_tail(&ltd->ltd_orphan_list,
3707                                       &lfsck_mdt_orphan_list);
3708                 spin_unlock(&lfsck_instance_lock);
3709
3710                 RETURN(0);
3711         }
3712         spin_unlock(&lfsck_instance_lock);
3713
3714         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3715         if (rc != 0)
3716                 lfsck_tgt_put(ltd);
3717
3718         lfsck_instance_put(env, lfsck);
3719
3720         RETURN(rc);
3721 }
3722 EXPORT_SYMBOL(lfsck_add_target);
3723
3724 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3725                       struct dt_device *tgt, __u32 index, bool for_ost)
3726 {
3727         struct lfsck_instance   *lfsck;
3728         struct lfsck_tgt_descs  *ltds;
3729         struct lfsck_tgt_desc   *ltd;
3730         struct list_head        *head;
3731
3732         if (for_ost)
3733                 head = &lfsck_ost_orphan_list;
3734         else
3735                 head = &lfsck_mdt_orphan_list;
3736
3737         spin_lock(&lfsck_instance_lock);
3738         list_for_each_entry(ltd, head, ltd_orphan_list) {
3739                 if (ltd->ltd_tgt == tgt) {
3740                         list_del_init(&ltd->ltd_orphan_list);
3741                         spin_unlock(&lfsck_instance_lock);
3742                         lfsck_tgt_put(ltd);
3743
3744                         return;
3745                 }
3746         }
3747
3748         ltd = NULL;
3749         lfsck = __lfsck_instance_find(key, true, false);
3750         spin_unlock(&lfsck_instance_lock);
3751         if (unlikely(lfsck == NULL))
3752                 return;
3753
3754         if (for_ost)
3755                 ltds = &lfsck->li_ost_descs;
3756         else
3757                 ltds = &lfsck->li_mdt_descs;
3758
3759         down_write(&ltds->ltd_rw_sem);
3760         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3761
3762         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3763                 goto unlock;
3764
3765         ltd = lfsck_ltd2tgt(ltds, index);
3766         if (unlikely(ltd == NULL))
3767                 goto unlock;
3768
3769         LASSERT(ltds->ltd_tgtnr > 0);
3770
3771         ltds->ltd_tgtnr--;
3772         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3773         lfsck_assign_tgt(ltds, NULL, index);
3774
3775 unlock:
3776         if (ltd == NULL) {
3777                 if (for_ost)
3778                         head = &lfsck->li_ost_descs.ltd_orphan;
3779                 else
3780                         head = &lfsck->li_mdt_descs.ltd_orphan;
3781
3782                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3783                         if (ltd->ltd_tgt == tgt) {
3784                                 list_del_init(&ltd->ltd_orphan_list);
3785                                 break;
3786                         }
3787                 }
3788         }
3789
3790         up_write(&ltds->ltd_rw_sem);
3791         if (ltd != NULL) {
3792                 spin_lock(&ltds->ltd_lock);
3793                 ltd->ltd_dead = 1;
3794                 spin_unlock(&ltds->ltd_lock);
3795                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3796                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3797                 lfsck_tgt_put(ltd);
3798         }
3799
3800         lfsck_instance_put(env, lfsck);
3801 }
3802 EXPORT_SYMBOL(lfsck_del_target);
3803
3804 static int __init lfsck_init(void)
3805 {
3806         int rc;
3807
3808         INIT_LIST_HEAD(&lfsck_instance_list);
3809         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3810         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3811         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3812         rc = lu_context_key_register(&lfsck_thread_key);
3813         if (rc == 0) {
3814                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3815                 tgt_register_lfsck_query(lfsck_query);
3816         }
3817
3818         return rc;
3819 }
3820
3821 static void __exit lfsck_exit(void)
3822 {
3823         struct lfsck_tgt_desc *ltd;
3824         struct lfsck_tgt_desc *next;
3825
3826         LASSERT(list_empty(&lfsck_instance_list));
3827
3828         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3829                                  ltd_orphan_list) {
3830                 list_del_init(&ltd->ltd_orphan_list);
3831                 lfsck_tgt_put(ltd);
3832         }
3833
3834         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3835                                  ltd_orphan_list) {
3836                 list_del_init(&ltd->ltd_orphan_list);
3837                 lfsck_tgt_put(ltd);
3838         }
3839
3840         lu_context_key_degister(&lfsck_thread_key);
3841 }
3842
3843 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3844 MODULE_DESCRIPTION("Lustre File System Checker");
3845 MODULE_VERSION(LUSTRE_VERSION_STRING);
3846 MODULE_LICENSE("GPL");
3847
3848 module_init(lfsck_init);
3849 module_exit(lfsck_exit);