Whamcloud - gitweb
1603099df99967bb505acc6ebc8a148671670c6c
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         "create_mdtobj",
102         NULL
103 };
104
105 enum lfsck_verify_lpf_types {
106         LVLT_BY_BOOKMARK        = 0,
107         LVLT_BY_NAMEENTRY       = 1,
108 };
109
110 const char *lfsck_status2names(enum lfsck_status status)
111 {
112         if (unlikely(status < 0 || status >= LS_MAX))
113                 return "unknown";
114
115         return lfsck_status_names[status];
116 }
117
118 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
119 {
120         spin_lock_init(&ltds->ltd_lock);
121         init_rwsem(&ltds->ltd_rw_sem);
122         INIT_LIST_HEAD(&ltds->ltd_orphan);
123         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
124         if (ltds->ltd_tgts_bitmap == NULL)
125                 return -ENOMEM;
126
127         return 0;
128 }
129
130 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
131 {
132         struct lfsck_tgt_desc   *ltd;
133         struct lfsck_tgt_desc   *next;
134         int                      idx;
135
136         down_write(&ltds->ltd_rw_sem);
137
138         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
139                                  ltd_orphan_list) {
140                 list_del_init(&ltd->ltd_orphan_list);
141                 lfsck_tgt_put(ltd);
142         }
143
144         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
145                 up_write(&ltds->ltd_rw_sem);
146
147                 return;
148         }
149
150         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
151                 ltd = LTD_TGT(ltds, idx);
152                 if (likely(ltd != NULL)) {
153                         LASSERT(list_empty(&ltd->ltd_layout_list));
154                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_list));
156                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
157
158                         ltds->ltd_tgtnr--;
159                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
160                         LTD_TGT(ltds, idx) = NULL;
161                         lfsck_tgt_put(ltd);
162                 }
163         }
164
165         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
166                  ltds->ltd_tgtnr);
167
168         for (idx = 0; idx < TGT_PTRS; idx++) {
169                 if (ltds->ltd_tgts_idx[idx] != NULL) {
170                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
171                         ltds->ltd_tgts_idx[idx] = NULL;
172                 }
173         }
174
175         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
176         ltds->ltd_tgts_bitmap = NULL;
177         up_write(&ltds->ltd_rw_sem);
178 }
179
180 static int __lfsck_add_target(const struct lu_env *env,
181                               struct lfsck_instance *lfsck,
182                               struct lfsck_tgt_desc *ltd,
183                               bool for_ost, bool locked)
184 {
185         struct lfsck_tgt_descs *ltds;
186         __u32                   index = ltd->ltd_index;
187         int                     rc    = 0;
188         ENTRY;
189
190         if (for_ost)
191                 ltds = &lfsck->li_ost_descs;
192         else
193                 ltds = &lfsck->li_mdt_descs;
194
195         if (!locked)
196                 down_write(&ltds->ltd_rw_sem);
197
198         LASSERT(ltds->ltd_tgts_bitmap != NULL);
199
200         if (index >= ltds->ltd_tgts_bitmap->size) {
201                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
202                                     (__u32)BITS_PER_LONG);
203                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
204                 cfs_bitmap_t *new_bitmap;
205
206                 while (newsize < index + 1)
207                         newsize <<= 1;
208
209                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
210                 if (new_bitmap == NULL)
211                         GOTO(unlock, rc = -ENOMEM);
212
213                 if (ltds->ltd_tgtnr > 0)
214                         cfs_bitmap_copy(new_bitmap, old_bitmap);
215                 ltds->ltd_tgts_bitmap = new_bitmap;
216                 CFS_FREE_BITMAP(old_bitmap);
217         }
218
219         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
220                 CERROR("%s: the device %s (%u) is registered already\n",
221                        lfsck_lfsck2name(lfsck),
222                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
223                 GOTO(unlock, rc = -EEXIST);
224         }
225
226         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
227                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
228                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
229                         GOTO(unlock, rc = -ENOMEM);
230         }
231
232         LTD_TGT(ltds, index) = ltd;
233         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
234         ltds->ltd_tgtnr++;
235
236         GOTO(unlock, rc = 0);
237
238 unlock:
239         if (!locked)
240                 up_write(&ltds->ltd_rw_sem);
241
242         return rc;
243 }
244
245 static int lfsck_add_target_from_orphan(const struct lu_env *env,
246                                         struct lfsck_instance *lfsck)
247 {
248         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
249         struct lfsck_tgt_desc   *ltd;
250         struct lfsck_tgt_desc   *next;
251         struct list_head        *head    = &lfsck_ost_orphan_list;
252         int                      rc;
253         bool                     for_ost = true;
254
255 again:
256         spin_lock(&lfsck_instance_lock);
257         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
258                 if (ltd->ltd_key == lfsck->li_bottom)
259                         list_move_tail(&ltd->ltd_orphan_list,
260                                        &ltds->ltd_orphan);
261         }
262         spin_unlock(&lfsck_instance_lock);
263
264         down_write(&ltds->ltd_rw_sem);
265         while (!list_empty(&ltds->ltd_orphan)) {
266                 ltd = list_entry(ltds->ltd_orphan.next,
267                                  struct lfsck_tgt_desc,
268                                  ltd_orphan_list);
269                 list_del_init(&ltd->ltd_orphan_list);
270                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
271                 /* Do not hold the semaphore for too long time. */
272                 up_write(&ltds->ltd_rw_sem);
273                 if (rc != 0)
274                         return rc;
275
276                 down_write(&ltds->ltd_rw_sem);
277         }
278         up_write(&ltds->ltd_rw_sem);
279
280         if (for_ost) {
281                 ltds = &lfsck->li_mdt_descs;
282                 head = &lfsck_mdt_orphan_list;
283                 for_ost = false;
284                 goto again;
285         }
286
287         return 0;
288 }
289
290 static inline struct lfsck_component *
291 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
292                        struct list_head *list)
293 {
294         struct lfsck_component *com;
295
296         list_for_each_entry(com, list, lc_link) {
297                 if (com->lc_type == type)
298                         return com;
299         }
300         return NULL;
301 }
302
303 struct lfsck_component *
304 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
305 {
306         struct lfsck_component *com;
307
308         spin_lock(&lfsck->li_lock);
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
310         if (com != NULL)
311                 goto unlock;
312
313         com = __lfsck_component_find(lfsck, type,
314                                      &lfsck->li_list_double_scan);
315         if (com != NULL)
316                 goto unlock;
317
318         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
319
320 unlock:
321         if (com != NULL)
322                 lfsck_component_get(com);
323         spin_unlock(&lfsck->li_lock);
324         return com;
325 }
326
327 void lfsck_component_cleanup(const struct lu_env *env,
328                              struct lfsck_component *com)
329 {
330         if (!list_empty(&com->lc_link))
331                 list_del_init(&com->lc_link);
332         if (!list_empty(&com->lc_link_dir))
333                 list_del_init(&com->lc_link_dir);
334
335         lfsck_component_put(env, com);
336 }
337
338 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lu_fid *fid, bool locked)
340 {
341         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
342         int                      rc = 0;
343         ENTRY;
344
345         if (!locked)
346                 mutex_lock(&lfsck->li_mutex);
347
348         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
349         if (rc >= 0) {
350                 bk->lb_last_fid = *fid;
351                 /* We do not care about whether the subsequent sub-operations
352                  * failed or not. The worst case is that one FID is lost that
353                  * is not a big issue for the LFSCK since it is relative rare
354                  * for LFSCK create. */
355                 rc = lfsck_bookmark_store(env, lfsck);
356         }
357
358         if (!locked)
359                 mutex_unlock(&lfsck->li_mutex);
360
361         RETURN(rc);
362 }
363
364 /**
365  * Request the specified ibits lock for the given object.
366  *
367  * Before the LFSCK modifying on the namespace visible object,
368  * it needs to acquire related ibits ldlm lock.
369  *
370  * \param[in] env       pointer to the thread context
371  * \param[in] lfsck     pointer to the lfsck instance
372  * \param[in] obj       pointer to the dt_object to be locked
373  * \param[out] lh       pointer to the lock handle
374  * \param[in] ibits     the bits for the ldlm lock to be acquired
375  * \param[in] mode      the mode for the ldlm lock to be acquired
376  *
377  * \retval              0 for success
378  * \retval              negative error number on failure
379  */
380 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
381                      struct dt_object *obj, struct lustre_handle *lh,
382                      __u64 bits, ldlm_mode_t mode)
383 {
384         struct lfsck_thread_info        *info   = lfsck_env_info(env);
385         ldlm_policy_data_t              *policy = &info->lti_policy;
386         struct ldlm_res_id              *resid  = &info->lti_resid;
387         __u64                            flags  = LDLM_FL_ATOMIC_CB;
388         int                              rc;
389
390         LASSERT(lfsck->li_namespace != NULL);
391
392         memset(policy, 0, sizeof(*policy));
393         policy->l_inodebits.bits = bits;
394         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
395         if (dt_object_remote(obj)) {
396                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
397
398                 memset(einfo, 0, sizeof(*einfo));
399                 einfo->ei_type = LDLM_IBITS;
400                 einfo->ei_mode = mode;
401                 einfo->ei_cb_bl = ldlm_blocking_ast;
402                 einfo->ei_cb_cp = ldlm_completion_ast;
403                 einfo->ei_res_id = resid;
404
405                 rc = dt_object_lock(env, obj, lh, einfo, policy);
406         } else {
407                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
408                                             LDLM_IBITS, policy, mode,
409                                             &flags, ldlm_blocking_ast,
410                                             ldlm_completion_ast, NULL, NULL,
411                                             0, LVB_T_NONE, NULL, lh);
412         }
413
414         if (rc == ELDLM_OK) {
415                 rc = 0;
416         } else {
417                 memset(lh, 0, sizeof(*lh));
418                 rc = -EIO;
419         }
420
421         return rc;
422 }
423
424 /**
425  * Release the the specified ibits lock.
426  *
427  * If the lock has been acquired before, release it
428  * and cleanup the handle. Otherwise, do nothing.
429  *
430  * \param[in] lh        pointer to the lock handle
431  * \param[in] mode      the mode for the ldlm lock to be released
432  */
433 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
434 {
435         if (lustre_handle_is_used(lh)) {
436                 ldlm_lock_decref(lh, mode);
437                 memset(lh, 0, sizeof(*lh));
438         }
439 }
440
441 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
442                               struct lfsck_instance *lfsck,
443                               const struct lu_fid *fid)
444 {
445         struct seq_server_site  *ss     =
446                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
447         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
448         int                      rc;
449
450         fld_range_set_mdt(range);
451         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
452         if (rc == 0)
453                 rc = range->lsr_index;
454
455         return rc;
456 }
457
458 const char dot[] = ".";
459 const char dotdot[] = "..";
460 static const char dotlustre[] = ".lustre";
461 static const char lostfound[] = "lost+found";
462
463 /**
464  * Remove the name entry from the .lustre/lost+found directory.
465  *
466  * No need to care about the object referenced by the name entry,
467  * either the name entry is invalid or redundant, or the referenced
468  * object has been processed or will be handled by others.
469  *
470  * \param[in] env       pointer to the thread context
471  * \param[in] lfsck     pointer to the lfsck instance
472  * \param[in] name      the name for the name entry to be removed
473  *
474  * \retval              0 for success
475  * \retval              negative error number on failure
476  */
477 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
478                                        struct lfsck_instance *lfsck,
479                                        const char *name)
480 {
481         struct dt_object        *parent = lfsck->li_lpf_root_obj;
482         struct dt_device        *dev    = lfsck->li_next;
483         struct thandle          *th;
484         struct lustre_handle     lh     = { 0 };
485         int                      rc;
486         ENTRY;
487
488         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
489                               MDS_INODELOCK_UPDATE, LCK_EX);
490         if (rc != 0)
491                 RETURN(rc);
492
493         th = dt_trans_create(env, dev);
494         if (IS_ERR(th))
495                 GOTO(unlock, rc = PTR_ERR(th));
496
497         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
498         if (rc != 0)
499                 GOTO(stop, rc);
500
501         rc = dt_declare_ref_del(env, parent, th);
502         if (rc != 0)
503                 GOTO(stop, rc);
504
505         rc = dt_trans_start(env, dev, th);
506         if (rc != 0)
507                 GOTO(stop, rc);
508
509         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
510                        BYPASS_CAPA);
511         if (rc != 0)
512                 GOTO(stop, rc);
513
514         dt_write_lock(env, parent, 0);
515         rc = dt_ref_del(env, parent, th);
516         dt_write_unlock(env, parent);
517
518         GOTO(stop, rc);
519
520 stop:
521         dt_trans_stop(env, dev, th);
522
523 unlock:
524         lfsck_ibits_unlock(&lh, LCK_EX);
525
526         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
527                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
528
529         return rc;
530 }
531
532 static int lfsck_create_lpf_local(const struct lu_env *env,
533                                   struct lfsck_instance *lfsck,
534                                   struct dt_object *child,
535                                   struct lu_attr *la,
536                                   struct dt_object_format *dof,
537                                   const char *name)
538 {
539         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
540         struct dt_object        *parent = lfsck->li_lpf_root_obj;
541         struct dt_device        *dev    = lfsck->li_bottom;
542         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
543         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
544         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
545         struct thandle          *th     = NULL;
546         struct linkea_data       ldata  = { NULL };
547         struct lu_buf            linkea_buf;
548         const struct lu_name    *cname;
549         loff_t                   pos    = 0;
550         int                      len    = sizeof(struct lfsck_bookmark);
551         int                      rc;
552         ENTRY;
553
554         rc = linkea_data_new(&ldata,
555                              &lfsck_env_info(env)->lti_linkea_buf2);
556         if (rc != 0)
557                 RETURN(rc);
558
559         cname = lfsck_name_get_const(env, name, strlen(name));
560         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
561         if (rc != 0)
562                 RETURN(rc);
563
564         th = dt_trans_create(env, dev);
565         if (IS_ERR(th))
566                 RETURN(PTR_ERR(th));
567
568         /* 1a. create child */
569         rc = dt_declare_create(env, child, la, NULL, dof, th);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         /* 2a. increase child nlink */
574         rc = dt_declare_ref_add(env, child, th);
575         if (rc != 0)
576                 GOTO(stop, rc);
577
578         /* 3a. insert linkEA for child */
579         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
580                        ldata.ld_leh->leh_len);
581         rc = dt_declare_xattr_set(env, child, &linkea_buf,
582                                   XATTR_NAME_LINK, 0, th);
583         if (rc != 0)
584                 GOTO(stop, rc);
585
586         /* 4a. insert name into parent dir */
587         rec->rec_type = S_IFDIR;
588         rec->rec_fid = cfid;
589         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
590                                (const struct dt_key *)name, th);
591         if (rc != 0)
592                 GOTO(stop, rc);
593
594         /* 5a. increase parent nlink */
595         rc = dt_declare_ref_add(env, parent, th);
596         if (rc != 0)
597                 GOTO(stop, rc);
598
599         /* 6a. update bookmark */
600         rc = dt_declare_record_write(env, bk_obj,
601                                      lfsck_buf_get(env, bk, len), 0, th);
602         if (rc != 0)
603                 GOTO(stop, rc);
604
605         rc = dt_trans_start_local(env, dev, th);
606         if (rc != 0)
607                 GOTO(stop, rc);
608
609         dt_write_lock(env, child, 0);
610         /* 1b.1. create child */
611         rc = dt_create(env, child, la, NULL, dof, th);
612         if (rc != 0)
613                 GOTO(unlock, rc);
614
615         if (unlikely(!dt_try_as_dir(env, child)))
616                 GOTO(unlock, rc = -ENOTDIR);
617
618         /* 1b.2. insert dot into child dir */
619         rec->rec_fid = cfid;
620         rc = dt_insert(env, child, (const struct dt_rec *)rec,
621                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
622         if (rc != 0)
623                 GOTO(unlock, rc);
624
625         /* 1b.3. insert dotdot into child dir */
626         rec->rec_fid = &LU_LPF_FID;
627         rc = dt_insert(env, child, (const struct dt_rec *)rec,
628                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
629         if (rc != 0)
630                 GOTO(unlock, rc);
631
632         /* 2b. increase child nlink */
633         rc = dt_ref_add(env, child, th);
634         if (rc != 0)
635                 GOTO(unlock, rc);
636
637         /* 3b. insert linkEA for child. */
638         rc = dt_xattr_set(env, child, &linkea_buf,
639                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
640         dt_write_unlock(env, child);
641         if (rc != 0)
642                 GOTO(stop, rc);
643
644         /* 4b. insert name into parent dir */
645         rec->rec_fid = cfid;
646         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
647                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
648         if (rc != 0)
649                 GOTO(stop, rc);
650
651         dt_write_lock(env, parent, 0);
652         /* 5b. increase parent nlink */
653         rc = dt_ref_add(env, parent, th);
654         dt_write_unlock(env, parent);
655         if (rc != 0)
656                 GOTO(stop, rc);
657
658         bk->lb_lpf_fid = *cfid;
659         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
660
661         /* 6b. update bookmark */
662         rc = dt_record_write(env, bk_obj,
663                              lfsck_buf_get(env, bk, len), &pos, th);
664
665         GOTO(stop, rc);
666
667 unlock:
668         dt_write_unlock(env, child);
669
670 stop:
671         dt_trans_stop(env, dev, th);
672
673         return rc;
674 }
675
676 static int lfsck_create_lpf_remote(const struct lu_env *env,
677                                    struct lfsck_instance *lfsck,
678                                    struct dt_object *child,
679                                    struct lu_attr *la,
680                                    struct dt_object_format *dof,
681                                    const char *name)
682 {
683         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
684         struct dt_object        *parent = lfsck->li_lpf_root_obj;
685         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
686         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
687         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
688         struct thandle          *th     = NULL;
689         struct linkea_data       ldata  = { NULL };
690         struct lu_buf            linkea_buf;
691         const struct lu_name    *cname;
692         struct dt_device        *dev;
693         loff_t                   pos    = 0;
694         int                      len    = sizeof(struct lfsck_bookmark);
695         int                      rc;
696         ENTRY;
697
698         rc = linkea_data_new(&ldata,
699                              &lfsck_env_info(env)->lti_linkea_buf2);
700         if (rc != 0)
701                 RETURN(rc);
702
703         cname = lfsck_name_get_const(env, name, strlen(name));
704         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
705         if (rc != 0)
706                 RETURN(rc);
707
708         /* Create .lustre/lost+found/MDTxxxx. */
709
710         /* XXX: Currently, cross-MDT create operation needs to create the child
711          *      object firstly, then insert name into the parent directory. For
712          *      this case, the child object resides on current MDT (local), but
713          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
714          *      easy to contain all the sub-modifications orderly within single
715          *      transaction.
716          *
717          *      To avoid more inconsistency, we split the create operation into
718          *      two transactions:
719          *
720          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
721          *         locally.
722          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
723          *         remotely.
724          *
725          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
726          *      repair such inconsistency when LFSCK run next time. */
727
728         /* Transaction I: locally */
729
730         dev = lfsck->li_bottom;
731         th = dt_trans_create(env, dev);
732         if (IS_ERR(th))
733                 RETURN(PTR_ERR(th));
734
735         /* 1a. create child */
736         rc = dt_declare_create(env, child, la, NULL, dof, th);
737         if (rc != 0)
738                 GOTO(stop, rc);
739
740         /* 2a. increase child nlink */
741         rc = dt_declare_ref_add(env, child, th);
742         if (rc != 0)
743                 GOTO(stop, rc);
744
745         /* 3a. insert linkEA for child */
746         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
747                        ldata.ld_leh->leh_len);
748         rc = dt_declare_xattr_set(env, child, &linkea_buf,
749                                   XATTR_NAME_LINK, 0, th);
750         if (rc != 0)
751                 GOTO(stop, rc);
752
753         /* 4a. update bookmark */
754         rc = dt_declare_record_write(env, bk_obj,
755                                      lfsck_buf_get(env, bk, len), 0, th);
756         if (rc != 0)
757                 GOTO(stop, rc);
758
759         rc = dt_trans_start_local(env, dev, th);
760         if (rc != 0)
761                 GOTO(stop, rc);
762
763         dt_write_lock(env, child, 0);
764         /* 1b.1. create child */
765         rc = dt_create(env, child, la, NULL, dof, th);
766         if (rc != 0)
767                 GOTO(unlock, rc);
768
769         if (unlikely(!dt_try_as_dir(env, child)))
770                 GOTO(unlock, rc = -ENOTDIR);
771
772         /* 1b.2. insert dot into child dir */
773         rec->rec_type = S_IFDIR;
774         rec->rec_fid = cfid;
775         rc = dt_insert(env, child, (const struct dt_rec *)rec,
776                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
777         if (rc != 0)
778                 GOTO(unlock, rc);
779
780         /* 1b.3. insert dotdot into child dir */
781         rec->rec_fid = &LU_LPF_FID;
782         rc = dt_insert(env, child, (const struct dt_rec *)rec,
783                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
784         if (rc != 0)
785                 GOTO(unlock, rc);
786
787         /* 2b. increase child nlink */
788         rc = dt_ref_add(env, child, th);
789         if (rc != 0)
790                 GOTO(unlock, rc);
791
792         /* 3b. insert linkEA for child */
793         rc = dt_xattr_set(env, child, &linkea_buf,
794                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
795         if (rc != 0)
796                 GOTO(unlock, rc);
797
798         bk->lb_lpf_fid = *cfid;
799         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
800
801         /* 4b. update bookmark */
802         rc = dt_record_write(env, bk_obj,
803                              lfsck_buf_get(env, bk, len), &pos, th);
804
805         dt_write_unlock(env, child);
806         dt_trans_stop(env, dev, th);
807         if (rc != 0)
808                 RETURN(rc);
809
810         /* Transaction II: remotely */
811
812         dev = lfsck->li_next;
813         th = dt_trans_create(env, dev);
814         if (IS_ERR(th))
815                 RETURN(PTR_ERR(th));
816
817         /* 5a. insert name into parent dir */
818         rec->rec_fid = cfid;
819         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
820                                (const struct dt_key *)name, th);
821         if (rc != 0)
822                 GOTO(stop, rc);
823
824         /* 6a. increase parent nlink */
825         rc = dt_declare_ref_add(env, parent, th);
826         if (rc != 0)
827                 GOTO(stop, rc);
828
829         rc = dt_trans_start(env, dev, th);
830         if (rc != 0)
831                 GOTO(stop, rc);
832
833         /* 5b. insert name into parent dir */
834         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
835                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
836         if (rc != 0)
837                 GOTO(stop, rc);
838
839         dt_write_lock(env, parent, 0);
840         /* 6b. increase parent nlink */
841         rc = dt_ref_add(env, parent, th);
842         dt_write_unlock(env, parent);
843
844         GOTO(stop, rc);
845
846 unlock:
847         dt_write_unlock(env, child);
848 stop:
849         dt_trans_stop(env, dev, th);
850
851         if (rc != 0 && dev == lfsck->li_next)
852                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
853                        "for orphans, but failed to insert the name %s "
854                        "to the .lustre/lost+found/. Such inconsistency "
855                        "will be repaired when LFSCK run next time: rc = %d\n",
856                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
857
858         return rc;
859 }
860
861 /**
862  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
863  *
864  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
865  * orphans and other uncertain inconsistent objects found during the
866  * LFSCK. Such directory will be created by the LFSCK engine on the
867  * local MDT before the LFSCK scanning.
868  *
869  * \param[in] env       pointer to the thread context
870  * \param[in] lfsck     pointer to the lfsck instance
871  *
872  * \retval              0 for success
873  * \retval              negative error number on failure
874  */
875 static int lfsck_create_lpf(const struct lu_env *env,
876                             struct lfsck_instance *lfsck)
877 {
878         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
879         struct lfsck_thread_info *info  = lfsck_env_info(env);
880         struct lu_fid            *cfid  = &info->lti_fid2;
881         struct lu_attr           *la    = &info->lti_la;
882         struct dt_object_format  *dof   = &info->lti_dof;
883         struct dt_object         *parent = lfsck->li_lpf_root_obj;
884         struct dt_object         *child = NULL;
885         struct lustre_handle      lh    = { 0 };
886         char                      name[8];
887         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
888         int                       rc    = 0;
889         ENTRY;
890
891         LASSERT(lfsck->li_master);
892         LASSERT(parent != NULL);
893         LASSERT(lfsck->li_lpf_obj == NULL);
894
895         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
896                               MDS_INODELOCK_UPDATE, LCK_EX);
897         if (rc != 0)
898                 RETURN(rc);
899
900         snprintf(name, 8, "MDT%04x", node);
901         if (fid_is_zero(&bk->lb_lpf_fid)) {
902                 /* There is corner case that: in former LFSCK scanning we have
903                  * created the .lustre/lost+found/MDTxxxx but failed to update
904                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
905                  * it from MDT0 firstly. */
906                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
907                                (const struct dt_key *)name, BYPASS_CAPA);
908                 if (rc != 0 && rc != -ENOENT)
909                         GOTO(unlock, rc);
910
911                 if (rc == 0) {
912                         bk->lb_lpf_fid = *cfid;
913                         rc = lfsck_bookmark_store(env, lfsck);
914                 } else {
915                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
916                 }
917                 if (rc != 0)
918                         GOTO(unlock, rc);
919         } else {
920                 *cfid = bk->lb_lpf_fid;
921         }
922
923         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
924         if (IS_ERR(child))
925                 GOTO(unlock, rc = PTR_ERR(child));
926
927         if (dt_object_exists(child) != 0) {
928                 if (unlikely(!dt_try_as_dir(env, child)))
929                         rc = -ENOTDIR;
930                 else
931                         lfsck->li_lpf_obj = child;
932
933                 GOTO(unlock, rc);
934         }
935
936         memset(la, 0, sizeof(*la));
937         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
938         la->la_mode = S_IFDIR | S_IRWXU;
939         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
940                        LA_UID | LA_GID;
941         memset(dof, 0, sizeof(*dof));
942         dof->dof_type = dt_mode_to_dft(S_IFDIR);
943
944         if (node == 0)
945                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
946         else
947                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
948         if (rc == 0)
949                 lfsck->li_lpf_obj = child;
950
951         GOTO(unlock, rc);
952
953 unlock:
954         lfsck_ibits_unlock(&lh, LCK_EX);
955         if (rc != 0 && child != NULL && !IS_ERR(child))
956                 lu_object_put(env, &child->do_lu);
957
958         return rc;
959 }
960
961 /**
962  * Scan .lustre/lost+found for bad name entries and remove them.
963  *
964  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
965  * index in the system. Any other formatted name is invalid and should be
966  * removed.
967  *
968  * \param[in] env       pointer to the thread context
969  * \param[in] lfsck     pointer to the lfsck instance
970  *
971  * \retval              0 for success
972  * \retval              negative error number on failure
973  */
974 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
975                                       struct lfsck_instance *lfsck)
976 {
977         struct dt_object        *parent = lfsck->li_lpf_root_obj;
978         struct lu_dirent        *ent    =
979                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
980         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
981         struct dt_it            *it;
982         int                      rc;
983         ENTRY;
984
985         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
986         if (IS_ERR(it))
987                 RETURN(PTR_ERR(it));
988
989         rc = iops->load(env, it, 0);
990         if (rc == 0)
991                 rc = iops->next(env, it);
992         else if (rc > 0)
993                 rc = 0;
994
995         while (rc == 0) {
996                 int off = 3;
997
998                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
999                 if (rc != 0)
1000                         break;
1001
1002                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1003                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1004                         goto next;
1005
1006                 /* name length must be strlen("MDTxxxx") */
1007                 if (ent->lde_namelen != 7)
1008                         goto remove;
1009
1010                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1011                         goto remove;
1012
1013                 while (off < 7 && isxdigit(ent->lde_name[off]))
1014                         off++;
1015
1016                 if (off != 7) {
1017
1018 remove:
1019                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1020                                                          ent->lde_name);
1021                         if (rc != 0)
1022                                 break;
1023                 }
1024
1025 next:
1026                 rc = iops->next(env, it);
1027         }
1028
1029         iops->put(env, it);
1030         iops->fini(env, it);
1031
1032         RETURN(rc > 0 ? 0 : rc);
1033 }
1034
1035 static int lfsck_update_lpf_entry(const struct lu_env *env,
1036                                   struct lfsck_instance *lfsck,
1037                                   struct dt_object *parent,
1038                                   struct dt_object *child,
1039                                   const char *name,
1040                                   enum lfsck_verify_lpf_types type)
1041 {
1042         int rc;
1043
1044         if (type == LVLT_BY_BOOKMARK) {
1045                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1046                                              lfsck_dto2fid(child), S_IFDIR);
1047         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1048                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1049                 rc = lfsck_bookmark_store(env, lfsck);
1050
1051                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1052                        " in the bookmark file: rc = %d\n",
1053                        lfsck_lfsck2name(lfsck),
1054                        PFID(lfsck_dto2fid(child)), rc);
1055         }
1056
1057         return rc;
1058 }
1059
1060 /**
1061  * Check whether the @child back references the @parent.
1062  *
1063  * Two cases:
1064  * 1) The child's FID is stored in the bookmark file. If the child back
1065  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1066  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1067  *    the child back references another parent2, then:
1068  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1069  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1070  *      references the child. So keep them there. As the LFSCK processing,
1071  *      the parent3 may be found, then when the LFSCK run next time, the
1072  *      inconsistency can be repaired.
1073  *
1074  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1075  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1076  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1077  *    back references another parent2, then:
1078  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1079  *      from .lustre/lost+found/;
1080  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1081  *      sub-directory name entry and update the child;
1082  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1083  *      or not, then keep them there.
1084  *
1085  * \param[in] env       pointer to the thread context
1086  * \param[in] lfsck     pointer to the lfsck instance
1087  * \param[in] child     pointer to the lost+found sub-directory object
1088  * \param[in] name      the name for lost+found sub-directory object
1089  * \param[out] fid      pointer to the buffer to hold the FID of the object
1090  *                      (called it as parent2) that is referenced via the
1091  *                      child's dotdot entry; it also can be the FID that
1092  *                      is referenced by the name entry under the parent2.
1093  * \param[in] type      to indicate where the child's FID is stored in
1094  *
1095  * \retval              positive number for uncertain inconsistency
1096  * \retval              0 for success
1097  * \retval              negative error number on failure
1098  */
1099 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1100                                   struct lfsck_instance *lfsck,
1101                                   struct dt_object *child, const char *name,
1102                                   struct lu_fid *fid,
1103                                   enum lfsck_verify_lpf_types type)
1104 {
1105         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1106         struct lfsck_thread_info *info    = lfsck_env_info(env);
1107         char                     *name2   = info->lti_key;
1108         struct lu_fid            *fid2    = &info->lti_fid3;
1109         struct dt_object         *parent2 = NULL;
1110         struct lustre_handle      lh      = { 0 };
1111         int                       rc;
1112         ENTRY;
1113
1114         fid_zero(fid);
1115         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1116                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1117         if (rc != 0)
1118                 GOTO(linkea, rc);
1119
1120         if (!fid_is_sane(fid))
1121                 GOTO(linkea, rc = -EINVAL);
1122
1123         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1124                 const struct lu_name *cname;
1125
1126                 if (lfsck->li_lpf_obj == NULL) {
1127                         lu_object_get(&child->do_lu);
1128                         lfsck->li_lpf_obj = child;
1129                 }
1130
1131                 cname = lfsck_name_get_const(env, name, strlen(name));
1132                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1133                                          &LU_LPF_FID);
1134                 if (rc == 0)
1135                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1136                                                     name, type);
1137
1138                 GOTO(out_done, rc);
1139         }
1140
1141         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1142         if (IS_ERR(parent2))
1143                 GOTO(linkea, parent2);
1144
1145         if (!dt_object_exists(parent2)) {
1146                 lu_object_put(env, &parent2->do_lu);
1147
1148                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1149         }
1150
1151         if (!dt_try_as_dir(env, parent2)) {
1152                 lu_object_put(env, &parent2->do_lu);
1153
1154                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1155         }
1156
1157 linkea:
1158         /* To prevent rename/unlink race */
1159         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1160                               MDS_INODELOCK_UPDATE, LCK_PR);
1161         if (rc != 0)
1162                 GOTO(out_put, rc);
1163
1164         dt_read_lock(env, child, 0);
1165         rc = lfsck_links_get_first(env, child, name2, fid2);
1166         if (rc != 0) {
1167                 dt_read_unlock(env, child);
1168                 lfsck_ibits_unlock(&lh, LCK_PR);
1169
1170                 GOTO(out_put, rc = 1);
1171         }
1172
1173         /* It is almost impossible that the bookmark file (or the name entry)
1174          * and the linkEA hit the same data corruption. Trust the linkEA. */
1175         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1176                 dt_read_unlock(env, child);
1177                 lfsck_ibits_unlock(&lh, LCK_PR);
1178
1179                 *fid = *fid2;
1180                 if (lfsck->li_lpf_obj == NULL) {
1181                         lu_object_get(&child->do_lu);
1182                         lfsck->li_lpf_obj = child;
1183                 }
1184
1185                 /* Update the child's dotdot entry */
1186                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1187                                              &LU_LPF_FID, S_IFDIR);
1188                 if (rc == 0)
1189                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1190                                                     name, type);
1191
1192                 GOTO(out_put, rc);
1193         }
1194
1195         if (parent2 == NULL || IS_ERR(parent2)) {
1196                 dt_read_unlock(env, child);
1197                 lfsck_ibits_unlock(&lh, LCK_PR);
1198
1199                 GOTO(out_done, rc = 1);
1200         }
1201
1202         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1203                        (const struct dt_key *)name2, BYPASS_CAPA);
1204         dt_read_unlock(env, child);
1205         lfsck_ibits_unlock(&lh, LCK_PR);
1206         if (rc != 0 && rc != -ENOENT)
1207                 GOTO(out_put, rc);
1208
1209         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1210                 if (type == LVLT_BY_BOOKMARK)
1211                         GOTO(out_put, rc = 1);
1212
1213                 /* Trust the name entry, update the child's dotdot entry. */
1214                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1215                                              &LU_LPF_FID, S_IFDIR);
1216
1217                 GOTO(out_put, rc);
1218         }
1219
1220         if (type == LVLT_BY_BOOKMARK) {
1221                 /* Invalid FID record in the bookmark file, reset it. */
1222                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1223                 rc = lfsck_bookmark_store(env, lfsck);
1224
1225                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1226                        " in the bookmark file: rc = %d\n",
1227                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1228         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1229                 /* The name entry is wrong, remove it. */
1230                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1231         }
1232
1233         GOTO(out_put, rc);
1234
1235 out_put:
1236         if (parent2 != NULL && !IS_ERR(parent2))
1237                 lu_object_put(env, &parent2->do_lu);
1238
1239 out_done:
1240         return rc;
1241 }
1242
1243 /**
1244  * Verify the /ROOT/.lustre/lost+found/ directory.
1245  *
1246  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1247  * the LFSCK does not exactly know how to handle, such as orphans. So before
1248  * the LFSCK scanning the system, the consistency of such directory needs to
1249  * be verified firstly to allow the users to use it during the LFSCK.
1250  *
1251  * \param[in] env       pointer to the thread context
1252  * \param[in] lfsck     pointer to the lfsck instance
1253  *
1254  * \retval              positive number for uncertain inconsistency
1255  * \retval              0 for success
1256  * \retval              negative error number on failure
1257  */
1258 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1259 {
1260         struct lfsck_thread_info *info   = lfsck_env_info(env);
1261         struct lu_fid            *pfid   = &info->lti_fid;
1262         struct lu_fid            *cfid   = &info->lti_fid2;
1263         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1264         struct dt_object         *parent;
1265         /* child1's FID is in the bookmark file. */
1266         struct dt_object         *child1 = NULL;
1267         /* child2's FID is in the name entry MDTxxxx. */
1268         struct dt_object         *child2 = NULL;
1269         struct dt_device         *dev    = lfsck->li_bottom;
1270         const struct lu_name     *cname;
1271         char                      name[8];
1272         int                       node   = lfsck_dev_idx(dev);
1273         int                       rc     = 0;
1274         ENTRY;
1275
1276         LASSERT(lfsck->li_master);
1277
1278         if (lfsck->li_lpf_root_obj != NULL)
1279                 RETURN(0);
1280
1281         if (node == 0) {
1282                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1283         } else {
1284                 struct lfsck_tgt_desc *ltd;
1285
1286                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1287                 if (unlikely(ltd == NULL))
1288                         RETURN(-ENXIO);
1289
1290                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1291                                                   &LU_LPF_FID);
1292                 lfsck_tgt_put(ltd);
1293         }
1294
1295         if (IS_ERR(parent))
1296                 RETURN(PTR_ERR(parent));
1297
1298         LASSERT(dt_object_exists(parent));
1299
1300         if (unlikely(!dt_try_as_dir(env, parent))) {
1301                 lu_object_put(env, &parent->do_lu);
1302
1303                 GOTO(put, rc = -ENOTDIR);
1304         }
1305
1306         lfsck->li_lpf_root_obj = parent;
1307         if (node == 0) {
1308                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1309                 if (rc != 0)
1310                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1311                                "for bad sub-directories: rc = %d\n",
1312                                lfsck_lfsck2name(lfsck), rc);
1313         }
1314
1315         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1316                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1317                         struct lu_fid tfid = bk->lb_lpf_fid;
1318
1319                         /* Invalid FID record in the bookmark file, reset it. */
1320                         fid_zero(&bk->lb_lpf_fid);
1321                         rc = lfsck_bookmark_store(env, lfsck);
1322
1323                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1324                                " in the bookmark file: rc = %d\n",
1325                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1326
1327                         if (rc != 0)
1328                                 GOTO(put, rc);
1329                 } else {
1330                         child1 = lfsck_object_find_by_dev(env, dev,
1331                                                           &bk->lb_lpf_fid);
1332                         if (IS_ERR(child1))
1333                                 GOTO(put, rc = PTR_ERR(child1));
1334
1335                         if (unlikely(!dt_object_exists(child1) ||
1336                                      dt_object_remote(child1)) ||
1337                                      !S_ISDIR(lfsck_object_type(child1))) {
1338                                 /* Invalid FID record in the bookmark file,
1339                                  * reset it. */
1340                                 fid_zero(&bk->lb_lpf_fid);
1341                                 rc = lfsck_bookmark_store(env, lfsck);
1342
1343                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1344                                        " in the bookmark file: rc = %d\n",
1345                                        lfsck_lfsck2name(lfsck),
1346                                        PFID(lfsck_dto2fid(child1)), rc);
1347
1348                                 if (rc != 0)
1349                                         GOTO(put, rc);
1350
1351                                 lu_object_put(env, &child1->do_lu);
1352                                 child1 = NULL;
1353                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1354                                 GOTO(put, rc = -ENOTDIR);
1355                         }
1356                 }
1357         }
1358
1359         snprintf(name, 8, "MDT%04x", node);
1360         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1361                        (const struct dt_key *)name, BYPASS_CAPA);
1362         if (rc == -ENOENT) {
1363                 if (!fid_is_zero(&bk->lb_lpf_fid))
1364                         goto check_child1;
1365
1366                 GOTO(put, rc = 0);
1367         }
1368
1369         if (rc != 0)
1370                 GOTO(put, rc);
1371
1372         /* Invalid FID in the name entry, remove the name entry. */
1373         if (!fid_is_norm(cfid)) {
1374                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1375                 if (rc != 0)
1376                         GOTO(put, rc);
1377
1378                 goto check_child1;
1379         }
1380
1381         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1382         if (IS_ERR(child2))
1383                 GOTO(put, rc = PTR_ERR(child2));
1384
1385         if (unlikely(!dt_object_exists(child2) ||
1386                      dt_object_remote(child2)) ||
1387                      !S_ISDIR(lfsck_object_type(child2))) {
1388                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1389                 if (rc != 0)
1390                         GOTO(put, rc);
1391
1392                 goto check_child1;
1393         }
1394
1395         if (unlikely(!dt_try_as_dir(env, child2)))
1396                 GOTO(put, rc = -ENOTDIR);
1397
1398         if (child1 == NULL) {
1399                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1400                                             pfid, LVLT_BY_NAMEENTRY);
1401         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1402                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1403                                             pfid, LVLT_BY_BOOKMARK);
1404                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1405                         rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
1406                                                     name, pfid,
1407                                                     LVLT_BY_NAMEENTRY);
1408         } else {
1409                 if (lfsck->li_lpf_obj == NULL) {
1410                         lu_object_get(&child2->do_lu);
1411                         lfsck->li_lpf_obj = child2;
1412                 }
1413
1414                 cname = lfsck_name_get_const(env, name, strlen(name));
1415                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1416         }
1417
1418         GOTO(put, rc);
1419
1420 check_child1:
1421         if (child1 != NULL)
1422                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1423                                             pfid, LVLT_BY_BOOKMARK);
1424
1425         GOTO(put, rc);
1426
1427 put:
1428         if (lfsck->li_lpf_obj != NULL) {
1429                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1430                         lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1431                         lfsck->li_lpf_obj = NULL;
1432                         rc = -ENOTDIR;
1433                 }
1434         } else if (rc == 0) {
1435                 rc = lfsck_create_lpf(env, lfsck);
1436         }
1437
1438         if (child2 != NULL && !IS_ERR(child2))
1439                 lu_object_put(env, &child2->do_lu);
1440         if (child1 != NULL && !IS_ERR(child1))
1441                 lu_object_put(env, &child1->do_lu);
1442
1443         return rc;
1444 }
1445
1446 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1447 {
1448         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1449         struct seq_server_site  *ss;
1450         char                    *prefix;
1451         int                      rc     = 0;
1452         ENTRY;
1453
1454         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1455         if (unlikely(ss == NULL))
1456                 RETURN(-ENXIO);
1457
1458         OBD_ALLOC_PTR(lfsck->li_seq);
1459         if (lfsck->li_seq == NULL)
1460                 RETURN(-ENOMEM);
1461
1462         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1463         if (prefix == NULL)
1464                 GOTO(out, rc = -ENOMEM);
1465
1466         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1467         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1468                              ss->ss_server_seq);
1469         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1470         if (rc != 0)
1471                 GOTO(out, rc);
1472
1473         if (fid_is_sane(&bk->lb_last_fid))
1474                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1475
1476         RETURN(0);
1477
1478 out:
1479         OBD_FREE_PTR(lfsck->li_seq);
1480         lfsck->li_seq = NULL;
1481
1482         return rc;
1483 }
1484
1485 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1486 {
1487         if (lfsck->li_seq != NULL) {
1488                 seq_client_fini(lfsck->li_seq);
1489                 OBD_FREE_PTR(lfsck->li_seq);
1490                 lfsck->li_seq = NULL;
1491         }
1492 }
1493
1494 void lfsck_instance_cleanup(const struct lu_env *env,
1495                             struct lfsck_instance *lfsck)
1496 {
1497         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1498         struct lfsck_component  *com;
1499         struct lfsck_component  *next;
1500         struct lfsck_lmv_unit   *llu;
1501         struct lfsck_lmv_unit   *llu_next;
1502         struct lfsck_lmv        *llmv;
1503         ENTRY;
1504
1505         LASSERT(list_empty(&lfsck->li_link));
1506         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1507
1508         if (lfsck->li_obj_oit != NULL) {
1509                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1510                 lfsck->li_obj_oit = NULL;
1511         }
1512
1513         LASSERT(lfsck->li_obj_dir == NULL);
1514         LASSERT(lfsck->li_lmv == NULL);
1515
1516         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1517                 llmv = &llu->llu_lmv;
1518
1519                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1520                          "still in using: %u\n",
1521                          atomic_read(&llmv->ll_ref));
1522
1523                 lfsck_lmv_put(env, llmv);
1524         }
1525
1526         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1527                 lfsck_component_cleanup(env, com);
1528         }
1529
1530         LASSERT(list_empty(&lfsck->li_list_dir));
1531
1532         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1533                                  lc_link) {
1534                 lfsck_component_cleanup(env, com);
1535         }
1536
1537         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1538                 lfsck_component_cleanup(env, com);
1539         }
1540
1541         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1542         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1543
1544         if (lfsck->li_bookmark_obj != NULL) {
1545                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1546                 lfsck->li_bookmark_obj = NULL;
1547         }
1548
1549         if (lfsck->li_lpf_obj != NULL) {
1550                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1551                 lfsck->li_lpf_obj = NULL;
1552         }
1553
1554         if (lfsck->li_lpf_root_obj != NULL) {
1555                 lu_object_put(env, &lfsck->li_lpf_root_obj->do_lu);
1556                 lfsck->li_lpf_root_obj = NULL;
1557         }
1558
1559         if (lfsck->li_los != NULL) {
1560                 local_oid_storage_fini(env, lfsck->li_los);
1561                 lfsck->li_los = NULL;
1562         }
1563
1564         lfsck_fid_fini(lfsck);
1565
1566         OBD_FREE_PTR(lfsck);
1567 }
1568
1569 static inline struct lfsck_instance *
1570 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1571 {
1572         struct lfsck_instance *lfsck;
1573
1574         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1575                 if (lfsck->li_bottom == key) {
1576                         if (ref)
1577                                 lfsck_instance_get(lfsck);
1578                         if (unlink)
1579                                 list_del_init(&lfsck->li_link);
1580
1581                         return lfsck;
1582                 }
1583         }
1584
1585         return NULL;
1586 }
1587
1588 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1589                                            bool unlink)
1590 {
1591         struct lfsck_instance *lfsck;
1592
1593         spin_lock(&lfsck_instance_lock);
1594         lfsck = __lfsck_instance_find(key, ref, unlink);
1595         spin_unlock(&lfsck_instance_lock);
1596
1597         return lfsck;
1598 }
1599
1600 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1601 {
1602         struct lfsck_instance *tmp;
1603
1604         spin_lock(&lfsck_instance_lock);
1605         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1606                 if (lfsck->li_bottom == tmp->li_bottom) {
1607                         spin_unlock(&lfsck_instance_lock);
1608                         return -EEXIST;
1609                 }
1610         }
1611
1612         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1613         spin_unlock(&lfsck_instance_lock);
1614         return 0;
1615 }
1616
1617 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1618                     const char *prefix)
1619 {
1620         int flag;
1621         int i;
1622         bool newline = (bits != 0 ? false : true);
1623
1624         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1625
1626         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1627                 if (flag & bits) {
1628                         bits &= ~flag;
1629                         if (names[i] != NULL) {
1630                                 if (bits == 0)
1631                                         newline = true;
1632
1633                                 seq_printf(m, "%s%c", names[i],
1634                                            newline ? '\n' : ',');
1635                         }
1636                 }
1637         }
1638
1639         if (!newline)
1640                 seq_printf(m, "\n");
1641         return 0;
1642 }
1643
1644 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1645 {
1646         if (time != 0)
1647                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1648                           cfs_time_current_sec() - time);
1649         else
1650                 seq_printf(m, "%s: N/A\n", prefix);
1651         return 0;
1652 }
1653
1654 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1655                    const char *prefix)
1656 {
1657         if (fid_is_zero(&pos->lp_dir_parent)) {
1658                 if (pos->lp_oit_cookie == 0)
1659                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1660                                    prefix);
1661                 else
1662                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1663                                    prefix, pos->lp_oit_cookie);
1664         } else {
1665                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1666                            prefix, pos->lp_oit_cookie,
1667                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1668         }
1669         return 0;
1670 }
1671
1672 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1673                     struct lfsck_position *pos, bool init)
1674 {
1675         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1676
1677         if (unlikely(lfsck->li_di_oit == NULL)) {
1678                 memset(pos, 0, sizeof(*pos));
1679                 return;
1680         }
1681
1682         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1683         if (!lfsck->li_current_oit_processed && !init)
1684                 pos->lp_oit_cookie--;
1685
1686         LASSERT(pos->lp_oit_cookie > 0);
1687
1688         if (lfsck->li_di_dir != NULL) {
1689                 struct dt_object *dto = lfsck->li_obj_dir;
1690
1691                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1692                                                         lfsck->li_di_dir);
1693
1694                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1695                         fid_zero(&pos->lp_dir_parent);
1696                         pos->lp_dir_cookie = 0;
1697                 } else {
1698                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1699                 }
1700         } else {
1701                 fid_zero(&pos->lp_dir_parent);
1702                 pos->lp_dir_cookie = 0;
1703         }
1704 }
1705
1706 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1707 {
1708         bool dirty = false;
1709
1710         if (limit != LFSCK_SPEED_NO_LIMIT) {
1711                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1712                         lfsck->li_sleep_rate = limit /
1713                                                msecs_to_jiffies(MSEC_PER_SEC);
1714                         lfsck->li_sleep_jif = 1;
1715                 } else {
1716                         lfsck->li_sleep_rate = 1;
1717                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1718                                               limit;
1719                 }
1720         } else {
1721                 lfsck->li_sleep_jif = 0;
1722                 lfsck->li_sleep_rate = 0;
1723         }
1724
1725         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1726                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1727                 dirty = true;
1728         }
1729
1730         return dirty;
1731 }
1732
1733 void lfsck_control_speed(struct lfsck_instance *lfsck)
1734 {
1735         struct ptlrpc_thread *thread = &lfsck->li_thread;
1736         struct l_wait_info    lwi;
1737
1738         if (lfsck->li_sleep_jif > 0 &&
1739             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1740                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1741                                        LWI_ON_SIGNAL_NOOP, NULL);
1742
1743                 l_wait_event(thread->t_ctl_waitq,
1744                              !thread_is_running(thread),
1745                              &lwi);
1746                 lfsck->li_new_scanned = 0;
1747         }
1748 }
1749
1750 void lfsck_control_speed_by_self(struct lfsck_component *com)
1751 {
1752         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1753         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1754         struct l_wait_info       lwi;
1755
1756         if (lfsck->li_sleep_jif > 0 &&
1757             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1758                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1759                                        LWI_ON_SIGNAL_NOOP, NULL);
1760
1761                 l_wait_event(thread->t_ctl_waitq,
1762                              !thread_is_running(thread),
1763                              &lwi);
1764                 com->lc_new_scanned = 0;
1765         }
1766 }
1767
1768 static struct lfsck_thread_args *
1769 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1770                        struct lfsck_component *com,
1771                        struct lfsck_start_param *lsp)
1772 {
1773         struct lfsck_thread_args *lta;
1774         int                       rc;
1775
1776         OBD_ALLOC_PTR(lta);
1777         if (lta == NULL)
1778                 return ERR_PTR(-ENOMEM);
1779
1780         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1781         if (rc != 0) {
1782                 OBD_FREE_PTR(lta);
1783                 return ERR_PTR(rc);
1784         }
1785
1786         lta->lta_lfsck = lfsck_instance_get(lfsck);
1787         if (com != NULL)
1788                 lta->lta_com = lfsck_component_get(com);
1789
1790         lta->lta_lsp = lsp;
1791
1792         return lta;
1793 }
1794
1795 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1796 {
1797         if (lta->lta_com != NULL)
1798                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1799         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1800         lu_env_fini(&lta->lta_env);
1801         OBD_FREE_PTR(lta);
1802 }
1803
1804 struct lfsck_assistant_data *
1805 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1806                           const char *name)
1807 {
1808         struct lfsck_assistant_data *lad;
1809
1810         OBD_ALLOC_PTR(lad);
1811         if (lad != NULL) {
1812                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1813                 if (lad->lad_bitmap == NULL) {
1814                         OBD_FREE_PTR(lad);
1815                         return NULL;
1816                 }
1817
1818                 INIT_LIST_HEAD(&lad->lad_req_list);
1819                 spin_lock_init(&lad->lad_lock);
1820                 INIT_LIST_HEAD(&lad->lad_ost_list);
1821                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1822                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1823                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1824                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1825                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1826                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1827                 lad->lad_ops = lao;
1828                 lad->lad_name = name;
1829         }
1830
1831         return lad;
1832 }
1833
1834 /**
1835  * Generic LFSCK asynchronous communication interpretor function.
1836  * The LFSCK RPC reply for both the event notification and status
1837  * querying will be handled here.
1838  *
1839  * \param[in] env       pointer to the thread context
1840  * \param[in] req       pointer to the LFSCK request
1841  * \param[in] args      pointer to the lfsck_async_interpret_args
1842  * \param[in] rc        the result for handling the LFSCK request
1843  *
1844  * \retval              0 for success
1845  * \retval              negative error number on failure
1846  */
1847 int lfsck_async_interpret_common(const struct lu_env *env,
1848                                  struct ptlrpc_request *req,
1849                                  void *args, int rc)
1850 {
1851         struct lfsck_async_interpret_args *laia = args;
1852         struct lfsck_component            *com  = laia->laia_com;
1853         struct lfsck_assistant_data       *lad  = com->lc_data;
1854         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1855         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1856         struct lfsck_request              *lr   = laia->laia_lr;
1857
1858         LASSERT(com->lc_lfsck->li_master);
1859
1860         switch (lr->lr_event) {
1861         case LE_START:
1862                 if (rc != 0) {
1863                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1864                                "start: rc = %d\n",
1865                                lfsck_lfsck2name(com->lc_lfsck),
1866                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1867                                ltd->ltd_index, lad->lad_name, rc);
1868
1869                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1870                                 struct lfsck_layout *lo = com->lc_file_ram;
1871
1872                                 if (lr->lr_flags & LEF_TO_OST)
1873                                         lfsck_lad_set_bitmap(env, com,
1874                                                              ltd->ltd_index);
1875                                 else
1876                                         lo->ll_flags |= LF_INCOMPLETE;
1877                         } else {
1878                                 struct lfsck_namespace *ns = com->lc_file_ram;
1879
1880                                 /* If some MDT does not join the namespace
1881                                  * LFSCK, then we cannot know whether there
1882                                  * is some name entry on such MDT that with
1883                                  * the referenced MDT-object on this MDT or
1884                                  * not. So the namespace LFSCK on this MDT
1885                                  * cannot handle orphan MDT-objects properly.
1886                                  * So we mark the LFSCK as LF_INCOMPLETE and
1887                                  * skip orphan MDT-objects handling. */
1888                                 ns->ln_flags |= LF_INCOMPLETE;
1889                         }
1890                         break;
1891                 }
1892
1893                 spin_lock(&ltds->ltd_lock);
1894                 if (ltd->ltd_dead) {
1895                         spin_unlock(&ltds->ltd_lock);
1896                         break;
1897                 }
1898
1899                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1900                         struct list_head *list;
1901                         struct list_head *phase_list;
1902
1903                         if (ltd->ltd_layout_done) {
1904                                 spin_unlock(&ltds->ltd_lock);
1905                                 break;
1906                         }
1907
1908                         if (lr->lr_flags & LEF_TO_OST) {
1909                                 list = &lad->lad_ost_list;
1910                                 phase_list = &lad->lad_ost_phase1_list;
1911                         } else {
1912                                 list = &lad->lad_mdt_list;
1913                                 phase_list = &lad->lad_mdt_phase1_list;
1914                         }
1915
1916                         if (list_empty(&ltd->ltd_layout_list))
1917                                 list_add_tail(&ltd->ltd_layout_list, list);
1918                         if (list_empty(&ltd->ltd_layout_phase_list))
1919                                 list_add_tail(&ltd->ltd_layout_phase_list,
1920                                               phase_list);
1921                 } else {
1922                         if (ltd->ltd_namespace_done) {
1923                                 spin_unlock(&ltds->ltd_lock);
1924                                 break;
1925                         }
1926
1927                         if (list_empty(&ltd->ltd_namespace_list))
1928                                 list_add_tail(&ltd->ltd_namespace_list,
1929                                               &lad->lad_mdt_list);
1930                         if (list_empty(&ltd->ltd_namespace_phase_list))
1931                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1932                                               &lad->lad_mdt_phase1_list);
1933                 }
1934                 spin_unlock(&ltds->ltd_lock);
1935                 break;
1936         case LE_STOP:
1937         case LE_PHASE1_DONE:
1938         case LE_PHASE2_DONE:
1939         case LE_PEER_EXIT:
1940                 if (rc != 0 && rc != -EALREADY)
1941                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1942                               "event = %d, rc = %d\n",
1943                               lfsck_lfsck2name(com->lc_lfsck),
1944                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1945                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1946                 break;
1947         case LE_QUERY: {
1948                 struct lfsck_reply *reply;
1949                 struct list_head *list;
1950                 struct list_head *phase_list;
1951
1952                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1953                         list = &ltd->ltd_layout_list;
1954                         phase_list = &ltd->ltd_layout_phase_list;
1955                 } else {
1956                         list = &ltd->ltd_namespace_list;
1957                         phase_list = &ltd->ltd_namespace_phase_list;
1958                 }
1959
1960                 if (rc != 0) {
1961                         spin_lock(&ltds->ltd_lock);
1962                         list_del_init(phase_list);
1963                         list_del_init(list);
1964                         spin_unlock(&ltds->ltd_lock);
1965                         break;
1966                 }
1967
1968                 reply = req_capsule_server_get(&req->rq_pill,
1969                                                &RMF_LFSCK_REPLY);
1970                 if (reply == NULL) {
1971                         rc = -EPROTO;
1972                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1973                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1974                                lad->lad_name, rc);
1975                         spin_lock(&ltds->ltd_lock);
1976                         list_del_init(phase_list);
1977                         list_del_init(list);
1978                         spin_unlock(&ltds->ltd_lock);
1979                         break;
1980                 }
1981
1982                 switch (reply->lr_status) {
1983                 case LS_SCANNING_PHASE1:
1984                         break;
1985                 case LS_SCANNING_PHASE2:
1986                         spin_lock(&ltds->ltd_lock);
1987                         list_del_init(phase_list);
1988                         if (ltd->ltd_dead) {
1989                                 spin_unlock(&ltds->ltd_lock);
1990                                 break;
1991                         }
1992
1993                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1994                                 if (ltd->ltd_layout_done) {
1995                                         spin_unlock(&ltds->ltd_lock);
1996                                         break;
1997                                 }
1998
1999                                 if (lr->lr_flags & LEF_TO_OST)
2000                                         list_add_tail(phase_list,
2001                                                 &lad->lad_ost_phase2_list);
2002                                 else
2003                                         list_add_tail(phase_list,
2004                                                 &lad->lad_mdt_phase2_list);
2005                         } else {
2006                                 if (ltd->ltd_namespace_done) {
2007                                         spin_unlock(&ltds->ltd_lock);
2008                                         break;
2009                                 }
2010
2011                                 list_add_tail(phase_list,
2012                                               &lad->lad_mdt_phase2_list);
2013                         }
2014                         spin_unlock(&ltds->ltd_lock);
2015                         break;
2016                 default:
2017                         spin_lock(&ltds->ltd_lock);
2018                         list_del_init(phase_list);
2019                         list_del_init(list);
2020                         spin_unlock(&ltds->ltd_lock);
2021                         break;
2022                 }
2023                 break;
2024         }
2025         default:
2026                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2027                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2028                 break;
2029         }
2030
2031         if (!laia->laia_shared) {
2032                 lfsck_tgt_put(ltd);
2033                 lfsck_component_put(env, com);
2034         }
2035
2036         return 0;
2037 }
2038
2039 static void lfsck_interpret(const struct lu_env *env,
2040                             struct lfsck_instance *lfsck,
2041                             struct ptlrpc_request *req, void *args, int result)
2042 {
2043         struct lfsck_async_interpret_args *laia = args;
2044         struct lfsck_component            *com;
2045
2046         LASSERT(laia->laia_com == NULL);
2047         LASSERT(laia->laia_shared);
2048
2049         spin_lock(&lfsck->li_lock);
2050         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2051                 laia->laia_com = com;
2052                 lfsck_async_interpret_common(env, req, laia, result);
2053         }
2054
2055         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2056                 laia->laia_com = com;
2057                 lfsck_async_interpret_common(env, req, laia, result);
2058         }
2059         spin_unlock(&lfsck->li_lock);
2060 }
2061
2062 static int lfsck_stop_notify(const struct lu_env *env,
2063                              struct lfsck_instance *lfsck,
2064                              struct lfsck_tgt_descs *ltds,
2065                              struct lfsck_tgt_desc *ltd, __u16 type)
2066 {
2067         struct lfsck_component *com;
2068         int                     rc = 0;
2069         ENTRY;
2070
2071         LASSERT(lfsck->li_master);
2072
2073         spin_lock(&lfsck->li_lock);
2074         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2075         if (com == NULL)
2076                 com = __lfsck_component_find(lfsck, type,
2077                                              &lfsck->li_list_double_scan);
2078         if (com != NULL)
2079                 lfsck_component_get(com);
2080         spin_unlock(&lfsck->li_lock);
2081
2082         if (com != NULL) {
2083                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2084                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2085                 struct lfsck_request              *lr    = &info->lti_lr;
2086                 struct lfsck_assistant_data       *lad   = com->lc_data;
2087                 struct list_head                  *list;
2088                 struct list_head                  *phase_list;
2089                 struct ptlrpc_request_set         *set;
2090
2091                 set = ptlrpc_prep_set();
2092                 if (set == NULL) {
2093                         lfsck_component_put(env, com);
2094
2095                         RETURN(-ENOMEM);
2096                 }
2097
2098                 if (type == LFSCK_TYPE_LAYOUT) {
2099                         list = &ltd->ltd_layout_list;
2100                         phase_list = &ltd->ltd_layout_phase_list;
2101                 } else {
2102                         list = &ltd->ltd_namespace_list;
2103                         phase_list = &ltd->ltd_namespace_phase_list;
2104                 }
2105
2106                 spin_lock(&ltds->ltd_lock);
2107                 if (list_empty(list)) {
2108                         LASSERT(list_empty(phase_list));
2109                         spin_unlock(&ltds->ltd_lock);
2110                         ptlrpc_set_destroy(set);
2111
2112                         RETURN(0);
2113                 }
2114
2115                 list_del_init(phase_list);
2116                 list_del_init(list);
2117                 spin_unlock(&ltds->ltd_lock);
2118
2119                 memset(lr, 0, sizeof(*lr));
2120                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2121                 lr->lr_event = LE_PEER_EXIT;
2122                 lr->lr_active = type;
2123                 lr->lr_status = LS_CO_PAUSED;
2124                 if (ltds == &lfsck->li_ost_descs)
2125                         lr->lr_flags = LEF_TO_OST;
2126
2127                 laia->laia_com = com;
2128                 laia->laia_ltds = ltds;
2129                 atomic_inc(&ltd->ltd_ref);
2130                 laia->laia_ltd = ltd;
2131                 laia->laia_lr = lr;
2132                 laia->laia_shared = 0;
2133
2134                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2135                                          lfsck_async_interpret_common,
2136                                          laia, LFSCK_NOTIFY);
2137                 if (rc != 0) {
2138                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2139                                "co-stop for %s: rc = %d\n",
2140                                lfsck_lfsck2name(lfsck),
2141                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2142                                ltd->ltd_index, lad->lad_name, rc);
2143                         lfsck_tgt_put(ltd);
2144                 } else {
2145                         rc = ptlrpc_set_wait(set);
2146                 }
2147
2148                 ptlrpc_set_destroy(set);
2149                 lfsck_component_put(env, com);
2150         }
2151
2152         RETURN(rc);
2153 }
2154
2155 static int lfsck_async_interpret(const struct lu_env *env,
2156                                  struct ptlrpc_request *req,
2157                                  void *args, int rc)
2158 {
2159         struct lfsck_async_interpret_args *laia = args;
2160         struct lfsck_instance             *lfsck;
2161
2162         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2163                               li_mdt_descs);
2164         lfsck_interpret(env, lfsck, req, laia, rc);
2165         lfsck_tgt_put(laia->laia_ltd);
2166         if (rc != 0 && laia->laia_result != -EALREADY)
2167                 laia->laia_result = rc;
2168
2169         return 0;
2170 }
2171
2172 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2173                         struct lfsck_request *lr,
2174                         struct ptlrpc_request_set *set,
2175                         ptlrpc_interpterer_t interpreter,
2176                         void *args, int request)
2177 {
2178         struct lfsck_async_interpret_args *laia;
2179         struct ptlrpc_request             *req;
2180         struct lfsck_request              *tmp;
2181         struct req_format                 *format;
2182         int                                rc;
2183
2184         switch (request) {
2185         case LFSCK_NOTIFY:
2186                 format = &RQF_LFSCK_NOTIFY;
2187                 break;
2188         case LFSCK_QUERY:
2189                 format = &RQF_LFSCK_QUERY;
2190                 break;
2191         default:
2192                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2193                        exp->exp_obd->obd_name, request, -EINVAL);
2194                 return -EINVAL;
2195         }
2196
2197         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2198         if (req == NULL)
2199                 return -ENOMEM;
2200
2201         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2202         if (rc != 0) {
2203                 ptlrpc_request_free(req);
2204
2205                 return rc;
2206         }
2207
2208         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2209         *tmp = *lr;
2210         ptlrpc_request_set_replen(req);
2211
2212         laia = ptlrpc_req_async_args(req);
2213         *laia = *(struct lfsck_async_interpret_args *)args;
2214         if (laia->laia_com != NULL)
2215                 lfsck_component_get(laia->laia_com);
2216         req->rq_interpret_reply = interpreter;
2217         ptlrpc_set_add_req(set, req);
2218
2219         return 0;
2220 }
2221
2222 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2223                           struct lfsck_start_param *lsp)
2224 {
2225         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2226         struct lfsck_assistant_data     *lad     = com->lc_data;
2227         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2228         struct ptlrpc_thread            *athread = &lad->lad_thread;
2229         struct lfsck_thread_args        *lta;
2230         struct task_struct              *task;
2231         int                              rc;
2232         ENTRY;
2233
2234         lad->lad_assistant_status = 0;
2235         lad->lad_post_result = 0;
2236         lad->lad_to_post = 0;
2237         lad->lad_to_double_scan = 0;
2238         lad->lad_in_double_scan = 0;
2239         lad->lad_exit = 0;
2240         thread_set_flags(athread, 0);
2241
2242         lta = lfsck_thread_args_init(lfsck, com, lsp);
2243         if (IS_ERR(lta))
2244                 RETURN(PTR_ERR(lta));
2245
2246         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2247         if (IS_ERR(task)) {
2248                 rc = PTR_ERR(task);
2249                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2250                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2251                 lfsck_thread_args_fini(lta);
2252         } else {
2253                 struct l_wait_info lwi = { 0 };
2254
2255                 l_wait_event(mthread->t_ctl_waitq,
2256                              thread_is_running(athread) ||
2257                              thread_is_stopped(athread),
2258                              &lwi);
2259                 if (unlikely(!thread_is_running(athread)))
2260                         rc = lad->lad_assistant_status;
2261                 else
2262                         rc = 0;
2263         }
2264
2265         RETURN(rc);
2266 }
2267
2268 int lfsck_checkpoint_generic(const struct lu_env *env,
2269                              struct lfsck_component *com)
2270 {
2271         struct lfsck_assistant_data     *lad     = com->lc_data;
2272         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2273         struct ptlrpc_thread            *athread = &lad->lad_thread;
2274         struct l_wait_info               lwi     = { 0 };
2275
2276         if (com->lc_new_checked == 0)
2277                 return LFSCK_CHECKPOINT_SKIP;
2278
2279         l_wait_event(mthread->t_ctl_waitq,
2280                      list_empty(&lad->lad_req_list) ||
2281                      !thread_is_running(mthread) ||
2282                      thread_is_stopped(athread),
2283                      &lwi);
2284
2285         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2286                 return LFSCK_CHECKPOINT_SKIP;
2287
2288         return 0;
2289 }
2290
2291 void lfsck_post_generic(const struct lu_env *env,
2292                         struct lfsck_component *com, int *result)
2293 {
2294         struct lfsck_assistant_data     *lad     = com->lc_data;
2295         struct ptlrpc_thread            *athread = &lad->lad_thread;
2296         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2297         struct l_wait_info               lwi     = { 0 };
2298
2299         lad->lad_post_result = *result;
2300         if (*result <= 0)
2301                 lad->lad_exit = 1;
2302         lad->lad_to_post = 1;
2303
2304         wake_up_all(&athread->t_ctl_waitq);
2305         l_wait_event(mthread->t_ctl_waitq,
2306                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2307                      thread_is_stopped(athread),
2308                      &lwi);
2309
2310         if (lad->lad_assistant_status < 0)
2311                 *result = lad->lad_assistant_status;
2312 }
2313
2314 int lfsck_double_scan_generic(const struct lu_env *env,
2315                               struct lfsck_component *com, int status)
2316 {
2317         struct lfsck_assistant_data     *lad     = com->lc_data;
2318         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2319         struct ptlrpc_thread            *athread = &lad->lad_thread;
2320         struct l_wait_info               lwi     = { 0 };
2321
2322         if (status != LS_SCANNING_PHASE2)
2323                 lad->lad_exit = 1;
2324         else
2325                 lad->lad_to_double_scan = 1;
2326
2327         wake_up_all(&athread->t_ctl_waitq);
2328         l_wait_event(mthread->t_ctl_waitq,
2329                      lad->lad_in_double_scan ||
2330                      thread_is_stopped(athread),
2331                      &lwi);
2332
2333         if (lad->lad_assistant_status < 0)
2334                 return lad->lad_assistant_status;
2335
2336         return 0;
2337 }
2338
2339 void lfsck_quit_generic(const struct lu_env *env,
2340                         struct lfsck_component *com)
2341 {
2342         struct lfsck_assistant_data     *lad     = com->lc_data;
2343         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2344         struct ptlrpc_thread            *athread = &lad->lad_thread;
2345         struct l_wait_info               lwi     = { 0 };
2346
2347         lad->lad_exit = 1;
2348         wake_up_all(&athread->t_ctl_waitq);
2349         l_wait_event(mthread->t_ctl_waitq,
2350                      thread_is_init(athread) ||
2351                      thread_is_stopped(athread),
2352                      &lwi);
2353 }
2354
2355 /* external interfaces */
2356
2357 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2358 {
2359         struct lu_env           env;
2360         struct lfsck_instance  *lfsck;
2361         int                     rc;
2362         ENTRY;
2363
2364         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2365         if (rc != 0)
2366                 RETURN(rc);
2367
2368         lfsck = lfsck_instance_find(key, true, false);
2369         if (likely(lfsck != NULL)) {
2370                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2371                 lfsck_instance_put(&env, lfsck);
2372         } else {
2373                 rc = -ENXIO;
2374         }
2375
2376         lu_env_fini(&env);
2377
2378         RETURN(rc);
2379 }
2380 EXPORT_SYMBOL(lfsck_get_speed);
2381
2382 int lfsck_set_speed(struct dt_device *key, int val)
2383 {
2384         struct lu_env           env;
2385         struct lfsck_instance  *lfsck;
2386         int                     rc;
2387         ENTRY;
2388
2389         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2390         if (rc != 0)
2391                 RETURN(rc);
2392
2393         lfsck = lfsck_instance_find(key, true, false);
2394         if (likely(lfsck != NULL)) {
2395                 mutex_lock(&lfsck->li_mutex);
2396                 if (__lfsck_set_speed(lfsck, val))
2397                         rc = lfsck_bookmark_store(&env, lfsck);
2398                 mutex_unlock(&lfsck->li_mutex);
2399                 lfsck_instance_put(&env, lfsck);
2400         } else {
2401                 rc = -ENXIO;
2402         }
2403
2404         lu_env_fini(&env);
2405
2406         RETURN(rc);
2407 }
2408 EXPORT_SYMBOL(lfsck_set_speed);
2409
2410 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2411 {
2412         struct lu_env           env;
2413         struct lfsck_instance  *lfsck;
2414         int                     rc;
2415         ENTRY;
2416
2417         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2418         if (rc != 0)
2419                 RETURN(rc);
2420
2421         lfsck = lfsck_instance_find(key, true, false);
2422         if (likely(lfsck != NULL)) {
2423                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2424                 lfsck_instance_put(&env, lfsck);
2425         } else {
2426                 rc = -ENXIO;
2427         }
2428
2429         lu_env_fini(&env);
2430
2431         RETURN(rc);
2432 }
2433 EXPORT_SYMBOL(lfsck_get_windows);
2434
2435 int lfsck_set_windows(struct dt_device *key, int val)
2436 {
2437         struct lu_env           env;
2438         struct lfsck_instance  *lfsck;
2439         int                     rc;
2440         ENTRY;
2441
2442         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2443         if (rc != 0)
2444                 RETURN(rc);
2445
2446         lfsck = lfsck_instance_find(key, true, false);
2447         if (likely(lfsck != NULL)) {
2448                 if (val > LFSCK_ASYNC_WIN_MAX) {
2449                         CWARN("%s: Too large async window size, which "
2450                               "may cause memory issues. The valid range "
2451                               "is [0 - %u]. If you do not want to restrict "
2452                               "the window size for async requests pipeline, "
2453                               "just set it as 0.\n",
2454                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2455                         rc = -EINVAL;
2456                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2457                         mutex_lock(&lfsck->li_mutex);
2458                         lfsck->li_bookmark_ram.lb_async_windows = val;
2459                         rc = lfsck_bookmark_store(&env, lfsck);
2460                         mutex_unlock(&lfsck->li_mutex);
2461                 }
2462                 lfsck_instance_put(&env, lfsck);
2463         } else {
2464                 rc = -ENXIO;
2465         }
2466
2467         lu_env_fini(&env);
2468
2469         RETURN(rc);
2470 }
2471 EXPORT_SYMBOL(lfsck_set_windows);
2472
2473 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2474 {
2475         struct lu_env           env;
2476         struct lfsck_instance  *lfsck;
2477         struct lfsck_component *com;
2478         int                     rc;
2479         ENTRY;
2480
2481         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2482         if (rc != 0)
2483                 RETURN(rc);
2484
2485         lfsck = lfsck_instance_find(key, true, false);
2486         if (likely(lfsck != NULL)) {
2487                 com = lfsck_component_find(lfsck, type);
2488                 if (likely(com != NULL)) {
2489                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2490                         lfsck_component_put(&env, com);
2491                 } else {
2492                         rc = -ENOTSUPP;
2493                 }
2494
2495                 lfsck_instance_put(&env, lfsck);
2496         } else {
2497                 rc = -ENXIO;
2498         }
2499
2500         lu_env_fini(&env);
2501
2502         RETURN(rc);
2503 }
2504 EXPORT_SYMBOL(lfsck_dump);
2505
2506 static int lfsck_stop_all(const struct lu_env *env,
2507                           struct lfsck_instance *lfsck,
2508                           struct lfsck_stop *stop)
2509 {
2510         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2511         struct lfsck_request              *lr     = &info->lti_lr;
2512         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2513         struct ptlrpc_request_set         *set;
2514         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2515         struct lfsck_tgt_desc             *ltd;
2516         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2517         __u32                              idx;
2518         int                                rc     = 0;
2519         int                                rc1    = 0;
2520         ENTRY;
2521
2522         LASSERT(stop->ls_flags & LPF_BROADCAST);
2523
2524         set = ptlrpc_prep_set();
2525         if (unlikely(set == NULL))
2526                 RETURN(-ENOMEM);
2527
2528         memset(lr, 0, sizeof(*lr));
2529         lr->lr_event = LE_STOP;
2530         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2531         lr->lr_status = stop->ls_status;
2532         lr->lr_version = bk->lb_version;
2533         lr->lr_active = LFSCK_TYPES_ALL;
2534         lr->lr_param = stop->ls_flags;
2535
2536         laia->laia_com = NULL;
2537         laia->laia_ltds = ltds;
2538         laia->laia_lr = lr;
2539         laia->laia_result = 0;
2540         laia->laia_shared = 1;
2541
2542         down_read(&ltds->ltd_rw_sem);
2543         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2544                 ltd = lfsck_tgt_get(ltds, idx);
2545                 LASSERT(ltd != NULL);
2546
2547                 laia->laia_ltd = ltd;
2548                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2549                                          lfsck_async_interpret, laia,
2550                                          LFSCK_NOTIFY);
2551                 if (rc != 0) {
2552                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2553                         lfsck_tgt_put(ltd);
2554                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2555                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2556                         rc1 = rc;
2557                 }
2558         }
2559         up_read(&ltds->ltd_rw_sem);
2560
2561         rc = ptlrpc_set_wait(set);
2562         ptlrpc_set_destroy(set);
2563
2564         if (rc == 0)
2565                 rc = laia->laia_result;
2566
2567         if (rc == -EALREADY)
2568                 rc = 0;
2569
2570         if (rc != 0)
2571                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2572                        lfsck_lfsck2name(lfsck), rc);
2573
2574         RETURN(rc != 0 ? rc : rc1);
2575 }
2576
2577 static int lfsck_start_all(const struct lu_env *env,
2578                            struct lfsck_instance *lfsck,
2579                            struct lfsck_start *start)
2580 {
2581         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2582         struct lfsck_request              *lr     = &info->lti_lr;
2583         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2584         struct ptlrpc_request_set         *set;
2585         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2586         struct lfsck_tgt_desc             *ltd;
2587         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2588         __u32                              idx;
2589         int                                rc     = 0;
2590         ENTRY;
2591
2592         LASSERT(start->ls_flags & LPF_BROADCAST);
2593
2594         set = ptlrpc_prep_set();
2595         if (unlikely(set == NULL))
2596                 RETURN(-ENOMEM);
2597
2598         memset(lr, 0, sizeof(*lr));
2599         lr->lr_event = LE_START;
2600         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2601         lr->lr_speed = bk->lb_speed_limit;
2602         lr->lr_version = bk->lb_version;
2603         lr->lr_active = start->ls_active;
2604         lr->lr_param = start->ls_flags;
2605         lr->lr_async_windows = bk->lb_async_windows;
2606         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2607                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2608                        LSV_CREATE_MDTOBJ;
2609
2610         laia->laia_com = NULL;
2611         laia->laia_ltds = ltds;
2612         laia->laia_lr = lr;
2613         laia->laia_result = 0;
2614         laia->laia_shared = 1;
2615
2616         down_read(&ltds->ltd_rw_sem);
2617         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2618                 ltd = lfsck_tgt_get(ltds, idx);
2619                 LASSERT(ltd != NULL);
2620
2621                 laia->laia_ltd = ltd;
2622                 ltd->ltd_layout_done = 0;
2623                 ltd->ltd_namespace_done = 0;
2624                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2625                                          lfsck_async_interpret, laia,
2626                                          LFSCK_NOTIFY);
2627                 if (rc != 0) {
2628                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2629                         lfsck_tgt_put(ltd);
2630                         CERROR("%s: cannot notify MDT %x for LFSCK "
2631                                "start, failout: rc = %d\n",
2632                                lfsck_lfsck2name(lfsck), idx, rc);
2633                         break;
2634                 }
2635         }
2636         up_read(&ltds->ltd_rw_sem);
2637
2638         if (rc != 0) {
2639                 ptlrpc_set_destroy(set);
2640
2641                 RETURN(rc);
2642         }
2643
2644         rc = ptlrpc_set_wait(set);
2645         ptlrpc_set_destroy(set);
2646
2647         if (rc == 0)
2648                 rc = laia->laia_result;
2649
2650         if (rc != 0) {
2651                 struct lfsck_stop *stop = &info->lti_stop;
2652
2653                 CERROR("%s: cannot start LFSCK on some MDTs, "
2654                        "stop all: rc = %d\n",
2655                        lfsck_lfsck2name(lfsck), rc);
2656                 if (rc != -EALREADY) {
2657                         stop->ls_status = LS_FAILED;
2658                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2659                         lfsck_stop_all(env, lfsck, stop);
2660                 }
2661         }
2662
2663         RETURN(rc);
2664 }
2665
2666 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2667                 struct lfsck_start_param *lsp)
2668 {
2669         struct lfsck_start              *start  = lsp->lsp_start;
2670         struct lfsck_instance           *lfsck;
2671         struct lfsck_bookmark           *bk;
2672         struct ptlrpc_thread            *thread;
2673         struct lfsck_component          *com;
2674         struct l_wait_info               lwi    = { 0 };
2675         struct lfsck_thread_args        *lta;
2676         struct task_struct              *task;
2677         int                              rc     = 0;
2678         __u16                            valid  = 0;
2679         __u16                            flags  = 0;
2680         __u16                            type   = 1;
2681         ENTRY;
2682
2683         lfsck = lfsck_instance_find(key, true, false);
2684         if (unlikely(lfsck == NULL))
2685                 RETURN(-ENXIO);
2686
2687         /* System is not ready, try again later. */
2688         if (unlikely(lfsck->li_namespace == NULL))
2689                 GOTO(put, rc = -EAGAIN);
2690
2691         /* start == NULL means auto trigger paused LFSCK. */
2692         if ((start == NULL) &&
2693             (list_empty(&lfsck->li_list_scan) ||
2694              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2695                 GOTO(put, rc = 0);
2696
2697         bk = &lfsck->li_bookmark_ram;
2698         thread = &lfsck->li_thread;
2699         mutex_lock(&lfsck->li_mutex);
2700         spin_lock(&lfsck->li_lock);
2701         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2702                 rc = -EALREADY;
2703                 if (unlikely(start == NULL)) {
2704                         spin_unlock(&lfsck->li_lock);
2705                         GOTO(out, rc);
2706                 }
2707
2708                 while (start->ls_active != 0) {
2709                         if (!(type & start->ls_active)) {
2710                                 type <<= 1;
2711                                 continue;
2712                         }
2713
2714                         com = __lfsck_component_find(lfsck, type,
2715                                                      &lfsck->li_list_scan);
2716                         if (com == NULL)
2717                                 com = __lfsck_component_find(lfsck, type,
2718                                                 &lfsck->li_list_double_scan);
2719                         if (com == NULL) {
2720                                 rc = -EOPNOTSUPP;
2721                                 break;
2722                         }
2723
2724                         if (com->lc_ops->lfsck_join != NULL) {
2725                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2726                                 if (rc != 0 && rc != -EALREADY)
2727                                         break;
2728                         }
2729                         start->ls_active &= ~type;
2730                         type <<= 1;
2731                 }
2732                 spin_unlock(&lfsck->li_lock);
2733                 GOTO(out, rc);
2734         }
2735         spin_unlock(&lfsck->li_lock);
2736
2737         lfsck->li_status = 0;
2738         lfsck->li_oit_over = 0;
2739         lfsck->li_start_unplug = 0;
2740         lfsck->li_drop_dryrun = 0;
2741         lfsck->li_new_scanned = 0;
2742
2743         /* For auto trigger. */
2744         if (start == NULL)
2745                 goto trigger;
2746
2747         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2748                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2749                        lfsck_lfsck2name(lfsck));
2750
2751                 GOTO(out, rc = -EPERM);
2752         }
2753
2754         start->ls_version = bk->lb_version;
2755
2756         if (start->ls_active != 0) {
2757                 struct lfsck_component *next;
2758
2759                 if (start->ls_active == LFSCK_TYPES_ALL)
2760                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2761
2762                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2763                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2764                         GOTO(out, rc = -ENOTSUPP);
2765                 }
2766
2767                 list_for_each_entry_safe(com, next,
2768                                          &lfsck->li_list_scan, lc_link) {
2769                         if (!(com->lc_type & start->ls_active)) {
2770                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2771                                                              false);
2772                                 if (rc != 0)
2773                                         GOTO(out, rc);
2774                         }
2775                 }
2776
2777                 while (start->ls_active != 0) {
2778                         if (type & start->ls_active) {
2779                                 com = __lfsck_component_find(lfsck, type,
2780                                                         &lfsck->li_list_idle);
2781                                 if (com != NULL)
2782                                         /* The component status will be updated
2783                                          * when its prep() is called later by
2784                                          * the LFSCK main engine. */
2785                                         list_move_tail(&com->lc_link,
2786                                                        &lfsck->li_list_scan);
2787                                 start->ls_active &= ~type;
2788                         }
2789                         type <<= 1;
2790                 }
2791         }
2792
2793         if (list_empty(&lfsck->li_list_scan)) {
2794                 /* The speed limit will be used to control both the LFSCK and
2795                  * low layer scrub (if applied), need to be handled firstly. */
2796                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2797                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2798                                 rc = lfsck_bookmark_store(env, lfsck);
2799                                 if (rc != 0)
2800                                         GOTO(out, rc);
2801                         }
2802                 }
2803
2804                 goto trigger;
2805         }
2806
2807         if (start->ls_flags & LPF_RESET)
2808                 flags |= DOIF_RESET;
2809
2810         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2811         if (rc != 0)
2812                 GOTO(out, rc);
2813
2814         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2815                 start->ls_active |= com->lc_type;
2816                 if (flags & DOIF_RESET) {
2817                         rc = com->lc_ops->lfsck_reset(env, com, false);
2818                         if (rc != 0)
2819                                 GOTO(out, rc);
2820                 }
2821         }
2822
2823 trigger:
2824         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2825         if (bk->lb_param & LPF_DRYRUN)
2826                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2827
2828         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2829                 valid |= DOIV_ERROR_HANDLE;
2830                 if (start->ls_flags & LPF_FAILOUT)
2831                         flags |= DOIF_FAILOUT;
2832         }
2833
2834         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2835                 valid |= DOIV_DRYRUN;
2836                 if (start->ls_flags & LPF_DRYRUN)
2837                         flags |= DOIF_DRYRUN;
2838         }
2839
2840         if (!list_empty(&lfsck->li_list_scan))
2841                 flags |= DOIF_OUTUSED;
2842
2843         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2844         thread_set_flags(thread, 0);
2845         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2846         if (IS_ERR(lta))
2847                 GOTO(out, rc = PTR_ERR(lta));
2848
2849         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2850         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2851         if (IS_ERR(task)) {
2852                 rc = PTR_ERR(task);
2853                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2854                        lfsck_lfsck2name(lfsck), rc);
2855                 lfsck_thread_args_fini(lta);
2856
2857                 GOTO(out, rc);
2858         }
2859
2860         l_wait_event(thread->t_ctl_waitq,
2861                      thread_is_running(thread) ||
2862                      thread_is_stopped(thread),
2863                      &lwi);
2864         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2865                 lfsck->li_start_unplug = 1;
2866                 wake_up_all(&thread->t_ctl_waitq);
2867
2868                 GOTO(out, rc = 0);
2869         }
2870
2871         /* release lfsck::li_mutex to avoid deadlock. */
2872         mutex_unlock(&lfsck->li_mutex);
2873         rc = lfsck_start_all(env, lfsck, start);
2874         if (rc != 0) {
2875                 spin_lock(&lfsck->li_lock);
2876                 if (thread_is_stopped(thread)) {
2877                         spin_unlock(&lfsck->li_lock);
2878                 } else {
2879                         lfsck->li_status = LS_FAILED;
2880                         lfsck->li_flags = 0;
2881                         thread_set_flags(thread, SVC_STOPPING);
2882                         spin_unlock(&lfsck->li_lock);
2883
2884                         lfsck->li_start_unplug = 1;
2885                         wake_up_all(&thread->t_ctl_waitq);
2886                         l_wait_event(thread->t_ctl_waitq,
2887                                      thread_is_stopped(thread),
2888                                      &lwi);
2889                 }
2890         } else {
2891                 lfsck->li_start_unplug = 1;
2892                 wake_up_all(&thread->t_ctl_waitq);
2893         }
2894
2895         GOTO(put, rc);
2896
2897 out:
2898         mutex_unlock(&lfsck->li_mutex);
2899
2900 put:
2901         lfsck_instance_put(env, lfsck);
2902
2903         return rc < 0 ? rc : 0;
2904 }
2905 EXPORT_SYMBOL(lfsck_start);
2906
2907 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2908                struct lfsck_stop *stop)
2909 {
2910         struct lfsck_instance   *lfsck;
2911         struct ptlrpc_thread    *thread;
2912         struct l_wait_info       lwi    = { 0 };
2913         int                      rc     = 0;
2914         int                      rc1    = 0;
2915         ENTRY;
2916
2917         lfsck = lfsck_instance_find(key, true, false);
2918         if (unlikely(lfsck == NULL))
2919                 RETURN(-ENXIO);
2920
2921         thread = &lfsck->li_thread;
2922         /* release lfsck::li_mutex to avoid deadlock. */
2923         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2924                 if (!lfsck->li_master) {
2925                         CERROR("%s: only allow to specify '-A' via MDS\n",
2926                                lfsck_lfsck2name(lfsck));
2927
2928                         GOTO(out, rc = -EPERM);
2929                 }
2930
2931                 rc1 = lfsck_stop_all(env, lfsck, stop);
2932         }
2933
2934         mutex_lock(&lfsck->li_mutex);
2935         spin_lock(&lfsck->li_lock);
2936         /* no error if LFSCK is already stopped, or was never started */
2937         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2938                 spin_unlock(&lfsck->li_lock);
2939                 GOTO(out, rc = 0);
2940         }
2941
2942         if (stop != NULL) {
2943                 lfsck->li_status = stop->ls_status;
2944                 lfsck->li_flags = stop->ls_flags;
2945         } else {
2946                 lfsck->li_status = LS_STOPPED;
2947                 lfsck->li_flags = 0;
2948         }
2949
2950         thread_set_flags(thread, SVC_STOPPING);
2951         spin_unlock(&lfsck->li_lock);
2952
2953         wake_up_all(&thread->t_ctl_waitq);
2954         l_wait_event(thread->t_ctl_waitq,
2955                      thread_is_stopped(thread),
2956                      &lwi);
2957
2958         GOTO(out, rc = 0);
2959
2960 out:
2961         mutex_unlock(&lfsck->li_mutex);
2962         lfsck_instance_put(env, lfsck);
2963
2964         return rc != 0 ? rc : rc1;
2965 }
2966 EXPORT_SYMBOL(lfsck_stop);
2967
2968 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2969                     struct lfsck_request *lr, struct thandle *th)
2970 {
2971         int rc = -EOPNOTSUPP;
2972         ENTRY;
2973
2974         switch (lr->lr_event) {
2975         case LE_START: {
2976                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2977                 struct lfsck_start_param  lsp;
2978
2979                 memset(start, 0, sizeof(*start));
2980                 start->ls_valid = lr->lr_valid;
2981                 start->ls_speed_limit = lr->lr_speed;
2982                 start->ls_version = lr->lr_version;
2983                 start->ls_active = lr->lr_active;
2984                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2985                 start->ls_async_windows = lr->lr_async_windows;
2986
2987                 lsp.lsp_start = start;
2988                 lsp.lsp_index = lr->lr_index;
2989                 lsp.lsp_index_valid = 1;
2990                 rc = lfsck_start(env, key, &lsp);
2991                 break;
2992         }
2993         case LE_STOP: {
2994                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2995
2996                 memset(stop, 0, sizeof(*stop));
2997                 stop->ls_status = lr->lr_status;
2998                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2999                 rc = lfsck_stop(env, key, stop);
3000                 break;
3001         }
3002         case LE_PHASE1_DONE:
3003         case LE_PHASE2_DONE:
3004         case LE_FID_ACCESSED:
3005         case LE_PEER_EXIT:
3006         case LE_CONDITIONAL_DESTROY:
3007         case LE_SKIP_NLINK_DECLARE:
3008         case LE_SKIP_NLINK:
3009         case LE_SET_LMV_MASTER:
3010         case LE_SET_LMV_SLAVE:
3011         case LE_PAIRS_VERIFY: {
3012                 struct lfsck_instance  *lfsck;
3013                 struct lfsck_component *com;
3014
3015                 lfsck = lfsck_instance_find(key, true, false);
3016                 if (unlikely(lfsck == NULL))
3017                         RETURN(-ENXIO);
3018
3019                 com = lfsck_component_find(lfsck, lr->lr_active);
3020                 if (likely(com != NULL)) {
3021                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3022                         lfsck_component_put(env, com);
3023                 }
3024
3025                 lfsck_instance_put(env, lfsck);
3026                 break;
3027         }
3028         default:
3029                 break;
3030         }
3031
3032         RETURN(rc);
3033 }
3034 EXPORT_SYMBOL(lfsck_in_notify);
3035
3036 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3037                 struct lfsck_request *lr)
3038 {
3039         struct lfsck_instance  *lfsck;
3040         struct lfsck_component *com;
3041         int                     rc;
3042         ENTRY;
3043
3044         lfsck = lfsck_instance_find(key, true, false);
3045         if (unlikely(lfsck == NULL))
3046                 RETURN(-ENXIO);
3047
3048         com = lfsck_component_find(lfsck, lr->lr_active);
3049         if (likely(com != NULL)) {
3050                 rc = com->lc_ops->lfsck_query(env, com);
3051                 lfsck_component_put(env, com);
3052         } else {
3053                 rc = -ENOTSUPP;
3054         }
3055
3056         lfsck_instance_put(env, lfsck);
3057
3058         RETURN(rc);
3059 }
3060 EXPORT_SYMBOL(lfsck_query);
3061
3062 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3063                              struct ldlm_namespace *ns)
3064 {
3065         struct lfsck_instance  *lfsck;
3066         int                     rc      = -ENXIO;
3067
3068         lfsck = lfsck_instance_find(key, true, false);
3069         if (likely(lfsck != NULL)) {
3070                 lfsck->li_namespace = ns;
3071                 lfsck_instance_put(env, lfsck);
3072                 rc = 0;
3073         }
3074
3075         return rc;
3076 }
3077 EXPORT_SYMBOL(lfsck_register_namespace);
3078
3079 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3080                    struct dt_device *next, struct obd_device *obd,
3081                    lfsck_out_notify notify, void *notify_data, bool master)
3082 {
3083         struct lfsck_instance   *lfsck;
3084         struct dt_object        *root  = NULL;
3085         struct dt_object        *obj   = NULL;
3086         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3087         int                      rc;
3088         ENTRY;
3089
3090         lfsck = lfsck_instance_find(key, false, false);
3091         if (unlikely(lfsck != NULL))
3092                 RETURN(-EEXIST);
3093
3094         OBD_ALLOC_PTR(lfsck);
3095         if (lfsck == NULL)
3096                 RETURN(-ENOMEM);
3097
3098         mutex_init(&lfsck->li_mutex);
3099         spin_lock_init(&lfsck->li_lock);
3100         INIT_LIST_HEAD(&lfsck->li_link);
3101         INIT_LIST_HEAD(&lfsck->li_list_scan);
3102         INIT_LIST_HEAD(&lfsck->li_list_dir);
3103         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3104         INIT_LIST_HEAD(&lfsck->li_list_idle);
3105         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3106         atomic_set(&lfsck->li_ref, 1);
3107         atomic_set(&lfsck->li_double_scan_count, 0);
3108         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3109         lfsck->li_out_notify = notify;
3110         lfsck->li_out_notify_data = notify_data;
3111         lfsck->li_next = next;
3112         lfsck->li_bottom = key;
3113         lfsck->li_obd = obd;
3114
3115         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3116         if (rc != 0)
3117                 GOTO(out, rc);
3118
3119         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3120         if (rc != 0)
3121                 GOTO(out, rc);
3122
3123         fid->f_seq = FID_SEQ_LOCAL_NAME;
3124         fid->f_oid = 1;
3125         fid->f_ver = 0;
3126         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3127         if (rc != 0)
3128                 GOTO(out, rc);
3129
3130         rc = dt_root_get(env, key, fid);
3131         if (rc != 0)
3132                 GOTO(out, rc);
3133
3134         root = dt_locate(env, key, fid);
3135         if (IS_ERR(root))
3136                 GOTO(out, rc = PTR_ERR(root));
3137
3138         if (unlikely(!dt_try_as_dir(env, root)))
3139                 GOTO(out, rc = -ENOTDIR);
3140
3141         lfsck->li_local_root_fid = *fid;
3142         if (master) {
3143                 lfsck->li_master = 1;
3144                 if (lfsck_dev_idx(key) == 0) {
3145                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3146                         const struct lu_name *cname;
3147
3148                         rc = dt_lookup(env, root,
3149                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3150                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3151                         if (rc != 0)
3152                                 GOTO(out, rc);
3153
3154                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3155                         if (IS_ERR(obj))
3156                                 GOTO(out, rc = PTR_ERR(obj));
3157
3158                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3159                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3160                         if (rc != 0)
3161                                 GOTO(out, rc);
3162
3163                         lu_object_put(env, &obj->do_lu);
3164                         obj = dt_locate(env, key, fid);
3165                         if (IS_ERR(obj))
3166                                 GOTO(out, rc = PTR_ERR(obj));
3167
3168                         cname = lfsck_name_get_const(env, dotlustre,
3169                                                      strlen(dotlustre));
3170                         rc = lfsck_verify_linkea(env, key, obj, cname,
3171                                                  &lfsck->li_global_root_fid);
3172                         if (rc != 0)
3173                                 GOTO(out, rc);
3174
3175                         *pfid = *fid;
3176                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3177                                        (const struct dt_key *)lostfound,
3178                                        BYPASS_CAPA);
3179                         if (rc != 0)
3180                                 GOTO(out, rc);
3181
3182                         lu_object_put(env, &obj->do_lu);
3183                         obj = dt_locate(env, key, fid);
3184                         if (IS_ERR(obj))
3185                                 GOTO(out, rc = PTR_ERR(obj));
3186
3187                         cname = lfsck_name_get_const(env, lostfound,
3188                                                      strlen(lostfound));
3189                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3190                         if (rc != 0)
3191                                 GOTO(out, rc);
3192
3193                         lu_object_put(env, &obj->do_lu);
3194                         obj = NULL;
3195                 }
3196         }
3197
3198         fid->f_seq = FID_SEQ_LOCAL_FILE;
3199         fid->f_oid = OTABLE_IT_OID;
3200         fid->f_ver = 0;
3201         obj = dt_locate(env, key, fid);
3202         if (IS_ERR(obj))
3203                 GOTO(out, rc = PTR_ERR(obj));
3204
3205         lu_object_get(&obj->do_lu);
3206         lfsck->li_obj_oit = obj;
3207         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3208         if (rc != 0)
3209                 GOTO(out, rc);
3210
3211         rc = lfsck_bookmark_setup(env, lfsck);
3212         if (rc != 0)
3213                 GOTO(out, rc);
3214
3215         if (master) {
3216                 rc = lfsck_fid_init(lfsck);
3217                 if (rc < 0)
3218                         GOTO(out, rc);
3219
3220                 rc = lfsck_namespace_setup(env, lfsck);
3221                 if (rc < 0)
3222                         GOTO(out, rc);
3223         }
3224
3225         rc = lfsck_layout_setup(env, lfsck);
3226         if (rc < 0)
3227                 GOTO(out, rc);
3228
3229         /* XXX: more LFSCK components initialization to be added here. */
3230
3231         rc = lfsck_instance_add(lfsck);
3232         if (rc == 0)
3233                 rc = lfsck_add_target_from_orphan(env, lfsck);
3234 out:
3235         if (obj != NULL && !IS_ERR(obj))
3236                 lu_object_put(env, &obj->do_lu);
3237         if (root != NULL && !IS_ERR(root))
3238                 lu_object_put(env, &root->do_lu);
3239         if (rc != 0)
3240                 lfsck_instance_cleanup(env, lfsck);
3241         return rc;
3242 }
3243 EXPORT_SYMBOL(lfsck_register);
3244
3245 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3246 {
3247         struct lfsck_instance *lfsck;
3248
3249         lfsck = lfsck_instance_find(key, false, true);
3250         if (lfsck != NULL)
3251                 lfsck_instance_put(env, lfsck);
3252 }
3253 EXPORT_SYMBOL(lfsck_degister);
3254
3255 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3256                      struct dt_device *tgt, struct obd_export *exp,
3257                      __u32 index, bool for_ost)
3258 {
3259         struct lfsck_instance   *lfsck;
3260         struct lfsck_tgt_desc   *ltd;
3261         int                      rc;
3262         ENTRY;
3263
3264         OBD_ALLOC_PTR(ltd);
3265         if (ltd == NULL)
3266                 RETURN(-ENOMEM);
3267
3268         ltd->ltd_tgt = tgt;
3269         ltd->ltd_key = key;
3270         ltd->ltd_exp = exp;
3271         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3272         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3273         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3274         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3275         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3276         atomic_set(&ltd->ltd_ref, 1);
3277         ltd->ltd_index = index;
3278
3279         spin_lock(&lfsck_instance_lock);
3280         lfsck = __lfsck_instance_find(key, true, false);
3281         if (lfsck == NULL) {
3282                 if (for_ost)
3283                         list_add_tail(&ltd->ltd_orphan_list,
3284                                       &lfsck_ost_orphan_list);
3285                 else
3286                         list_add_tail(&ltd->ltd_orphan_list,
3287                                       &lfsck_mdt_orphan_list);
3288                 spin_unlock(&lfsck_instance_lock);
3289
3290                 RETURN(0);
3291         }
3292         spin_unlock(&lfsck_instance_lock);
3293
3294         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3295         if (rc != 0)
3296                 lfsck_tgt_put(ltd);
3297
3298         lfsck_instance_put(env, lfsck);
3299
3300         RETURN(rc);
3301 }
3302 EXPORT_SYMBOL(lfsck_add_target);
3303
3304 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3305                       struct dt_device *tgt, __u32 index, bool for_ost)
3306 {
3307         struct lfsck_instance   *lfsck;
3308         struct lfsck_tgt_descs  *ltds;
3309         struct lfsck_tgt_desc   *ltd;
3310         struct list_head        *head;
3311
3312         if (for_ost)
3313                 head = &lfsck_ost_orphan_list;
3314         else
3315                 head = &lfsck_mdt_orphan_list;
3316
3317         spin_lock(&lfsck_instance_lock);
3318         list_for_each_entry(ltd, head, ltd_orphan_list) {
3319                 if (ltd->ltd_tgt == tgt) {
3320                         list_del_init(&ltd->ltd_orphan_list);
3321                         spin_unlock(&lfsck_instance_lock);
3322                         lfsck_tgt_put(ltd);
3323
3324                         return;
3325                 }
3326         }
3327
3328         ltd = NULL;
3329         lfsck = __lfsck_instance_find(key, true, false);
3330         spin_unlock(&lfsck_instance_lock);
3331         if (unlikely(lfsck == NULL))
3332                 return;
3333
3334         if (for_ost)
3335                 ltds = &lfsck->li_ost_descs;
3336         else
3337                 ltds = &lfsck->li_mdt_descs;
3338
3339         down_write(&ltds->ltd_rw_sem);
3340         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3341
3342         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3343                 goto unlock;
3344
3345         ltd = LTD_TGT(ltds, index);
3346         if (unlikely(ltd == NULL))
3347                 goto unlock;
3348
3349         LASSERT(ltds->ltd_tgtnr > 0);
3350
3351         ltds->ltd_tgtnr--;
3352         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3353         LTD_TGT(ltds, index) = NULL;
3354
3355 unlock:
3356         if (ltd == NULL) {
3357                 if (for_ost)
3358                         head = &lfsck->li_ost_descs.ltd_orphan;
3359                 else
3360                         head = &lfsck->li_mdt_descs.ltd_orphan;
3361
3362                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3363                         if (ltd->ltd_tgt == tgt) {
3364                                 list_del_init(&ltd->ltd_orphan_list);
3365                                 break;
3366                         }
3367                 }
3368         }
3369
3370         up_write(&ltds->ltd_rw_sem);
3371         if (ltd != NULL) {
3372                 spin_lock(&ltds->ltd_lock);
3373                 ltd->ltd_dead = 1;
3374                 spin_unlock(&ltds->ltd_lock);
3375                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3376                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3377                 lfsck_tgt_put(ltd);
3378         }
3379
3380         lfsck_instance_put(env, lfsck);
3381 }
3382 EXPORT_SYMBOL(lfsck_del_target);
3383
3384 static int __init lfsck_init(void)
3385 {
3386         int rc;
3387
3388         INIT_LIST_HEAD(&lfsck_instance_list);
3389         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3390         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3391         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3392         rc = lu_context_key_register(&lfsck_thread_key);
3393         if (rc == 0) {
3394                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3395                 tgt_register_lfsck_query(lfsck_query);
3396         }
3397
3398         return rc;
3399 }
3400
3401 static void __exit lfsck_exit(void)
3402 {
3403         struct lfsck_tgt_desc *ltd;
3404         struct lfsck_tgt_desc *next;
3405
3406         LASSERT(list_empty(&lfsck_instance_list));
3407
3408         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3409                                  ltd_orphan_list) {
3410                 list_del_init(&ltd->ltd_orphan_list);
3411                 lfsck_tgt_put(ltd);
3412         }
3413
3414         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3415                                  ltd_orphan_list) {
3416                 list_del_init(&ltd->ltd_orphan_list);
3417                 lfsck_tgt_put(ltd);
3418         }
3419
3420         lu_context_key_degister(&lfsck_thread_key);
3421 }
3422
3423 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3424 MODULE_DESCRIPTION("LFSCK");
3425 MODULE_LICENSE("GPL");
3426
3427 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);