Whamcloud - gitweb
LU-5791 lfsck: use bottom device to locate object
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         "create_mdtobj",
102         NULL
103 };
104
105 enum lfsck_verify_lpf_types {
106         LVLT_BY_BOOKMARK        = 0,
107         LVLT_BY_NAMEENTRY       = 1,
108 };
109
110 const char *lfsck_status2names(enum lfsck_status status)
111 {
112         if (unlikely(status < 0 || status >= LS_MAX))
113                 return "unknown";
114
115         return lfsck_status_names[status];
116 }
117
118 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
119 {
120         spin_lock_init(&ltds->ltd_lock);
121         init_rwsem(&ltds->ltd_rw_sem);
122         INIT_LIST_HEAD(&ltds->ltd_orphan);
123         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
124         if (ltds->ltd_tgts_bitmap == NULL)
125                 return -ENOMEM;
126
127         return 0;
128 }
129
130 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
131 {
132         struct lfsck_tgt_desc   *ltd;
133         struct lfsck_tgt_desc   *next;
134         int                      idx;
135
136         down_write(&ltds->ltd_rw_sem);
137
138         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
139                                  ltd_orphan_list) {
140                 list_del_init(&ltd->ltd_orphan_list);
141                 lfsck_tgt_put(ltd);
142         }
143
144         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
145                 up_write(&ltds->ltd_rw_sem);
146
147                 return;
148         }
149
150         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
151                 ltd = LTD_TGT(ltds, idx);
152                 if (likely(ltd != NULL)) {
153                         LASSERT(list_empty(&ltd->ltd_layout_list));
154                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_list));
156                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
157
158                         ltds->ltd_tgtnr--;
159                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
160                         LTD_TGT(ltds, idx) = NULL;
161                         lfsck_tgt_put(ltd);
162                 }
163         }
164
165         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
166                  ltds->ltd_tgtnr);
167
168         for (idx = 0; idx < TGT_PTRS; idx++) {
169                 if (ltds->ltd_tgts_idx[idx] != NULL) {
170                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
171                         ltds->ltd_tgts_idx[idx] = NULL;
172                 }
173         }
174
175         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
176         ltds->ltd_tgts_bitmap = NULL;
177         up_write(&ltds->ltd_rw_sem);
178 }
179
180 static int __lfsck_add_target(const struct lu_env *env,
181                               struct lfsck_instance *lfsck,
182                               struct lfsck_tgt_desc *ltd,
183                               bool for_ost, bool locked)
184 {
185         struct lfsck_tgt_descs *ltds;
186         __u32                   index = ltd->ltd_index;
187         int                     rc    = 0;
188         ENTRY;
189
190         if (for_ost)
191                 ltds = &lfsck->li_ost_descs;
192         else
193                 ltds = &lfsck->li_mdt_descs;
194
195         if (!locked)
196                 down_write(&ltds->ltd_rw_sem);
197
198         LASSERT(ltds->ltd_tgts_bitmap != NULL);
199
200         if (index >= ltds->ltd_tgts_bitmap->size) {
201                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
202                                     (__u32)BITS_PER_LONG);
203                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
204                 cfs_bitmap_t *new_bitmap;
205
206                 while (newsize < index + 1)
207                         newsize <<= 1;
208
209                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
210                 if (new_bitmap == NULL)
211                         GOTO(unlock, rc = -ENOMEM);
212
213                 if (ltds->ltd_tgtnr > 0)
214                         cfs_bitmap_copy(new_bitmap, old_bitmap);
215                 ltds->ltd_tgts_bitmap = new_bitmap;
216                 CFS_FREE_BITMAP(old_bitmap);
217         }
218
219         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
220                 CERROR("%s: the device %s (%u) is registered already\n",
221                        lfsck_lfsck2name(lfsck),
222                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
223                 GOTO(unlock, rc = -EEXIST);
224         }
225
226         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
227                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
228                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
229                         GOTO(unlock, rc = -ENOMEM);
230         }
231
232         LTD_TGT(ltds, index) = ltd;
233         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
234         ltds->ltd_tgtnr++;
235
236         GOTO(unlock, rc = 0);
237
238 unlock:
239         if (!locked)
240                 up_write(&ltds->ltd_rw_sem);
241
242         return rc;
243 }
244
245 static int lfsck_add_target_from_orphan(const struct lu_env *env,
246                                         struct lfsck_instance *lfsck)
247 {
248         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
249         struct lfsck_tgt_desc   *ltd;
250         struct lfsck_tgt_desc   *next;
251         struct list_head        *head    = &lfsck_ost_orphan_list;
252         int                      rc;
253         bool                     for_ost = true;
254
255 again:
256         spin_lock(&lfsck_instance_lock);
257         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
258                 if (ltd->ltd_key == lfsck->li_bottom)
259                         list_move_tail(&ltd->ltd_orphan_list,
260                                        &ltds->ltd_orphan);
261         }
262         spin_unlock(&lfsck_instance_lock);
263
264         down_write(&ltds->ltd_rw_sem);
265         while (!list_empty(&ltds->ltd_orphan)) {
266                 ltd = list_entry(ltds->ltd_orphan.next,
267                                  struct lfsck_tgt_desc,
268                                  ltd_orphan_list);
269                 list_del_init(&ltd->ltd_orphan_list);
270                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
271                 /* Do not hold the semaphore for too long time. */
272                 up_write(&ltds->ltd_rw_sem);
273                 if (rc != 0)
274                         return rc;
275
276                 down_write(&ltds->ltd_rw_sem);
277         }
278         up_write(&ltds->ltd_rw_sem);
279
280         if (for_ost) {
281                 ltds = &lfsck->li_mdt_descs;
282                 head = &lfsck_mdt_orphan_list;
283                 for_ost = false;
284                 goto again;
285         }
286
287         return 0;
288 }
289
290 static inline struct lfsck_component *
291 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
292                        struct list_head *list)
293 {
294         struct lfsck_component *com;
295
296         list_for_each_entry(com, list, lc_link) {
297                 if (com->lc_type == type)
298                         return com;
299         }
300         return NULL;
301 }
302
303 struct lfsck_component *
304 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
305 {
306         struct lfsck_component *com;
307
308         spin_lock(&lfsck->li_lock);
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
310         if (com != NULL)
311                 goto unlock;
312
313         com = __lfsck_component_find(lfsck, type,
314                                      &lfsck->li_list_double_scan);
315         if (com != NULL)
316                 goto unlock;
317
318         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
319
320 unlock:
321         if (com != NULL)
322                 lfsck_component_get(com);
323         spin_unlock(&lfsck->li_lock);
324         return com;
325 }
326
327 void lfsck_component_cleanup(const struct lu_env *env,
328                              struct lfsck_component *com)
329 {
330         if (!list_empty(&com->lc_link))
331                 list_del_init(&com->lc_link);
332         if (!list_empty(&com->lc_link_dir))
333                 list_del_init(&com->lc_link_dir);
334
335         lfsck_component_put(env, com);
336 }
337
338 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lu_fid *fid, bool locked)
340 {
341         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
342         int                      rc = 0;
343         ENTRY;
344
345         if (!locked)
346                 mutex_lock(&lfsck->li_mutex);
347
348         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
349         if (rc >= 0) {
350                 bk->lb_last_fid = *fid;
351                 /* We do not care about whether the subsequent sub-operations
352                  * failed or not. The worst case is that one FID is lost that
353                  * is not a big issue for the LFSCK since it is relative rare
354                  * for LFSCK create. */
355                 rc = lfsck_bookmark_store(env, lfsck);
356         }
357
358         if (!locked)
359                 mutex_unlock(&lfsck->li_mutex);
360
361         RETURN(rc);
362 }
363
364 /**
365  * Request the specified ibits lock for the given object.
366  *
367  * Before the LFSCK modifying on the namespace visible object,
368  * it needs to acquire related ibits ldlm lock.
369  *
370  * \param[in] env       pointer to the thread context
371  * \param[in] lfsck     pointer to the lfsck instance
372  * \param[in] obj       pointer to the dt_object to be locked
373  * \param[out] lh       pointer to the lock handle
374  * \param[in] ibits     the bits for the ldlm lock to be acquired
375  * \param[in] mode      the mode for the ldlm lock to be acquired
376  *
377  * \retval              0 for success
378  * \retval              negative error number on failure
379  */
380 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
381                      struct dt_object *obj, struct lustre_handle *lh,
382                      __u64 bits, ldlm_mode_t mode)
383 {
384         struct lfsck_thread_info        *info   = lfsck_env_info(env);
385         ldlm_policy_data_t              *policy = &info->lti_policy;
386         struct ldlm_res_id              *resid  = &info->lti_resid;
387         __u64                            flags  = LDLM_FL_ATOMIC_CB;
388         int                              rc;
389
390         LASSERT(lfsck->li_namespace != NULL);
391
392         memset(policy, 0, sizeof(*policy));
393         policy->l_inodebits.bits = bits;
394         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
395         if (dt_object_remote(obj)) {
396                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
397
398                 memset(einfo, 0, sizeof(*einfo));
399                 einfo->ei_type = LDLM_IBITS;
400                 einfo->ei_mode = mode;
401                 einfo->ei_cb_bl = ldlm_blocking_ast;
402                 einfo->ei_cb_cp = ldlm_completion_ast;
403                 einfo->ei_res_id = resid;
404
405                 rc = dt_object_lock(env, obj, lh, einfo, policy);
406         } else {
407                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
408                                             LDLM_IBITS, policy, mode,
409                                             &flags, ldlm_blocking_ast,
410                                             ldlm_completion_ast, NULL, NULL,
411                                             0, LVB_T_NONE, NULL, lh);
412         }
413
414         if (rc == ELDLM_OK) {
415                 rc = 0;
416         } else {
417                 memset(lh, 0, sizeof(*lh));
418                 rc = -EIO;
419         }
420
421         return rc;
422 }
423
424 /**
425  * Release the the specified ibits lock.
426  *
427  * If the lock has been acquired before, release it
428  * and cleanup the handle. Otherwise, do nothing.
429  *
430  * \param[in] lh        pointer to the lock handle
431  * \param[in] mode      the mode for the ldlm lock to be released
432  */
433 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
434 {
435         if (lustre_handle_is_used(lh)) {
436                 ldlm_lock_decref(lh, mode);
437                 memset(lh, 0, sizeof(*lh));
438         }
439 }
440
441 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
442                               struct lfsck_instance *lfsck,
443                               const struct lu_fid *fid)
444 {
445         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
446         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
447         int                      rc;
448
449         fld_range_set_mdt(range);
450         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
451         if (rc == 0)
452                 rc = range->lsr_index;
453
454         return rc;
455 }
456
457 const char dot[] = ".";
458 const char dotdot[] = "..";
459 static const char dotlustre[] = ".lustre";
460 static const char lostfound[] = "lost+found";
461
462 /**
463  * Remove the name entry from the .lustre/lost+found directory.
464  *
465  * No need to care about the object referenced by the name entry,
466  * either the name entry is invalid or redundant, or the referenced
467  * object has been processed or will be handled by others.
468  *
469  * \param[in] env       pointer to the thread context
470  * \param[in] lfsck     pointer to the lfsck instance
471  * \param[in] name      the name for the name entry to be removed
472  *
473  * \retval              0 for success
474  * \retval              negative error number on failure
475  */
476 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
477                                        struct lfsck_instance *lfsck,
478                                        const char *name)
479 {
480         struct dt_object        *parent = lfsck->li_lpf_root_obj;
481         struct dt_device        *dev    = lfsck_obj2dev(parent);
482         struct thandle          *th;
483         struct lustre_handle     lh     = { 0 };
484         int                      rc;
485         ENTRY;
486
487         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
488                               MDS_INODELOCK_UPDATE, LCK_EX);
489         if (rc != 0)
490                 RETURN(rc);
491
492         th = dt_trans_create(env, dev);
493         if (IS_ERR(th))
494                 GOTO(unlock, rc = PTR_ERR(th));
495
496         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
497         if (rc != 0)
498                 GOTO(stop, rc);
499
500         rc = dt_declare_ref_del(env, parent, th);
501         if (rc != 0)
502                 GOTO(stop, rc);
503
504         rc = dt_trans_start_local(env, dev, th);
505         if (rc != 0)
506                 GOTO(stop, rc);
507
508         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
509                        BYPASS_CAPA);
510         if (rc != 0)
511                 GOTO(stop, rc);
512
513         dt_write_lock(env, parent, 0);
514         rc = dt_ref_del(env, parent, th);
515         dt_write_unlock(env, parent);
516
517         GOTO(stop, rc);
518
519 stop:
520         dt_trans_stop(env, dev, th);
521
522 unlock:
523         lfsck_ibits_unlock(&lh, LCK_EX);
524
525         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
526                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
527
528         return rc;
529 }
530
531 static int lfsck_create_lpf_local(const struct lu_env *env,
532                                   struct lfsck_instance *lfsck,
533                                   struct dt_object *child,
534                                   struct lu_attr *la,
535                                   struct dt_object_format *dof,
536                                   const char *name)
537 {
538         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
539         struct dt_object        *parent = lfsck->li_lpf_root_obj;
540         struct dt_device        *dev    = lfsck_obj2dev(child);
541         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
542         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
543         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
544         struct thandle          *th     = NULL;
545         struct linkea_data       ldata  = { NULL };
546         struct lu_buf            linkea_buf;
547         const struct lu_name    *cname;
548         loff_t                   pos    = 0;
549         int                      len    = sizeof(struct lfsck_bookmark);
550         int                      rc;
551         ENTRY;
552
553         rc = linkea_data_new(&ldata,
554                              &lfsck_env_info(env)->lti_linkea_buf2);
555         if (rc != 0)
556                 RETURN(rc);
557
558         cname = lfsck_name_get_const(env, name, strlen(name));
559         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
560         if (rc != 0)
561                 RETURN(rc);
562
563         th = dt_trans_create(env, dev);
564         if (IS_ERR(th))
565                 RETURN(PTR_ERR(th));
566
567         /* 1a. create child */
568         rc = dt_declare_create(env, child, la, NULL, dof, th);
569         if (rc != 0)
570                 GOTO(stop, rc);
571
572         /* 2a. increase child nlink */
573         rc = dt_declare_ref_add(env, child, th);
574         if (rc != 0)
575                 GOTO(stop, rc);
576
577         /* 3a. insert linkEA for child */
578         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
579                        ldata.ld_leh->leh_len);
580         rc = dt_declare_xattr_set(env, child, &linkea_buf,
581                                   XATTR_NAME_LINK, 0, th);
582         if (rc != 0)
583                 GOTO(stop, rc);
584
585         /* 4a. insert name into parent dir */
586         rec->rec_type = S_IFDIR;
587         rec->rec_fid = cfid;
588         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
589                                (const struct dt_key *)name, th);
590         if (rc != 0)
591                 GOTO(stop, rc);
592
593         /* 5a. increase parent nlink */
594         rc = dt_declare_ref_add(env, parent, th);
595         if (rc != 0)
596                 GOTO(stop, rc);
597
598         /* 6a. update bookmark */
599         rc = dt_declare_record_write(env, bk_obj,
600                                      lfsck_buf_get(env, bk, len), 0, th);
601         if (rc != 0)
602                 GOTO(stop, rc);
603
604         rc = dt_trans_start_local(env, dev, th);
605         if (rc != 0)
606                 GOTO(stop, rc);
607
608         dt_write_lock(env, child, 0);
609         /* 1b.1. create child */
610         rc = dt_create(env, child, la, NULL, dof, th);
611         if (rc != 0)
612                 GOTO(unlock, rc);
613
614         if (unlikely(!dt_try_as_dir(env, child)))
615                 GOTO(unlock, rc = -ENOTDIR);
616
617         /* 1b.2. insert dot into child dir */
618         rec->rec_fid = cfid;
619         rc = dt_insert(env, child, (const struct dt_rec *)rec,
620                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
621         if (rc != 0)
622                 GOTO(unlock, rc);
623
624         /* 1b.3. insert dotdot into child dir */
625         rec->rec_fid = &LU_LPF_FID;
626         rc = dt_insert(env, child, (const struct dt_rec *)rec,
627                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
628         if (rc != 0)
629                 GOTO(unlock, rc);
630
631         /* 2b. increase child nlink */
632         rc = dt_ref_add(env, child, th);
633         if (rc != 0)
634                 GOTO(unlock, rc);
635
636         /* 3b. insert linkEA for child. */
637         rc = dt_xattr_set(env, child, &linkea_buf,
638                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
639         dt_write_unlock(env, child);
640         if (rc != 0)
641                 GOTO(stop, rc);
642
643         /* 4b. insert name into parent dir */
644         rec->rec_fid = cfid;
645         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
646                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
647         if (rc != 0)
648                 GOTO(stop, rc);
649
650         dt_write_lock(env, parent, 0);
651         /* 5b. increase parent nlink */
652         rc = dt_ref_add(env, parent, th);
653         dt_write_unlock(env, parent);
654         if (rc != 0)
655                 GOTO(stop, rc);
656
657         bk->lb_lpf_fid = *cfid;
658         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
659
660         /* 6b. update bookmark */
661         rc = dt_record_write(env, bk_obj,
662                              lfsck_buf_get(env, bk, len), &pos, th);
663
664         GOTO(stop, rc);
665
666 unlock:
667         dt_write_unlock(env, child);
668
669 stop:
670         dt_trans_stop(env, dev, th);
671
672         return rc;
673 }
674
675 static int lfsck_create_lpf_remote(const struct lu_env *env,
676                                    struct lfsck_instance *lfsck,
677                                    struct dt_object *child,
678                                    struct lu_attr *la,
679                                    struct dt_object_format *dof,
680                                    const char *name)
681 {
682         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
683         struct dt_object        *parent = lfsck->li_lpf_root_obj;
684         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
685         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
686         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
687         struct thandle          *th     = NULL;
688         struct linkea_data       ldata  = { NULL };
689         struct lu_buf            linkea_buf;
690         const struct lu_name    *cname;
691         struct dt_device        *dev;
692         loff_t                   pos    = 0;
693         int                      len    = sizeof(struct lfsck_bookmark);
694         int                      rc;
695         ENTRY;
696
697         rc = linkea_data_new(&ldata,
698                              &lfsck_env_info(env)->lti_linkea_buf2);
699         if (rc != 0)
700                 RETURN(rc);
701
702         cname = lfsck_name_get_const(env, name, strlen(name));
703         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
704         if (rc != 0)
705                 RETURN(rc);
706
707         /* Create .lustre/lost+found/MDTxxxx. */
708
709         /* XXX: Currently, cross-MDT create operation needs to create the child
710          *      object firstly, then insert name into the parent directory. For
711          *      this case, the child object resides on current MDT (local), but
712          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
713          *      easy to contain all the sub-modifications orderly within single
714          *      transaction.
715          *
716          *      To avoid more inconsistency, we split the create operation into
717          *      two transactions:
718          *
719          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
720          *         locally.
721          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
722          *         remotely.
723          *
724          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
725          *      repair such inconsistency when LFSCK run next time. */
726
727         /* Transaction I: locally */
728
729         dev = lfsck_obj2dev(child);
730         th = dt_trans_create(env, dev);
731         if (IS_ERR(th))
732                 RETURN(PTR_ERR(th));
733
734         /* 1a. create child */
735         rc = dt_declare_create(env, child, la, NULL, dof, th);
736         if (rc != 0)
737                 GOTO(stop, rc);
738
739         /* 2a. increase child nlink */
740         rc = dt_declare_ref_add(env, child, th);
741         if (rc != 0)
742                 GOTO(stop, rc);
743
744         /* 3a. insert linkEA for child */
745         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
746                        ldata.ld_leh->leh_len);
747         rc = dt_declare_xattr_set(env, child, &linkea_buf,
748                                   XATTR_NAME_LINK, 0, th);
749         if (rc != 0)
750                 GOTO(stop, rc);
751
752         /* 4a. update bookmark */
753         rc = dt_declare_record_write(env, bk_obj,
754                                      lfsck_buf_get(env, bk, len), 0, th);
755         if (rc != 0)
756                 GOTO(stop, rc);
757
758         rc = dt_trans_start_local(env, dev, th);
759         if (rc != 0)
760                 GOTO(stop, rc);
761
762         dt_write_lock(env, child, 0);
763         /* 1b.1. create child */
764         rc = dt_create(env, child, la, NULL, dof, th);
765         if (rc != 0)
766                 GOTO(unlock, rc);
767
768         if (unlikely(!dt_try_as_dir(env, child)))
769                 GOTO(unlock, rc = -ENOTDIR);
770
771         /* 1b.2. insert dot into child dir */
772         rec->rec_type = S_IFDIR;
773         rec->rec_fid = cfid;
774         rc = dt_insert(env, child, (const struct dt_rec *)rec,
775                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
776         if (rc != 0)
777                 GOTO(unlock, rc);
778
779         /* 1b.3. insert dotdot into child dir */
780         rec->rec_fid = &LU_LPF_FID;
781         rc = dt_insert(env, child, (const struct dt_rec *)rec,
782                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
783         if (rc != 0)
784                 GOTO(unlock, rc);
785
786         /* 2b. increase child nlink */
787         rc = dt_ref_add(env, child, th);
788         if (rc != 0)
789                 GOTO(unlock, rc);
790
791         /* 3b. insert linkEA for child */
792         rc = dt_xattr_set(env, child, &linkea_buf,
793                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
794         if (rc != 0)
795                 GOTO(unlock, rc);
796
797         bk->lb_lpf_fid = *cfid;
798         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
799
800         /* 4b. update bookmark */
801         rc = dt_record_write(env, bk_obj,
802                              lfsck_buf_get(env, bk, len), &pos, th);
803
804         dt_write_unlock(env, child);
805         dt_trans_stop(env, dev, th);
806         if (rc != 0)
807                 RETURN(rc);
808
809         /* Transaction II: remotely */
810
811         dev = lfsck_obj2dev(parent);
812         th = dt_trans_create(env, dev);
813         if (IS_ERR(th))
814                 RETURN(PTR_ERR(th));
815
816         th->th_sync = 1;
817         /* 5a. insert name into parent dir */
818         rec->rec_fid = cfid;
819         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
820                                (const struct dt_key *)name, th);
821         if (rc != 0)
822                 GOTO(stop, rc);
823
824         /* 6a. increase parent nlink */
825         rc = dt_declare_ref_add(env, parent, th);
826         if (rc != 0)
827                 GOTO(stop, rc);
828
829         rc = dt_trans_start_local(env, dev, th);
830         if (rc != 0)
831                 GOTO(stop, rc);
832
833         /* 5b. insert name into parent dir */
834         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
835                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
836         if (rc != 0)
837                 GOTO(stop, rc);
838
839         dt_write_lock(env, parent, 0);
840         /* 6b. increase parent nlink */
841         rc = dt_ref_add(env, parent, th);
842         dt_write_unlock(env, parent);
843
844         GOTO(stop, rc);
845
846 unlock:
847         dt_write_unlock(env, child);
848 stop:
849         dt_trans_stop(env, dev, th);
850
851         if (rc != 0 && dev == lfsck_obj2dev(parent))
852                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
853                        "for orphans, but failed to insert the name %s "
854                        "to the .lustre/lost+found/. Such inconsistency "
855                        "will be repaired when LFSCK run next time: rc = %d\n",
856                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
857
858         return rc;
859 }
860
861 /**
862  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
863  *
864  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
865  * orphans and other uncertain inconsistent objects found during the
866  * LFSCK. Such directory will be created by the LFSCK engine on the
867  * local MDT before the LFSCK scanning.
868  *
869  * \param[in] env       pointer to the thread context
870  * \param[in] lfsck     pointer to the lfsck instance
871  *
872  * \retval              0 for success
873  * \retval              negative error number on failure
874  */
875 static int lfsck_create_lpf(const struct lu_env *env,
876                             struct lfsck_instance *lfsck)
877 {
878         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
879         struct lfsck_thread_info *info  = lfsck_env_info(env);
880         struct lu_fid            *cfid  = &info->lti_fid2;
881         struct lu_attr           *la    = &info->lti_la;
882         struct dt_object_format  *dof   = &info->lti_dof;
883         struct dt_object         *parent = lfsck->li_lpf_root_obj;
884         struct dt_object         *child = NULL;
885         struct lustre_handle      lh    = { 0 };
886         char                      name[8];
887         int                       node  = lfsck_dev_idx(lfsck);
888         int                       rc    = 0;
889         ENTRY;
890
891         LASSERT(lfsck->li_master);
892         LASSERT(parent != NULL);
893         LASSERT(lfsck->li_lpf_obj == NULL);
894
895         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
896                               MDS_INODELOCK_UPDATE, LCK_EX);
897         if (rc != 0)
898                 RETURN(rc);
899
900         snprintf(name, 8, "MDT%04x", node);
901         if (fid_is_zero(&bk->lb_lpf_fid)) {
902                 /* There is corner case that: in former LFSCK scanning we have
903                  * created the .lustre/lost+found/MDTxxxx but failed to update
904                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
905                  * it from MDT0 firstly. */
906                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
907                                (const struct dt_key *)name, BYPASS_CAPA);
908                 if (rc != 0 && rc != -ENOENT)
909                         GOTO(unlock, rc);
910
911                 if (rc == 0) {
912                         bk->lb_lpf_fid = *cfid;
913                         rc = lfsck_bookmark_store(env, lfsck);
914                 } else {
915                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
916                 }
917                 if (rc != 0)
918                         GOTO(unlock, rc);
919         } else {
920                 *cfid = bk->lb_lpf_fid;
921         }
922
923         child = lfsck_object_find_bottom(env, lfsck, cfid);
924         if (IS_ERR(child))
925                 GOTO(unlock, rc = PTR_ERR(child));
926
927         if (dt_object_exists(child) != 0) {
928                 if (unlikely(!dt_try_as_dir(env, child)))
929                         rc = -ENOTDIR;
930                 else
931                         lfsck->li_lpf_obj = child;
932
933                 GOTO(unlock, rc);
934         }
935
936         memset(la, 0, sizeof(*la));
937         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
938         la->la_mode = S_IFDIR | S_IRWXU;
939         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
940                        LA_UID | LA_GID;
941         memset(dof, 0, sizeof(*dof));
942         dof->dof_type = dt_mode_to_dft(S_IFDIR);
943
944         if (node == 0)
945                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
946         else
947                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
948         if (rc == 0)
949                 lfsck->li_lpf_obj = child;
950
951         GOTO(unlock, rc);
952
953 unlock:
954         lfsck_ibits_unlock(&lh, LCK_EX);
955         if (rc != 0 && child != NULL && !IS_ERR(child))
956                 lfsck_object_put(env, child);
957
958         return rc;
959 }
960
961 /**
962  * Scan .lustre/lost+found for bad name entries and remove them.
963  *
964  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
965  * index in the system. Any other formatted name is invalid and should be
966  * removed.
967  *
968  * \param[in] env       pointer to the thread context
969  * \param[in] lfsck     pointer to the lfsck instance
970  *
971  * \retval              0 for success
972  * \retval              negative error number on failure
973  */
974 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
975                                       struct lfsck_instance *lfsck)
976 {
977         struct dt_object        *parent = lfsck->li_lpf_root_obj;
978         struct lu_dirent        *ent    =
979                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
980         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
981         struct dt_it            *it;
982         int                      rc;
983         ENTRY;
984
985         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
986         if (IS_ERR(it))
987                 RETURN(PTR_ERR(it));
988
989         rc = iops->load(env, it, 0);
990         if (rc == 0)
991                 rc = iops->next(env, it);
992         else if (rc > 0)
993                 rc = 0;
994
995         while (rc == 0) {
996                 int off = 3;
997
998                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
999                 if (rc != 0)
1000                         break;
1001
1002                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1003                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1004                         goto next;
1005
1006                 /* name length must be strlen("MDTxxxx") */
1007                 if (ent->lde_namelen != 7)
1008                         goto remove;
1009
1010                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1011                         goto remove;
1012
1013                 while (off < 7 && isxdigit(ent->lde_name[off]))
1014                         off++;
1015
1016                 if (off != 7) {
1017
1018 remove:
1019                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1020                                                          ent->lde_name);
1021                         if (rc != 0)
1022                                 break;
1023                 }
1024
1025 next:
1026                 rc = iops->next(env, it);
1027         }
1028
1029         iops->put(env, it);
1030         iops->fini(env, it);
1031
1032         RETURN(rc > 0 ? 0 : rc);
1033 }
1034
1035 static int lfsck_update_lpf_entry(const struct lu_env *env,
1036                                   struct lfsck_instance *lfsck,
1037                                   struct dt_object *parent,
1038                                   struct dt_object *child,
1039                                   const char *name,
1040                                   enum lfsck_verify_lpf_types type)
1041 {
1042         int rc;
1043
1044         if (type == LVLT_BY_BOOKMARK) {
1045                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1046                                              lfsck_dto2fid(child), S_IFDIR);
1047         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1048                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1049                 rc = lfsck_bookmark_store(env, lfsck);
1050
1051                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1052                        " in the bookmark file: rc = %d\n",
1053                        lfsck_lfsck2name(lfsck),
1054                        PFID(lfsck_dto2fid(child)), rc);
1055         }
1056
1057         return rc;
1058 }
1059
1060 /**
1061  * Check whether the @child back references the @parent.
1062  *
1063  * Two cases:
1064  * 1) The child's FID is stored in the bookmark file. If the child back
1065  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1066  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1067  *    the child back references another parent2, then:
1068  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1069  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1070  *      references the child. So keep them there. As the LFSCK processing,
1071  *      the parent3 may be found, then when the LFSCK run next time, the
1072  *      inconsistency can be repaired.
1073  *
1074  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1075  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1076  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1077  *    back references another parent2, then:
1078  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1079  *      from .lustre/lost+found/;
1080  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1081  *      sub-directory name entry and update the child;
1082  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1083  *      or not, then keep them there.
1084  *
1085  * \param[in] env       pointer to the thread context
1086  * \param[in] lfsck     pointer to the lfsck instance
1087  * \param[in] child     pointer to the lost+found sub-directory object
1088  * \param[in] name      the name for lost+found sub-directory object
1089  * \param[out] fid      pointer to the buffer to hold the FID of the object
1090  *                      (called it as parent2) that is referenced via the
1091  *                      child's dotdot entry; it also can be the FID that
1092  *                      is referenced by the name entry under the parent2.
1093  * \param[in] type      to indicate where the child's FID is stored in
1094  *
1095  * \retval              positive number for uncertain inconsistency
1096  * \retval              0 for success
1097  * \retval              negative error number on failure
1098  */
1099 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1100                                   struct lfsck_instance *lfsck,
1101                                   struct dt_object *child, const char *name,
1102                                   struct lu_fid *fid,
1103                                   enum lfsck_verify_lpf_types type)
1104 {
1105         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1106         struct lfsck_thread_info *info    = lfsck_env_info(env);
1107         char                     *name2   = info->lti_key;
1108         struct lu_fid            *fid2    = &info->lti_fid3;
1109         struct dt_object         *parent2 = NULL;
1110         struct lustre_handle      lh      = { 0 };
1111         int                       rc;
1112         ENTRY;
1113
1114         fid_zero(fid);
1115         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1116                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1117         if (rc != 0)
1118                 GOTO(linkea, rc);
1119
1120         if (!fid_is_sane(fid))
1121                 GOTO(linkea, rc = -EINVAL);
1122
1123         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1124                 const struct lu_name *cname;
1125
1126                 if (lfsck->li_lpf_obj == NULL) {
1127                         lu_object_get(&child->do_lu);
1128                         lfsck->li_lpf_obj = child;
1129                 }
1130
1131                 cname = lfsck_name_get_const(env, name, strlen(name));
1132                 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1133                 if (rc == 0)
1134                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1135                                                     name, type);
1136
1137                 GOTO(out_done, rc);
1138         }
1139
1140         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1141         if (IS_ERR(parent2))
1142                 GOTO(linkea, parent2);
1143
1144         if (!dt_object_exists(parent2)) {
1145                 lfsck_object_put(env, parent2);
1146
1147                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1148         }
1149
1150         if (!dt_try_as_dir(env, parent2)) {
1151                 lfsck_object_put(env, parent2);
1152
1153                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1154         }
1155
1156 linkea:
1157         /* To prevent rename/unlink race */
1158         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1159                               MDS_INODELOCK_UPDATE, LCK_PR);
1160         if (rc != 0)
1161                 GOTO(out_put, rc);
1162
1163         dt_read_lock(env, child, 0);
1164         rc = lfsck_links_get_first(env, child, name2, fid2);
1165         if (rc != 0) {
1166                 dt_read_unlock(env, child);
1167                 lfsck_ibits_unlock(&lh, LCK_PR);
1168
1169                 GOTO(out_put, rc = 1);
1170         }
1171
1172         /* It is almost impossible that the bookmark file (or the name entry)
1173          * and the linkEA hit the same data corruption. Trust the linkEA. */
1174         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1175                 dt_read_unlock(env, child);
1176                 lfsck_ibits_unlock(&lh, LCK_PR);
1177
1178                 *fid = *fid2;
1179                 if (lfsck->li_lpf_obj == NULL) {
1180                         lu_object_get(&child->do_lu);
1181                         lfsck->li_lpf_obj = child;
1182                 }
1183
1184                 /* Update the child's dotdot entry */
1185                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1186                                              &LU_LPF_FID, S_IFDIR);
1187                 if (rc == 0)
1188                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1189                                                     name, type);
1190
1191                 GOTO(out_put, rc);
1192         }
1193
1194         if (parent2 == NULL || IS_ERR(parent2)) {
1195                 dt_read_unlock(env, child);
1196                 lfsck_ibits_unlock(&lh, LCK_PR);
1197
1198                 GOTO(out_done, rc = 1);
1199         }
1200
1201         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1202                        (const struct dt_key *)name2, BYPASS_CAPA);
1203         dt_read_unlock(env, child);
1204         lfsck_ibits_unlock(&lh, LCK_PR);
1205         if (rc != 0 && rc != -ENOENT)
1206                 GOTO(out_put, rc);
1207
1208         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1209                 if (type == LVLT_BY_BOOKMARK)
1210                         GOTO(out_put, rc = 1);
1211
1212                 /* Trust the name entry, update the child's dotdot entry. */
1213                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1214                                              &LU_LPF_FID, S_IFDIR);
1215
1216                 GOTO(out_put, rc);
1217         }
1218
1219         if (type == LVLT_BY_BOOKMARK) {
1220                 /* Invalid FID record in the bookmark file, reset it. */
1221                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1222                 rc = lfsck_bookmark_store(env, lfsck);
1223
1224                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1225                        " in the bookmark file: rc = %d\n",
1226                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1227         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1228                 /* The name entry is wrong, remove it. */
1229                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1230         }
1231
1232         GOTO(out_put, rc);
1233
1234 out_put:
1235         if (parent2 != NULL && !IS_ERR(parent2))
1236                 lfsck_object_put(env, parent2);
1237
1238 out_done:
1239         return rc;
1240 }
1241
1242 /**
1243  * Verify the /ROOT/.lustre/lost+found/ directory.
1244  *
1245  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1246  * the LFSCK does not exactly know how to handle, such as orphans. So before
1247  * the LFSCK scanning the system, the consistency of such directory needs to
1248  * be verified firstly to allow the users to use it during the LFSCK.
1249  *
1250  * \param[in] env       pointer to the thread context
1251  * \param[in] lfsck     pointer to the lfsck instance
1252  *
1253  * \retval              positive number for uncertain inconsistency
1254  * \retval              0 for success
1255  * \retval              negative error number on failure
1256  */
1257 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1258 {
1259         struct lfsck_thread_info *info   = lfsck_env_info(env);
1260         struct lu_fid            *pfid   = &info->lti_fid;
1261         struct lu_fid            *cfid   = &info->lti_fid2;
1262         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1263         struct dt_object         *parent;
1264         /* child1's FID is in the bookmark file. */
1265         struct dt_object         *child1 = NULL;
1266         /* child2's FID is in the name entry MDTxxxx. */
1267         struct dt_object         *child2 = NULL;
1268         const struct lu_name     *cname;
1269         char                      name[8];
1270         int                       node   = lfsck_dev_idx(lfsck);
1271         int                       rc     = 0;
1272         ENTRY;
1273
1274         LASSERT(lfsck->li_master);
1275
1276         if (lfsck->li_lpf_root_obj != NULL)
1277                 RETURN(0);
1278
1279         if (node == 0) {
1280                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1281                                                   &LU_LPF_FID);
1282         } else {
1283                 struct lfsck_tgt_desc *ltd;
1284
1285                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1286                 if (unlikely(ltd == NULL))
1287                         RETURN(-ENXIO);
1288
1289                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1290                                                   &LU_LPF_FID);
1291                 lfsck_tgt_put(ltd);
1292         }
1293
1294         if (IS_ERR(parent))
1295                 RETURN(PTR_ERR(parent));
1296
1297         LASSERT(dt_object_exists(parent));
1298
1299         if (unlikely(!dt_try_as_dir(env, parent))) {
1300                 lfsck_object_put(env, parent);
1301
1302                 GOTO(put, rc = -ENOTDIR);
1303         }
1304
1305         lfsck->li_lpf_root_obj = parent;
1306         if (node == 0) {
1307                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1308                 if (rc != 0)
1309                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1310                                "for bad sub-directories: rc = %d\n",
1311                                lfsck_lfsck2name(lfsck), rc);
1312         }
1313
1314         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1315                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1316                         struct lu_fid tfid = bk->lb_lpf_fid;
1317
1318                         /* Invalid FID record in the bookmark file, reset it. */
1319                         fid_zero(&bk->lb_lpf_fid);
1320                         rc = lfsck_bookmark_store(env, lfsck);
1321
1322                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1323                                " in the bookmark file: rc = %d\n",
1324                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1325
1326                         if (rc != 0)
1327                                 GOTO(put, rc);
1328                 } else {
1329                         child1 = lfsck_object_find_bottom(env, lfsck,
1330                                                           &bk->lb_lpf_fid);
1331                         if (IS_ERR(child1)) {
1332                                 child1 = NULL;
1333                                 goto find_child2;
1334                         }
1335
1336                         if (unlikely(!dt_object_exists(child1) ||
1337                                      dt_object_remote(child1)) ||
1338                                      !S_ISDIR(lfsck_object_type(child1))) {
1339                                 /* Invalid FID record in the bookmark file,
1340                                  * reset it. */
1341                                 fid_zero(&bk->lb_lpf_fid);
1342                                 rc = lfsck_bookmark_store(env, lfsck);
1343
1344                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1345                                        " in the bookmark file: rc = %d\n",
1346                                        lfsck_lfsck2name(lfsck),
1347                                        PFID(lfsck_dto2fid(child1)), rc);
1348
1349                                 if (rc != 0)
1350                                         GOTO(put, rc);
1351
1352                                 lfsck_object_put(env, child1);
1353                                 child1 = NULL;
1354                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1355                                 GOTO(put, rc = -ENOTDIR);
1356                         }
1357                 }
1358         }
1359
1360 find_child2:
1361         snprintf(name, 8, "MDT%04x", node);
1362         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1363                        (const struct dt_key *)name, BYPASS_CAPA);
1364         if (rc == -ENOENT) {
1365                 if (!fid_is_zero(&bk->lb_lpf_fid))
1366                         goto check_child1;
1367
1368                 GOTO(put, rc = 0);
1369         }
1370
1371         if (rc != 0)
1372                 GOTO(put, rc);
1373
1374         /* Invalid FID in the name entry, remove the name entry. */
1375         if (!fid_is_norm(cfid)) {
1376                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1377                 if (rc != 0)
1378                         GOTO(put, rc);
1379
1380                 goto check_child1;
1381         }
1382
1383         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1384         if (IS_ERR(child2))
1385                 GOTO(put, rc = PTR_ERR(child2));
1386
1387         if (unlikely(!dt_object_exists(child2) ||
1388                      dt_object_remote(child2)) ||
1389                      !S_ISDIR(lfsck_object_type(child2))) {
1390                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1391                 if (rc != 0)
1392                         GOTO(put, rc);
1393
1394                 goto check_child1;
1395         }
1396
1397         if (unlikely(!dt_try_as_dir(env, child2)))
1398                 GOTO(put, rc = -ENOTDIR);
1399
1400         if (child1 == NULL) {
1401                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1402                                             pfid, LVLT_BY_NAMEENTRY);
1403         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1404                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1405                                             pfid, LVLT_BY_BOOKMARK);
1406                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1407                         rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
1408                                                     name, pfid,
1409                                                     LVLT_BY_NAMEENTRY);
1410         } else {
1411                 if (lfsck->li_lpf_obj == NULL) {
1412                         lu_object_get(&child2->do_lu);
1413                         lfsck->li_lpf_obj = child2;
1414                 }
1415
1416                 cname = lfsck_name_get_const(env, name, strlen(name));
1417                 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1418         }
1419
1420         GOTO(put, rc);
1421
1422 check_child1:
1423         if (child1 != NULL)
1424                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1425                                             pfid, LVLT_BY_BOOKMARK);
1426
1427         GOTO(put, rc);
1428
1429 put:
1430         if (lfsck->li_lpf_obj != NULL) {
1431                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1432                         lfsck_object_put(env, lfsck->li_lpf_obj);
1433                         lfsck->li_lpf_obj = NULL;
1434                         rc = -ENOTDIR;
1435                 }
1436         } else if (rc == 0) {
1437                 rc = lfsck_create_lpf(env, lfsck);
1438         }
1439
1440         if (child2 != NULL && !IS_ERR(child2))
1441                 lfsck_object_put(env, child2);
1442         if (child1 != NULL && !IS_ERR(child1))
1443                 lfsck_object_put(env, child1);
1444
1445         return rc;
1446 }
1447
1448 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1449 {
1450         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1451         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1452         char                    *prefix;
1453         int                      rc     = 0;
1454         ENTRY;
1455
1456         if (unlikely(ss == NULL))
1457                 RETURN(-ENXIO);
1458
1459         OBD_ALLOC_PTR(lfsck->li_seq);
1460         if (lfsck->li_seq == NULL)
1461                 RETURN(-ENOMEM);
1462
1463         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1464         if (prefix == NULL)
1465                 GOTO(out, rc = -ENOMEM);
1466
1467         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1468         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1469                              ss->ss_server_seq);
1470         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1471         if (rc != 0)
1472                 GOTO(out, rc);
1473
1474         if (fid_is_sane(&bk->lb_last_fid))
1475                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1476
1477         RETURN(0);
1478
1479 out:
1480         OBD_FREE_PTR(lfsck->li_seq);
1481         lfsck->li_seq = NULL;
1482
1483         return rc;
1484 }
1485
1486 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1487 {
1488         if (lfsck->li_seq != NULL) {
1489                 seq_client_fini(lfsck->li_seq);
1490                 OBD_FREE_PTR(lfsck->li_seq);
1491                 lfsck->li_seq = NULL;
1492         }
1493 }
1494
1495 void lfsck_instance_cleanup(const struct lu_env *env,
1496                             struct lfsck_instance *lfsck)
1497 {
1498         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1499         struct lfsck_component  *com;
1500         struct lfsck_component  *next;
1501         struct lfsck_lmv_unit   *llu;
1502         struct lfsck_lmv_unit   *llu_next;
1503         struct lfsck_lmv        *llmv;
1504         ENTRY;
1505
1506         LASSERT(list_empty(&lfsck->li_link));
1507         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1508
1509         if (lfsck->li_obj_oit != NULL) {
1510                 lfsck_object_put(env, lfsck->li_obj_oit);
1511                 lfsck->li_obj_oit = NULL;
1512         }
1513
1514         LASSERT(lfsck->li_obj_dir == NULL);
1515         LASSERT(lfsck->li_lmv == NULL);
1516
1517         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1518                 llmv = &llu->llu_lmv;
1519
1520                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1521                          "still in using: %u\n",
1522                          atomic_read(&llmv->ll_ref));
1523
1524                 lfsck_lmv_put(env, llmv);
1525         }
1526
1527         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1528                 lfsck_component_cleanup(env, com);
1529         }
1530
1531         LASSERT(list_empty(&lfsck->li_list_dir));
1532
1533         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1534                                  lc_link) {
1535                 lfsck_component_cleanup(env, com);
1536         }
1537
1538         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1539                 lfsck_component_cleanup(env, com);
1540         }
1541
1542         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1543         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1544
1545         if (lfsck->li_lfsck_dir != NULL) {
1546                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1547                 lfsck->li_lfsck_dir = NULL;
1548         }
1549
1550         if (lfsck->li_bookmark_obj != NULL) {
1551                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1552                 lfsck->li_bookmark_obj = NULL;
1553         }
1554
1555         if (lfsck->li_lpf_obj != NULL) {
1556                 lfsck_object_put(env, lfsck->li_lpf_obj);
1557                 lfsck->li_lpf_obj = NULL;
1558         }
1559
1560         if (lfsck->li_lpf_root_obj != NULL) {
1561                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1562                 lfsck->li_lpf_root_obj = NULL;
1563         }
1564
1565         if (lfsck->li_los != NULL) {
1566                 local_oid_storage_fini(env, lfsck->li_los);
1567                 lfsck->li_los = NULL;
1568         }
1569
1570         lfsck_fid_fini(lfsck);
1571
1572         OBD_FREE_PTR(lfsck);
1573 }
1574
1575 static inline struct lfsck_instance *
1576 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1577 {
1578         struct lfsck_instance *lfsck;
1579
1580         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1581                 if (lfsck->li_bottom == key) {
1582                         if (ref)
1583                                 lfsck_instance_get(lfsck);
1584                         if (unlink)
1585                                 list_del_init(&lfsck->li_link);
1586
1587                         return lfsck;
1588                 }
1589         }
1590
1591         return NULL;
1592 }
1593
1594 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1595                                            bool unlink)
1596 {
1597         struct lfsck_instance *lfsck;
1598
1599         spin_lock(&lfsck_instance_lock);
1600         lfsck = __lfsck_instance_find(key, ref, unlink);
1601         spin_unlock(&lfsck_instance_lock);
1602
1603         return lfsck;
1604 }
1605
1606 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1607 {
1608         struct lfsck_instance *tmp;
1609
1610         spin_lock(&lfsck_instance_lock);
1611         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1612                 if (lfsck->li_bottom == tmp->li_bottom) {
1613                         spin_unlock(&lfsck_instance_lock);
1614                         return -EEXIST;
1615                 }
1616         }
1617
1618         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1619         spin_unlock(&lfsck_instance_lock);
1620         return 0;
1621 }
1622
1623 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1624                     const char *prefix)
1625 {
1626         int flag;
1627         int i;
1628         bool newline = (bits != 0 ? false : true);
1629
1630         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1631
1632         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1633                 if (flag & bits) {
1634                         bits &= ~flag;
1635                         if (names[i] != NULL) {
1636                                 if (bits == 0)
1637                                         newline = true;
1638
1639                                 seq_printf(m, "%s%c", names[i],
1640                                            newline ? '\n' : ',');
1641                         }
1642                 }
1643         }
1644
1645         if (!newline)
1646                 seq_printf(m, "\n");
1647         return 0;
1648 }
1649
1650 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1651 {
1652         if (time != 0)
1653                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1654                           cfs_time_current_sec() - time);
1655         else
1656                 seq_printf(m, "%s: N/A\n", prefix);
1657         return 0;
1658 }
1659
1660 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1661                    const char *prefix)
1662 {
1663         if (fid_is_zero(&pos->lp_dir_parent)) {
1664                 if (pos->lp_oit_cookie == 0)
1665                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1666                                    prefix);
1667                 else
1668                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1669                                    prefix, pos->lp_oit_cookie);
1670         } else {
1671                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1672                            prefix, pos->lp_oit_cookie,
1673                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1674         }
1675         return 0;
1676 }
1677
1678 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1679                     struct lfsck_position *pos, bool init)
1680 {
1681         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1682
1683         if (unlikely(lfsck->li_di_oit == NULL)) {
1684                 memset(pos, 0, sizeof(*pos));
1685                 return;
1686         }
1687
1688         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1689         if (!lfsck->li_current_oit_processed && !init)
1690                 pos->lp_oit_cookie--;
1691
1692         LASSERT(pos->lp_oit_cookie > 0);
1693
1694         if (lfsck->li_di_dir != NULL) {
1695                 struct dt_object *dto = lfsck->li_obj_dir;
1696
1697                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1698                                                         lfsck->li_di_dir);
1699
1700                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1701                         fid_zero(&pos->lp_dir_parent);
1702                         pos->lp_dir_cookie = 0;
1703                 } else {
1704                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1705                 }
1706         } else {
1707                 fid_zero(&pos->lp_dir_parent);
1708                 pos->lp_dir_cookie = 0;
1709         }
1710 }
1711
1712 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1713 {
1714         bool dirty = false;
1715
1716         if (limit != LFSCK_SPEED_NO_LIMIT) {
1717                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1718                         lfsck->li_sleep_rate = limit /
1719                                                msecs_to_jiffies(MSEC_PER_SEC);
1720                         lfsck->li_sleep_jif = 1;
1721                 } else {
1722                         lfsck->li_sleep_rate = 1;
1723                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1724                                               limit;
1725                 }
1726         } else {
1727                 lfsck->li_sleep_jif = 0;
1728                 lfsck->li_sleep_rate = 0;
1729         }
1730
1731         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1732                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1733                 dirty = true;
1734         }
1735
1736         return dirty;
1737 }
1738
1739 void lfsck_control_speed(struct lfsck_instance *lfsck)
1740 {
1741         struct ptlrpc_thread *thread = &lfsck->li_thread;
1742         struct l_wait_info    lwi;
1743
1744         if (lfsck->li_sleep_jif > 0 &&
1745             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1746                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1747                                        LWI_ON_SIGNAL_NOOP, NULL);
1748
1749                 l_wait_event(thread->t_ctl_waitq,
1750                              !thread_is_running(thread),
1751                              &lwi);
1752                 lfsck->li_new_scanned = 0;
1753         }
1754 }
1755
1756 void lfsck_control_speed_by_self(struct lfsck_component *com)
1757 {
1758         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1759         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1760         struct l_wait_info       lwi;
1761
1762         if (lfsck->li_sleep_jif > 0 &&
1763             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1764                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1765                                        LWI_ON_SIGNAL_NOOP, NULL);
1766
1767                 l_wait_event(thread->t_ctl_waitq,
1768                              !thread_is_running(thread),
1769                              &lwi);
1770                 com->lc_new_scanned = 0;
1771         }
1772 }
1773
1774 static struct lfsck_thread_args *
1775 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1776                        struct lfsck_component *com,
1777                        struct lfsck_start_param *lsp)
1778 {
1779         struct lfsck_thread_args *lta;
1780         int                       rc;
1781
1782         OBD_ALLOC_PTR(lta);
1783         if (lta == NULL)
1784                 return ERR_PTR(-ENOMEM);
1785
1786         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1787         if (rc != 0) {
1788                 OBD_FREE_PTR(lta);
1789                 return ERR_PTR(rc);
1790         }
1791
1792         lta->lta_lfsck = lfsck_instance_get(lfsck);
1793         if (com != NULL)
1794                 lta->lta_com = lfsck_component_get(com);
1795
1796         lta->lta_lsp = lsp;
1797
1798         return lta;
1799 }
1800
1801 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1802 {
1803         if (lta->lta_com != NULL)
1804                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1805         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1806         lu_env_fini(&lta->lta_env);
1807         OBD_FREE_PTR(lta);
1808 }
1809
1810 struct lfsck_assistant_data *
1811 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1812                           const char *name)
1813 {
1814         struct lfsck_assistant_data *lad;
1815
1816         OBD_ALLOC_PTR(lad);
1817         if (lad != NULL) {
1818                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1819                 if (lad->lad_bitmap == NULL) {
1820                         OBD_FREE_PTR(lad);
1821                         return NULL;
1822                 }
1823
1824                 INIT_LIST_HEAD(&lad->lad_req_list);
1825                 spin_lock_init(&lad->lad_lock);
1826                 INIT_LIST_HEAD(&lad->lad_ost_list);
1827                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1828                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1829                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1830                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1831                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1832                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1833                 lad->lad_ops = lao;
1834                 lad->lad_name = name;
1835         }
1836
1837         return lad;
1838 }
1839
1840 /**
1841  * Generic LFSCK asynchronous communication interpretor function.
1842  * The LFSCK RPC reply for both the event notification and status
1843  * querying will be handled here.
1844  *
1845  * \param[in] env       pointer to the thread context
1846  * \param[in] req       pointer to the LFSCK request
1847  * \param[in] args      pointer to the lfsck_async_interpret_args
1848  * \param[in] rc        the result for handling the LFSCK request
1849  *
1850  * \retval              0 for success
1851  * \retval              negative error number on failure
1852  */
1853 int lfsck_async_interpret_common(const struct lu_env *env,
1854                                  struct ptlrpc_request *req,
1855                                  void *args, int rc)
1856 {
1857         struct lfsck_async_interpret_args *laia = args;
1858         struct lfsck_component            *com  = laia->laia_com;
1859         struct lfsck_assistant_data       *lad  = com->lc_data;
1860         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1861         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1862         struct lfsck_request              *lr   = laia->laia_lr;
1863
1864         LASSERT(com->lc_lfsck->li_master);
1865
1866         switch (lr->lr_event) {
1867         case LE_START:
1868                 if (rc != 0) {
1869                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1870                                "start: rc = %d\n",
1871                                lfsck_lfsck2name(com->lc_lfsck),
1872                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1873                                ltd->ltd_index, lad->lad_name, rc);
1874
1875                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1876                                 struct lfsck_layout *lo = com->lc_file_ram;
1877
1878                                 if (lr->lr_flags & LEF_TO_OST)
1879                                         lfsck_lad_set_bitmap(env, com,
1880                                                              ltd->ltd_index);
1881                                 else
1882                                         lo->ll_flags |= LF_INCOMPLETE;
1883                         } else {
1884                                 struct lfsck_namespace *ns = com->lc_file_ram;
1885
1886                                 /* If some MDT does not join the namespace
1887                                  * LFSCK, then we cannot know whether there
1888                                  * is some name entry on such MDT that with
1889                                  * the referenced MDT-object on this MDT or
1890                                  * not. So the namespace LFSCK on this MDT
1891                                  * cannot handle orphan MDT-objects properly.
1892                                  * So we mark the LFSCK as LF_INCOMPLETE and
1893                                  * skip orphan MDT-objects handling. */
1894                                 ns->ln_flags |= LF_INCOMPLETE;
1895                         }
1896                         break;
1897                 }
1898
1899                 spin_lock(&ltds->ltd_lock);
1900                 if (ltd->ltd_dead) {
1901                         spin_unlock(&ltds->ltd_lock);
1902                         break;
1903                 }
1904
1905                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1906                         struct list_head *list;
1907                         struct list_head *phase_list;
1908
1909                         if (ltd->ltd_layout_done) {
1910                                 spin_unlock(&ltds->ltd_lock);
1911                                 break;
1912                         }
1913
1914                         if (lr->lr_flags & LEF_TO_OST) {
1915                                 list = &lad->lad_ost_list;
1916                                 phase_list = &lad->lad_ost_phase1_list;
1917                         } else {
1918                                 list = &lad->lad_mdt_list;
1919                                 phase_list = &lad->lad_mdt_phase1_list;
1920                         }
1921
1922                         if (list_empty(&ltd->ltd_layout_list))
1923                                 list_add_tail(&ltd->ltd_layout_list, list);
1924                         if (list_empty(&ltd->ltd_layout_phase_list))
1925                                 list_add_tail(&ltd->ltd_layout_phase_list,
1926                                               phase_list);
1927                 } else {
1928                         if (ltd->ltd_namespace_done) {
1929                                 spin_unlock(&ltds->ltd_lock);
1930                                 break;
1931                         }
1932
1933                         if (list_empty(&ltd->ltd_namespace_list))
1934                                 list_add_tail(&ltd->ltd_namespace_list,
1935                                               &lad->lad_mdt_list);
1936                         if (list_empty(&ltd->ltd_namespace_phase_list))
1937                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1938                                               &lad->lad_mdt_phase1_list);
1939                 }
1940                 spin_unlock(&ltds->ltd_lock);
1941                 break;
1942         case LE_STOP:
1943         case LE_PHASE1_DONE:
1944         case LE_PHASE2_DONE:
1945         case LE_PEER_EXIT:
1946                 if (rc != 0 && rc != -EALREADY)
1947                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1948                               "event = %d, rc = %d\n",
1949                               lfsck_lfsck2name(com->lc_lfsck),
1950                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1951                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1952                 break;
1953         case LE_QUERY: {
1954                 struct lfsck_reply *reply;
1955                 struct list_head *list;
1956                 struct list_head *phase_list;
1957
1958                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1959                         list = &ltd->ltd_layout_list;
1960                         phase_list = &ltd->ltd_layout_phase_list;
1961                 } else {
1962                         list = &ltd->ltd_namespace_list;
1963                         phase_list = &ltd->ltd_namespace_phase_list;
1964                 }
1965
1966                 if (rc != 0) {
1967                         spin_lock(&ltds->ltd_lock);
1968                         list_del_init(phase_list);
1969                         list_del_init(list);
1970                         spin_unlock(&ltds->ltd_lock);
1971                         break;
1972                 }
1973
1974                 reply = req_capsule_server_get(&req->rq_pill,
1975                                                &RMF_LFSCK_REPLY);
1976                 if (reply == NULL) {
1977                         rc = -EPROTO;
1978                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1979                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1980                                lad->lad_name, rc);
1981                         spin_lock(&ltds->ltd_lock);
1982                         list_del_init(phase_list);
1983                         list_del_init(list);
1984                         spin_unlock(&ltds->ltd_lock);
1985                         break;
1986                 }
1987
1988                 switch (reply->lr_status) {
1989                 case LS_SCANNING_PHASE1:
1990                         break;
1991                 case LS_SCANNING_PHASE2:
1992                         spin_lock(&ltds->ltd_lock);
1993                         list_del_init(phase_list);
1994                         if (ltd->ltd_dead) {
1995                                 spin_unlock(&ltds->ltd_lock);
1996                                 break;
1997                         }
1998
1999                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2000                                 if (ltd->ltd_layout_done) {
2001                                         spin_unlock(&ltds->ltd_lock);
2002                                         break;
2003                                 }
2004
2005                                 if (lr->lr_flags & LEF_TO_OST)
2006                                         list_add_tail(phase_list,
2007                                                 &lad->lad_ost_phase2_list);
2008                                 else
2009                                         list_add_tail(phase_list,
2010                                                 &lad->lad_mdt_phase2_list);
2011                         } else {
2012                                 if (ltd->ltd_namespace_done) {
2013                                         spin_unlock(&ltds->ltd_lock);
2014                                         break;
2015                                 }
2016
2017                                 list_add_tail(phase_list,
2018                                               &lad->lad_mdt_phase2_list);
2019                         }
2020                         spin_unlock(&ltds->ltd_lock);
2021                         break;
2022                 default:
2023                         spin_lock(&ltds->ltd_lock);
2024                         list_del_init(phase_list);
2025                         list_del_init(list);
2026                         spin_unlock(&ltds->ltd_lock);
2027                         break;
2028                 }
2029                 break;
2030         }
2031         default:
2032                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2033                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2034                 break;
2035         }
2036
2037         if (!laia->laia_shared) {
2038                 lfsck_tgt_put(ltd);
2039                 lfsck_component_put(env, com);
2040         }
2041
2042         return 0;
2043 }
2044
2045 static void lfsck_interpret(const struct lu_env *env,
2046                             struct lfsck_instance *lfsck,
2047                             struct ptlrpc_request *req, void *args, int result)
2048 {
2049         struct lfsck_async_interpret_args *laia = args;
2050         struct lfsck_component            *com;
2051
2052         LASSERT(laia->laia_com == NULL);
2053         LASSERT(laia->laia_shared);
2054
2055         spin_lock(&lfsck->li_lock);
2056         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2057                 laia->laia_com = com;
2058                 lfsck_async_interpret_common(env, req, laia, result);
2059         }
2060
2061         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2062                 laia->laia_com = com;
2063                 lfsck_async_interpret_common(env, req, laia, result);
2064         }
2065         spin_unlock(&lfsck->li_lock);
2066 }
2067
2068 static int lfsck_stop_notify(const struct lu_env *env,
2069                              struct lfsck_instance *lfsck,
2070                              struct lfsck_tgt_descs *ltds,
2071                              struct lfsck_tgt_desc *ltd, __u16 type)
2072 {
2073         struct lfsck_component *com;
2074         int                     rc = 0;
2075         ENTRY;
2076
2077         LASSERT(lfsck->li_master);
2078
2079         spin_lock(&lfsck->li_lock);
2080         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2081         if (com == NULL)
2082                 com = __lfsck_component_find(lfsck, type,
2083                                              &lfsck->li_list_double_scan);
2084         if (com != NULL)
2085                 lfsck_component_get(com);
2086         spin_unlock(&lfsck->li_lock);
2087
2088         if (com != NULL) {
2089                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2090                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2091                 struct lfsck_request              *lr    = &info->lti_lr;
2092                 struct lfsck_assistant_data       *lad   = com->lc_data;
2093                 struct list_head                  *list;
2094                 struct list_head                  *phase_list;
2095                 struct ptlrpc_request_set         *set;
2096
2097                 set = ptlrpc_prep_set();
2098                 if (set == NULL) {
2099                         lfsck_component_put(env, com);
2100
2101                         RETURN(-ENOMEM);
2102                 }
2103
2104                 if (type == LFSCK_TYPE_LAYOUT) {
2105                         list = &ltd->ltd_layout_list;
2106                         phase_list = &ltd->ltd_layout_phase_list;
2107                 } else {
2108                         list = &ltd->ltd_namespace_list;
2109                         phase_list = &ltd->ltd_namespace_phase_list;
2110                 }
2111
2112                 spin_lock(&ltds->ltd_lock);
2113                 if (list_empty(list)) {
2114                         LASSERT(list_empty(phase_list));
2115                         spin_unlock(&ltds->ltd_lock);
2116                         ptlrpc_set_destroy(set);
2117
2118                         RETURN(0);
2119                 }
2120
2121                 list_del_init(phase_list);
2122                 list_del_init(list);
2123                 spin_unlock(&ltds->ltd_lock);
2124
2125                 memset(lr, 0, sizeof(*lr));
2126                 lr->lr_index = lfsck_dev_idx(lfsck);
2127                 lr->lr_event = LE_PEER_EXIT;
2128                 lr->lr_active = type;
2129                 lr->lr_status = LS_CO_PAUSED;
2130                 if (ltds == &lfsck->li_ost_descs)
2131                         lr->lr_flags = LEF_TO_OST;
2132
2133                 laia->laia_com = com;
2134                 laia->laia_ltds = ltds;
2135                 atomic_inc(&ltd->ltd_ref);
2136                 laia->laia_ltd = ltd;
2137                 laia->laia_lr = lr;
2138                 laia->laia_shared = 0;
2139
2140                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2141                                          lfsck_async_interpret_common,
2142                                          laia, LFSCK_NOTIFY);
2143                 if (rc != 0) {
2144                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2145                                "co-stop for %s: rc = %d\n",
2146                                lfsck_lfsck2name(lfsck),
2147                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2148                                ltd->ltd_index, lad->lad_name, rc);
2149                         lfsck_tgt_put(ltd);
2150                 } else {
2151                         rc = ptlrpc_set_wait(set);
2152                 }
2153
2154                 ptlrpc_set_destroy(set);
2155                 lfsck_component_put(env, com);
2156         }
2157
2158         RETURN(rc);
2159 }
2160
2161 static int lfsck_async_interpret(const struct lu_env *env,
2162                                  struct ptlrpc_request *req,
2163                                  void *args, int rc)
2164 {
2165         struct lfsck_async_interpret_args *laia = args;
2166         struct lfsck_instance             *lfsck;
2167
2168         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2169                               li_mdt_descs);
2170         lfsck_interpret(env, lfsck, req, laia, rc);
2171         lfsck_tgt_put(laia->laia_ltd);
2172         if (rc != 0 && laia->laia_result != -EALREADY)
2173                 laia->laia_result = rc;
2174
2175         return 0;
2176 }
2177
2178 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2179                         struct lfsck_request *lr,
2180                         struct ptlrpc_request_set *set,
2181                         ptlrpc_interpterer_t interpreter,
2182                         void *args, int request)
2183 {
2184         struct lfsck_async_interpret_args *laia;
2185         struct ptlrpc_request             *req;
2186         struct lfsck_request              *tmp;
2187         struct req_format                 *format;
2188         int                                rc;
2189
2190         switch (request) {
2191         case LFSCK_NOTIFY:
2192                 format = &RQF_LFSCK_NOTIFY;
2193                 break;
2194         case LFSCK_QUERY:
2195                 format = &RQF_LFSCK_QUERY;
2196                 break;
2197         default:
2198                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2199                        exp->exp_obd->obd_name, request, -EINVAL);
2200                 return -EINVAL;
2201         }
2202
2203         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2204         if (req == NULL)
2205                 return -ENOMEM;
2206
2207         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2208         if (rc != 0) {
2209                 ptlrpc_request_free(req);
2210
2211                 return rc;
2212         }
2213
2214         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2215         *tmp = *lr;
2216         ptlrpc_request_set_replen(req);
2217
2218         laia = ptlrpc_req_async_args(req);
2219         *laia = *(struct lfsck_async_interpret_args *)args;
2220         if (laia->laia_com != NULL)
2221                 lfsck_component_get(laia->laia_com);
2222         req->rq_interpret_reply = interpreter;
2223         ptlrpc_set_add_req(set, req);
2224
2225         return 0;
2226 }
2227
2228 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2229                           struct lfsck_start_param *lsp)
2230 {
2231         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2232         struct lfsck_assistant_data     *lad     = com->lc_data;
2233         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2234         struct ptlrpc_thread            *athread = &lad->lad_thread;
2235         struct lfsck_thread_args        *lta;
2236         struct task_struct              *task;
2237         int                              rc;
2238         ENTRY;
2239
2240         lad->lad_assistant_status = 0;
2241         lad->lad_post_result = 0;
2242         lad->lad_to_post = 0;
2243         lad->lad_to_double_scan = 0;
2244         lad->lad_in_double_scan = 0;
2245         lad->lad_exit = 0;
2246         thread_set_flags(athread, 0);
2247
2248         lta = lfsck_thread_args_init(lfsck, com, lsp);
2249         if (IS_ERR(lta))
2250                 RETURN(PTR_ERR(lta));
2251
2252         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2253         if (IS_ERR(task)) {
2254                 rc = PTR_ERR(task);
2255                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2256                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2257                 lfsck_thread_args_fini(lta);
2258         } else {
2259                 struct l_wait_info lwi = { 0 };
2260
2261                 l_wait_event(mthread->t_ctl_waitq,
2262                              thread_is_running(athread) ||
2263                              thread_is_stopped(athread),
2264                              &lwi);
2265                 if (unlikely(!thread_is_running(athread)))
2266                         rc = lad->lad_assistant_status;
2267                 else
2268                         rc = 0;
2269         }
2270
2271         RETURN(rc);
2272 }
2273
2274 int lfsck_checkpoint_generic(const struct lu_env *env,
2275                              struct lfsck_component *com)
2276 {
2277         struct lfsck_assistant_data     *lad     = com->lc_data;
2278         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2279         struct ptlrpc_thread            *athread = &lad->lad_thread;
2280         struct l_wait_info               lwi     = { 0 };
2281
2282         if (com->lc_new_checked == 0)
2283                 return LFSCK_CHECKPOINT_SKIP;
2284
2285         l_wait_event(mthread->t_ctl_waitq,
2286                      list_empty(&lad->lad_req_list) ||
2287                      !thread_is_running(mthread) ||
2288                      thread_is_stopped(athread),
2289                      &lwi);
2290
2291         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2292                 return LFSCK_CHECKPOINT_SKIP;
2293
2294         return 0;
2295 }
2296
2297 void lfsck_post_generic(const struct lu_env *env,
2298                         struct lfsck_component *com, int *result)
2299 {
2300         struct lfsck_assistant_data     *lad     = com->lc_data;
2301         struct ptlrpc_thread            *athread = &lad->lad_thread;
2302         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2303         struct l_wait_info               lwi     = { 0 };
2304
2305         lad->lad_post_result = *result;
2306         if (*result <= 0)
2307                 lad->lad_exit = 1;
2308         lad->lad_to_post = 1;
2309
2310         wake_up_all(&athread->t_ctl_waitq);
2311         l_wait_event(mthread->t_ctl_waitq,
2312                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2313                      thread_is_stopped(athread),
2314                      &lwi);
2315
2316         if (lad->lad_assistant_status < 0)
2317                 *result = lad->lad_assistant_status;
2318 }
2319
2320 int lfsck_double_scan_generic(const struct lu_env *env,
2321                               struct lfsck_component *com, int status)
2322 {
2323         struct lfsck_assistant_data     *lad     = com->lc_data;
2324         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2325         struct ptlrpc_thread            *athread = &lad->lad_thread;
2326         struct l_wait_info               lwi     = { 0 };
2327
2328         if (status != LS_SCANNING_PHASE2)
2329                 lad->lad_exit = 1;
2330         else
2331                 lad->lad_to_double_scan = 1;
2332
2333         wake_up_all(&athread->t_ctl_waitq);
2334         l_wait_event(mthread->t_ctl_waitq,
2335                      lad->lad_in_double_scan ||
2336                      thread_is_stopped(athread),
2337                      &lwi);
2338
2339         if (lad->lad_assistant_status < 0)
2340                 return lad->lad_assistant_status;
2341
2342         return 0;
2343 }
2344
2345 void lfsck_quit_generic(const struct lu_env *env,
2346                         struct lfsck_component *com)
2347 {
2348         struct lfsck_assistant_data     *lad     = com->lc_data;
2349         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2350         struct ptlrpc_thread            *athread = &lad->lad_thread;
2351         struct l_wait_info               lwi     = { 0 };
2352
2353         lad->lad_exit = 1;
2354         wake_up_all(&athread->t_ctl_waitq);
2355         l_wait_event(mthread->t_ctl_waitq,
2356                      thread_is_init(athread) ||
2357                      thread_is_stopped(athread),
2358                      &lwi);
2359 }
2360
2361 /* external interfaces */
2362
2363 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2364 {
2365         struct lu_env           env;
2366         struct lfsck_instance  *lfsck;
2367         int                     rc;
2368         ENTRY;
2369
2370         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2371         if (rc != 0)
2372                 RETURN(rc);
2373
2374         lfsck = lfsck_instance_find(key, true, false);
2375         if (likely(lfsck != NULL)) {
2376                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2377                 lfsck_instance_put(&env, lfsck);
2378         } else {
2379                 rc = -ENXIO;
2380         }
2381
2382         lu_env_fini(&env);
2383
2384         RETURN(rc);
2385 }
2386 EXPORT_SYMBOL(lfsck_get_speed);
2387
2388 int lfsck_set_speed(struct dt_device *key, int val)
2389 {
2390         struct lu_env           env;
2391         struct lfsck_instance  *lfsck;
2392         int                     rc;
2393         ENTRY;
2394
2395         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2396         if (rc != 0)
2397                 RETURN(rc);
2398
2399         lfsck = lfsck_instance_find(key, true, false);
2400         if (likely(lfsck != NULL)) {
2401                 mutex_lock(&lfsck->li_mutex);
2402                 if (__lfsck_set_speed(lfsck, val))
2403                         rc = lfsck_bookmark_store(&env, lfsck);
2404                 mutex_unlock(&lfsck->li_mutex);
2405                 lfsck_instance_put(&env, lfsck);
2406         } else {
2407                 rc = -ENXIO;
2408         }
2409
2410         lu_env_fini(&env);
2411
2412         RETURN(rc);
2413 }
2414 EXPORT_SYMBOL(lfsck_set_speed);
2415
2416 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2417 {
2418         struct lu_env           env;
2419         struct lfsck_instance  *lfsck;
2420         int                     rc;
2421         ENTRY;
2422
2423         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2424         if (rc != 0)
2425                 RETURN(rc);
2426
2427         lfsck = lfsck_instance_find(key, true, false);
2428         if (likely(lfsck != NULL)) {
2429                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2430                 lfsck_instance_put(&env, lfsck);
2431         } else {
2432                 rc = -ENXIO;
2433         }
2434
2435         lu_env_fini(&env);
2436
2437         RETURN(rc);
2438 }
2439 EXPORT_SYMBOL(lfsck_get_windows);
2440
2441 int lfsck_set_windows(struct dt_device *key, int val)
2442 {
2443         struct lu_env           env;
2444         struct lfsck_instance  *lfsck;
2445         int                     rc;
2446         ENTRY;
2447
2448         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2449         if (rc != 0)
2450                 RETURN(rc);
2451
2452         lfsck = lfsck_instance_find(key, true, false);
2453         if (likely(lfsck != NULL)) {
2454                 if (val > LFSCK_ASYNC_WIN_MAX) {
2455                         CWARN("%s: Too large async window size, which "
2456                               "may cause memory issues. The valid range "
2457                               "is [0 - %u]. If you do not want to restrict "
2458                               "the window size for async requests pipeline, "
2459                               "just set it as 0.\n",
2460                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2461                         rc = -EINVAL;
2462                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2463                         mutex_lock(&lfsck->li_mutex);
2464                         lfsck->li_bookmark_ram.lb_async_windows = val;
2465                         rc = lfsck_bookmark_store(&env, lfsck);
2466                         mutex_unlock(&lfsck->li_mutex);
2467                 }
2468                 lfsck_instance_put(&env, lfsck);
2469         } else {
2470                 rc = -ENXIO;
2471         }
2472
2473         lu_env_fini(&env);
2474
2475         RETURN(rc);
2476 }
2477 EXPORT_SYMBOL(lfsck_set_windows);
2478
2479 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2480 {
2481         struct lu_env           env;
2482         struct lfsck_instance  *lfsck;
2483         struct lfsck_component *com;
2484         int                     rc;
2485         ENTRY;
2486
2487         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2488         if (rc != 0)
2489                 RETURN(rc);
2490
2491         lfsck = lfsck_instance_find(key, true, false);
2492         if (likely(lfsck != NULL)) {
2493                 com = lfsck_component_find(lfsck, type);
2494                 if (likely(com != NULL)) {
2495                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2496                         lfsck_component_put(&env, com);
2497                 } else {
2498                         rc = -ENOTSUPP;
2499                 }
2500
2501                 lfsck_instance_put(&env, lfsck);
2502         } else {
2503                 rc = -ENXIO;
2504         }
2505
2506         lu_env_fini(&env);
2507
2508         RETURN(rc);
2509 }
2510 EXPORT_SYMBOL(lfsck_dump);
2511
2512 static int lfsck_stop_all(const struct lu_env *env,
2513                           struct lfsck_instance *lfsck,
2514                           struct lfsck_stop *stop)
2515 {
2516         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2517         struct lfsck_request              *lr     = &info->lti_lr;
2518         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2519         struct ptlrpc_request_set         *set;
2520         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2521         struct lfsck_tgt_desc             *ltd;
2522         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2523         __u32                              idx;
2524         int                                rc     = 0;
2525         int                                rc1    = 0;
2526         ENTRY;
2527
2528         LASSERT(stop->ls_flags & LPF_BROADCAST);
2529
2530         set = ptlrpc_prep_set();
2531         if (unlikely(set == NULL))
2532                 RETURN(-ENOMEM);
2533
2534         memset(lr, 0, sizeof(*lr));
2535         lr->lr_event = LE_STOP;
2536         lr->lr_index = lfsck_dev_idx(lfsck);
2537         lr->lr_status = stop->ls_status;
2538         lr->lr_version = bk->lb_version;
2539         lr->lr_active = LFSCK_TYPES_ALL;
2540         lr->lr_param = stop->ls_flags;
2541
2542         laia->laia_com = NULL;
2543         laia->laia_ltds = ltds;
2544         laia->laia_lr = lr;
2545         laia->laia_result = 0;
2546         laia->laia_shared = 1;
2547
2548         down_read(&ltds->ltd_rw_sem);
2549         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2550                 ltd = lfsck_tgt_get(ltds, idx);
2551                 LASSERT(ltd != NULL);
2552
2553                 laia->laia_ltd = ltd;
2554                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2555                                          lfsck_async_interpret, laia,
2556                                          LFSCK_NOTIFY);
2557                 if (rc != 0) {
2558                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2559                         lfsck_tgt_put(ltd);
2560                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2561                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2562                         rc1 = rc;
2563                 }
2564         }
2565         up_read(&ltds->ltd_rw_sem);
2566
2567         rc = ptlrpc_set_wait(set);
2568         ptlrpc_set_destroy(set);
2569
2570         if (rc == 0)
2571                 rc = laia->laia_result;
2572
2573         if (rc == -EALREADY)
2574                 rc = 0;
2575
2576         if (rc != 0)
2577                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2578                        lfsck_lfsck2name(lfsck), rc);
2579
2580         RETURN(rc != 0 ? rc : rc1);
2581 }
2582
2583 static int lfsck_start_all(const struct lu_env *env,
2584                            struct lfsck_instance *lfsck,
2585                            struct lfsck_start *start)
2586 {
2587         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2588         struct lfsck_request              *lr     = &info->lti_lr;
2589         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2590         struct ptlrpc_request_set         *set;
2591         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2592         struct lfsck_tgt_desc             *ltd;
2593         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2594         __u32                              idx;
2595         int                                rc     = 0;
2596         ENTRY;
2597
2598         LASSERT(start->ls_flags & LPF_BROADCAST);
2599
2600         set = ptlrpc_prep_set();
2601         if (unlikely(set == NULL))
2602                 RETURN(-ENOMEM);
2603
2604         memset(lr, 0, sizeof(*lr));
2605         lr->lr_event = LE_START;
2606         lr->lr_index = lfsck_dev_idx(lfsck);
2607         lr->lr_speed = bk->lb_speed_limit;
2608         lr->lr_version = bk->lb_version;
2609         lr->lr_active = start->ls_active;
2610         lr->lr_param = start->ls_flags;
2611         lr->lr_async_windows = bk->lb_async_windows;
2612         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2613                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2614                        LSV_CREATE_MDTOBJ;
2615
2616         laia->laia_com = NULL;
2617         laia->laia_ltds = ltds;
2618         laia->laia_lr = lr;
2619         laia->laia_result = 0;
2620         laia->laia_shared = 1;
2621
2622         down_read(&ltds->ltd_rw_sem);
2623         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2624                 ltd = lfsck_tgt_get(ltds, idx);
2625                 LASSERT(ltd != NULL);
2626
2627                 laia->laia_ltd = ltd;
2628                 ltd->ltd_layout_done = 0;
2629                 ltd->ltd_namespace_done = 0;
2630                 ltd->ltd_synced_failures = 0;
2631                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2632                                          lfsck_async_interpret, laia,
2633                                          LFSCK_NOTIFY);
2634                 if (rc != 0) {
2635                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2636                         lfsck_tgt_put(ltd);
2637                         CERROR("%s: cannot notify MDT %x for LFSCK "
2638                                "start, failout: rc = %d\n",
2639                                lfsck_lfsck2name(lfsck), idx, rc);
2640                         break;
2641                 }
2642         }
2643         up_read(&ltds->ltd_rw_sem);
2644
2645         if (rc != 0) {
2646                 ptlrpc_set_destroy(set);
2647
2648                 RETURN(rc);
2649         }
2650
2651         rc = ptlrpc_set_wait(set);
2652         ptlrpc_set_destroy(set);
2653
2654         if (rc == 0)
2655                 rc = laia->laia_result;
2656
2657         if (rc != 0) {
2658                 struct lfsck_stop *stop = &info->lti_stop;
2659
2660                 CERROR("%s: cannot start LFSCK on some MDTs, "
2661                        "stop all: rc = %d\n",
2662                        lfsck_lfsck2name(lfsck), rc);
2663                 if (rc != -EALREADY) {
2664                         stop->ls_status = LS_FAILED;
2665                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2666                         lfsck_stop_all(env, lfsck, stop);
2667                 }
2668         }
2669
2670         RETURN(rc);
2671 }
2672
2673 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2674                 struct lfsck_start_param *lsp)
2675 {
2676         struct lfsck_start              *start  = lsp->lsp_start;
2677         struct lfsck_instance           *lfsck;
2678         struct lfsck_bookmark           *bk;
2679         struct ptlrpc_thread            *thread;
2680         struct lfsck_component          *com;
2681         struct l_wait_info               lwi    = { 0 };
2682         struct lfsck_thread_args        *lta;
2683         struct task_struct              *task;
2684         int                              rc     = 0;
2685         __u16                            valid  = 0;
2686         __u16                            flags  = 0;
2687         __u16                            type   = 1;
2688         ENTRY;
2689
2690         lfsck = lfsck_instance_find(key, true, false);
2691         if (unlikely(lfsck == NULL))
2692                 RETURN(-ENXIO);
2693
2694         /* System is not ready, try again later. */
2695         if (unlikely(lfsck->li_namespace == NULL))
2696                 GOTO(put, rc = -EAGAIN);
2697
2698         /* start == NULL means auto trigger paused LFSCK. */
2699         if ((start == NULL) &&
2700             (list_empty(&lfsck->li_list_scan) ||
2701              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2702                 GOTO(put, rc = 0);
2703
2704         bk = &lfsck->li_bookmark_ram;
2705         thread = &lfsck->li_thread;
2706         mutex_lock(&lfsck->li_mutex);
2707         spin_lock(&lfsck->li_lock);
2708         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2709                 rc = -EALREADY;
2710                 if (unlikely(start == NULL)) {
2711                         spin_unlock(&lfsck->li_lock);
2712                         GOTO(out, rc);
2713                 }
2714
2715                 while (start->ls_active != 0) {
2716                         if (!(type & start->ls_active)) {
2717                                 type <<= 1;
2718                                 continue;
2719                         }
2720
2721                         com = __lfsck_component_find(lfsck, type,
2722                                                      &lfsck->li_list_scan);
2723                         if (com == NULL)
2724                                 com = __lfsck_component_find(lfsck, type,
2725                                                 &lfsck->li_list_double_scan);
2726                         if (com == NULL) {
2727                                 rc = -EOPNOTSUPP;
2728                                 break;
2729                         }
2730
2731                         if (com->lc_ops->lfsck_join != NULL) {
2732                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2733                                 if (rc != 0 && rc != -EALREADY)
2734                                         break;
2735                         }
2736                         start->ls_active &= ~type;
2737                         type <<= 1;
2738                 }
2739                 spin_unlock(&lfsck->li_lock);
2740                 GOTO(out, rc);
2741         }
2742         spin_unlock(&lfsck->li_lock);
2743
2744         lfsck->li_status = 0;
2745         lfsck->li_oit_over = 0;
2746         lfsck->li_start_unplug = 0;
2747         lfsck->li_drop_dryrun = 0;
2748         lfsck->li_new_scanned = 0;
2749
2750         /* For auto trigger. */
2751         if (start == NULL)
2752                 goto trigger;
2753
2754         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2755                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2756                        lfsck_lfsck2name(lfsck));
2757
2758                 GOTO(out, rc = -EPERM);
2759         }
2760
2761         start->ls_version = bk->lb_version;
2762
2763         if (start->ls_active != 0) {
2764                 struct lfsck_component *next;
2765
2766                 if (start->ls_active == LFSCK_TYPES_ALL)
2767                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2768
2769                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2770                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2771                         GOTO(out, rc = -ENOTSUPP);
2772                 }
2773
2774                 list_for_each_entry_safe(com, next,
2775                                          &lfsck->li_list_scan, lc_link) {
2776                         if (!(com->lc_type & start->ls_active)) {
2777                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2778                                                              false);
2779                                 if (rc != 0)
2780                                         GOTO(out, rc);
2781                         }
2782                 }
2783
2784                 while (start->ls_active != 0) {
2785                         if (type & start->ls_active) {
2786                                 com = __lfsck_component_find(lfsck, type,
2787                                                         &lfsck->li_list_idle);
2788                                 if (com != NULL)
2789                                         /* The component status will be updated
2790                                          * when its prep() is called later by
2791                                          * the LFSCK main engine. */
2792                                         list_move_tail(&com->lc_link,
2793                                                        &lfsck->li_list_scan);
2794                                 start->ls_active &= ~type;
2795                         }
2796                         type <<= 1;
2797                 }
2798         }
2799
2800         if (list_empty(&lfsck->li_list_scan)) {
2801                 /* The speed limit will be used to control both the LFSCK and
2802                  * low layer scrub (if applied), need to be handled firstly. */
2803                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2804                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2805                                 rc = lfsck_bookmark_store(env, lfsck);
2806                                 if (rc != 0)
2807                                         GOTO(out, rc);
2808                         }
2809                 }
2810
2811                 goto trigger;
2812         }
2813
2814         if (start->ls_flags & LPF_RESET)
2815                 flags |= DOIF_RESET;
2816
2817         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2818         if (rc != 0)
2819                 GOTO(out, rc);
2820
2821         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2822                 start->ls_active |= com->lc_type;
2823                 if (flags & DOIF_RESET) {
2824                         rc = com->lc_ops->lfsck_reset(env, com, false);
2825                         if (rc != 0)
2826                                 GOTO(out, rc);
2827                 }
2828         }
2829
2830 trigger:
2831         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2832         if (bk->lb_param & LPF_DRYRUN)
2833                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2834
2835         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2836                 valid |= DOIV_ERROR_HANDLE;
2837                 if (start->ls_flags & LPF_FAILOUT)
2838                         flags |= DOIF_FAILOUT;
2839         }
2840
2841         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2842                 valid |= DOIV_DRYRUN;
2843                 if (start->ls_flags & LPF_DRYRUN)
2844                         flags |= DOIF_DRYRUN;
2845         }
2846
2847         if (!list_empty(&lfsck->li_list_scan))
2848                 flags |= DOIF_OUTUSED;
2849
2850         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2851         thread_set_flags(thread, 0);
2852         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2853         if (IS_ERR(lta))
2854                 GOTO(out, rc = PTR_ERR(lta));
2855
2856         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2857         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2858         if (IS_ERR(task)) {
2859                 rc = PTR_ERR(task);
2860                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2861                        lfsck_lfsck2name(lfsck), rc);
2862                 lfsck_thread_args_fini(lta);
2863
2864                 GOTO(out, rc);
2865         }
2866
2867         l_wait_event(thread->t_ctl_waitq,
2868                      thread_is_running(thread) ||
2869                      thread_is_stopped(thread),
2870                      &lwi);
2871         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2872                 lfsck->li_start_unplug = 1;
2873                 wake_up_all(&thread->t_ctl_waitq);
2874
2875                 GOTO(out, rc = 0);
2876         }
2877
2878         /* release lfsck::li_mutex to avoid deadlock. */
2879         mutex_unlock(&lfsck->li_mutex);
2880         rc = lfsck_start_all(env, lfsck, start);
2881         if (rc != 0) {
2882                 spin_lock(&lfsck->li_lock);
2883                 if (thread_is_stopped(thread)) {
2884                         spin_unlock(&lfsck->li_lock);
2885                 } else {
2886                         lfsck->li_status = LS_FAILED;
2887                         lfsck->li_flags = 0;
2888                         thread_set_flags(thread, SVC_STOPPING);
2889                         spin_unlock(&lfsck->li_lock);
2890
2891                         lfsck->li_start_unplug = 1;
2892                         wake_up_all(&thread->t_ctl_waitq);
2893                         l_wait_event(thread->t_ctl_waitq,
2894                                      thread_is_stopped(thread),
2895                                      &lwi);
2896                 }
2897         } else {
2898                 lfsck->li_start_unplug = 1;
2899                 wake_up_all(&thread->t_ctl_waitq);
2900         }
2901
2902         GOTO(put, rc);
2903
2904 out:
2905         mutex_unlock(&lfsck->li_mutex);
2906
2907 put:
2908         lfsck_instance_put(env, lfsck);
2909
2910         return rc < 0 ? rc : 0;
2911 }
2912 EXPORT_SYMBOL(lfsck_start);
2913
2914 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2915                struct lfsck_stop *stop)
2916 {
2917         struct lfsck_instance   *lfsck;
2918         struct ptlrpc_thread    *thread;
2919         struct l_wait_info       lwi    = { 0 };
2920         int                      rc     = 0;
2921         int                      rc1    = 0;
2922         ENTRY;
2923
2924         lfsck = lfsck_instance_find(key, true, false);
2925         if (unlikely(lfsck == NULL))
2926                 RETURN(-ENXIO);
2927
2928         thread = &lfsck->li_thread;
2929         /* release lfsck::li_mutex to avoid deadlock. */
2930         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2931                 if (!lfsck->li_master) {
2932                         CERROR("%s: only allow to specify '-A' via MDS\n",
2933                                lfsck_lfsck2name(lfsck));
2934
2935                         GOTO(out, rc = -EPERM);
2936                 }
2937
2938                 rc1 = lfsck_stop_all(env, lfsck, stop);
2939         }
2940
2941         mutex_lock(&lfsck->li_mutex);
2942         spin_lock(&lfsck->li_lock);
2943         /* no error if LFSCK is already stopped, or was never started */
2944         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2945                 spin_unlock(&lfsck->li_lock);
2946                 GOTO(out, rc = 0);
2947         }
2948
2949         if (stop != NULL) {
2950                 lfsck->li_status = stop->ls_status;
2951                 lfsck->li_flags = stop->ls_flags;
2952         } else {
2953                 lfsck->li_status = LS_STOPPED;
2954                 lfsck->li_flags = 0;
2955         }
2956
2957         thread_set_flags(thread, SVC_STOPPING);
2958         spin_unlock(&lfsck->li_lock);
2959
2960         wake_up_all(&thread->t_ctl_waitq);
2961         l_wait_event(thread->t_ctl_waitq,
2962                      thread_is_stopped(thread),
2963                      &lwi);
2964
2965         GOTO(out, rc = 0);
2966
2967 out:
2968         mutex_unlock(&lfsck->li_mutex);
2969         lfsck_instance_put(env, lfsck);
2970
2971         return rc != 0 ? rc : rc1;
2972 }
2973 EXPORT_SYMBOL(lfsck_stop);
2974
2975 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2976                     struct lfsck_request *lr, struct thandle *th)
2977 {
2978         int rc = -EOPNOTSUPP;
2979         ENTRY;
2980
2981         switch (lr->lr_event) {
2982         case LE_START: {
2983                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2984                 struct lfsck_start_param  lsp;
2985
2986                 memset(start, 0, sizeof(*start));
2987                 start->ls_valid = lr->lr_valid;
2988                 start->ls_speed_limit = lr->lr_speed;
2989                 start->ls_version = lr->lr_version;
2990                 start->ls_active = lr->lr_active;
2991                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2992                 start->ls_async_windows = lr->lr_async_windows;
2993
2994                 lsp.lsp_start = start;
2995                 lsp.lsp_index = lr->lr_index;
2996                 lsp.lsp_index_valid = 1;
2997                 rc = lfsck_start(env, key, &lsp);
2998                 break;
2999         }
3000         case LE_STOP: {
3001                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3002
3003                 memset(stop, 0, sizeof(*stop));
3004                 stop->ls_status = lr->lr_status;
3005                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3006                 rc = lfsck_stop(env, key, stop);
3007                 break;
3008         }
3009         case LE_PHASE1_DONE:
3010         case LE_PHASE2_DONE:
3011         case LE_FID_ACCESSED:
3012         case LE_PEER_EXIT:
3013         case LE_CONDITIONAL_DESTROY:
3014         case LE_SKIP_NLINK_DECLARE:
3015         case LE_SKIP_NLINK:
3016         case LE_SET_LMV_MASTER:
3017         case LE_SET_LMV_SLAVE:
3018         case LE_PAIRS_VERIFY: {
3019                 struct lfsck_instance  *lfsck;
3020                 struct lfsck_component *com;
3021
3022                 lfsck = lfsck_instance_find(key, true, false);
3023                 if (unlikely(lfsck == NULL))
3024                         RETURN(-ENXIO);
3025
3026                 com = lfsck_component_find(lfsck, lr->lr_active);
3027                 if (likely(com != NULL)) {
3028                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3029                         lfsck_component_put(env, com);
3030                 }
3031
3032                 lfsck_instance_put(env, lfsck);
3033                 break;
3034         }
3035         default:
3036                 break;
3037         }
3038
3039         RETURN(rc);
3040 }
3041 EXPORT_SYMBOL(lfsck_in_notify);
3042
3043 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3044                 struct lfsck_request *lr)
3045 {
3046         struct lfsck_instance  *lfsck;
3047         struct lfsck_component *com;
3048         int                     rc;
3049         ENTRY;
3050
3051         lfsck = lfsck_instance_find(key, true, false);
3052         if (unlikely(lfsck == NULL))
3053                 RETURN(-ENXIO);
3054
3055         com = lfsck_component_find(lfsck, lr->lr_active);
3056         if (likely(com != NULL)) {
3057                 rc = com->lc_ops->lfsck_query(env, com);
3058                 lfsck_component_put(env, com);
3059         } else {
3060                 rc = -ENOTSUPP;
3061         }
3062
3063         lfsck_instance_put(env, lfsck);
3064
3065         RETURN(rc);
3066 }
3067
3068 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3069                              struct ldlm_namespace *ns)
3070 {
3071         struct lfsck_instance  *lfsck;
3072         int                     rc      = -ENXIO;
3073
3074         lfsck = lfsck_instance_find(key, true, false);
3075         if (likely(lfsck != NULL)) {
3076                 lfsck->li_namespace = ns;
3077                 lfsck_instance_put(env, lfsck);
3078                 rc = 0;
3079         }
3080
3081         return rc;
3082 }
3083 EXPORT_SYMBOL(lfsck_register_namespace);
3084
3085 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3086                    struct dt_device *next, struct obd_device *obd,
3087                    lfsck_out_notify notify, void *notify_data, bool master)
3088 {
3089         struct lfsck_instance   *lfsck;
3090         struct dt_object        *root  = NULL;
3091         struct dt_object        *obj   = NULL;
3092         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3093         int                      rc;
3094         ENTRY;
3095
3096         lfsck = lfsck_instance_find(key, false, false);
3097         if (unlikely(lfsck != NULL))
3098                 RETURN(-EEXIST);
3099
3100         OBD_ALLOC_PTR(lfsck);
3101         if (lfsck == NULL)
3102                 RETURN(-ENOMEM);
3103
3104         mutex_init(&lfsck->li_mutex);
3105         spin_lock_init(&lfsck->li_lock);
3106         INIT_LIST_HEAD(&lfsck->li_link);
3107         INIT_LIST_HEAD(&lfsck->li_list_scan);
3108         INIT_LIST_HEAD(&lfsck->li_list_dir);
3109         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3110         INIT_LIST_HEAD(&lfsck->li_list_idle);
3111         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3112         atomic_set(&lfsck->li_ref, 1);
3113         atomic_set(&lfsck->li_double_scan_count, 0);
3114         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3115         lfsck->li_out_notify = notify;
3116         lfsck->li_out_notify_data = notify_data;
3117         lfsck->li_next = next;
3118         lfsck->li_bottom = key;
3119         lfsck->li_obd = obd;
3120
3121         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3122         if (rc != 0)
3123                 GOTO(out, rc);
3124
3125         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3126         if (rc != 0)
3127                 GOTO(out, rc);
3128
3129         fid->f_seq = FID_SEQ_LOCAL_NAME;
3130         fid->f_oid = 1;
3131         fid->f_ver = 0;
3132         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3133         if (rc != 0)
3134                 GOTO(out, rc);
3135
3136         rc = dt_root_get(env, key, fid);
3137         if (rc != 0)
3138                 GOTO(out, rc);
3139
3140         root = dt_locate(env, key, fid);
3141         if (IS_ERR(root))
3142                 GOTO(out, rc = PTR_ERR(root));
3143
3144         if (unlikely(!dt_try_as_dir(env, root)))
3145                 GOTO(out, rc = -ENOTDIR);
3146
3147         lfsck->li_local_root_fid = *fid;
3148         if (master) {
3149                 lfsck->li_master = 1;
3150                 if (lfsck_dev_idx(lfsck) == 0) {
3151                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3152                         const struct lu_name *cname;
3153
3154                         rc = dt_lookup(env, root,
3155                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3156                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3157                         if (rc != 0)
3158                                 GOTO(out, rc);
3159
3160                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3161                         if (IS_ERR(obj))
3162                                 GOTO(out, rc = PTR_ERR(obj));
3163
3164                         if (unlikely(!dt_try_as_dir(env, obj)))
3165                                 GOTO(out, rc = -ENOTDIR);
3166
3167                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3168                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3169                         if (rc != 0)
3170                                 GOTO(out, rc);
3171
3172                         lfsck_object_put(env, obj);
3173                         obj = dt_locate(env, key, fid);
3174                         if (IS_ERR(obj))
3175                                 GOTO(out, rc = PTR_ERR(obj));
3176
3177                         cname = lfsck_name_get_const(env, dotlustre,
3178                                                      strlen(dotlustre));
3179                         rc = lfsck_verify_linkea(env, obj, cname,
3180                                                  &lfsck->li_global_root_fid);
3181                         if (rc != 0)
3182                                 GOTO(out, rc);
3183
3184                         if (unlikely(!dt_try_as_dir(env, obj)))
3185                                 GOTO(out, rc = -ENOTDIR);
3186
3187                         *pfid = *fid;
3188                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3189                                        (const struct dt_key *)lostfound,
3190                                        BYPASS_CAPA);
3191                         if (rc != 0)
3192                                 GOTO(out, rc);
3193
3194                         lfsck_object_put(env, obj);
3195                         obj = dt_locate(env, key, fid);
3196                         if (IS_ERR(obj))
3197                                 GOTO(out, rc = PTR_ERR(obj));
3198
3199                         cname = lfsck_name_get_const(env, lostfound,
3200                                                      strlen(lostfound));
3201                         rc = lfsck_verify_linkea(env, obj, cname, pfid);
3202                         if (rc != 0)
3203                                 GOTO(out, rc);
3204
3205                         lfsck_object_put(env, obj);
3206                         obj = NULL;
3207                 }
3208         }
3209
3210         fid->f_seq = FID_SEQ_LOCAL_FILE;
3211         fid->f_oid = OTABLE_IT_OID;
3212         fid->f_ver = 0;
3213         obj = dt_locate(env, key, fid);
3214         if (IS_ERR(obj))
3215                 GOTO(out, rc = PTR_ERR(obj));
3216
3217         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3218         if (rc != 0)
3219                 GOTO(out, rc);
3220
3221         lfsck->li_obj_oit = obj;
3222         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3223                                         S_IFDIR | S_IRUGO | S_IWUSR);
3224         if (IS_ERR(obj))
3225                 GOTO(out, rc = PTR_ERR(obj));
3226
3227         lu_object_get(&obj->do_lu);
3228         lfsck->li_lfsck_dir = obj;
3229         rc = lfsck_bookmark_setup(env, lfsck);
3230         if (rc != 0)
3231                 GOTO(out, rc);
3232
3233         if (master) {
3234                 rc = lfsck_fid_init(lfsck);
3235                 if (rc < 0)
3236                         GOTO(out, rc);
3237
3238                 rc = lfsck_namespace_setup(env, lfsck);
3239                 if (rc < 0)
3240                         GOTO(out, rc);
3241         }
3242
3243         rc = lfsck_layout_setup(env, lfsck);
3244         if (rc < 0)
3245                 GOTO(out, rc);
3246
3247         /* XXX: more LFSCK components initialization to be added here. */
3248
3249         rc = lfsck_instance_add(lfsck);
3250         if (rc == 0)
3251                 rc = lfsck_add_target_from_orphan(env, lfsck);
3252 out:
3253         if (obj != NULL && !IS_ERR(obj))
3254                 lfsck_object_put(env, obj);
3255         if (root != NULL && !IS_ERR(root))
3256                 lfsck_object_put(env, root);
3257         if (rc != 0)
3258                 lfsck_instance_cleanup(env, lfsck);
3259         return rc;
3260 }
3261 EXPORT_SYMBOL(lfsck_register);
3262
3263 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3264 {
3265         struct lfsck_instance *lfsck;
3266
3267         lfsck = lfsck_instance_find(key, false, true);
3268         if (lfsck != NULL)
3269                 lfsck_instance_put(env, lfsck);
3270 }
3271 EXPORT_SYMBOL(lfsck_degister);
3272
3273 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3274                      struct dt_device *tgt, struct obd_export *exp,
3275                      __u32 index, bool for_ost)
3276 {
3277         struct lfsck_instance   *lfsck;
3278         struct lfsck_tgt_desc   *ltd;
3279         int                      rc;
3280         ENTRY;
3281
3282         OBD_ALLOC_PTR(ltd);
3283         if (ltd == NULL)
3284                 RETURN(-ENOMEM);
3285
3286         ltd->ltd_tgt = tgt;
3287         ltd->ltd_key = key;
3288         ltd->ltd_exp = exp;
3289         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3290         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3291         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3292         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3293         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3294         atomic_set(&ltd->ltd_ref, 1);
3295         ltd->ltd_index = index;
3296
3297         spin_lock(&lfsck_instance_lock);
3298         lfsck = __lfsck_instance_find(key, true, false);
3299         if (lfsck == NULL) {
3300                 if (for_ost)
3301                         list_add_tail(&ltd->ltd_orphan_list,
3302                                       &lfsck_ost_orphan_list);
3303                 else
3304                         list_add_tail(&ltd->ltd_orphan_list,
3305                                       &lfsck_mdt_orphan_list);
3306                 spin_unlock(&lfsck_instance_lock);
3307
3308                 RETURN(0);
3309         }
3310         spin_unlock(&lfsck_instance_lock);
3311
3312         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3313         if (rc != 0)
3314                 lfsck_tgt_put(ltd);
3315
3316         lfsck_instance_put(env, lfsck);
3317
3318         RETURN(rc);
3319 }
3320 EXPORT_SYMBOL(lfsck_add_target);
3321
3322 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3323                       struct dt_device *tgt, __u32 index, bool for_ost)
3324 {
3325         struct lfsck_instance   *lfsck;
3326         struct lfsck_tgt_descs  *ltds;
3327         struct lfsck_tgt_desc   *ltd;
3328         struct list_head        *head;
3329
3330         if (for_ost)
3331                 head = &lfsck_ost_orphan_list;
3332         else
3333                 head = &lfsck_mdt_orphan_list;
3334
3335         spin_lock(&lfsck_instance_lock);
3336         list_for_each_entry(ltd, head, ltd_orphan_list) {
3337                 if (ltd->ltd_tgt == tgt) {
3338                         list_del_init(&ltd->ltd_orphan_list);
3339                         spin_unlock(&lfsck_instance_lock);
3340                         lfsck_tgt_put(ltd);
3341
3342                         return;
3343                 }
3344         }
3345
3346         ltd = NULL;
3347         lfsck = __lfsck_instance_find(key, true, false);
3348         spin_unlock(&lfsck_instance_lock);
3349         if (unlikely(lfsck == NULL))
3350                 return;
3351
3352         if (for_ost)
3353                 ltds = &lfsck->li_ost_descs;
3354         else
3355                 ltds = &lfsck->li_mdt_descs;
3356
3357         down_write(&ltds->ltd_rw_sem);
3358         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3359
3360         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3361                 goto unlock;
3362
3363         ltd = LTD_TGT(ltds, index);
3364         if (unlikely(ltd == NULL))
3365                 goto unlock;
3366
3367         LASSERT(ltds->ltd_tgtnr > 0);
3368
3369         ltds->ltd_tgtnr--;
3370         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3371         LTD_TGT(ltds, index) = NULL;
3372
3373 unlock:
3374         if (ltd == NULL) {
3375                 if (for_ost)
3376                         head = &lfsck->li_ost_descs.ltd_orphan;
3377                 else
3378                         head = &lfsck->li_mdt_descs.ltd_orphan;
3379
3380                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3381                         if (ltd->ltd_tgt == tgt) {
3382                                 list_del_init(&ltd->ltd_orphan_list);
3383                                 break;
3384                         }
3385                 }
3386         }
3387
3388         up_write(&ltds->ltd_rw_sem);
3389         if (ltd != NULL) {
3390                 spin_lock(&ltds->ltd_lock);
3391                 ltd->ltd_dead = 1;
3392                 spin_unlock(&ltds->ltd_lock);
3393                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3394                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3395                 lfsck_tgt_put(ltd);
3396         }
3397
3398         lfsck_instance_put(env, lfsck);
3399 }
3400 EXPORT_SYMBOL(lfsck_del_target);
3401
3402 static int __init lfsck_init(void)
3403 {
3404         int rc;
3405
3406         INIT_LIST_HEAD(&lfsck_instance_list);
3407         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3408         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3409         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3410         rc = lu_context_key_register(&lfsck_thread_key);
3411         if (rc == 0) {
3412                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3413                 tgt_register_lfsck_query(lfsck_query);
3414         }
3415
3416         return rc;
3417 }
3418
3419 static void __exit lfsck_exit(void)
3420 {
3421         struct lfsck_tgt_desc *ltd;
3422         struct lfsck_tgt_desc *next;
3423
3424         LASSERT(list_empty(&lfsck_instance_list));
3425
3426         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3427                                  ltd_orphan_list) {
3428                 list_del_init(&ltd->ltd_orphan_list);
3429                 lfsck_tgt_put(ltd);
3430         }
3431
3432         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3433                                  ltd_orphan_list) {
3434                 list_del_init(&ltd->ltd_orphan_list);
3435                 lfsck_tgt_put(ltd);
3436         }
3437
3438         lu_context_key_degister(&lfsck_thread_key);
3439 }
3440
3441 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3442 MODULE_DESCRIPTION("LFSCK");
3443 MODULE_LICENSE("GPL");
3444
3445 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);