Whamcloud - gitweb
8703cb01edb7966554afb0ccfe016c6c24fbfbb5
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2017, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <linux/list.h>
36 #include <linux/delay.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <md_object.h>
40 #include <lustre_fld.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_lfsck.h>
44 #include <lu_target.h>
45
46 #include "lfsck_internal.h"
47
48 #define LFSCK_CHECKPOINT_SKIP   1
49
50 /* define lfsck thread key */
51 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
52
53 static void lfsck_key_fini(const struct lu_context *ctx,
54                            struct lu_context_key *key, void *data)
55 {
56         struct lfsck_thread_info *info = data;
57
58         lu_buf_free(&info->lti_linkea_buf);
59         lu_buf_free(&info->lti_linkea_buf2);
60         lu_buf_free(&info->lti_big_buf);
61         OBD_FREE_PTR(info);
62 }
63
64 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
65 LU_KEY_INIT_GENERIC(lfsck);
66
67 static LIST_HEAD(lfsck_instance_list);
68 static LIST_HEAD(lfsck_ost_orphan_list);
69 static LIST_HEAD(lfsck_mdt_orphan_list);
70 static DEFINE_SPINLOCK(lfsck_instance_lock);
71
72 const char *const lfsck_flags_names[] = {
73         "scanned-once",
74         "inconsistent",
75         "upgrade",
76         "incomplete",
77         "crashed_lastid",
78         NULL
79 };
80
81 const char *const lfsck_param_names[] = {
82         NULL,
83         "failout",
84         "dryrun",
85         "all_targets",
86         "broadcast",
87         "orphan",
88         "create_ostobj",
89         "create_mdtobj",
90         NULL,
91         "delay_create_ostobj",
92         NULL
93 };
94
95 enum lfsck_verify_lpf_types {
96         LVLT_BY_BOOKMARK        = 0,
97         LVLT_BY_NAMEENTRY       = 1,
98 };
99
100 static inline void
101 lfsck_reset_ltd_status(struct lfsck_tgt_desc *ltd, enum lfsck_type type)
102 {
103         if (type == LFSCK_TYPE_LAYOUT) {
104                 ltd->ltd_layout_status = LS_MAX;
105                 ltd->ltd_layout_repaired = 0;
106         } else {
107                 ltd->ltd_namespace_status = LS_MAX;
108                 ltd->ltd_namespace_repaired = 0;
109         }
110 }
111
112 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
113 {
114         spin_lock_init(&ltds->ltd_lock);
115         init_rwsem(&ltds->ltd_rw_sem);
116         INIT_LIST_HEAD(&ltds->ltd_orphan);
117         ltds->ltd_tgts_bitmap = bitmap_zalloc(BITS_PER_LONG, GFP_KERNEL);
118         if (!ltds->ltd_tgts_bitmap)
119                 return -ENOMEM;
120
121         return 0;
122 }
123
124 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
125 {
126         struct lfsck_tgt_desc *ltd;
127         struct lfsck_tgt_desc *next;
128         int idx;
129
130         down_write(&ltds->ltd_rw_sem);
131
132         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
133                                  ltd_orphan_list) {
134                 list_del_init(&ltd->ltd_orphan_list);
135                 lfsck_tgt_put(ltd);
136         }
137
138         if (unlikely(!ltds->ltd_tgts_bitmap)) {
139                 up_write(&ltds->ltd_rw_sem);
140
141                 return;
142         }
143
144         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
145                 ltd = lfsck_ltd2tgt(ltds, idx);
146                 if (likely(ltd != NULL)) {
147                         LASSERT(list_empty(&ltd->ltd_layout_list));
148                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
149                         LASSERT(list_empty(&ltd->ltd_namespace_list));
150                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
151
152                         ltds->ltd_tgtnr--;
153                         clear_bit(idx, ltds->ltd_tgts_bitmap);
154                         lfsck_assign_tgt(ltds, NULL, idx);
155                         lfsck_tgt_put(ltd);
156                 }
157         }
158
159         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
160                  ltds->ltd_tgtnr);
161
162         for (idx = 0; idx < ARRAY_SIZE(ltds->ltd_tgts_idx); idx++) {
163                 if (ltds->ltd_tgts_idx[idx] != NULL) {
164                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
165                         ltds->ltd_tgts_idx[idx] = NULL;
166                 }
167         }
168
169         bitmap_free(ltds->ltd_tgts_bitmap);
170         ltds->ltd_tgts_bitmap = NULL;
171         up_write(&ltds->ltd_rw_sem);
172 }
173
174 static int __lfsck_add_target(const struct lu_env *env,
175                               struct lfsck_instance *lfsck,
176                               struct lfsck_tgt_desc *ltd,
177                               bool for_ost, bool locked)
178 {
179         struct lfsck_tgt_descs *ltds;
180         __u32                   index = ltd->ltd_index;
181         int                     rc    = 0;
182         ENTRY;
183
184         if (for_ost)
185                 ltds = &lfsck->li_ost_descs;
186         else
187                 ltds = &lfsck->li_mdt_descs;
188
189         if (!locked)
190                 down_write(&ltds->ltd_rw_sem);
191
192         LASSERT(ltds->ltd_tgts_bitmap);
193
194         if (index >= ltds->ltd_tgts_mask_len) {
195                 u32 newsize = max_t(u32, ltds->ltd_tgts_mask_len,
196                                     BITS_PER_LONG);
197                 unsigned long *old_bitmap = ltds->ltd_tgts_bitmap;
198                 unsigned long *new_bitmap;
199
200                 while (newsize < index + 1)
201                         newsize <<= 1;
202
203                 new_bitmap = bitmap_zalloc(newsize, GFP_KERNEL);
204                 if (!new_bitmap)
205                         GOTO(unlock, rc = -ENOMEM);
206
207                 if (ltds->ltd_tgtnr > 0) {
208                         bitmap_copy(new_bitmap, old_bitmap,
209                                     ltds->ltd_tgts_mask_len);
210                 }
211                 ltds->ltd_tgts_bitmap = new_bitmap;
212                 ltds->ltd_tgts_mask_len = newsize;
213                 bitmap_free(old_bitmap);
214         }
215
216         if (test_bit(index, ltds->ltd_tgts_bitmap)) {
217                 CERROR("%s: the device %s (%u) is registered already\n",
218                        lfsck_lfsck2name(lfsck),
219                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
220                 GOTO(unlock, rc = -EEXIST);
221         }
222
223         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
224                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
225                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
226                         GOTO(unlock, rc = -ENOMEM);
227         }
228
229         lfsck_assign_tgt(ltds, ltd, index);
230         set_bit(index, ltds->ltd_tgts_bitmap);
231         ltds->ltd_tgtnr++;
232
233         GOTO(unlock, rc = 0);
234
235 unlock:
236         if (!locked)
237                 up_write(&ltds->ltd_rw_sem);
238
239         return rc;
240 }
241
242 static int lfsck_add_target_from_orphan(const struct lu_env *env,
243                                         struct lfsck_instance *lfsck)
244 {
245         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
246         struct lfsck_tgt_desc   *ltd;
247         struct lfsck_tgt_desc   *next;
248         struct list_head        *head    = &lfsck_ost_orphan_list;
249         int                      rc;
250         bool                     for_ost = true;
251
252 again:
253         spin_lock(&lfsck_instance_lock);
254         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
255                 if (ltd->ltd_key == lfsck->li_bottom)
256                         list_move_tail(&ltd->ltd_orphan_list,
257                                        &ltds->ltd_orphan);
258         }
259         spin_unlock(&lfsck_instance_lock);
260
261         down_write(&ltds->ltd_rw_sem);
262         while (!list_empty(&ltds->ltd_orphan)) {
263                 ltd = list_first_entry(&ltds->ltd_orphan,
264                                        struct lfsck_tgt_desc,
265                                        ltd_orphan_list);
266                 list_del_init(&ltd->ltd_orphan_list);
267                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
268                 /* Do not hold the semaphore for too long time. */
269                 up_write(&ltds->ltd_rw_sem);
270                 if (rc != 0)
271                         return rc;
272
273                 down_write(&ltds->ltd_rw_sem);
274         }
275         up_write(&ltds->ltd_rw_sem);
276
277         if (for_ost) {
278                 ltds = &lfsck->li_mdt_descs;
279                 head = &lfsck_mdt_orphan_list;
280                 for_ost = false;
281                 goto again;
282         }
283
284         return 0;
285 }
286
287 static inline struct lfsck_component *
288 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
289                        struct list_head *list)
290 {
291         struct lfsck_component *com;
292
293         list_for_each_entry(com, list, lc_link) {
294                 if (com->lc_type == type)
295                         return com;
296         }
297         return NULL;
298 }
299
300 struct lfsck_component *
301 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
302 {
303         struct lfsck_component *com;
304
305         spin_lock(&lfsck->li_lock);
306         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
307         if (com != NULL)
308                 goto unlock;
309
310         com = __lfsck_component_find(lfsck, type,
311                                      &lfsck->li_list_double_scan);
312         if (com != NULL)
313                 goto unlock;
314
315         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
316
317 unlock:
318         if (com != NULL)
319                 lfsck_component_get(com);
320         spin_unlock(&lfsck->li_lock);
321         return com;
322 }
323
324 void lfsck_component_cleanup(const struct lu_env *env,
325                              struct lfsck_component *com)
326 {
327         if (!list_empty(&com->lc_link))
328                 list_del_init(&com->lc_link);
329         if (!list_empty(&com->lc_link_dir))
330                 list_del_init(&com->lc_link_dir);
331
332         lfsck_component_put(env, com);
333 }
334
335 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
336                     struct lu_fid *fid, bool locked)
337 {
338         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
339         int                      rc = 0;
340         ENTRY;
341
342         if (!locked)
343                 mutex_lock(&lfsck->li_mutex);
344
345         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
346         if (rc >= 0) {
347                 bk->lb_last_fid = *fid;
348                 /* We do not care about whether the subsequent sub-operations
349                  * failed or not. The worst case is that one FID is lost that
350                  * is not a big issue for the LFSCK since it is relative rare
351                  * for LFSCK create. */
352                 rc = lfsck_bookmark_store(env, lfsck);
353         }
354
355         if (!locked)
356                 mutex_unlock(&lfsck->li_mutex);
357
358         RETURN(rc);
359 }
360
361 static int __lfsck_ibits_lock(const struct lu_env *env,
362                               struct lfsck_instance *lfsck,
363                               struct dt_object *obj, struct ldlm_res_id *resid,
364                               struct lustre_handle *lh, __u64 bits,
365                               enum ldlm_mode mode)
366 {
367         struct lfsck_thread_info        *info   = lfsck_env_info(env);
368         union ldlm_policy_data          *policy = &info->lti_policy;
369         __u64                            flags  = LDLM_FL_ATOMIC_CB;
370         int                              rc;
371
372         LASSERT(lfsck->li_namespace != NULL);
373
374         memset(policy, 0, sizeof(*policy));
375         policy->l_inodebits.bits = bits;
376         if (dt_object_remote(obj)) {
377                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
378
379                 memset(einfo, 0, sizeof(*einfo));
380                 einfo->ei_type = LDLM_IBITS;
381                 einfo->ei_mode = mode;
382                 einfo->ei_cb_bl = ldlm_blocking_ast;
383                 einfo->ei_cb_cp = ldlm_completion_ast;
384                 einfo->ei_res_id = resid;
385                 einfo->ei_req_slot = 1;
386
387                 rc = dt_object_lock(env, obj, lh, einfo, policy);
388                 /* for regular checks LFSCK doesn't use LDLM locking,
389                  * so the state isn't coherent. here we just took LDLM
390                  * lock for coherency and it's time to invalidate
391                  * previous state */
392                 if (rc == ELDLM_OK)
393                         dt_invalidate(env, obj);
394         } else {
395                 rc = ldlm_cli_enqueue_local(env, lfsck->li_namespace, resid,
396                                             LDLM_IBITS, policy, mode,
397                                             &flags, ldlm_blocking_ast,
398                                             ldlm_completion_ast, NULL, NULL,
399                                             0, LVB_T_NONE, NULL, lh);
400         }
401
402         if (rc == ELDLM_OK) {
403                 rc = 0;
404         } else {
405                 memset(lh, 0, sizeof(*lh));
406                 rc = -EIO;
407         }
408
409         return rc;
410 }
411
412 /**
413  * Request the specified ibits lock for the given object.
414  *
415  * Before the LFSCK modifying on the namespace visible object,
416  * it needs to acquire related ibits ldlm lock.
417  *
418  * \param[in] env       pointer to the thread context
419  * \param[in] lfsck     pointer to the lfsck instance
420  * \param[in] obj       pointer to the dt_object to be locked
421  * \param[out] lh       pointer to the lock handle
422  * \param[in] bits      the bits for the ldlm lock to be acquired
423  * \param[in] mode      the mode for the ldlm lock to be acquired
424  *
425  * \retval              0 for success
426  * \retval              negative error number on failure
427  */
428 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
429                      struct dt_object *obj, struct lustre_handle *lh,
430                      __u64 bits, enum ldlm_mode mode)
431 {
432         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
433
434         LASSERT(!lustre_handle_is_used(lh));
435
436         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
437         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
438 }
439
440 /**
441  * Request the remote LOOKUP lock for the given object.
442  *
443  * If \a pobj is remote, the LOOKUP lock of \a obj is on the MDT where
444  * \a pobj is, acquire LOOKUP lock there.
445  *
446  * \param[in] env       pointer to the thread context
447  * \param[in] lfsck     pointer to the lfsck instance
448  * \param[in] pobj      pointer to parent dt_object
449  * \param[in] obj       pointer to the dt_object to be locked
450  * \param[out] lh       pointer to the lock handle
451  * \param[in] mode      the mode for the ldlm lock to be acquired
452  *
453  * \retval              0 for success
454  * \retval              negative error number on failure
455  */
456 int lfsck_remote_lookup_lock(const struct lu_env *env,
457                              struct lfsck_instance *lfsck,
458                              struct dt_object *pobj, struct dt_object *obj,
459                              struct lustre_handle *lh, enum ldlm_mode mode)
460 {
461         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
462
463         LASSERT(!lustre_handle_is_used(lh));
464
465         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
466         return __lfsck_ibits_lock(env, lfsck, pobj, resid, lh,
467                                   MDS_INODELOCK_LOOKUP, mode);
468 }
469
470 /**
471  * Release the the specified ibits lock.
472  *
473  * If the lock has been acquired before, release it
474  * and cleanup the handle. Otherwise, do nothing.
475  *
476  * \param[in] lh        pointer to the lock handle
477  * \param[in] mode      the mode for the ldlm lock to be released
478  */
479 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
480 {
481         if (lustre_handle_is_used(lh)) {
482                 ldlm_lock_decref(lh, mode);
483                 memset(lh, 0, sizeof(*lh));
484         }
485 }
486
487 /**
488  * Request compound ibits locks for the given <obj, name> pairs.
489  *
490  * Before the LFSCK modifying on the namespace visible object, it needs to
491  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
492  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
493  * modificationis (such as insert name entry to the directory) may be too
494  * coarse-grained and not efficient.
495  *
496  * The lfsck_lock() will request compound ibits locks on the specified
497  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
498  * lock on the directory object, and the regular ibits lock on the name hash.
499  *
500  * \param[in] env       pointer to the thread context
501  * \param[in] lfsck     pointer to the lfsck instance
502  * \param[in] obj       pointer to the dt_object to be locked
503  * \param[in] name      used for building the PDO lock resource
504  * \param[out] llh      pointer to the lfsck_lock_handle
505  * \param[in] bits      the bits for the ldlm lock to be acquired
506  * \param[in] mode      the mode for the ldlm lock to be acquired
507  *
508  * \retval              0 for success
509  * \retval              negative error number on failure
510  */
511 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
512                struct dt_object *obj, const char *name,
513                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
514 {
515         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
516         int                 rc;
517
518         LASSERT(S_ISDIR(lfsck_object_type(obj)));
519         LASSERT(name != NULL);
520         LASSERT(name[0] != 0);
521         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
522         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
523
524         switch (mode) {
525         case LCK_EX:
526                 llh->llh_pdo_mode = LCK_EX;
527                 break;
528         case LCK_PW:
529                 llh->llh_pdo_mode = LCK_CW;
530                 break;
531         case LCK_PR:
532                 llh->llh_pdo_mode = LCK_CR;
533                 break;
534         default:
535                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
536                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
537                        PFID(lfsck_dto2fid(obj)));
538                 LBUG();
539         }
540
541         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
542         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
543                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
544         if (rc != 0)
545                 return rc;
546
547         llh->llh_reg_mode = mode;
548         resid->name[LUSTRE_RES_ID_HSH_OFF] = ll_full_name_hash(NULL, name,
549                                                                strlen(name));
550         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
551         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
552                                 bits, llh->llh_reg_mode);
553         if (rc != 0)
554                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
555
556         return rc;
557 }
558
559 /**
560  * Release the the compound ibits locks.
561  *
562  * \param[in] llh       pointer to the lfsck_lock_handle to be released
563  */
564 void lfsck_unlock(struct lfsck_lock_handle *llh)
565 {
566         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
567         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
568 }
569
570 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
571                               struct lfsck_instance *lfsck,
572                               const struct lu_fid *fid)
573 {
574         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
575         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
576         int                      rc;
577
578         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
579                 /* "ROOT" is always on the MDT0. */
580                 if (lu_fid_eq(fid, &lfsck->li_global_root_fid))
581                         return 0;
582
583                 return lfsck_dev_idx(lfsck);
584         }
585
586         fld_range_set_mdt(range);
587         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
588         if (rc == 0)
589                 rc = range->lsr_index;
590
591         return rc;
592 }
593
594 const char dot[] = ".";
595 const char dotdot[] = "..";
596 static const char dotlustre[] = ".lustre";
597 static const char lostfound[] = "lost+found";
598
599 /**
600  * Remove the name entry from the .lustre/lost+found directory.
601  *
602  * No need to care about the object referenced by the name entry,
603  * either the name entry is invalid or redundant, or the referenced
604  * object has been processed or will be handled by others.
605  *
606  * \param[in] env       pointer to the thread context
607  * \param[in] lfsck     pointer to the lfsck instance
608  * \param[in] name      the name for the name entry to be removed
609  *
610  * \retval              0 for success
611  * \retval              negative error number on failure
612  */
613 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
614                                        struct lfsck_instance *lfsck,
615                                        const char *name)
616 {
617         struct dt_object        *parent = lfsck->li_lpf_root_obj;
618         struct dt_device        *dev    = lfsck_obj2dev(parent);
619         struct thandle          *th;
620         struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
621         int                      rc;
622         ENTRY;
623
624         rc = lfsck_lock(env, lfsck, parent, name, llh,
625                         MDS_INODELOCK_UPDATE, LCK_PW);
626         if (rc != 0)
627                 RETURN(rc);
628
629         th = lfsck_trans_create(env, dev, lfsck);
630         if (IS_ERR(th))
631                 GOTO(unlock, rc = PTR_ERR(th));
632
633         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
634         if (rc != 0)
635                 GOTO(stop, rc);
636
637         rc = dt_declare_ref_del(env, parent, th);
638         if (rc != 0)
639                 GOTO(stop, rc);
640
641         rc = dt_trans_start_local(env, dev, th);
642         if (rc != 0)
643                 GOTO(stop, rc);
644
645         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
646         if (rc != 0)
647                 GOTO(stop, rc);
648
649         dt_write_lock(env, parent, 0);
650         rc = dt_ref_del(env, parent, th);
651         dt_write_unlock(env, parent);
652
653         GOTO(stop, rc);
654
655 stop:
656         dt_trans_stop(env, dev, th);
657
658 unlock:
659         lfsck_unlock(llh);
660
661         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
662                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
663
664         return rc;
665 }
666
667 static int lfsck_create_lpf_local(const struct lu_env *env,
668                                   struct lfsck_instance *lfsck,
669                                   struct dt_object *child,
670                                   struct lu_attr *la,
671                                   struct dt_object_format *dof,
672                                   const char *name)
673 {
674         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
675         struct dt_object        *parent = lfsck->li_lpf_root_obj;
676         struct dt_device        *dev    = lfsck_obj2dev(child);
677         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
678         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
679         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
680         struct thandle          *th     = NULL;
681         struct linkea_data       ldata  = { NULL };
682         struct lu_buf            linkea_buf;
683         const struct lu_name    *cname;
684         loff_t                   pos    = 0;
685         int                      len    = sizeof(struct lfsck_bookmark);
686         int                      rc;
687         ENTRY;
688
689         cname = lfsck_name_get_const(env, name, strlen(name));
690         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
691                               cname, lfsck_dto2fid(parent));
692         if (rc != 0)
693                 RETURN(rc);
694
695         th = lfsck_trans_create(env, dev, lfsck);
696         if (IS_ERR(th))
697                 RETURN(PTR_ERR(th));
698
699         /* 1a. create child */
700         rc = dt_declare_create(env, child, la, NULL, dof, th);
701         if (rc != 0)
702                 GOTO(stop, rc);
703
704         if (!dt_try_as_dir(env, child, false))
705                 GOTO(stop, rc = -ENOTDIR);
706
707         /* 2a. increase child nlink */
708         rc = dt_declare_ref_add(env, child, th);
709         if (rc != 0)
710                 GOTO(stop, rc);
711
712         /* 3a. insert dot into child dir */
713         rec->rec_type = S_IFDIR;
714         rec->rec_fid = cfid;
715         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
716                                (const struct dt_key *)dot, th);
717         if (rc != 0)
718                 GOTO(stop, rc);
719
720         /* 4a. insert dotdot into child dir */
721         rec->rec_fid = &LU_LPF_FID;
722         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
723                                (const struct dt_key *)dotdot, th);
724         if (rc != 0)
725                 GOTO(stop, rc);
726
727         /* 5a. insert linkEA for child */
728         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
729                        ldata.ld_leh->leh_len);
730         rc = dt_declare_xattr_set(env, child, &linkea_buf,
731                                   XATTR_NAME_LINK, 0, th);
732         if (rc != 0)
733                 GOTO(stop, rc);
734
735         /* 6a. insert name into parent dir */
736         rec->rec_type = S_IFDIR;
737         rec->rec_fid = cfid;
738         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
739                                (const struct dt_key *)name, th);
740         if (rc != 0)
741                 GOTO(stop, rc);
742
743         /* 7a. increase parent nlink */
744         rc = dt_declare_ref_add(env, parent, th);
745         if (rc != 0)
746                 GOTO(stop, rc);
747
748         /* 8a. update bookmark */
749         rc = dt_declare_record_write(env, bk_obj,
750                                      lfsck_buf_get(env, bk, len), 0, th);
751         if (rc != 0)
752                 GOTO(stop, rc);
753
754         rc = dt_trans_start_local(env, dev, th);
755         if (rc != 0)
756                 GOTO(stop, rc);
757
758         dt_write_lock(env, child, 0);
759         /* 1b. create child */
760         rc = dt_create(env, child, la, NULL, dof, th);
761         if (rc != 0)
762                 GOTO(unlock, rc);
763
764         /* 2b. increase child nlink */
765         rc = dt_ref_add(env, child, th);
766         if (rc != 0)
767                 GOTO(unlock, rc);
768
769         /* 3b. insert dot into child dir */
770         rec->rec_fid = cfid;
771         rc = dt_insert(env, child, (const struct dt_rec *)rec,
772                        (const struct dt_key *)dot, th);
773         if (rc != 0)
774                 GOTO(unlock, rc);
775
776         /* 4b. insert dotdot into child dir */
777         rec->rec_fid = &LU_LPF_FID;
778         rc = dt_insert(env, child, (const struct dt_rec *)rec,
779                        (const struct dt_key *)dotdot, th);
780         if (rc != 0)
781                 GOTO(unlock, rc);
782
783         /* 5b. insert linkEA for child. */
784         rc = dt_xattr_set(env, child, &linkea_buf,
785                           XATTR_NAME_LINK, 0, th);
786         dt_write_unlock(env, child);
787         if (rc != 0)
788                 GOTO(stop, rc);
789
790         /* 6b. insert name into parent dir */
791         rec->rec_fid = cfid;
792         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
793                        (const struct dt_key *)name, th);
794         if (rc != 0)
795                 GOTO(stop, rc);
796
797         dt_write_lock(env, parent, 0);
798         /* 7b. increase parent nlink */
799         rc = dt_ref_add(env, parent, th);
800         dt_write_unlock(env, parent);
801         if (rc != 0)
802                 GOTO(stop, rc);
803
804         bk->lb_lpf_fid = *cfid;
805         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
806
807         /* 8b. update bookmark */
808         rc = dt_record_write(env, bk_obj,
809                              lfsck_buf_get(env, bk, len), &pos, th);
810
811         GOTO(stop, rc);
812
813 unlock:
814         dt_write_unlock(env, child);
815
816 stop:
817         dt_trans_stop(env, dev, th);
818
819         return rc;
820 }
821
822 static int lfsck_create_lpf_remote(const struct lu_env *env,
823                                    struct lfsck_instance *lfsck,
824                                    struct dt_object *child,
825                                    struct lu_attr *la,
826                                    struct dt_object_format *dof,
827                                    const char *name)
828 {
829         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
830         struct dt_object        *parent = lfsck->li_lpf_root_obj;
831         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
832         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
833         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
834         struct thandle          *th     = NULL;
835         struct linkea_data       ldata  = { NULL };
836         struct lu_buf            linkea_buf;
837         const struct lu_name    *cname;
838         struct dt_device        *dev;
839         loff_t                   pos    = 0;
840         int                      len    = sizeof(struct lfsck_bookmark);
841         int                      rc;
842         ENTRY;
843
844         cname = lfsck_name_get_const(env, name, strlen(name));
845         rc = linkea_links_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf2,
846                               cname, lfsck_dto2fid(parent));
847         if (rc != 0)
848                 RETURN(rc);
849
850         /* Create .lustre/lost+found/MDTxxxx. */
851
852         /* XXX: Currently, cross-MDT create operation needs to create the child
853          *      object firstly, then insert name into the parent directory. For
854          *      this case, the child object resides on current MDT (local), but
855          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
856          *      easy to contain all the sub-modifications orderly within single
857          *      transaction.
858          *
859          *      To avoid more inconsistency, we split the create operation into
860          *      two transactions:
861          *
862          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
863          *         locally.
864          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
865          *         remotely.
866          *
867          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
868          *      repair such inconsistency when LFSCK run next time. */
869
870         /* Transaction I: locally */
871
872         dev = lfsck_obj2dev(child);
873         th = lfsck_trans_create(env, dev, lfsck);
874         if (IS_ERR(th))
875                 RETURN(PTR_ERR(th));
876
877         /* 1a. create child */
878         rc = dt_declare_create(env, child, la, NULL, dof, th);
879         if (rc != 0)
880                 GOTO(stop, rc);
881
882         if (!dt_try_as_dir(env, child, false))
883                 GOTO(stop, rc = -ENOTDIR);
884
885         /* 2a. increase child nlink */
886         rc = dt_declare_ref_add(env, child, th);
887         if (rc != 0)
888                 GOTO(stop, rc);
889
890         /* 3a. insert dot into child dir */
891         rec->rec_type = S_IFDIR;
892         rec->rec_fid = cfid;
893         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
894                                (const struct dt_key *)dot, th);
895         if (rc != 0)
896                 GOTO(stop, rc);
897
898         /* 4a. insert dotdot into child dir */
899         rec->rec_fid = &LU_LPF_FID;
900         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
901                                (const struct dt_key *)dotdot, th);
902         if (rc != 0)
903                 GOTO(stop, rc);
904
905         /* 5a. insert linkEA for child */
906         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
907                        ldata.ld_leh->leh_len);
908         rc = dt_declare_xattr_set(env, child, &linkea_buf,
909                                   XATTR_NAME_LINK, 0, th);
910         if (rc != 0)
911                 GOTO(stop, rc);
912
913         /* 6a. update bookmark */
914         rc = dt_declare_record_write(env, bk_obj,
915                                      lfsck_buf_get(env, bk, len), 0, th);
916         if (rc != 0)
917                 GOTO(stop, rc);
918
919         rc = dt_trans_start_local(env, dev, th);
920         if (rc != 0)
921                 GOTO(stop, rc);
922
923         dt_write_lock(env, child, 0);
924         /* 1b. create child */
925         rc = dt_create(env, child, la, NULL, dof, th);
926         if (rc != 0)
927                 GOTO(unlock, rc);
928
929         /* 2b. increase child nlink */
930         rc = dt_ref_add(env, child, th);
931         if (rc != 0)
932                 GOTO(unlock, rc);
933
934         /* 3b. insert dot into child dir */
935         rec->rec_type = S_IFDIR;
936         rec->rec_fid = cfid;
937         rc = dt_insert(env, child, (const struct dt_rec *)rec,
938                        (const struct dt_key *)dot, th);
939         if (rc != 0)
940                 GOTO(unlock, rc);
941
942         /* 4b. insert dotdot into child dir */
943         rec->rec_fid = &LU_LPF_FID;
944         rc = dt_insert(env, child, (const struct dt_rec *)rec,
945                        (const struct dt_key *)dotdot, th);
946         if (rc != 0)
947                 GOTO(unlock, rc);
948
949         /* 5b. insert linkEA for child */
950         rc = dt_xattr_set(env, child, &linkea_buf,
951                           XATTR_NAME_LINK, 0, th);
952         if (rc != 0)
953                 GOTO(unlock, rc);
954
955         bk->lb_lpf_fid = *cfid;
956         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
957
958         /* 6b. update bookmark */
959         rc = dt_record_write(env, bk_obj,
960                              lfsck_buf_get(env, bk, len), &pos, th);
961
962         dt_write_unlock(env, child);
963         dt_trans_stop(env, dev, th);
964         if (rc != 0)
965                 RETURN(rc);
966
967         /* Transaction II: remotely */
968
969         dev = lfsck_obj2dev(parent);
970         th = lfsck_trans_create(env, dev, lfsck);
971         if (IS_ERR(th))
972                 RETURN(PTR_ERR(th));
973
974         th->th_sync = 1;
975         /* 5a. insert name into parent dir */
976         rec->rec_fid = cfid;
977         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
978                                (const struct dt_key *)name, th);
979         if (rc != 0)
980                 GOTO(stop, rc);
981
982         /* 6a. increase parent nlink */
983         rc = dt_declare_ref_add(env, parent, th);
984         if (rc != 0)
985                 GOTO(stop, rc);
986
987         rc = dt_trans_start_local(env, dev, th);
988         if (rc != 0)
989                 GOTO(stop, rc);
990
991         /* 5b. insert name into parent dir */
992         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
993                        (const struct dt_key *)name, th);
994         if (rc != 0)
995                 GOTO(stop, rc);
996
997         dt_write_lock(env, parent, 0);
998         /* 6b. increase parent nlink */
999         rc = dt_ref_add(env, parent, th);
1000         dt_write_unlock(env, parent);
1001
1002         GOTO(stop, rc);
1003
1004 unlock:
1005         dt_write_unlock(env, child);
1006 stop:
1007         dt_trans_stop(env, dev, th);
1008
1009         if (rc != 0 && dev == lfsck_obj2dev(parent))
1010                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
1011                        "for orphans, but failed to insert the name %s "
1012                        "to the .lustre/lost+found/. Such inconsistency "
1013                        "will be repaired when LFSCK run next time: rc = %d\n",
1014                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
1015
1016         return rc;
1017 }
1018
1019 /**
1020  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
1021  *
1022  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
1023  * orphans and other uncertain inconsistent objects found during the
1024  * LFSCK. Such directory will be created by the LFSCK engine on the
1025  * local MDT before the LFSCK scanning.
1026  *
1027  * \param[in] env       pointer to the thread context
1028  * \param[in] lfsck     pointer to the lfsck instance
1029  *
1030  * \retval              0 for success
1031  * \retval              negative error number on failure
1032  */
1033 static int lfsck_create_lpf(const struct lu_env *env,
1034                             struct lfsck_instance *lfsck)
1035 {
1036         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
1037         struct lfsck_thread_info *info  = lfsck_env_info(env);
1038         struct lu_fid            *cfid  = &info->lti_fid2;
1039         struct lu_attr           *la    = &info->lti_la;
1040         struct dt_object_format  *dof   = &info->lti_dof;
1041         struct dt_object         *parent = lfsck->li_lpf_root_obj;
1042         struct dt_object         *child = NULL;
1043         struct lfsck_lock_handle *llh   = &info->lti_llh;
1044         char                      name[8];
1045         int                       node  = lfsck_dev_idx(lfsck);
1046         int                       rc    = 0;
1047         ENTRY;
1048
1049         LASSERT(lfsck->li_master);
1050         LASSERT(parent != NULL);
1051         LASSERT(lfsck->li_lpf_obj == NULL);
1052
1053         snprintf(name, 8, "MDT%04x", node);
1054         rc = lfsck_lock(env, lfsck, parent, name, llh,
1055                         MDS_INODELOCK_UPDATE, LCK_PW);
1056         if (rc != 0)
1057                 RETURN(rc);
1058
1059         if (fid_is_zero(&bk->lb_lpf_fid)) {
1060                 /* There is corner case that: in former LFSCK scanning we have
1061                  * created the .lustre/lost+found/MDTxxxx but failed to update
1062                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1063                  * it from MDT0 firstly. */
1064                 rc = dt_lookup_dir(env, parent, name, cfid);
1065                 if (rc != 0 && rc != -ENOENT)
1066                         GOTO(unlock, rc);
1067
1068                 if (rc == 0) {
1069                         bk->lb_lpf_fid = *cfid;
1070                         rc = lfsck_bookmark_store(env, lfsck);
1071                 } else {
1072                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1073                 }
1074                 if (rc != 0)
1075                         GOTO(unlock, rc);
1076         } else {
1077                 *cfid = bk->lb_lpf_fid;
1078         }
1079
1080         child = lfsck_object_find_bottom_new(env, lfsck, cfid);
1081         if (IS_ERR(child))
1082                 GOTO(unlock, rc = PTR_ERR(child));
1083
1084         if (dt_object_exists(child)) {
1085                 if (unlikely(!dt_try_as_dir(env, child, true)))
1086                         rc = -ENOTDIR;
1087                 else
1088                         lfsck->li_lpf_obj = child;
1089
1090                 GOTO(unlock, rc);
1091         }
1092
1093         memset(la, 0, sizeof(*la));
1094         la->la_atime = la->la_mtime = la->la_ctime = ktime_get_real_seconds();
1095         la->la_mode = S_IFDIR | S_IRWXU;
1096         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1097                        LA_UID | LA_GID | LA_TYPE;
1098         memset(dof, 0, sizeof(*dof));
1099         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1100
1101         if (node == 0)
1102                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1103         else
1104                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1105         if (rc == 0)
1106                 lfsck->li_lpf_obj = child;
1107
1108         GOTO(unlock, rc);
1109
1110 unlock:
1111         lfsck_unlock(llh);
1112         if (rc != 0 && child != NULL && !IS_ERR(child))
1113                 lfsck_object_put(env, child);
1114
1115         return rc;
1116 }
1117
1118 /**
1119  * Scan .lustre/lost+found for bad name entries and remove them.
1120  *
1121  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1122  * index in the system. Any other formatted name is invalid and should be
1123  * removed.
1124  *
1125  * \param[in] env       pointer to the thread context
1126  * \param[in] lfsck     pointer to the lfsck instance
1127  *
1128  * \retval              0 for success
1129  * \retval              negative error number on failure
1130  */
1131 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1132                                       struct lfsck_instance *lfsck)
1133 {
1134         struct dt_object        *parent = lfsck->li_lpf_root_obj;
1135         struct lu_dirent        *ent    =
1136                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1137         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
1138         struct dt_it            *it;
1139         int                      rc;
1140         ENTRY;
1141
1142         it = iops->init(env, parent, LUDA_64BITHASH);
1143         if (IS_ERR(it))
1144                 RETURN(PTR_ERR(it));
1145
1146         rc = iops->load(env, it, 0);
1147         if (rc == 0)
1148                 rc = iops->next(env, it);
1149         else if (rc > 0)
1150                 rc = 0;
1151
1152         while (rc == 0) {
1153                 int off = 3;
1154
1155                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1156                 if (rc != 0)
1157                         break;
1158
1159                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1160                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1161                         goto next;
1162
1163                 /* name length must be strlen("MDTxxxx") */
1164                 if (ent->lde_namelen != 7)
1165                         goto remove;
1166
1167                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1168                         goto remove;
1169
1170                 while (off < 7 && isxdigit(ent->lde_name[off]))
1171                         off++;
1172
1173                 if (off != 7) {
1174
1175 remove:
1176                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1177                                                          ent->lde_name);
1178                         if (rc != 0)
1179                                 break;
1180                 }
1181
1182 next:
1183                 rc = iops->next(env, it);
1184         }
1185
1186         iops->put(env, it);
1187         iops->fini(env, it);
1188
1189         RETURN(rc > 0 ? 0 : rc);
1190 }
1191
1192 static int lfsck_update_lpf_entry(const struct lu_env *env,
1193                                   struct lfsck_instance *lfsck,
1194                                   struct dt_object *parent,
1195                                   struct dt_object *child,
1196                                   const char *name,
1197                                   enum lfsck_verify_lpf_types type)
1198 {
1199         int rc;
1200
1201         if (type == LVLT_BY_BOOKMARK) {
1202                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1203                                              lfsck_dto2fid(child), S_IFDIR);
1204         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1205                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1206                 rc = lfsck_bookmark_store(env, lfsck);
1207
1208                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1209                        " in the bookmark file: rc = %d\n",
1210                        lfsck_lfsck2name(lfsck),
1211                        PFID(lfsck_dto2fid(child)), rc);
1212         }
1213
1214         return rc;
1215 }
1216
1217 /**
1218  * Check whether the @child back references the @parent.
1219  *
1220  * Two cases:
1221  * 1) The child's FID is stored in the bookmark file. If the child back
1222  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1223  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1224  *    the child back references another parent2, then:
1225  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1226  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1227  *      references the child. So keep them there. As the LFSCK processing,
1228  *      the parent3 may be found, then when the LFSCK run next time, the
1229  *      inconsistency can be repaired.
1230  *
1231  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1232  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1233  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1234  *    back references another parent2, then:
1235  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1236  *      from .lustre/lost+found/;
1237  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1238  *      sub-directory name entry and update the child;
1239  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1240  *      or not, then keep them there.
1241  *
1242  * \param[in] env       pointer to the thread context
1243  * \param[in] lfsck     pointer to the lfsck instance
1244  * \param[in] child     pointer to the lost+found sub-directory object
1245  * \param[in] name      the name for lost+found sub-directory object
1246  * \param[out] fid      pointer to the buffer to hold the FID of the object
1247  *                      (called it as parent2) that is referenced via the
1248  *                      child's dotdot entry; it also can be the FID that
1249  *                      is referenced by the name entry under the parent2.
1250  * \param[in] type      to indicate where the child's FID is stored in
1251  *
1252  * \retval              positive number for uncertain inconsistency
1253  * \retval              0 for success
1254  * \retval              negative error number on failure
1255  */
1256 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1257                                   struct lfsck_instance *lfsck,
1258                                   struct dt_object *child, const char *name,
1259                                   struct lu_fid *fid,
1260                                   enum lfsck_verify_lpf_types type)
1261 {
1262         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1263         struct lfsck_thread_info *info    = lfsck_env_info(env);
1264         char                     *name2   = info->lti_key;
1265         struct lu_fid            *fid2    = &info->lti_fid3;
1266         struct dt_object         *parent2 = NULL;
1267         struct lustre_handle      lh      = { 0 };
1268         int                       rc;
1269         ENTRY;
1270
1271         fid_zero(fid);
1272         rc = dt_lookup_dir(env, child, dotdot, fid);
1273         if (rc != 0)
1274                 GOTO(linkea, rc);
1275
1276         if (!fid_is_sane(fid))
1277                 GOTO(linkea, rc = -EINVAL);
1278
1279         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1280                 const struct lu_name *cname;
1281
1282                 if (lfsck->li_lpf_obj == NULL) {
1283                         lu_object_get(&child->do_lu);
1284                         lfsck->li_lpf_obj = child;
1285                 }
1286
1287                 cname = lfsck_name_get_const(env, name, strlen(name));
1288                 rc = lfsck_verify_linkea(env, lfsck, child, cname, &LU_LPF_FID);
1289                 if (rc == 0)
1290                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1291                                                     name, type);
1292
1293                 GOTO(out_done, rc);
1294         }
1295
1296         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1297         if (IS_ERR(parent2))
1298                 GOTO(linkea, parent2);
1299
1300         if (!dt_try_as_dir(env, parent2, true)) {
1301                 lfsck_object_put(env, parent2);
1302
1303                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1304         }
1305
1306 linkea:
1307         /* To prevent rename/unlink race */
1308         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1309                               MDS_INODELOCK_UPDATE, LCK_PR);
1310         if (rc != 0)
1311                 GOTO(out_put, rc);
1312
1313         dt_read_lock(env, child, 0);
1314         rc = lfsck_links_get_first(env, child, name2, fid2);
1315         if (rc != 0) {
1316                 dt_read_unlock(env, child);
1317                 lfsck_ibits_unlock(&lh, LCK_PR);
1318
1319                 GOTO(out_put, rc = 1);
1320         }
1321
1322         /* It is almost impossible that the bookmark file (or the name entry)
1323          * and the linkEA hit the same data corruption. Trust the linkEA. */
1324         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1325                 dt_read_unlock(env, child);
1326                 lfsck_ibits_unlock(&lh, LCK_PR);
1327
1328                 *fid = *fid2;
1329                 if (lfsck->li_lpf_obj == NULL) {
1330                         lu_object_get(&child->do_lu);
1331                         lfsck->li_lpf_obj = child;
1332                 }
1333
1334                 /* Update the child's dotdot entry */
1335                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1336                                              &LU_LPF_FID, S_IFDIR);
1337                 if (rc == 0)
1338                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1339                                                     name, type);
1340
1341                 GOTO(out_put, rc);
1342         }
1343
1344         if (parent2 == NULL || IS_ERR(parent2)) {
1345                 dt_read_unlock(env, child);
1346                 lfsck_ibits_unlock(&lh, LCK_PR);
1347
1348                 GOTO(out_done, rc = 1);
1349         }
1350
1351         rc = dt_lookup_dir(env, parent2, name2, fid);
1352         dt_read_unlock(env, child);
1353         lfsck_ibits_unlock(&lh, LCK_PR);
1354         if (rc != 0 && rc != -ENOENT)
1355                 GOTO(out_put, rc);
1356
1357         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1358                 if (type == LVLT_BY_BOOKMARK)
1359                         GOTO(out_put, rc = 1);
1360
1361                 /* Trust the name entry, update the child's dotdot entry. */
1362                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1363                                              &LU_LPF_FID, S_IFDIR);
1364
1365                 GOTO(out_put, rc);
1366         }
1367
1368         if (type == LVLT_BY_BOOKMARK) {
1369                 /* Invalid FID record in the bookmark file, reset it. */
1370                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1371                 rc = lfsck_bookmark_store(env, lfsck);
1372
1373                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1374                        " in the bookmark file: rc = %d\n",
1375                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1376         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1377                 /* The name entry is wrong, remove it. */
1378                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1379         }
1380
1381         GOTO(out_put, rc);
1382
1383 out_put:
1384         if (parent2 != NULL && !IS_ERR(parent2))
1385                 lfsck_object_put(env, parent2);
1386
1387 out_done:
1388         return rc;
1389 }
1390
1391 /**
1392  * Verify the /ROOT/.lustre/lost+found/ directory.
1393  *
1394  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1395  * the LFSCK does not exactly know how to handle, such as orphans. So before
1396  * the LFSCK scanning the system, the consistency of such directory needs to
1397  * be verified firstly to allow the users to use it during the LFSCK.
1398  *
1399  * \param[in] env       pointer to the thread context
1400  * \param[in] lfsck     pointer to the lfsck instance
1401  *
1402  * \retval              positive number for uncertain inconsistency
1403  * \retval              0 for success
1404  * \retval              negative error number on failure
1405  */
1406 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1407 {
1408         struct lfsck_thread_info *info   = lfsck_env_info(env);
1409         struct lu_fid            *pfid   = &info->lti_fid;
1410         struct lu_fid            *cfid   = &info->lti_fid2;
1411         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1412         struct dt_object         *parent;
1413         /* child1's FID is in the bookmark file. */
1414         struct dt_object         *child1 = NULL;
1415         /* child2's FID is in the name entry MDTxxxx. */
1416         struct dt_object         *child2 = NULL;
1417         const struct lu_name     *cname;
1418         char                      name[8];
1419         int                       node   = lfsck_dev_idx(lfsck);
1420         int                       rc     = 0;
1421         ENTRY;
1422
1423         LASSERT(lfsck->li_master);
1424
1425         if (lfsck_is_dryrun(lfsck))
1426                 RETURN(0);
1427
1428         if (lfsck->li_lpf_root_obj != NULL)
1429                 RETURN(0);
1430
1431         if (node == 0) {
1432                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1433                                                   &LU_LPF_FID);
1434         } else {
1435                 struct lfsck_tgt_desc *ltd;
1436
1437                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1438                 if (unlikely(ltd == NULL))
1439                         RETURN(-ENXIO);
1440
1441                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1442                                                   &LU_LPF_FID);
1443                 lfsck_tgt_put(ltd);
1444         }
1445
1446         if (IS_ERR(parent))
1447                 RETURN(PTR_ERR(parent));
1448
1449         LASSERT(dt_object_exists(parent));
1450
1451         if (unlikely(!dt_try_as_dir(env, parent, true))) {
1452                 lfsck_object_put(env, parent);
1453
1454                 GOTO(put, rc = -ENOTDIR);
1455         }
1456
1457         lfsck->li_lpf_root_obj = parent;
1458         if (node == 0) {
1459                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1460                 if (rc != 0)
1461                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1462                                "for bad sub-directories: rc = %d\n",
1463                                lfsck_lfsck2name(lfsck), rc);
1464         }
1465
1466         /* child2 */
1467         snprintf(name, 8, "MDT%04x", node);
1468         rc = dt_lookup_dir(env, parent, name, cfid);
1469         if (rc == -ENOENT) {
1470                 rc = 0;
1471                 goto find_child1;
1472         }
1473
1474         if (rc != 0)
1475                 GOTO(put, rc);
1476
1477         /* Invalid FID in the name entry, remove the name entry. */
1478         if (!fid_is_norm(cfid)) {
1479                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1480                 if (rc != 0)
1481                         GOTO(put, rc);
1482
1483                 goto find_child1;
1484         }
1485
1486         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1487         if (IS_ERR(child2))
1488                 GOTO(put, rc = PTR_ERR(child2));
1489
1490         if (unlikely(!dt_object_exists(child2) ||
1491                      dt_object_remote(child2)) ||
1492                      !S_ISDIR(lfsck_object_type(child2))) {
1493                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1494                 if (rc != 0)
1495                         GOTO(put, rc);
1496
1497                 goto find_child1;
1498         }
1499
1500         if (unlikely(!dt_try_as_dir(env, child2, true)))
1501                 GOTO(put, rc = -ENOTDIR);
1502
1503 find_child1:
1504         if (fid_is_zero(&bk->lb_lpf_fid))
1505                 goto check_child2;
1506
1507         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1508                 if (lfsck->li_lpf_obj == NULL) {
1509                         lu_object_get(&child2->do_lu);
1510                         lfsck->li_lpf_obj = child2;
1511                 }
1512
1513                 cname = lfsck_name_get_const(env, name, strlen(name));
1514                 rc = lfsck_verify_linkea(env, lfsck, child2, cname,
1515                                          &LU_LPF_FID);
1516
1517                 GOTO(put, rc);
1518         }
1519
1520         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1521                 struct lu_fid tfid = bk->lb_lpf_fid;
1522
1523                 /* Invalid FID record in the bookmark file, reset it. */
1524                 fid_zero(&bk->lb_lpf_fid);
1525                 rc = lfsck_bookmark_store(env, lfsck);
1526
1527                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1528                        " in the bookmark file: rc = %d\n",
1529                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1530
1531                 if (rc != 0)
1532                         GOTO(put, rc);
1533
1534                 goto check_child2;
1535         }
1536
1537         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1538         if (IS_ERR(child1)) {
1539                 child1 = NULL;
1540                 goto check_child2;
1541         }
1542
1543         if (unlikely(!dt_object_exists(child1) ||
1544                      dt_object_remote(child1)) ||
1545                      !S_ISDIR(lfsck_object_type(child1))) {
1546                 /* Invalid FID record in the bookmark file, reset it. */
1547                 fid_zero(&bk->lb_lpf_fid);
1548                 rc = lfsck_bookmark_store(env, lfsck);
1549
1550                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1551                        " in the bookmark file: rc = %d\n",
1552                        lfsck_lfsck2name(lfsck),
1553                        PFID(lfsck_dto2fid(child1)), rc);
1554
1555                 if (rc != 0)
1556                         GOTO(put, rc);
1557
1558                 lfsck_object_put(env, child1);
1559                 child1 = NULL;
1560                 goto check_child2;
1561         }
1562
1563         if (unlikely(!dt_try_as_dir(env, child1, true))) {
1564                 lfsck_object_put(env, child1);
1565                 child1 = NULL;
1566                 rc = -ENOTDIR;
1567                 goto check_child2;
1568         }
1569
1570         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1571                                     LVLT_BY_BOOKMARK);
1572         if (lu_fid_eq(pfid, &LU_LPF_FID))
1573                 GOTO(put, rc);
1574
1575 check_child2:
1576         if (child2 != NULL)
1577                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1578                                             pfid, LVLT_BY_NAMEENTRY);
1579
1580         GOTO(put, rc);
1581
1582 put:
1583         if (lfsck->li_lpf_obj != NULL) {
1584                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj, true))) {
1585                         lfsck_object_put(env, lfsck->li_lpf_obj);
1586                         lfsck->li_lpf_obj = NULL;
1587                         rc = -ENOTDIR;
1588                 }
1589         } else if (rc == 0) {
1590                 rc = lfsck_create_lpf(env, lfsck);
1591         }
1592
1593         if (child2 != NULL && !IS_ERR(child2))
1594                 lfsck_object_put(env, child2);
1595         if (child1 != NULL && !IS_ERR(child1))
1596                 lfsck_object_put(env, child1);
1597
1598         return rc;
1599 }
1600
1601 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1602 {
1603         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1604         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1605         char                    *prefix;
1606         int                      rc     = 0;
1607         ENTRY;
1608
1609         if (unlikely(ss == NULL))
1610                 RETURN(-ENXIO);
1611
1612         OBD_ALLOC_PTR(lfsck->li_seq);
1613         if (lfsck->li_seq == NULL)
1614                 RETURN(-ENOMEM);
1615
1616         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1617         if (prefix == NULL)
1618                 GOTO(out, rc = -ENOMEM);
1619
1620         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1621         seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1622                              ss->ss_server_seq);
1623         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1624
1625         if (fid_is_sane(&bk->lb_last_fid))
1626                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1627
1628         RETURN(0);
1629
1630 out:
1631         OBD_FREE_PTR(lfsck->li_seq);
1632         lfsck->li_seq = NULL;
1633
1634         return rc;
1635 }
1636
1637 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1638 {
1639         if (lfsck->li_seq != NULL) {
1640                 seq_client_fini(lfsck->li_seq);
1641                 OBD_FREE_PTR(lfsck->li_seq);
1642                 lfsck->li_seq = NULL;
1643         }
1644 }
1645
1646 void lfsck_instance_cleanup(const struct lu_env *env,
1647                             struct lfsck_instance *lfsck)
1648 {
1649         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1650         struct lfsck_component  *com;
1651         struct lfsck_component  *next;
1652         struct lfsck_lmv_unit   *llu;
1653         struct lfsck_lmv_unit   *llu_next;
1654         struct lfsck_lmv        *llmv;
1655         ENTRY;
1656
1657         LASSERT(list_empty(&lfsck->li_link));
1658         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1659
1660         if (lfsck->li_obj_oit != NULL) {
1661                 lfsck_object_put(env, lfsck->li_obj_oit);
1662                 lfsck->li_obj_oit = NULL;
1663         }
1664
1665         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1666                 llmv = &llu->llu_lmv;
1667
1668                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1669                          "still in using: %u\n",
1670                          atomic_read(&llmv->ll_ref));
1671
1672                 lfsck_lmv_put(env, llmv);
1673         }
1674
1675         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1676                 lfsck_component_cleanup(env, com);
1677         }
1678
1679         LASSERT(list_empty(&lfsck->li_list_dir));
1680
1681         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1682                                  lc_link) {
1683                 lfsck_component_cleanup(env, com);
1684         }
1685
1686         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1687                 lfsck_component_cleanup(env, com);
1688         }
1689
1690         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1691         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1692
1693         if (lfsck->li_lfsck_dir != NULL) {
1694                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1695                 lfsck->li_lfsck_dir = NULL;
1696         }
1697
1698         if (lfsck->li_bookmark_obj != NULL) {
1699                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1700                 lfsck->li_bookmark_obj = NULL;
1701         }
1702
1703         if (lfsck->li_lpf_obj != NULL) {
1704                 lfsck_object_put(env, lfsck->li_lpf_obj);
1705                 lfsck->li_lpf_obj = NULL;
1706         }
1707
1708         if (lfsck->li_lpf_root_obj != NULL) {
1709                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1710                 lfsck->li_lpf_root_obj = NULL;
1711         }
1712
1713         if (lfsck->li_los != NULL) {
1714                 local_oid_storage_fini(env, lfsck->li_los);
1715                 lfsck->li_los = NULL;
1716         }
1717
1718         lfsck_fid_fini(lfsck);
1719
1720         OBD_FREE_PTR(lfsck);
1721 }
1722
1723 static inline struct lfsck_instance *
1724 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1725 {
1726         struct lfsck_instance *lfsck;
1727
1728         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1729                 if (lfsck->li_bottom == key) {
1730                         if (ref)
1731                                 lfsck_instance_get(lfsck);
1732                         if (unlink)
1733                                 list_del_init(&lfsck->li_link);
1734
1735                         return lfsck;
1736                 }
1737         }
1738
1739         return NULL;
1740 }
1741
1742 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1743                                            bool unlink)
1744 {
1745         struct lfsck_instance *lfsck;
1746
1747         spin_lock(&lfsck_instance_lock);
1748         lfsck = __lfsck_instance_find(key, ref, unlink);
1749         spin_unlock(&lfsck_instance_lock);
1750
1751         return lfsck;
1752 }
1753
1754 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1755 {
1756         struct lfsck_instance *tmp;
1757
1758         spin_lock(&lfsck_instance_lock);
1759         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1760                 if (lfsck->li_bottom == tmp->li_bottom) {
1761                         spin_unlock(&lfsck_instance_lock);
1762                         return -EEXIST;
1763                 }
1764         }
1765
1766         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1767         spin_unlock(&lfsck_instance_lock);
1768         return 0;
1769 }
1770
1771 void lfsck_bits_dump(struct seq_file *m, int bits, const char *const names[],
1772                      const char *prefix)
1773 {
1774         int flag;
1775         int i;
1776         bool newline = (bits != 0 ? false : true);
1777
1778         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1779
1780         for (i = 0, flag = 1; bits != 0; i++, flag = BIT(i)) {
1781                 if (flag & bits) {
1782                         bits &= ~flag;
1783                         if (names[i] != NULL) {
1784                                 if (bits == 0)
1785                                         newline = true;
1786
1787                                 seq_printf(m, "%s%c", names[i],
1788                                            newline ? '\n' : ',');
1789                         }
1790                 }
1791         }
1792
1793         if (!newline)
1794                 seq_putc(m, '\n');
1795 }
1796
1797 void lfsck_time_dump(struct seq_file *m, time64_t time, const char *name)
1798 {
1799         if (time == 0) {
1800                 seq_printf(m, "%s_time: N/A\n", name);
1801                 seq_printf(m, "time_since_%s: N/A\n", name);
1802         } else {
1803                 seq_printf(m, "%s_time: %lld\n", name, time);
1804                 seq_printf(m, "time_since_%s: %lld seconds\n",
1805                            name, ktime_get_real_seconds() - time);
1806         }
1807 }
1808
1809 void lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1810                     const char *prefix)
1811 {
1812         if (fid_is_zero(&pos->lp_dir_parent)) {
1813                 if (pos->lp_oit_cookie == 0) {
1814                         seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1815                         return;
1816                 }
1817                 seq_printf(m, "%s: %llu, N/A, N/A\n",
1818                            prefix, pos->lp_oit_cookie);
1819         } else {
1820                 seq_printf(m, "%s: %llu, "DFID", %#llx\n",
1821                            prefix, pos->lp_oit_cookie,
1822                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1823         }
1824 }
1825
1826 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1827                     struct lfsck_position *pos, bool init)
1828 {
1829         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1830
1831         if (unlikely(lfsck->li_di_oit == NULL)) {
1832                 memset(pos, 0, sizeof(*pos));
1833                 return;
1834         }
1835
1836         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1837         if (!lfsck->li_current_oit_processed && !init)
1838                 pos->lp_oit_cookie--;
1839
1840         if (unlikely(pos->lp_oit_cookie == 0))
1841                 pos->lp_oit_cookie = 1;
1842
1843         spin_lock(&lfsck->li_lock);
1844         if (lfsck->li_di_dir != NULL) {
1845                 struct dt_object *dto = lfsck->li_obj_dir;
1846
1847                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1848                                                         lfsck->li_di_dir);
1849
1850                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1851                         fid_zero(&pos->lp_dir_parent);
1852                         pos->lp_dir_cookie = 0;
1853                 } else {
1854                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1855                 }
1856         } else {
1857                 fid_zero(&pos->lp_dir_parent);
1858                 pos->lp_dir_cookie = 0;
1859         }
1860         spin_unlock(&lfsck->li_lock);
1861 }
1862
1863 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1864 {
1865         bool dirty = false;
1866
1867         if (limit != LFSCK_SPEED_NO_LIMIT) {
1868                 if (limit > cfs_time_seconds(1)) {
1869                         lfsck->li_sleep_rate = limit / cfs_time_seconds(1);
1870                         lfsck->li_sleep_jif = 1;
1871                 } else {
1872                         lfsck->li_sleep_rate = 1;
1873                         lfsck->li_sleep_jif = cfs_time_seconds(1) / limit;
1874                 }
1875         } else {
1876                 lfsck->li_sleep_jif = 0;
1877                 lfsck->li_sleep_rate = 0;
1878         }
1879
1880         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1881                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1882                 dirty = true;
1883         }
1884
1885         return dirty;
1886 }
1887
1888 void lfsck_control_speed(struct lfsck_instance *lfsck)
1889 {
1890         struct ptlrpc_thread *thread = &lfsck->li_thread;
1891
1892         if (lfsck->li_sleep_jif > 0 &&
1893             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1894                 wait_event_idle_timeout(thread->t_ctl_waitq,
1895                                         !thread_is_running(thread),
1896                                         lfsck->li_sleep_jif);
1897                 lfsck->li_new_scanned = 0;
1898         }
1899 }
1900
1901 void lfsck_control_speed_by_self(struct lfsck_component *com)
1902 {
1903         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1904         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1905
1906         if (lfsck->li_sleep_jif > 0 &&
1907             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1908                 wait_event_idle_timeout(thread->t_ctl_waitq,
1909                                         !thread_is_running(thread),
1910                                         lfsck->li_sleep_jif);
1911                 com->lc_new_scanned = 0;
1912         }
1913 }
1914
1915 static struct lfsck_thread_args *
1916 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1917                        struct lfsck_component *com,
1918                        struct lfsck_start_param *lsp)
1919 {
1920         struct lfsck_thread_args *lta;
1921         int                       rc;
1922
1923         OBD_ALLOC_PTR(lta);
1924         if (lta == NULL)
1925                 return ERR_PTR(-ENOMEM);
1926
1927         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1928         if (rc != 0) {
1929                 OBD_FREE_PTR(lta);
1930                 return ERR_PTR(rc);
1931         }
1932
1933         lta->lta_lfsck = lfsck_instance_get(lfsck);
1934         if (com != NULL)
1935                 lta->lta_com = lfsck_component_get(com);
1936
1937         lta->lta_lsp = lsp;
1938
1939         return lta;
1940 }
1941
1942 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1943 {
1944         if (lta->lta_com != NULL)
1945                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1946         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1947         lu_env_fini(&lta->lta_env);
1948         OBD_FREE_PTR(lta);
1949 }
1950
1951 struct lfsck_assistant_data *
1952 lfsck_assistant_data_init(const struct lfsck_assistant_operations *lao,
1953                           const char *name)
1954 {
1955         struct lfsck_assistant_data *lad;
1956
1957         OBD_ALLOC_PTR(lad);
1958         if (lad != NULL) {
1959                 lad->lad_bitmap = bitmap_zalloc(BITS_PER_LONG, GFP_KERNEL);
1960                 if (lad->lad_bitmap == NULL) {
1961                         OBD_FREE_PTR(lad);
1962                         return NULL;
1963                 }
1964                 lad->lad_bitmap_count = BITS_PER_LONG;
1965
1966                 INIT_LIST_HEAD(&lad->lad_req_list);
1967                 spin_lock_init(&lad->lad_lock);
1968                 INIT_LIST_HEAD(&lad->lad_ost_list);
1969                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1970                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1971                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1972                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1973                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1974                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1975                 lad->lad_ops = lao;
1976                 lad->lad_name = name;
1977         }
1978
1979         return lad;
1980 }
1981
1982 struct lfsck_assistant_object *
1983 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1984                             const struct lu_attr *attr, __u64 cookie,
1985                             bool is_dir)
1986 {
1987         struct lfsck_assistant_object   *lso;
1988
1989         OBD_ALLOC_PTR(lso);
1990         if (lso == NULL)
1991                 return ERR_PTR(-ENOMEM);
1992
1993         lso->lso_fid = *fid;
1994         if (attr != NULL)
1995                 lso->lso_attr = *attr;
1996
1997         atomic_set(&lso->lso_ref, 1);
1998         lso->lso_oit_cookie = cookie;
1999         if (is_dir)
2000                 lso->lso_is_dir = 1;
2001
2002         return lso;
2003 }
2004
2005 struct dt_object *
2006 lfsck_assistant_object_load(const struct lu_env *env,
2007                             struct lfsck_instance *lfsck,
2008                             struct lfsck_assistant_object *lso)
2009 {
2010         struct dt_object *obj;
2011
2012         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2013         if (IS_ERR(obj))
2014                 return obj;
2015
2016         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2017                 lso->lso_dead = 1;
2018                 lfsck_object_put(env, obj);
2019
2020                 return ERR_PTR(-ENOENT);
2021         }
2022
2023         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj, true))) {
2024                 lfsck_object_put(env, obj);
2025
2026                 return ERR_PTR(-ENOTDIR);
2027         }
2028
2029         return obj;
2030 }
2031
2032 /**
2033  * Generic LFSCK asynchronous communication interpretor function.
2034  * The LFSCK RPC reply for both the event notification and status
2035  * querying will be handled here.
2036  *
2037  * \param[in] env       pointer to the thread context
2038  * \param[in] req       pointer to the LFSCK request
2039  * \param[in] args      pointer to the lfsck_async_interpret_args
2040  * \param[in] rc        the result for handling the LFSCK request
2041  *
2042  * \retval              0 for success
2043  * \retval              negative error number on failure
2044  */
2045 int lfsck_async_interpret_common(const struct lu_env *env,
2046                                  struct ptlrpc_request *req,
2047                                  void *args, int rc)
2048 {
2049         struct lfsck_async_interpret_args *laia = args;
2050         struct lfsck_component            *com  = laia->laia_com;
2051         struct lfsck_assistant_data       *lad  = com->lc_data;
2052         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
2053         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
2054         struct lfsck_request              *lr   = laia->laia_lr;
2055
2056         LASSERT(com->lc_lfsck->li_master);
2057
2058         switch (lr->lr_event) {
2059         case LE_START:
2060                 if (unlikely(rc == -EINPROGRESS)) {
2061                         ltd->ltd_retry_start = 1;
2062                         break;
2063                 }
2064
2065                 if (rc != 0) {
2066                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2067                                "start: rc = %d\n",
2068                                lfsck_lfsck2name(com->lc_lfsck),
2069                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2070                                ltd->ltd_index, lad->lad_name, rc);
2071
2072                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2073                                 struct lfsck_layout *lo = com->lc_file_ram;
2074
2075                                 if (lr->lr_flags & LEF_TO_OST)
2076                                         lfsck_lad_set_bitmap(env, com,
2077                                                              ltd->ltd_index);
2078                                 else
2079                                         lo->ll_flags |= LF_INCOMPLETE;
2080                         } else {
2081                                 struct lfsck_namespace *ns = com->lc_file_ram;
2082
2083                                 /* If some MDT does not join the namespace
2084                                  * LFSCK, then we cannot know whether there
2085                                  * is some name entry on such MDT that with
2086                                  * the referenced MDT-object on this MDT or
2087                                  * not. So the namespace LFSCK on this MDT
2088                                  * cannot handle orphan MDT-objects properly.
2089                                  * So we mark the LFSCK as LF_INCOMPLETE and
2090                                  * skip orphan MDT-objects handling. */
2091                                 ns->ln_flags |= LF_INCOMPLETE;
2092                         }
2093                         break;
2094                 }
2095
2096                 spin_lock(&ltds->ltd_lock);
2097                 if (ltd->ltd_dead) {
2098                         spin_unlock(&ltds->ltd_lock);
2099                         break;
2100                 }
2101
2102                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2103                         struct list_head *list;
2104                         struct list_head *phase_list;
2105
2106                         if (ltd->ltd_layout_done) {
2107                                 spin_unlock(&ltds->ltd_lock);
2108                                 break;
2109                         }
2110
2111                         if (lr->lr_flags & LEF_TO_OST) {
2112                                 list = &lad->lad_ost_list;
2113                                 phase_list = &lad->lad_ost_phase1_list;
2114                         } else {
2115                                 list = &lad->lad_mdt_list;
2116                                 phase_list = &lad->lad_mdt_phase1_list;
2117                         }
2118
2119                         if (list_empty(&ltd->ltd_layout_list))
2120                                 list_add_tail(&ltd->ltd_layout_list, list);
2121                         if (list_empty(&ltd->ltd_layout_phase_list))
2122                                 list_add_tail(&ltd->ltd_layout_phase_list,
2123                                               phase_list);
2124                 } else {
2125                         if (ltd->ltd_namespace_done) {
2126                                 spin_unlock(&ltds->ltd_lock);
2127                                 break;
2128                         }
2129
2130                         if (list_empty(&ltd->ltd_namespace_list))
2131                                 list_add_tail(&ltd->ltd_namespace_list,
2132                                               &lad->lad_mdt_list);
2133                         if (list_empty(&ltd->ltd_namespace_phase_list))
2134                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2135                                               &lad->lad_mdt_phase1_list);
2136                 }
2137                 spin_unlock(&ltds->ltd_lock);
2138                 break;
2139         case LE_STOP:
2140         case LE_PHASE1_DONE:
2141         case LE_PHASE2_DONE:
2142         case LE_PEER_EXIT:
2143                 if (rc != 0 && rc != -EALREADY)
2144                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2145                               "event = %d, rc = %d\n",
2146                               lfsck_lfsck2name(com->lc_lfsck),
2147                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2148                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2149                 break;
2150         case LE_QUERY: {
2151                 struct lfsck_reply *reply;
2152                 struct list_head *list;
2153                 struct list_head *phase_list;
2154
2155                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2156                         list = &ltd->ltd_layout_list;
2157                         phase_list = &ltd->ltd_layout_phase_list;
2158                 } else {
2159                         list = &ltd->ltd_namespace_list;
2160                         phase_list = &ltd->ltd_namespace_phase_list;
2161                 }
2162
2163                 if (rc != 0) {
2164                         if (lr->lr_flags & LEF_QUERY_ALL) {
2165                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2166                                 break;
2167                         }
2168
2169                         spin_lock(&ltds->ltd_lock);
2170                         list_del_init(phase_list);
2171                         list_del_init(list);
2172                         spin_unlock(&ltds->ltd_lock);
2173                         break;
2174                 }
2175
2176                 reply = req_capsule_server_get(&req->rq_pill,
2177                                                &RMF_LFSCK_REPLY);
2178                 if (reply == NULL) {
2179                         rc = -EPROTO;
2180                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2181                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2182                                lad->lad_name, rc);
2183
2184                         if (lr->lr_flags & LEF_QUERY_ALL) {
2185                                 lfsck_reset_ltd_status(ltd, com->lc_type);
2186                                 break;
2187                         }
2188
2189                         spin_lock(&ltds->ltd_lock);
2190                         list_del_init(phase_list);
2191                         list_del_init(list);
2192                         spin_unlock(&ltds->ltd_lock);
2193                         break;
2194                 }
2195
2196                 if (lr->lr_flags & LEF_QUERY_ALL) {
2197                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2198                                 ltd->ltd_layout_status = reply->lr_status;
2199                                 ltd->ltd_layout_repaired = reply->lr_repaired;
2200                         } else {
2201                                 ltd->ltd_namespace_status = reply->lr_status;
2202                                 ltd->ltd_namespace_repaired =
2203                                                         reply->lr_repaired;
2204                         }
2205                         break;
2206                 }
2207
2208                 switch (reply->lr_status) {
2209                 case LS_SCANNING_PHASE1:
2210                         break;
2211                 case LS_SCANNING_PHASE2:
2212                         spin_lock(&ltds->ltd_lock);
2213                         list_del_init(phase_list);
2214                         if (ltd->ltd_dead) {
2215                                 spin_unlock(&ltds->ltd_lock);
2216                                 break;
2217                         }
2218
2219                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2220                                 if (ltd->ltd_layout_done) {
2221                                         spin_unlock(&ltds->ltd_lock);
2222                                         break;
2223                                 }
2224
2225                                 if (lr->lr_flags & LEF_TO_OST)
2226                                         list_add_tail(phase_list,
2227                                                 &lad->lad_ost_phase2_list);
2228                                 else
2229                                         list_add_tail(phase_list,
2230                                                 &lad->lad_mdt_phase2_list);
2231                         } else {
2232                                 if (ltd->ltd_namespace_done) {
2233                                         spin_unlock(&ltds->ltd_lock);
2234                                         break;
2235                                 }
2236
2237                                 list_add_tail(phase_list,
2238                                               &lad->lad_mdt_phase2_list);
2239                         }
2240                         spin_unlock(&ltds->ltd_lock);
2241                         break;
2242                 default:
2243                         spin_lock(&ltds->ltd_lock);
2244                         list_del_init(phase_list);
2245                         list_del_init(list);
2246                         spin_unlock(&ltds->ltd_lock);
2247                         break;
2248                 }
2249                 break;
2250         }
2251         default:
2252                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2253                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2254                 break;
2255         }
2256
2257         if (!laia->laia_shared) {
2258                 lfsck_tgt_put(ltd);
2259                 lfsck_component_put(env, com);
2260         }
2261
2262         return 0;
2263 }
2264
2265 static void lfsck_interpret(const struct lu_env *env,
2266                             struct lfsck_instance *lfsck,
2267                             struct ptlrpc_request *req, void *args, int result)
2268 {
2269         struct lfsck_async_interpret_args *laia = args;
2270         struct lfsck_component            *com;
2271
2272         LASSERT(laia->laia_com == NULL);
2273         LASSERT(laia->laia_shared);
2274
2275         spin_lock(&lfsck->li_lock);
2276         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2277                 laia->laia_com = com;
2278                 lfsck_async_interpret_common(env, req, laia, result);
2279         }
2280
2281         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2282                 laia->laia_com = com;
2283                 lfsck_async_interpret_common(env, req, laia, result);
2284         }
2285         spin_unlock(&lfsck->li_lock);
2286 }
2287
2288 static int lfsck_stop_notify(const struct lu_env *env,
2289                              struct lfsck_instance *lfsck,
2290                              struct lfsck_tgt_descs *ltds,
2291                              struct lfsck_tgt_desc *ltd, __u16 type)
2292 {
2293         struct lfsck_component *com;
2294         int                     rc = 0;
2295         ENTRY;
2296
2297         LASSERT(lfsck->li_master);
2298
2299         spin_lock(&lfsck->li_lock);
2300         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2301         if (com == NULL)
2302                 com = __lfsck_component_find(lfsck, type,
2303                                              &lfsck->li_list_double_scan);
2304         if (com != NULL)
2305                 lfsck_component_get(com);
2306         spin_unlock(&lfsck->li_lock);
2307
2308         if (com != NULL) {
2309                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2310                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2311                 struct lfsck_request              *lr    = &info->lti_lr;
2312                 struct lfsck_assistant_data       *lad   = com->lc_data;
2313                 struct list_head                  *list;
2314                 struct list_head                  *phase_list;
2315                 struct ptlrpc_request_set         *set;
2316
2317                 set = ptlrpc_prep_set();
2318                 if (set == NULL) {
2319                         lfsck_component_put(env, com);
2320
2321                         RETURN(-ENOMEM);
2322                 }
2323
2324                 if (type == LFSCK_TYPE_LAYOUT) {
2325                         list = &ltd->ltd_layout_list;
2326                         phase_list = &ltd->ltd_layout_phase_list;
2327                 } else {
2328                         list = &ltd->ltd_namespace_list;
2329                         phase_list = &ltd->ltd_namespace_phase_list;
2330                 }
2331
2332                 spin_lock(&ltds->ltd_lock);
2333                 if (list_empty(list)) {
2334                         LASSERT(list_empty(phase_list));
2335                         spin_unlock(&ltds->ltd_lock);
2336                         ptlrpc_set_destroy(set);
2337
2338                         RETURN(0);
2339                 }
2340
2341                 list_del_init(phase_list);
2342                 list_del_init(list);
2343                 spin_unlock(&ltds->ltd_lock);
2344
2345                 memset(lr, 0, sizeof(*lr));
2346                 lr->lr_index = lfsck_dev_idx(lfsck);
2347                 lr->lr_event = LE_PEER_EXIT;
2348                 lr->lr_active = type;
2349                 lr->lr_status = LS_CO_PAUSED;
2350                 if (ltds == &lfsck->li_ost_descs)
2351                         lr->lr_flags = LEF_TO_OST;
2352
2353                 memset(laia, 0, sizeof(*laia));
2354                 laia->laia_com = com;
2355                 laia->laia_ltds = ltds;
2356                 atomic_inc(&ltd->ltd_ref);
2357                 laia->laia_ltd = ltd;
2358                 laia->laia_lr = lr;
2359
2360                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2361                                          lfsck_async_interpret_common,
2362                                          laia, LFSCK_NOTIFY);
2363                 if (rc != 0) {
2364                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2365                                "co-stop for %s: rc = %d\n",
2366                                lfsck_lfsck2name(lfsck),
2367                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2368                                ltd->ltd_index, lad->lad_name, rc);
2369                         lfsck_tgt_put(ltd);
2370                 } else {
2371                         rc = ptlrpc_set_wait(env, set);
2372                 }
2373
2374                 ptlrpc_set_destroy(set);
2375                 lfsck_component_put(env, com);
2376         }
2377
2378         RETURN(rc);
2379 }
2380
2381 static int lfsck_async_interpret(const struct lu_env *env,
2382                                  struct ptlrpc_request *req,
2383                                  void *args, int rc)
2384 {
2385         struct lfsck_async_interpret_args *laia = args;
2386         struct lfsck_instance             *lfsck;
2387
2388         lfsck = container_of(laia->laia_ltds, struct lfsck_instance,
2389                              li_mdt_descs);
2390         lfsck_interpret(env, lfsck, req, laia, rc);
2391         lfsck_tgt_put(laia->laia_ltd);
2392         if (rc != 0 && laia->laia_result != -EALREADY)
2393                 laia->laia_result = rc;
2394
2395         return 0;
2396 }
2397
2398 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2399                         struct lfsck_request *lr,
2400                         struct ptlrpc_request_set *set,
2401                         ptlrpc_interpterer_t interpreter,
2402                         void *args, int request)
2403 {
2404         struct lfsck_async_interpret_args *laia;
2405         struct ptlrpc_request             *req;
2406         struct lfsck_request              *tmp;
2407         struct req_format                 *format;
2408         int                                rc;
2409
2410         switch (request) {
2411         case LFSCK_NOTIFY:
2412                 format = &RQF_LFSCK_NOTIFY;
2413                 break;
2414         case LFSCK_QUERY:
2415                 format = &RQF_LFSCK_QUERY;
2416                 break;
2417         default:
2418                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2419                        exp->exp_obd->obd_name, request, -EINVAL);
2420                 return -EINVAL;
2421         }
2422
2423         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2424         if (req == NULL)
2425                 return -ENOMEM;
2426
2427         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2428         if (rc != 0) {
2429                 ptlrpc_request_free(req);
2430
2431                 return rc;
2432         }
2433
2434         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2435         *tmp = *lr;
2436         ptlrpc_request_set_replen(req);
2437
2438         laia = ptlrpc_req_async_args(laia, req);
2439         *laia = *(struct lfsck_async_interpret_args *)args;
2440         if (laia->laia_com != NULL)
2441                 lfsck_component_get(laia->laia_com);
2442         req->rq_interpret_reply = interpreter;
2443         req->rq_allow_intr = 1;
2444         req->rq_no_delay = 1;
2445         ptlrpc_set_add_req(set, req);
2446
2447         return 0;
2448 }
2449
2450 int lfsck_query_all(const struct lu_env *env, struct lfsck_component *com)
2451 {
2452         struct lfsck_thread_info *info = lfsck_env_info(env);
2453         struct lfsck_request *lr = &info->lti_lr;
2454         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2455         struct lfsck_instance *lfsck = com->lc_lfsck;
2456         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2457         struct lfsck_tgt_desc *ltd;
2458         struct ptlrpc_request_set *set;
2459         int idx;
2460         int rc;
2461
2462         ENTRY;
2463         memset(lr, 0, sizeof(*lr));
2464         lr->lr_event = LE_QUERY;
2465         lr->lr_active = com->lc_type;
2466         lr->lr_flags = LEF_QUERY_ALL;
2467
2468         memset(laia, 0, sizeof(*laia));
2469         laia->laia_com = com;
2470         laia->laia_lr = lr;
2471
2472         set = ptlrpc_prep_set();
2473         if (set == NULL)
2474                 RETURN(-ENOMEM);
2475
2476 again:
2477         laia->laia_ltds = ltds;
2478         down_read(&ltds->ltd_rw_sem);
2479         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
2480                 ltd = lfsck_tgt_get(ltds, idx);
2481                 LASSERT(ltd != NULL);
2482
2483                 laia->laia_ltd = ltd;
2484                 up_read(&ltds->ltd_rw_sem);
2485                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2486                                          lfsck_async_interpret_common,
2487                                          laia, LFSCK_QUERY);
2488                 if (rc != 0) {
2489                         struct lfsck_assistant_data *lad = com->lc_data;
2490
2491                         CDEBUG(D_LFSCK, "%s: Fail to query %s %x for stat %s: "
2492                                "rc = %d\n", lfsck_lfsck2name(lfsck),
2493                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2494                                ltd->ltd_index, lad->lad_name, rc);
2495                         lfsck_reset_ltd_status(ltd, com->lc_type);
2496                         lfsck_tgt_put(ltd);
2497                 }
2498                 down_read(&ltds->ltd_rw_sem);
2499         }
2500         up_read(&ltds->ltd_rw_sem);
2501
2502         if (com->lc_type == LFSCK_TYPE_LAYOUT && !(lr->lr_flags & LEF_TO_OST)) {
2503                 ltds = &lfsck->li_ost_descs;
2504                 lr->lr_flags |= LEF_TO_OST;
2505                 goto again;
2506         }
2507
2508         rc = ptlrpc_set_wait(env, set);
2509         ptlrpc_set_destroy(set);
2510
2511         RETURN(rc);
2512 }
2513
2514 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2515                           struct lfsck_start_param *lsp)
2516 {
2517         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2518         struct lfsck_assistant_data     *lad     = com->lc_data;
2519         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2520         struct ptlrpc_thread            *athread = &lad->lad_thread;
2521         struct lfsck_thread_args        *lta;
2522         struct task_struct              *task;
2523         int                              rc;
2524         ENTRY;
2525
2526         lad->lad_assistant_status = 0;
2527         lad->lad_post_result = 0;
2528         lad->lad_flags = 0;
2529         lad->lad_advance_lock = false;
2530         thread_set_flags(athread, 0);
2531
2532         lta = lfsck_thread_args_init(lfsck, com, lsp);
2533         if (IS_ERR(lta))
2534                 RETURN(PTR_ERR(lta));
2535
2536         task = kthread_run(lfsck_assistant_engine, lta, "%s", lad->lad_name);
2537         if (IS_ERR(task)) {
2538                 rc = PTR_ERR(task);
2539                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2540                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2541                 lfsck_thread_args_fini(lta);
2542         } else {
2543                 wait_event_idle(mthread->t_ctl_waitq,
2544                                 thread_is_running(athread) ||
2545                                 thread_is_stopped(athread) ||
2546                                 !thread_is_starting(mthread));
2547                 if (unlikely(!thread_is_starting(mthread)))
2548                         /* stopped by race */
2549                         rc = -ESRCH;
2550                 else if (unlikely(!thread_is_running(athread)))
2551                         rc = lad->lad_assistant_status;
2552                 else
2553                         rc = 0;
2554         }
2555
2556         RETURN(rc);
2557 }
2558
2559 int lfsck_checkpoint_generic(const struct lu_env *env,
2560                              struct lfsck_component *com)
2561 {
2562         struct lfsck_assistant_data     *lad     = com->lc_data;
2563         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2564         struct ptlrpc_thread            *athread = &lad->lad_thread;
2565
2566         wait_event_idle(mthread->t_ctl_waitq,
2567                         list_empty(&lad->lad_req_list) ||
2568                         !thread_is_running(mthread) ||
2569                         thread_is_stopped(athread));
2570
2571         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2572                 return LFSCK_CHECKPOINT_SKIP;
2573
2574         return 0;
2575 }
2576
2577 void lfsck_post_generic(const struct lu_env *env,
2578                         struct lfsck_component *com, int *result)
2579 {
2580         struct lfsck_assistant_data     *lad     = com->lc_data;
2581         struct ptlrpc_thread            *athread = &lad->lad_thread;
2582         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2583
2584         lad->lad_post_result = *result;
2585         if (*result <= 0)
2586                 set_bit(LAD_EXIT, &lad->lad_flags);
2587         set_bit(LAD_TO_POST, &lad->lad_flags);
2588
2589         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2590                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2591
2592         wake_up(&athread->t_ctl_waitq);
2593         wait_event_idle(mthread->t_ctl_waitq,
2594                         (*result > 0 && list_empty(&lad->lad_req_list)) ||
2595                         thread_is_stopped(athread));
2596
2597         if (lad->lad_assistant_status < 0)
2598                 *result = lad->lad_assistant_status;
2599
2600         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2601                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2602 }
2603
2604 int lfsck_double_scan_generic(const struct lu_env *env,
2605                               struct lfsck_component *com, int status)
2606 {
2607         struct lfsck_assistant_data     *lad     = com->lc_data;
2608         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2609         struct ptlrpc_thread            *athread = &lad->lad_thread;
2610
2611         if (status != LS_SCANNING_PHASE2)
2612                 set_bit(LAD_EXIT, &lad->lad_flags);
2613         else
2614                 set_bit(LAD_TO_DOUBLE_SCAN, &lad->lad_flags);
2615
2616         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2617                "status %d\n",
2618                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2619
2620         wake_up(&athread->t_ctl_waitq);
2621         wait_event_idle(mthread->t_ctl_waitq,
2622                         test_bit(LAD_IN_DOUBLE_SCAN, &lad->lad_flags) ||
2623                         thread_is_stopped(athread));
2624
2625         CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2626                "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2627                lad->lad_assistant_status);
2628
2629         if (lad->lad_assistant_status < 0)
2630                 return lad->lad_assistant_status;
2631
2632         return 0;
2633 }
2634
2635 void lfsck_quit_generic(const struct lu_env *env,
2636                         struct lfsck_component *com)
2637 {
2638         struct lfsck_assistant_data     *lad     = com->lc_data;
2639         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2640         struct ptlrpc_thread            *athread = &lad->lad_thread;
2641
2642         set_bit(LAD_EXIT, &lad->lad_flags);
2643         wake_up(&athread->t_ctl_waitq);
2644         wait_event_idle(mthread->t_ctl_waitq,
2645                         thread_is_init(athread) ||
2646                         thread_is_stopped(athread));
2647 }
2648
2649 int lfsck_load_one_trace_file(const struct lu_env *env,
2650                               struct lfsck_component *com,
2651                               struct dt_object *parent,
2652                               struct dt_object **child,
2653                               const struct dt_index_features *ft,
2654                               const char *name, bool reset)
2655 {
2656         struct lfsck_instance *lfsck = com->lc_lfsck;
2657         struct dt_object *obj;
2658         int rc;
2659         ENTRY;
2660
2661         if (*child != NULL) {
2662                 struct dt_it *it;
2663                 const struct dt_it_ops *iops;
2664                 struct lu_fid *fid = &lfsck_env_info(env)->lti_fid3;
2665
2666                 if (!reset)
2667                         RETURN(0);
2668
2669                 obj = *child;
2670                 rc = obj->do_ops->do_index_try(env, obj, ft);
2671                 if (rc)
2672                         /* unlink by force */
2673                         goto unlink;
2674
2675                 iops = &obj->do_index_ops->dio_it;
2676                 it = iops->init(env, obj, 0);
2677                 if (IS_ERR(it))
2678                         /* unlink by force */
2679                         goto unlink;
2680
2681                 fid_zero(fid);
2682                 rc = iops->get(env, it, (const struct dt_key *)fid);
2683                 if (rc >= 0) {
2684                         rc = iops->next(env, it);
2685                         iops->put(env, it);
2686                 }
2687                 iops->fini(env, it);
2688                 if (rc > 0)
2689                         /* "rc > 0" means the index file is empty. */
2690                         RETURN(0);
2691
2692 unlink:
2693                 /* The old index is not empty, remove it firstly. */
2694                 rc = local_object_unlink(env, lfsck->li_bottom, parent, name);
2695                 CDEBUG_LIMIT(rc ? D_ERROR : D_LFSCK,
2696                              "%s: unlink lfsck sub trace file %s: rc = %d\n",
2697                              lfsck_lfsck2name(com->lc_lfsck), name, rc);
2698                 if (rc)
2699                         RETURN(rc);
2700
2701                 if (*child) {
2702                         lfsck_object_put(env, *child);
2703                         *child = NULL;
2704                 }
2705         } else if (reset) {
2706                 goto unlink;
2707         }
2708
2709         obj = local_index_find_or_create(env, lfsck->li_los, parent, name,
2710                                          S_IFREG | S_IRUGO | S_IWUSR, ft);
2711         if (IS_ERR(obj))
2712                 RETURN(PTR_ERR(obj));
2713
2714         rc = obj->do_ops->do_index_try(env, obj, ft);
2715         if (rc) {
2716                 lfsck_object_put(env, obj);
2717                 CDEBUG(D_LFSCK, "%s: LFSCK fail to load "
2718                        "sub trace file %s: rc = %d\n",
2719                        lfsck_lfsck2name(com->lc_lfsck), name, rc);
2720         } else {
2721                 *child = obj;
2722         }
2723
2724         RETURN(rc);
2725 }
2726
2727 int lfsck_load_sub_trace_files(const struct lu_env *env,
2728                                struct lfsck_component *com,
2729                                const struct dt_index_features *ft,
2730                                const char *prefix, bool reset)
2731 {
2732         char *name = lfsck_env_info(env)->lti_key;
2733         struct lfsck_sub_trace_obj *lsto;
2734         int rc;
2735         int i;
2736
2737         for (i = 0, rc = 0, lsto = &com->lc_sub_trace_objs[0];
2738              i < LFSCK_STF_COUNT && rc == 0; i++, lsto++) {
2739                 snprintf(name, NAME_MAX, "%s_%02d", prefix, i);
2740                 rc = lfsck_load_one_trace_file(env, com,
2741                                 com->lc_lfsck->li_lfsck_dir,
2742                                 &lsto->lsto_obj, ft, name, reset);
2743         }
2744
2745         return rc;
2746 }
2747
2748 /* external interfaces */
2749 int lfsck_get_speed(char *buf, struct dt_device *key)
2750 {
2751         struct lu_env           env;
2752         struct lfsck_instance  *lfsck;
2753         int                     rc;
2754         ENTRY;
2755
2756         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2757         if (rc != 0)
2758                 RETURN(rc);
2759
2760         lfsck = lfsck_instance_find(key, true, false);
2761         if (lfsck && buf) {
2762                 rc = sprintf(buf, "%u\n",
2763                              lfsck->li_bookmark_ram.lb_speed_limit);
2764                 lfsck_instance_put(&env, lfsck);
2765         } else {
2766                 rc = -ENXIO;
2767         }
2768
2769         lu_env_fini(&env);
2770
2771         RETURN(rc);
2772 }
2773 EXPORT_SYMBOL(lfsck_get_speed);
2774
2775 int lfsck_set_speed(struct dt_device *key, __u32 val)
2776 {
2777         struct lu_env           env;
2778         struct lfsck_instance  *lfsck;
2779         int                     rc;
2780         ENTRY;
2781
2782         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2783         if (rc != 0)
2784                 RETURN(rc);
2785
2786         lfsck = lfsck_instance_find(key, true, false);
2787         if (likely(lfsck != NULL)) {
2788                 mutex_lock(&lfsck->li_mutex);
2789                 if (__lfsck_set_speed(lfsck, val))
2790                         rc = lfsck_bookmark_store(&env, lfsck);
2791                 mutex_unlock(&lfsck->li_mutex);
2792                 lfsck_instance_put(&env, lfsck);
2793         } else {
2794                 rc = -ENXIO;
2795         }
2796
2797         lu_env_fini(&env);
2798
2799         RETURN(rc);
2800 }
2801 EXPORT_SYMBOL(lfsck_set_speed);
2802
2803 int lfsck_get_windows(char *buf, struct dt_device *key)
2804 {
2805         struct lu_env           env;
2806         struct lfsck_instance  *lfsck;
2807         int                     rc;
2808         ENTRY;
2809
2810         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2811         if (rc != 0)
2812                 RETURN(rc);
2813
2814         lfsck = lfsck_instance_find(key, true, false);
2815         if (likely(lfsck != NULL)) {
2816                 rc = sprintf(buf, "%u\n",
2817                              lfsck->li_bookmark_ram.lb_async_windows);
2818                 lfsck_instance_put(&env, lfsck);
2819         } else {
2820                 rc = -ENXIO;
2821         }
2822
2823         lu_env_fini(&env);
2824
2825         RETURN(rc);
2826 }
2827 EXPORT_SYMBOL(lfsck_get_windows);
2828
2829 int lfsck_set_windows(struct dt_device *key, unsigned int val)
2830 {
2831         struct lu_env           env;
2832         struct lfsck_instance  *lfsck;
2833         int                     rc;
2834         ENTRY;
2835
2836         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2837         if (rc != 0)
2838                 RETURN(rc);
2839
2840         lfsck = lfsck_instance_find(key, true, false);
2841         if (likely(lfsck != NULL)) {
2842                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2843                         CWARN("%s: invalid async windows size that may "
2844                               "cause memory issues. The valid range is "
2845                               "[1 - %u].\n",
2846                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2847                         rc = -EINVAL;
2848                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2849                         mutex_lock(&lfsck->li_mutex);
2850                         lfsck->li_bookmark_ram.lb_async_windows = val;
2851                         rc = lfsck_bookmark_store(&env, lfsck);
2852                         mutex_unlock(&lfsck->li_mutex);
2853                 }
2854                 lfsck_instance_put(&env, lfsck);
2855         } else {
2856                 rc = -ENXIO;
2857         }
2858
2859         lu_env_fini(&env);
2860
2861         RETURN(rc);
2862 }
2863 EXPORT_SYMBOL(lfsck_set_windows);
2864
2865 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2866 {
2867         struct lu_env           env;
2868         struct lfsck_instance  *lfsck;
2869         struct lfsck_component *com;
2870         int                     rc;
2871         ENTRY;
2872
2873         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2874         if (rc != 0)
2875                 RETURN(rc);
2876
2877         lfsck = lfsck_instance_find(key, true, false);
2878         if (likely(lfsck != NULL)) {
2879                 com = lfsck_component_find(lfsck, type);
2880                 if (likely(com != NULL)) {
2881                         com->lc_ops->lfsck_dump(&env, com, m);
2882                         lfsck_component_put(&env, com);
2883                 } else {
2884                         rc = -ENOTSUPP;
2885                 }
2886
2887                 lfsck_instance_put(&env, lfsck);
2888         } else {
2889                 rc = -ENXIO;
2890         }
2891
2892         lu_env_fini(&env);
2893
2894         RETURN(rc);
2895 }
2896 EXPORT_SYMBOL(lfsck_dump);
2897
2898 static int lfsck_stop_all(const struct lu_env *env,
2899                           struct lfsck_instance *lfsck,
2900                           struct lfsck_stop *stop)
2901 {
2902         struct lfsck_thread_info *info = lfsck_env_info(env);
2903         struct lfsck_request *lr = &info->lti_lr;
2904         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2905         struct ptlrpc_request_set *set;
2906         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2907         struct lfsck_tgt_desc *ltd;
2908         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2909         int idx;
2910         int rc = 0;
2911         int rc1 = 0;
2912
2913         ENTRY;
2914         LASSERT(stop->ls_flags & LPF_BROADCAST);
2915
2916         set = ptlrpc_prep_set();
2917         if (unlikely(set == NULL))
2918                 RETURN(-ENOMEM);
2919
2920         memset(lr, 0, sizeof(*lr));
2921         lr->lr_event = LE_STOP;
2922         lr->lr_index = lfsck_dev_idx(lfsck);
2923         lr->lr_status = stop->ls_status;
2924         lr->lr_version = bk->lb_version;
2925         lr->lr_active = LFSCK_TYPES_ALL;
2926         lr->lr_param = stop->ls_flags;
2927
2928         memset(laia, 0, sizeof(*laia));
2929         laia->laia_ltds = ltds;
2930         laia->laia_lr = lr;
2931         laia->laia_shared = 1;
2932
2933         down_read(&ltds->ltd_rw_sem);
2934         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
2935                 ltd = lfsck_tgt_get(ltds, idx);
2936                 LASSERT(ltd != NULL);
2937
2938                 laia->laia_ltd = ltd;
2939                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2940                                          lfsck_async_interpret, laia,
2941                                          LFSCK_NOTIFY);
2942                 if (rc != 0) {
2943                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2944                         lfsck_tgt_put(ltd);
2945                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2946                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2947                         rc1 = rc;
2948                 }
2949         }
2950         up_read(&ltds->ltd_rw_sem);
2951
2952         rc = ptlrpc_set_wait(env, set);
2953         ptlrpc_set_destroy(set);
2954
2955         if (rc == 0)
2956                 rc = laia->laia_result;
2957
2958         if (rc == -EALREADY)
2959                 rc = 0;
2960
2961         if (rc != 0)
2962                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2963                        lfsck_lfsck2name(lfsck), rc);
2964
2965         RETURN(rc != 0 ? rc : rc1);
2966 }
2967
2968 static int lfsck_start_all(const struct lu_env *env,
2969                            struct lfsck_instance *lfsck,
2970                            struct lfsck_start *start)
2971 {
2972         struct lfsck_thread_info *info = lfsck_env_info(env);
2973         struct lfsck_request *lr = &info->lti_lr;
2974         struct lfsck_async_interpret_args *laia = &info->lti_laia;
2975         struct ptlrpc_request_set *set;
2976         struct lfsck_tgt_descs *ltds = &lfsck->li_mdt_descs;
2977         struct lfsck_tgt_desc *ltd;
2978         struct lfsck_bookmark *bk = &lfsck->li_bookmark_ram;
2979         int idx;
2980         int rc = 0;
2981         bool retry = false;
2982         ENTRY;
2983
2984         LASSERT(start->ls_flags & LPF_BROADCAST);
2985
2986         memset(lr, 0, sizeof(*lr));
2987         lr->lr_event = LE_START;
2988         lr->lr_index = lfsck_dev_idx(lfsck);
2989         lr->lr_speed = bk->lb_speed_limit;
2990         lr->lr_version = bk->lb_version;
2991         lr->lr_active = start->ls_active;
2992         lr->lr_param = start->ls_flags;
2993         lr->lr_async_windows = bk->lb_async_windows;
2994         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2995                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2996                        LSV_CREATE_MDTOBJ;
2997
2998         memset(laia, 0, sizeof(*laia));
2999         laia->laia_ltds = ltds;
3000         laia->laia_lr = lr;
3001         laia->laia_shared = 1;
3002
3003 again:
3004         set = ptlrpc_prep_set();
3005         if (unlikely(!set))
3006                 RETURN(-ENOMEM);
3007
3008         down_read(&ltds->ltd_rw_sem);
3009         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3010                 ltd = lfsck_tgt_get(ltds, idx);
3011                 LASSERT(ltd != NULL);
3012
3013                 if (retry && !ltd->ltd_retry_start) {
3014                         lfsck_tgt_put(ltd);
3015                         continue;
3016                 }
3017
3018                 laia->laia_ltd = ltd;
3019                 ltd->ltd_retry_start = 0;
3020                 ltd->ltd_layout_done = 0;
3021                 ltd->ltd_namespace_done = 0;
3022                 ltd->ltd_synced_failures = 0;
3023                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
3024                                          lfsck_async_interpret, laia,
3025                                          LFSCK_NOTIFY);
3026                 if (rc != 0) {
3027                         lfsck_interpret(env, lfsck, NULL, laia, rc);
3028                         lfsck_tgt_put(ltd);
3029                         CERROR("%s: cannot notify MDT %x for LFSCK "
3030                                "start, failout: rc = %d\n",
3031                                lfsck_lfsck2name(lfsck), idx, rc);
3032                         break;
3033                 }
3034         }
3035         up_read(&ltds->ltd_rw_sem);
3036
3037         if (rc != 0) {
3038                 ptlrpc_set_destroy(set);
3039
3040                 RETURN(rc);
3041         }
3042
3043         rc = ptlrpc_set_wait(env, set);
3044         ptlrpc_set_destroy(set);
3045
3046         if (rc == 0)
3047                 rc = laia->laia_result;
3048
3049         if (unlikely(rc == -EINPROGRESS)) {
3050                 retry = true;
3051                 schedule_timeout_interruptible(cfs_time_seconds(1));
3052                 set_current_state(TASK_RUNNING);
3053                 if (!signal_pending(current) &&
3054                     thread_is_running(&lfsck->li_thread))
3055                         goto again;
3056
3057                 rc = -EINTR;
3058         }
3059
3060         if (rc != 0) {
3061                 struct lfsck_stop *stop = &info->lti_stop;
3062
3063                 CERROR("%s: cannot start LFSCK on some MDTs, "
3064                        "stop all: rc = %d\n",
3065                        lfsck_lfsck2name(lfsck), rc);
3066                 if (rc != -EALREADY) {
3067                         stop->ls_status = LS_FAILED;
3068                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
3069                         lfsck_stop_all(env, lfsck, stop);
3070                 }
3071         }
3072
3073         RETURN(rc);
3074 }
3075
3076 int lfsck_start(const struct lu_env *env, struct dt_device *key,
3077                 struct lfsck_start_param *lsp)
3078 {
3079         struct lfsck_start *start = lsp->lsp_start;
3080         struct lfsck_instance *lfsck;
3081         struct lfsck_bookmark *bk;
3082         struct ptlrpc_thread *thread;
3083         struct lfsck_component *com;
3084         struct lfsck_thread_args *lta;
3085         struct task_struct *task;
3086         struct lfsck_tgt_descs *ltds;
3087         struct lfsck_tgt_desc *ltd;
3088         int idx;
3089         int rc = 0;
3090         __u16 valid  = 0;
3091         __u16 flags  = 0;
3092         __u16 type   = 1;
3093
3094         ENTRY;
3095         if (key->dd_rdonly)
3096                 RETURN(-EROFS);
3097
3098         lfsck = lfsck_instance_find(key, true, false);
3099         if (unlikely(lfsck == NULL))
3100                 RETURN(-ENXIO);
3101
3102         if (unlikely(lfsck->li_stopping))
3103                 GOTO(put, rc = -ENXIO);
3104
3105         /* System is not ready, try again later. */
3106         if (unlikely(lfsck->li_namespace == NULL ||
3107                      lfsck_dev_site(lfsck)->ss_server_fld == NULL))
3108                 GOTO(put, rc = -EINPROGRESS);
3109
3110         /* start == NULL means auto trigger paused LFSCK. */
3111         if (!start) {
3112                 if (list_empty(&lfsck->li_list_scan) ||
3113                     CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO))
3114                         GOTO(put, rc = 0);
3115         } else if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3116                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
3117                        lfsck_lfsck2name(lfsck));
3118
3119                 GOTO(put, rc = -EPERM);
3120         }
3121
3122         bk = &lfsck->li_bookmark_ram;
3123         thread = &lfsck->li_thread;
3124         mutex_lock(&lfsck->li_mutex);
3125         spin_lock(&lfsck->li_lock);
3126         if (unlikely(thread_is_stopping(thread))) {
3127                 /* Someone is stopping the LFSCK. */
3128                 spin_unlock(&lfsck->li_lock);
3129                 GOTO(out, rc = -EBUSY);
3130         }
3131
3132         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
3133                 rc = -EALREADY;
3134                 if (unlikely(start == NULL)) {
3135                         spin_unlock(&lfsck->li_lock);
3136                         GOTO(out, rc);
3137                 }
3138
3139                 while (start->ls_active != 0) {
3140                         if (!(type & start->ls_active)) {
3141                                 type <<= 1;
3142                                 continue;
3143                         }
3144
3145                         com = __lfsck_component_find(lfsck, type,
3146                                                      &lfsck->li_list_scan);
3147                         if (com == NULL)
3148                                 com = __lfsck_component_find(lfsck, type,
3149                                                 &lfsck->li_list_double_scan);
3150                         if (com == NULL) {
3151                                 rc = -EOPNOTSUPP;
3152                                 break;
3153                         }
3154
3155                         if (com->lc_ops->lfsck_join != NULL) {
3156                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
3157                                 if (rc != 0 && rc != -EALREADY)
3158                                         break;
3159                         }
3160                         start->ls_active &= ~type;
3161                         type <<= 1;
3162                 }
3163                 spin_unlock(&lfsck->li_lock);
3164                 GOTO(out, rc);
3165         }
3166         spin_unlock(&lfsck->li_lock);
3167
3168         lfsck->li_status = 0;
3169         lfsck->li_oit_over = 0;
3170         lfsck->li_start_unplug = 0;
3171         lfsck->li_drop_dryrun = 0;
3172         lfsck->li_new_scanned = 0;
3173
3174         /* For auto trigger. */
3175         if (start == NULL)
3176                 goto trigger;
3177
3178         start->ls_version = bk->lb_version;
3179
3180         if (start->ls_active != 0) {
3181                 struct lfsck_component *next;
3182
3183                 if (start->ls_active == LFSCK_TYPES_ALL)
3184                         start->ls_active = LFSCK_TYPES_SUPPORTED;
3185
3186                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
3187                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
3188                         GOTO(out, rc = -ENOTSUPP);
3189                 }
3190
3191                 list_for_each_entry_safe(com, next,
3192                                          &lfsck->li_list_scan, lc_link) {
3193                         if (!(com->lc_type & start->ls_active)) {
3194                                 rc = com->lc_ops->lfsck_post(env, com, 0,
3195                                                              false);
3196                                 if (rc != 0)
3197                                         GOTO(out, rc);
3198                         }
3199                 }
3200
3201                 while (start->ls_active != 0) {
3202                         if (type & start->ls_active) {
3203                                 com = __lfsck_component_find(lfsck, type,
3204                                                         &lfsck->li_list_idle);
3205                                 if (com != NULL)
3206                                         /* The component status will be updated
3207                                          * when its prep() is called later by
3208                                          * the LFSCK main engine. */
3209                                         list_move_tail(&com->lc_link,
3210                                                        &lfsck->li_list_scan);
3211                                 start->ls_active &= ~type;
3212                         }
3213                         type <<= 1;
3214                 }
3215         }
3216
3217         if (list_empty(&lfsck->li_list_scan)) {
3218                 /* The speed limit will be used to control both the LFSCK and
3219                  * low layer scrub (if applied), need to be handled firstly. */
3220                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3221                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3222                                 rc = lfsck_bookmark_store(env, lfsck);
3223                                 if (rc != 0)
3224                                         GOTO(out, rc);
3225                         }
3226                 }
3227
3228                 goto trigger;
3229         }
3230
3231         if (start->ls_flags & LPF_RESET)
3232                 flags |= DOIF_RESET;
3233
3234         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3235         if (rc != 0)
3236                 GOTO(out, rc);
3237
3238         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3239                 start->ls_active |= com->lc_type;
3240                 if (flags & DOIF_RESET) {
3241                         rc = com->lc_ops->lfsck_reset(env, com, false);
3242                         if (rc != 0)
3243                                 GOTO(out, rc);
3244                 }
3245         }
3246
3247         ltds = &lfsck->li_mdt_descs;
3248         down_read(&ltds->ltd_rw_sem);
3249         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3250                 ltd = lfsck_ltd2tgt(ltds, idx);
3251                 LASSERT(ltd != NULL);
3252
3253                 ltd->ltd_layout_done = 0;
3254                 ltd->ltd_namespace_done = 0;
3255                 ltd->ltd_synced_failures = 0;
3256                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_NAMESPACE);
3257                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3258                 list_del_init(&ltd->ltd_layout_phase_list);
3259                 list_del_init(&ltd->ltd_layout_list);
3260                 list_del_init(&ltd->ltd_namespace_phase_list);
3261                 list_del_init(&ltd->ltd_namespace_list);
3262         }
3263         up_read(&ltds->ltd_rw_sem);
3264
3265         ltds = &lfsck->li_ost_descs;
3266         down_read(&ltds->ltd_rw_sem);
3267         for_each_set_bit(idx, ltds->ltd_tgts_bitmap, ltds->ltd_tgts_mask_len) {
3268                 ltd = lfsck_ltd2tgt(ltds, idx);
3269                 LASSERT(ltd != NULL);
3270
3271                 ltd->ltd_layout_done = 0;
3272                 ltd->ltd_synced_failures = 0;
3273                 lfsck_reset_ltd_status(ltd, LFSCK_TYPE_LAYOUT);
3274                 list_del_init(&ltd->ltd_layout_phase_list);
3275                 list_del_init(&ltd->ltd_layout_list);
3276         }
3277         up_read(&ltds->ltd_rw_sem);
3278
3279 trigger:
3280         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3281         if (bk->lb_param & LPF_DRYRUN)
3282                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3283
3284         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3285                 valid |= DOIV_ERROR_HANDLE;
3286                 if (start->ls_flags & LPF_FAILOUT)
3287                         flags |= DOIF_FAILOUT;
3288         }
3289
3290         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3291                 valid |= DOIV_DRYRUN;
3292                 if (start->ls_flags & LPF_DRYRUN)
3293                         flags |= DOIF_DRYRUN;
3294         }
3295
3296         if (!list_empty(&lfsck->li_list_scan))
3297                 flags |= DOIF_OUTUSED;
3298
3299         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3300         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3301         if (IS_ERR(lta))
3302                 GOTO(out, rc = PTR_ERR(lta));
3303
3304         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3305         spin_lock(&lfsck->li_lock);
3306         thread_set_flags(thread, SVC_STARTING);
3307         spin_unlock(&lfsck->li_lock);
3308         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3309         if (IS_ERR(task)) {
3310                 rc = PTR_ERR(task);
3311                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3312                        lfsck_lfsck2name(lfsck), rc);
3313                 lfsck_thread_args_fini(lta);
3314
3315                 GOTO(out, rc);
3316         }
3317
3318         wait_event_idle(thread->t_ctl_waitq,
3319                         thread_is_running(thread) ||
3320                         thread_is_stopped(thread));
3321         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3322                 lfsck->li_start_unplug = 1;
3323                 wake_up(&thread->t_ctl_waitq);
3324
3325                 GOTO(out, rc = 0);
3326         }
3327
3328         /* release lfsck::li_mutex to avoid deadlock. */
3329         mutex_unlock(&lfsck->li_mutex);
3330         rc = lfsck_start_all(env, lfsck, start);
3331         if (rc != 0) {
3332                 spin_lock(&lfsck->li_lock);
3333                 if (thread_is_stopped(thread)) {
3334                         spin_unlock(&lfsck->li_lock);
3335                 } else {
3336                         lfsck->li_status = LS_FAILED;
3337                         lfsck->li_flags = 0;
3338                         thread_set_flags(thread, SVC_STOPPING);
3339                         spin_unlock(&lfsck->li_lock);
3340
3341                         lfsck->li_start_unplug = 1;
3342                         wake_up(&thread->t_ctl_waitq);
3343                         wait_event_idle(thread->t_ctl_waitq,
3344                                         thread_is_stopped(thread));
3345                 }
3346         } else {
3347                 lfsck->li_start_unplug = 1;
3348                 wake_up(&thread->t_ctl_waitq);
3349         }
3350
3351         GOTO(put, rc);
3352
3353 out:
3354         mutex_unlock(&lfsck->li_mutex);
3355
3356 put:
3357         lfsck_instance_put(env, lfsck);
3358
3359         return rc < 0 ? rc : 0;
3360 }
3361 EXPORT_SYMBOL(lfsck_start);
3362
3363 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3364                struct lfsck_stop *stop)
3365 {
3366         struct lfsck_instance   *lfsck;
3367         struct ptlrpc_thread    *thread;
3368         int                      rc     = 0;
3369         int                      rc1    = 0;
3370         ENTRY;
3371
3372         lfsck = lfsck_instance_find(key, true, false);
3373         if (unlikely(lfsck == NULL))
3374                 RETURN(-ENXIO);
3375
3376         thread = &lfsck->li_thread;
3377         if (stop && stop->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
3378                 CERROR("%s: only allow to specify '-A' via MDS\n",
3379                        lfsck_lfsck2name(lfsck));
3380                 GOTO(put, rc = -EPERM);
3381         }
3382
3383         spin_lock(&lfsck->li_lock);
3384         /* The target is umounted */
3385         if (stop && stop->ls_status == LS_PAUSED)
3386                 lfsck->li_stopping = 1;
3387
3388         if (thread_is_init(thread) || thread_is_stopped(thread))
3389                 /* no error if LFSCK stopped already, or not started */
3390                 GOTO(unlock, rc = 0);
3391
3392         if (thread_is_stopping(thread))
3393                 /* Someone is stopping LFSCK. */
3394                 GOTO(unlock, rc = -EINPROGRESS);
3395
3396         if (stop) {
3397                 lfsck->li_status = stop->ls_status;
3398                 lfsck->li_flags = stop->ls_flags;
3399         } else {
3400                 lfsck->li_status = LS_STOPPED;
3401                 lfsck->li_flags = 0;
3402         }
3403
3404         thread_set_flags(thread, SVC_STOPPING);
3405
3406         LASSERT(lfsck->li_task);
3407         send_sig(SIGINT, lfsck->li_task, 1);
3408
3409         if (lfsck->li_master) {
3410                 struct lfsck_component *com;
3411                 struct lfsck_assistant_data *lad;
3412
3413                 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3414                         lad = com->lc_data;
3415                         spin_lock(&lad->lad_lock);
3416                         if (lad->lad_task)
3417                                 send_sig(SIGINT, lad->lad_task, 1);
3418                         spin_unlock(&lad->lad_lock);
3419                 }
3420
3421                 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3422                         lad = com->lc_data;
3423                         spin_lock(&lad->lad_lock);
3424                         if (lad->lad_task)
3425                                 send_sig(SIGINT, lad->lad_task, 1);
3426                         spin_unlock(&lad->lad_lock);
3427                 }
3428         }
3429
3430         wake_up(&thread->t_ctl_waitq);
3431         spin_unlock(&lfsck->li_lock);
3432         if (stop && stop->ls_flags & LPF_BROADCAST)
3433                 rc1 = lfsck_stop_all(env, lfsck, stop);
3434
3435         /* It was me set the status as 'stopping' just now, if it is not
3436          * 'stopping' now, then either stopped, or re-started by race. */
3437         wait_event_idle(thread->t_ctl_waitq,
3438                         !thread_is_stopping(thread));
3439
3440         GOTO(put, rc = 0);
3441
3442 unlock:
3443         spin_unlock(&lfsck->li_lock);
3444 put:
3445         lfsck_instance_put(env, lfsck);
3446
3447         return rc != 0 ? rc : rc1;
3448 }
3449 EXPORT_SYMBOL(lfsck_stop);
3450
3451 int lfsck_in_notify_local(const struct lu_env *env, struct dt_device *key,
3452                           struct lfsck_req_local *lrl, struct thandle *th)
3453 {
3454         struct lfsck_instance *lfsck;
3455         struct lfsck_component *com;
3456         int rc = -EOPNOTSUPP;
3457         ENTRY;
3458
3459         lfsck = lfsck_instance_find(key, true, false);
3460         if (unlikely(!lfsck))
3461                 RETURN(-ENXIO);
3462
3463         com = lfsck_component_find(lfsck, lrl->lrl_active);
3464         if (likely(com && com->lc_ops->lfsck_in_notify_local)) {
3465                 rc = com->lc_ops->lfsck_in_notify_local(env, com, lrl, th);
3466                 lfsck_component_put(env, com);
3467         }
3468
3469         lfsck_instance_put(env, lfsck);
3470
3471         RETURN(rc);
3472 }
3473 EXPORT_SYMBOL(lfsck_in_notify_local);
3474
3475 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3476                     struct lfsck_request *lr)
3477 {
3478         int rc = -EOPNOTSUPP;
3479         ENTRY;
3480
3481         switch (lr->lr_event) {
3482         case LE_START: {
3483                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
3484                 struct lfsck_start_param  lsp;
3485
3486                 memset(start, 0, sizeof(*start));
3487                 start->ls_valid = lr->lr_valid;
3488                 start->ls_speed_limit = lr->lr_speed;
3489                 start->ls_version = lr->lr_version;
3490                 start->ls_active = lr->lr_active;
3491                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3492                 start->ls_async_windows = lr->lr_async_windows;
3493
3494                 lsp.lsp_start = start;
3495                 lsp.lsp_index = lr->lr_index;
3496                 lsp.lsp_index_valid = 1;
3497                 rc = lfsck_start(env, key, &lsp);
3498                 break;
3499         }
3500         case LE_STOP: {
3501                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3502
3503                 memset(stop, 0, sizeof(*stop));
3504                 stop->ls_status = lr->lr_status;
3505                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3506                 rc = lfsck_stop(env, key, stop);
3507                 break;
3508         }
3509         case LE_PHASE1_DONE:
3510         case LE_PHASE2_DONE:
3511         case LE_PEER_EXIT:
3512         case LE_CONDITIONAL_DESTROY:
3513         case LE_SET_LMV_MASTER:
3514         case LE_SET_LMV_SLAVE:
3515         case LE_PAIRS_VERIFY: {
3516                 struct lfsck_instance  *lfsck;
3517                 struct lfsck_component *com;
3518
3519                 lfsck = lfsck_instance_find(key, true, false);
3520                 if (unlikely(lfsck == NULL))
3521                         RETURN(-ENXIO);
3522
3523                 com = lfsck_component_find(lfsck, lr->lr_active);
3524                 if (likely(com)) {
3525                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
3526                         lfsck_component_put(env, com);
3527                 }
3528
3529                 lfsck_instance_put(env, lfsck);
3530                 break;
3531         }
3532         default:
3533                 break;
3534         }
3535
3536         RETURN(rc);
3537 }
3538 EXPORT_SYMBOL(lfsck_in_notify);
3539
3540 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3541                 struct lfsck_request *req, struct lfsck_reply *rep,
3542                 struct lfsck_query *que)
3543 {
3544         struct lfsck_instance  *lfsck;
3545         struct lfsck_component *com;
3546         int                     i;
3547         int                     rc = 0;
3548         __u16                   type;
3549         ENTRY;
3550
3551         lfsck = lfsck_instance_find(key, true, false);
3552         if (unlikely(lfsck == NULL))
3553                 RETURN(-ENXIO);
3554
3555         if (que != NULL) {
3556                 if (que->lu_types == LFSCK_TYPES_ALL)
3557                         que->lu_types =
3558                                 LFSCK_TYPES_SUPPORTED & ~LFSCK_TYPE_SCRUB;
3559
3560                 if (que->lu_types & ~LFSCK_TYPES_SUPPORTED) {
3561                         que->lu_types &= ~LFSCK_TYPES_SUPPORTED;
3562
3563                         GOTO(out, rc = -ENOTSUPP);
3564                 }
3565
3566                 for (i = 0, type = BIT(i); i < LFSCK_TYPE_BITS;
3567                      i++, type = BIT(i)) {
3568                         if (!(que->lu_types & type))
3569                                 continue;
3570
3571 again:
3572                         com = lfsck_component_find(lfsck, type);
3573                         if (unlikely(com == NULL))
3574                                 GOTO(out, rc = -ENOTSUPP);
3575
3576                         memset(que->lu_mdts_count[i], 0,
3577                                sizeof(__u32) * (LS_MAX + 1));
3578                         memset(que->lu_osts_count[i], 0,
3579                                sizeof(__u32) * (LS_MAX + 1));
3580                         que->lu_repaired[i] = 0;
3581                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3582                                                       que, i);
3583                         lfsck_component_put(env, com);
3584                         if  (rc < 0)
3585                                 GOTO(out, rc);
3586                 }
3587
3588                 if (!(que->lu_flags & LPF_WAIT))
3589                         GOTO(out, rc);
3590
3591                 for (i = 0, type = BIT(i); i < LFSCK_TYPE_BITS;
3592                      i++, type = BIT(i)) {
3593                         if (!(que->lu_types & type))
3594                                 continue;
3595
3596                         if (que->lu_mdts_count[i][LS_SCANNING_PHASE1] != 0 ||
3597                             que->lu_mdts_count[i][LS_SCANNING_PHASE2] != 0 ||
3598                             que->lu_osts_count[i][LS_SCANNING_PHASE1] != 0 ||
3599                             que->lu_osts_count[i][LS_SCANNING_PHASE2] != 0) {
3600                                 /* If it is required to wait, then sleep
3601                                  * 3 seconds and try to query again.
3602                                  */
3603                                 unsigned long timeout =
3604                                         msecs_to_jiffies(3000) + 1;
3605                                 while (timeout &&
3606                                        !fatal_signal_pending(current))
3607                                         timeout = schedule_timeout_killable(
3608                                                 timeout);
3609                                 if (timeout == 0)
3610                                         goto again;
3611                         }
3612                 }
3613         } else {
3614                 com = lfsck_component_find(lfsck, req->lr_active);
3615                 if (likely(com != NULL)) {
3616                         rc = com->lc_ops->lfsck_query(env, com, req, rep,
3617                                                       que, -1);
3618                         lfsck_component_put(env, com);
3619                 } else {
3620                         rc = -ENOTSUPP;
3621                 }
3622         }
3623
3624         GOTO(out, rc);
3625
3626 out:
3627         lfsck_instance_put(env, lfsck);
3628         return rc;
3629 }
3630 EXPORT_SYMBOL(lfsck_query);
3631
3632 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3633                              struct ldlm_namespace *ns)
3634 {
3635         struct lfsck_instance  *lfsck;
3636         int                     rc      = -ENXIO;
3637
3638         lfsck = lfsck_instance_find(key, true, false);
3639         if (likely(lfsck != NULL)) {
3640                 lfsck->li_namespace = ns;
3641                 lfsck_instance_put(env, lfsck);
3642                 rc = 0;
3643         }
3644
3645         return rc;
3646 }
3647 EXPORT_SYMBOL(lfsck_register_namespace);
3648
3649 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3650                    struct dt_device *next, struct obd_device *obd,
3651                    lfsck_out_notify notify, void *notify_data, bool master)
3652 {
3653         struct lfsck_instance   *lfsck;
3654         struct dt_object        *root  = NULL;
3655         struct dt_object        *obj   = NULL;
3656         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3657         int                      rc;
3658         ENTRY;
3659
3660         lfsck = lfsck_instance_find(key, false, false);
3661         if (unlikely(lfsck != NULL))
3662                 RETURN(-EEXIST);
3663
3664         OBD_ALLOC_PTR(lfsck);
3665         if (lfsck == NULL)
3666                 RETURN(-ENOMEM);
3667
3668         mutex_init(&lfsck->li_mutex);
3669         spin_lock_init(&lfsck->li_lock);
3670         INIT_LIST_HEAD(&lfsck->li_link);
3671         INIT_LIST_HEAD(&lfsck->li_list_scan);
3672         INIT_LIST_HEAD(&lfsck->li_list_dir);
3673         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3674         INIT_LIST_HEAD(&lfsck->li_list_idle);
3675         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3676         atomic_set(&lfsck->li_ref, 1);
3677         atomic_set(&lfsck->li_double_scan_count, 0);
3678         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3679         lfsck->li_out_notify = notify;
3680         lfsck->li_out_notify_data = notify_data;
3681         lfsck->li_next = next;
3682         lfsck->li_bottom = key;
3683         lfsck->li_obd = obd;
3684
3685         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3686         if (rc != 0)
3687                 GOTO(out, rc);
3688
3689         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3690         if (rc != 0)
3691                 GOTO(out, rc);
3692
3693         fid->f_seq = FID_SEQ_LOCAL_NAME;
3694         fid->f_oid = 1;
3695         fid->f_ver = 0;
3696         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3697         if (rc != 0)
3698                 GOTO(out, rc);
3699
3700         rc = dt_root_get(env, key, fid);
3701         if (rc != 0)
3702                 GOTO(out, rc);
3703
3704         root = dt_locate(env, key, fid);
3705         if (IS_ERR(root))
3706                 GOTO(out, rc = PTR_ERR(root));
3707
3708         lfsck->li_local_root_fid = *fid;
3709         if (master) {
3710                 lfsck->li_master = 1;
3711                 if (lfsck_dev_idx(lfsck) == 0) {
3712                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3713                         const struct lu_name *cname;
3714
3715                         rc = dt_lookup_dir(env, root, "ROOT",
3716                                            &lfsck->li_global_root_fid);
3717                         if (rc != 0)
3718                                 GOTO(out, rc);
3719
3720                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3721                         if (IS_ERR(obj))
3722                                 GOTO(out, rc = PTR_ERR(obj));
3723
3724                         rc = dt_lookup_dir(env, obj, dotlustre, fid);
3725                         if (rc != 0)
3726                                 GOTO(out, rc);
3727
3728                         lfsck_object_put(env, obj);
3729                         obj = dt_locate(env, key, fid);
3730                         if (IS_ERR(obj))
3731                                 GOTO(out, rc = PTR_ERR(obj));
3732
3733                         cname = lfsck_name_get_const(env, dotlustre,
3734                                                      strlen(dotlustre));
3735                         rc = lfsck_verify_linkea(env, lfsck, obj, cname,
3736                                                  &lfsck->li_global_root_fid);
3737                         if (rc != 0)
3738                                 GOTO(out, rc);
3739
3740                         *pfid = *fid;
3741                         rc = dt_lookup_dir(env, obj, lostfound, fid);
3742                         if (rc != 0)
3743                                 GOTO(out, rc);
3744
3745                         lfsck_object_put(env, obj);
3746                         obj = dt_locate(env, key, fid);
3747                         if (IS_ERR(obj))
3748                                 GOTO(out, rc = PTR_ERR(obj));
3749
3750                         cname = lfsck_name_get_const(env, lostfound,
3751                                                      strlen(lostfound));
3752                         rc = lfsck_verify_linkea(env, lfsck, obj, cname, pfid);
3753                         if (rc != 0)
3754                                 GOTO(out, rc);
3755
3756                         lfsck_object_put(env, obj);
3757                         obj = NULL;
3758                 }
3759         }
3760
3761         fid->f_seq = FID_SEQ_LOCAL_FILE;
3762         fid->f_oid = OTABLE_IT_OID;
3763         fid->f_ver = 0;
3764         obj = dt_locate(env, key, fid);
3765         if (IS_ERR(obj))
3766                 GOTO(out, rc = PTR_ERR(obj));
3767
3768         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3769         if (rc != 0)
3770                 GOTO(out, rc);
3771
3772         lfsck->li_obj_oit = obj;
3773         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3774                                         S_IFDIR | S_IRUGO | S_IWUSR);
3775         if (IS_ERR(obj))
3776                 GOTO(out, rc = PTR_ERR(obj));
3777
3778         lu_object_get(&obj->do_lu);
3779         lfsck->li_lfsck_dir = obj;
3780         rc = lfsck_bookmark_setup(env, lfsck);
3781         if (rc != 0)
3782                 GOTO(out, rc);
3783
3784         if (master) {
3785                 rc = lfsck_fid_init(lfsck);
3786                 if (rc < 0)
3787                         GOTO(out, rc);
3788
3789                 rc = lfsck_namespace_setup(env, lfsck);
3790                 if (rc < 0)
3791                         GOTO(out, rc);
3792         }
3793
3794         rc = lfsck_layout_setup(env, lfsck);
3795         if (rc < 0)
3796                 GOTO(out, rc);
3797
3798         /* XXX: more LFSCK components initialization to be added here. */
3799
3800         rc = lfsck_instance_add(lfsck);
3801         if (rc == 0)
3802                 rc = lfsck_add_target_from_orphan(env, lfsck);
3803 out:
3804         if (obj != NULL && !IS_ERR(obj))
3805                 lfsck_object_put(env, obj);
3806         if (root != NULL && !IS_ERR(root))
3807                 lfsck_object_put(env, root);
3808         if (rc != 0)
3809                 lfsck_instance_cleanup(env, lfsck);
3810         return rc;
3811 }
3812 EXPORT_SYMBOL(lfsck_register);
3813
3814 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3815 {
3816         struct lfsck_instance *lfsck;
3817
3818         lfsck = lfsck_instance_find(key, false, true);
3819         if (lfsck != NULL)
3820                 lfsck_instance_put(env, lfsck);
3821 }
3822 EXPORT_SYMBOL(lfsck_degister);
3823
3824 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3825                      struct dt_device *tgt, struct obd_export *exp,
3826                      __u32 index, bool for_ost)
3827 {
3828         struct lfsck_instance   *lfsck;
3829         struct lfsck_tgt_desc   *ltd;
3830         int                      rc;
3831         ENTRY;
3832
3833         OBD_ALLOC_PTR(ltd);
3834         if (ltd == NULL)
3835                 RETURN(-ENOMEM);
3836
3837         ltd->ltd_tgt = tgt;
3838         ltd->ltd_key = key;
3839         ltd->ltd_exp = exp;
3840         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3841         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3842         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3843         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3844         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3845         atomic_set(&ltd->ltd_ref, 1);
3846         ltd->ltd_index = index;
3847
3848         spin_lock(&lfsck_instance_lock);
3849         lfsck = __lfsck_instance_find(key, true, false);
3850         if (lfsck == NULL) {
3851                 if (for_ost)
3852                         list_add_tail(&ltd->ltd_orphan_list,
3853                                       &lfsck_ost_orphan_list);
3854                 else
3855                         list_add_tail(&ltd->ltd_orphan_list,
3856                                       &lfsck_mdt_orphan_list);
3857                 spin_unlock(&lfsck_instance_lock);
3858
3859                 RETURN(0);
3860         }
3861         spin_unlock(&lfsck_instance_lock);
3862
3863         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3864         if (rc != 0)
3865                 lfsck_tgt_put(ltd);
3866
3867         lfsck_instance_put(env, lfsck);
3868
3869         RETURN(rc);
3870 }
3871 EXPORT_SYMBOL(lfsck_add_target);
3872
3873 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3874                       struct dt_device *tgt, __u32 index, bool for_ost)
3875 {
3876         struct lfsck_instance   *lfsck;
3877         struct lfsck_tgt_descs  *ltds;
3878         struct lfsck_tgt_desc   *ltd;
3879         struct list_head        *head;
3880
3881         if (for_ost)
3882                 head = &lfsck_ost_orphan_list;
3883         else
3884                 head = &lfsck_mdt_orphan_list;
3885
3886         spin_lock(&lfsck_instance_lock);
3887         list_for_each_entry(ltd, head, ltd_orphan_list) {
3888                 if (ltd->ltd_tgt == tgt) {
3889                         list_del_init(&ltd->ltd_orphan_list);
3890                         spin_unlock(&lfsck_instance_lock);
3891                         lfsck_tgt_put(ltd);
3892
3893                         return;
3894                 }
3895         }
3896
3897         ltd = NULL;
3898         lfsck = __lfsck_instance_find(key, true, false);
3899         spin_unlock(&lfsck_instance_lock);
3900         if (unlikely(lfsck == NULL))
3901                 return;
3902
3903         if (for_ost)
3904                 ltds = &lfsck->li_ost_descs;
3905         else
3906                 ltds = &lfsck->li_mdt_descs;
3907
3908         down_write(&ltds->ltd_rw_sem);
3909         LASSERT(ltds->ltd_tgts_bitmap);
3910
3911         if (unlikely(index >= ltds->ltd_tgts_mask_len))
3912                 goto unlock;
3913
3914         ltd = lfsck_ltd2tgt(ltds, index);
3915         if (unlikely(ltd == NULL))
3916                 goto unlock;
3917
3918         LASSERT(ltds->ltd_tgtnr > 0);
3919
3920         ltds->ltd_tgtnr--;
3921         set_bit(index, ltds->ltd_tgts_bitmap);
3922         lfsck_assign_tgt(ltds, NULL, index);
3923
3924 unlock:
3925         if (ltd == NULL) {
3926                 if (for_ost)
3927                         head = &lfsck->li_ost_descs.ltd_orphan;
3928                 else
3929                         head = &lfsck->li_mdt_descs.ltd_orphan;
3930
3931                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3932                         if (ltd->ltd_tgt == tgt) {
3933                                 list_del_init(&ltd->ltd_orphan_list);
3934                                 break;
3935                         }
3936                 }
3937         }
3938
3939         up_write(&ltds->ltd_rw_sem);
3940         if (ltd != NULL) {
3941                 spin_lock(&ltds->ltd_lock);
3942                 ltd->ltd_dead = 1;
3943                 spin_unlock(&ltds->ltd_lock);
3944                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3945                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3946                 lfsck_tgt_put(ltd);
3947         }
3948
3949         lfsck_instance_put(env, lfsck);
3950 }
3951 EXPORT_SYMBOL(lfsck_del_target);
3952
3953 static int __init lfsck_init(void)
3954 {
3955         int rc;
3956
3957         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3958         rc = lu_context_key_register(&lfsck_thread_key);
3959         if (!rc) {
3960                 tgt_register_lfsck_in_notify_local(lfsck_in_notify_local);
3961                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3962                 tgt_register_lfsck_query(lfsck_query);
3963         }
3964
3965         return rc;
3966 }
3967
3968 static void __exit lfsck_exit(void)
3969 {
3970         struct lfsck_tgt_desc *ltd;
3971         struct lfsck_tgt_desc *next;
3972
3973         LASSERT(list_empty(&lfsck_instance_list));
3974
3975         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3976                                  ltd_orphan_list) {
3977                 list_del_init(&ltd->ltd_orphan_list);
3978                 lfsck_tgt_put(ltd);
3979         }
3980
3981         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3982                                  ltd_orphan_list) {
3983                 list_del_init(&ltd->ltd_orphan_list);
3984                 lfsck_tgt_put(ltd);
3985         }
3986
3987         lu_context_key_degister(&lfsck_thread_key);
3988 }
3989
3990 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3991 MODULE_DESCRIPTION("Lustre File System Checker");
3992 MODULE_VERSION(LUSTRE_VERSION_STRING);
3993 MODULE_LICENSE("GPL");
3994
3995 module_init(lfsck_init);
3996 module_exit(lfsck_exit);