Whamcloud - gitweb
12dbeb5c92f40979f991d699775b758060b16f3d
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         NULL
102 };
103
104 enum lfsck_verify_lpf_types {
105         LVLT_BY_BOOKMARK        = 0,
106         LVLT_BY_NAMEENTRY       = 1,
107 };
108
109 const char *lfsck_status2names(enum lfsck_status status)
110 {
111         if (unlikely(status < 0 || status >= LS_MAX))
112                 return "unknown";
113
114         return lfsck_status_names[status];
115 }
116
117 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
118 {
119         spin_lock_init(&ltds->ltd_lock);
120         init_rwsem(&ltds->ltd_rw_sem);
121         INIT_LIST_HEAD(&ltds->ltd_orphan);
122         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
123         if (ltds->ltd_tgts_bitmap == NULL)
124                 return -ENOMEM;
125
126         return 0;
127 }
128
129 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
130 {
131         struct lfsck_tgt_desc   *ltd;
132         struct lfsck_tgt_desc   *next;
133         int                      idx;
134
135         down_write(&ltds->ltd_rw_sem);
136
137         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
138                                  ltd_orphan_list) {
139                 list_del_init(&ltd->ltd_orphan_list);
140                 lfsck_tgt_put(ltd);
141         }
142
143         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
144                 up_write(&ltds->ltd_rw_sem);
145
146                 return;
147         }
148
149         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
150                 ltd = LTD_TGT(ltds, idx);
151                 if (likely(ltd != NULL)) {
152                         LASSERT(list_empty(&ltd->ltd_layout_list));
153                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
154                         LASSERT(list_empty(&ltd->ltd_namespace_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
156
157                         ltds->ltd_tgtnr--;
158                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
159                         LTD_TGT(ltds, idx) = NULL;
160                         lfsck_tgt_put(ltd);
161                 }
162         }
163
164         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
165                  ltds->ltd_tgtnr);
166
167         for (idx = 0; idx < TGT_PTRS; idx++) {
168                 if (ltds->ltd_tgts_idx[idx] != NULL) {
169                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
170                         ltds->ltd_tgts_idx[idx] = NULL;
171                 }
172         }
173
174         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
175         ltds->ltd_tgts_bitmap = NULL;
176         up_write(&ltds->ltd_rw_sem);
177 }
178
179 static int __lfsck_add_target(const struct lu_env *env,
180                               struct lfsck_instance *lfsck,
181                               struct lfsck_tgt_desc *ltd,
182                               bool for_ost, bool locked)
183 {
184         struct lfsck_tgt_descs *ltds;
185         __u32                   index = ltd->ltd_index;
186         int                     rc    = 0;
187         ENTRY;
188
189         if (for_ost)
190                 ltds = &lfsck->li_ost_descs;
191         else
192                 ltds = &lfsck->li_mdt_descs;
193
194         if (!locked)
195                 down_write(&ltds->ltd_rw_sem);
196
197         LASSERT(ltds->ltd_tgts_bitmap != NULL);
198
199         if (index >= ltds->ltd_tgts_bitmap->size) {
200                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
201                                     (__u32)BITS_PER_LONG);
202                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
203                 cfs_bitmap_t *new_bitmap;
204
205                 while (newsize < index + 1)
206                         newsize <<= 1;
207
208                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
209                 if (new_bitmap == NULL)
210                         GOTO(unlock, rc = -ENOMEM);
211
212                 if (ltds->ltd_tgtnr > 0)
213                         cfs_bitmap_copy(new_bitmap, old_bitmap);
214                 ltds->ltd_tgts_bitmap = new_bitmap;
215                 CFS_FREE_BITMAP(old_bitmap);
216         }
217
218         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
219                 CERROR("%s: the device %s (%u) is registered already\n",
220                        lfsck_lfsck2name(lfsck),
221                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
222                 GOTO(unlock, rc = -EEXIST);
223         }
224
225         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
226                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
227                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
228                         GOTO(unlock, rc = -ENOMEM);
229         }
230
231         LTD_TGT(ltds, index) = ltd;
232         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
233         ltds->ltd_tgtnr++;
234
235         GOTO(unlock, rc = 0);
236
237 unlock:
238         if (!locked)
239                 up_write(&ltds->ltd_rw_sem);
240
241         return rc;
242 }
243
244 static int lfsck_add_target_from_orphan(const struct lu_env *env,
245                                         struct lfsck_instance *lfsck)
246 {
247         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
248         struct lfsck_tgt_desc   *ltd;
249         struct lfsck_tgt_desc   *next;
250         struct list_head        *head    = &lfsck_ost_orphan_list;
251         int                      rc;
252         bool                     for_ost = true;
253
254 again:
255         spin_lock(&lfsck_instance_lock);
256         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
257                 if (ltd->ltd_key == lfsck->li_bottom)
258                         list_move_tail(&ltd->ltd_orphan_list,
259                                        &ltds->ltd_orphan);
260         }
261         spin_unlock(&lfsck_instance_lock);
262
263         down_write(&ltds->ltd_rw_sem);
264         while (!list_empty(&ltds->ltd_orphan)) {
265                 ltd = list_entry(ltds->ltd_orphan.next,
266                                  struct lfsck_tgt_desc,
267                                  ltd_orphan_list);
268                 list_del_init(&ltd->ltd_orphan_list);
269                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
270                 /* Do not hold the semaphore for too long time. */
271                 up_write(&ltds->ltd_rw_sem);
272                 if (rc != 0)
273                         return rc;
274
275                 down_write(&ltds->ltd_rw_sem);
276         }
277         up_write(&ltds->ltd_rw_sem);
278
279         if (for_ost) {
280                 ltds = &lfsck->li_mdt_descs;
281                 head = &lfsck_mdt_orphan_list;
282                 for_ost = false;
283                 goto again;
284         }
285
286         return 0;
287 }
288
289 static inline struct lfsck_component *
290 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
291                        struct list_head *list)
292 {
293         struct lfsck_component *com;
294
295         list_for_each_entry(com, list, lc_link) {
296                 if (com->lc_type == type)
297                         return com;
298         }
299         return NULL;
300 }
301
302 struct lfsck_component *
303 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
304 {
305         struct lfsck_component *com;
306
307         spin_lock(&lfsck->li_lock);
308         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
309         if (com != NULL)
310                 goto unlock;
311
312         com = __lfsck_component_find(lfsck, type,
313                                      &lfsck->li_list_double_scan);
314         if (com != NULL)
315                 goto unlock;
316
317         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
318
319 unlock:
320         if (com != NULL)
321                 lfsck_component_get(com);
322         spin_unlock(&lfsck->li_lock);
323         return com;
324 }
325
326 void lfsck_component_cleanup(const struct lu_env *env,
327                              struct lfsck_component *com)
328 {
329         if (!list_empty(&com->lc_link))
330                 list_del_init(&com->lc_link);
331         if (!list_empty(&com->lc_link_dir))
332                 list_del_init(&com->lc_link_dir);
333
334         lfsck_component_put(env, com);
335 }
336
337 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
338                     struct lu_fid *fid, bool locked)
339 {
340         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
341         int                      rc = 0;
342         ENTRY;
343
344         if (!locked)
345                 mutex_lock(&lfsck->li_mutex);
346
347         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
348         if (rc >= 0) {
349                 bk->lb_last_fid = *fid;
350                 /* We do not care about whether the subsequent sub-operations
351                  * failed or not. The worst case is that one FID is lost that
352                  * is not a big issue for the LFSCK since it is relative rare
353                  * for LFSCK create. */
354                 rc = lfsck_bookmark_store(env, lfsck);
355         }
356
357         if (!locked)
358                 mutex_unlock(&lfsck->li_mutex);
359
360         RETURN(rc);
361 }
362
363 /**
364  * Request the specified ibits lock for the given object.
365  *
366  * Before the LFSCK modifying on the namespace visible object,
367  * it needs to acquire related ibits ldlm lock.
368  *
369  * \param[in] env       pointer to the thread context
370  * \param[in] lfsck     pointer to the lfsck instance
371  * \param[in] obj       pointer to the dt_object to be locked
372  * \param[out] lh       pointer to the lock handle
373  * \param[in] ibits     the bits for the ldlm lock to be acquired
374  * \param[in] mode      the mode for the ldlm lock to be acquired
375  *
376  * \retval              0 for success
377  * \retval              negative error number on failure
378  */
379 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
380                      struct dt_object *obj, struct lustre_handle *lh,
381                      __u64 bits, ldlm_mode_t mode)
382 {
383         struct lfsck_thread_info        *info   = lfsck_env_info(env);
384         ldlm_policy_data_t              *policy = &info->lti_policy;
385         struct ldlm_res_id              *resid  = &info->lti_resid;
386         __u64                            flags  = LDLM_FL_ATOMIC_CB;
387         int                              rc;
388
389         LASSERT(lfsck->li_namespace != NULL);
390
391         memset(policy, 0, sizeof(*policy));
392         policy->l_inodebits.bits = bits;
393         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
394         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
395                                     policy, mode, &flags, ldlm_blocking_ast,
396                                     ldlm_completion_ast, NULL, NULL, 0,
397                                     LVB_T_NONE, NULL, lh);
398         if (rc == ELDLM_OK) {
399                 rc = 0;
400         } else {
401                 memset(lh, 0, sizeof(*lh));
402                 rc = -EIO;
403         }
404
405         return rc;
406 }
407
408 /**
409  * Release the the specified ibits lock.
410  *
411  * If the lock has been acquired before, release it
412  * and cleanup the handle. Otherwise, do nothing.
413  *
414  * \param[in] lh        pointer to the lock handle
415  * \param[in] mode      the mode for the ldlm lock to be released
416  */
417 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
418 {
419         if (lustre_handle_is_used(lh)) {
420                 ldlm_lock_decref(lh, mode);
421                 memset(lh, 0, sizeof(*lh));
422         }
423 }
424
425 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
426                               struct lfsck_instance *lfsck,
427                               const struct lu_fid *fid)
428 {
429         struct seq_server_site  *ss     =
430                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
431         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
432         int                      rc;
433
434         fld_range_set_mdt(range);
435         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
436         if (rc == 0)
437                 rc = range->lsr_index;
438
439         return rc;
440 }
441
442 const char dot[] = ".";
443 const char dotdot[] = "..";
444 static const char dotlustre[] = ".lustre";
445 static const char lostfound[] = "lost+found";
446
447 static int lfsck_create_lpf_local(const struct lu_env *env,
448                                   struct lfsck_instance *lfsck,
449                                   struct dt_object *parent,
450                                   struct dt_object *child,
451                                   struct lu_attr *la,
452                                   struct dt_object_format *dof,
453                                   const char *name)
454 {
455         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
456         struct dt_device        *dev    = lfsck->li_bottom;
457         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
458         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
459         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
460         struct thandle          *th     = NULL;
461         struct linkea_data       ldata  = { 0 };
462         struct lu_buf            linkea_buf;
463         const struct lu_name    *cname;
464         loff_t                   pos    = 0;
465         int                      len    = sizeof(struct lfsck_bookmark);
466         int                      rc;
467         ENTRY;
468
469         rc = linkea_data_new(&ldata,
470                              &lfsck_env_info(env)->lti_linkea_buf2);
471         if (rc != 0)
472                 RETURN(rc);
473
474         cname = lfsck_name_get_const(env, name, strlen(name));
475         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
476         if (rc != 0)
477                 RETURN(rc);
478
479         th = dt_trans_create(env, dev);
480         if (IS_ERR(th))
481                 RETURN(PTR_ERR(th));
482
483         /* 1a. create child */
484         rc = dt_declare_create(env, child, la, NULL, dof, th);
485         if (rc != 0)
486                 GOTO(stop, rc);
487
488         /* 2a. increase child nlink */
489         rc = dt_declare_ref_add(env, child, th);
490         if (rc != 0)
491                 GOTO(stop, rc);
492
493         /* 3a. insert linkEA for child */
494         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
495                        ldata.ld_leh->leh_len);
496         rc = dt_declare_xattr_set(env, child, &linkea_buf,
497                                   XATTR_NAME_LINK, 0, th);
498         if (rc != 0)
499                 GOTO(stop, rc);
500
501         /* 4a. insert name into parent dir */
502         rec->rec_type = S_IFDIR;
503         rec->rec_fid = cfid;
504         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
505                                (const struct dt_key *)name, th);
506         if (rc != 0)
507                 GOTO(stop, rc);
508
509         /* 5a. increase parent nlink */
510         rc = dt_declare_ref_add(env, parent, th);
511         if (rc != 0)
512                 GOTO(stop, rc);
513
514         /* 6a. update bookmark */
515         rc = dt_declare_record_write(env, bk_obj,
516                                      lfsck_buf_get(env, bk, len), 0, th);
517         if (rc != 0)
518                 GOTO(stop, rc);
519
520         rc = dt_trans_start_local(env, dev, th);
521         if (rc != 0)
522                 GOTO(stop, rc);
523
524         dt_write_lock(env, child, 0);
525         /* 1b.1. create child */
526         rc = dt_create(env, child, la, NULL, dof, th);
527         if (rc != 0)
528                 GOTO(unlock, rc);
529
530         if (unlikely(!dt_try_as_dir(env, child)))
531                 GOTO(unlock, rc = -ENOTDIR);
532
533         /* 1b.2. insert dot into child dir */
534         rec->rec_fid = cfid;
535         rc = dt_insert(env, child, (const struct dt_rec *)rec,
536                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
537         if (rc != 0)
538                 GOTO(unlock, rc);
539
540         /* 1b.3. insert dotdot into child dir */
541         rec->rec_fid = &LU_LPF_FID;
542         rc = dt_insert(env, child, (const struct dt_rec *)rec,
543                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
544         if (rc != 0)
545                 GOTO(unlock, rc);
546
547         /* 2b. increase child nlink */
548         rc = dt_ref_add(env, child, th);
549         if (rc != 0)
550                 GOTO(unlock, rc);
551
552         /* 3b. insert linkEA for child. */
553         rc = dt_xattr_set(env, child, &linkea_buf,
554                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
555         dt_write_unlock(env, child);
556         if (rc != 0)
557                 GOTO(stop, rc);
558
559         /* 4b. insert name into parent dir */
560         rec->rec_fid = cfid;
561         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
562                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
563         if (rc != 0)
564                 GOTO(stop, rc);
565
566         dt_write_lock(env, parent, 0);
567         /* 5b. increase parent nlink */
568         rc = dt_ref_add(env, parent, th);
569         dt_write_unlock(env, parent);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         bk->lb_lpf_fid = *cfid;
574         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
575
576         /* 6b. update bookmark */
577         rc = dt_record_write(env, bk_obj,
578                              lfsck_buf_get(env, bk, len), &pos, th);
579
580         GOTO(stop, rc);
581
582 unlock:
583         dt_write_unlock(env, child);
584
585 stop:
586         dt_trans_stop(env, dev, th);
587
588         return rc;
589 }
590
591 static int lfsck_create_lpf_remote(const struct lu_env *env,
592                                    struct lfsck_instance *lfsck,
593                                    struct dt_object *parent,
594                                    struct dt_object *child,
595                                    struct lu_attr *la,
596                                    struct dt_object_format *dof,
597                                    const char *name)
598 {
599         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
600         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
601         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
602         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
603         struct thandle          *th     = NULL;
604         struct linkea_data       ldata  = { 0 };
605         struct lu_buf            linkea_buf;
606         const struct lu_name    *cname;
607         struct dt_device        *dev;
608         loff_t                   pos    = 0;
609         int                      len    = sizeof(struct lfsck_bookmark);
610         int                      rc;
611         ENTRY;
612
613         rc = linkea_data_new(&ldata,
614                              &lfsck_env_info(env)->lti_linkea_buf2);
615         if (rc != 0)
616                 RETURN(rc);
617
618         cname = lfsck_name_get_const(env, name, strlen(name));
619         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
620         if (rc != 0)
621                 RETURN(rc);
622
623         /* Create .lustre/lost+found/MDTxxxx. */
624
625         /* XXX: Currently, cross-MDT create operation needs to create the child
626          *      object firstly, then insert name into the parent directory. For
627          *      this case, the child object resides on current MDT (local), but
628          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
629          *      easy to contain all the sub-modifications orderly within single
630          *      transaction.
631          *
632          *      To avoid more inconsistency, we split the create operation into
633          *      two transactions:
634          *
635          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
636          *         locally.
637          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
638          *         remotely.
639          *
640          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
641          *      repair such inconsistency when LFSCK run next time. */
642
643         /* Transaction I: locally */
644
645         dev = lfsck->li_bottom;
646         th = dt_trans_create(env, dev);
647         if (IS_ERR(th))
648                 RETURN(PTR_ERR(th));
649
650         /* 1a. create child */
651         rc = dt_declare_create(env, child, la, NULL, dof, th);
652         if (rc != 0)
653                 GOTO(stop, rc);
654
655         /* 2a. increase child nlink */
656         rc = dt_declare_ref_add(env, child, th);
657         if (rc != 0)
658                 GOTO(stop, rc);
659
660         /* 3a. insert linkEA for child */
661         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
662                        ldata.ld_leh->leh_len);
663         rc = dt_declare_xattr_set(env, child, &linkea_buf,
664                                   XATTR_NAME_LINK, 0, th);
665         if (rc != 0)
666                 GOTO(stop, rc);
667
668         /* 4a. update bookmark */
669         rc = dt_declare_record_write(env, bk_obj,
670                                      lfsck_buf_get(env, bk, len), 0, th);
671         if (rc != 0)
672                 GOTO(stop, rc);
673
674         rc = dt_trans_start_local(env, dev, th);
675         if (rc != 0)
676                 GOTO(stop, rc);
677
678         dt_write_lock(env, child, 0);
679         /* 1b.1. create child */
680         rc = dt_create(env, child, la, NULL, dof, th);
681         if (rc != 0)
682                 GOTO(unlock, rc);
683
684         if (unlikely(!dt_try_as_dir(env, child)))
685                 GOTO(unlock, rc = -ENOTDIR);
686
687         /* 1b.2. insert dot into child dir */
688         rec->rec_type = S_IFDIR;
689         rec->rec_fid = cfid;
690         rc = dt_insert(env, child, (const struct dt_rec *)rec,
691                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
692         if (rc != 0)
693                 GOTO(unlock, rc);
694
695         /* 1b.3. insert dotdot into child dir */
696         rec->rec_fid = &LU_LPF_FID;
697         rc = dt_insert(env, child, (const struct dt_rec *)rec,
698                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
699         if (rc != 0)
700                 GOTO(unlock, rc);
701
702         /* 2b. increase child nlink */
703         rc = dt_ref_add(env, child, th);
704         if (rc != 0)
705                 GOTO(unlock, rc);
706
707         /* 3b. insert linkEA for child */
708         rc = dt_xattr_set(env, child, &linkea_buf,
709                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
710         if (rc != 0)
711                 GOTO(unlock, rc);
712
713         bk->lb_lpf_fid = *cfid;
714         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
715
716         /* 4b. update bookmark */
717         rc = dt_record_write(env, bk_obj,
718                              lfsck_buf_get(env, bk, len), &pos, th);
719
720         dt_write_unlock(env, child);
721         dt_trans_stop(env, dev, th);
722         if (rc != 0)
723                 RETURN(rc);
724
725         /* Transaction II: remotely */
726
727         dev = lfsck->li_next;
728         th = dt_trans_create(env, dev);
729         if (IS_ERR(th))
730                 RETURN(PTR_ERR(th));
731
732         /* 5a. insert name into parent dir */
733         rec->rec_fid = cfid;
734         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
735                                (const struct dt_key *)name, th);
736         if (rc != 0)
737                 GOTO(stop, rc);
738
739         /* 6a. increase parent nlink */
740         rc = dt_declare_ref_add(env, parent, th);
741         if (rc != 0)
742                 GOTO(stop, rc);
743
744         rc = dt_trans_start(env, dev, th);
745         if (rc != 0)
746                 GOTO(stop, rc);
747
748         /* 5b. insert name into parent dir */
749         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
750                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
751         if (rc != 0)
752                 GOTO(stop, rc);
753
754         dt_write_lock(env, parent, 0);
755         /* 6b. increase parent nlink */
756         rc = dt_ref_add(env, parent, th);
757         dt_write_unlock(env, parent);
758
759         GOTO(stop, rc);
760
761 unlock:
762         dt_write_unlock(env, child);
763 stop:
764         dt_trans_stop(env, dev, th);
765
766         if (rc != 0 && dev == lfsck->li_next)
767                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
768                        "for orphans, but failed to insert the name %s "
769                        "to the .lustre/lost+found/. Such inconsistency "
770                        "will be repaired when LFSCK run next time: rc = %d\n",
771                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
772
773         return rc;
774 }
775
776 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
777  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
778  * only when it is required, such as orphan OST-objects repairing. */
779 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
780 {
781         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
782         struct lfsck_thread_info *info  = lfsck_env_info(env);
783         struct lu_fid            *cfid  = &info->lti_fid2;
784         struct lu_attr           *la    = &info->lti_la;
785         struct dt_object_format  *dof   = &info->lti_dof;
786         struct dt_object         *parent = NULL;
787         struct dt_object         *child = NULL;
788         struct lustre_handle      lh    = { 0 };
789         char                      name[8];
790         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
791         int                       rc    = 0;
792         ENTRY;
793
794         LASSERT(lfsck->li_master);
795
796         sprintf(name, "MDT%04x", node);
797         if (node == 0) {
798                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
799                                                   &LU_LPF_FID);
800         } else {
801                 struct lfsck_tgt_desc *ltd;
802
803                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
804                 if (unlikely(ltd == NULL))
805                         RETURN(-ENXIO);
806
807                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
808                                                   &LU_LPF_FID);
809                 lfsck_tgt_put(ltd);
810         }
811         if (IS_ERR(parent))
812                 RETURN(PTR_ERR(parent));
813
814         if (lfsck->li_lpf_obj != NULL)
815                 GOTO(out, rc = 0);
816
817         if (unlikely(!dt_try_as_dir(env, parent)))
818                 GOTO(out, rc = -ENOTDIR);
819
820         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
821                               MDS_INODELOCK_UPDATE, LCK_EX);
822         if (rc != 0)
823                 GOTO(out, rc);
824
825         mutex_lock(&lfsck->li_mutex);
826         if (lfsck->li_lpf_obj != NULL)
827                 GOTO(unlock, rc = 0);
828
829         if (fid_is_zero(&bk->lb_lpf_fid)) {
830                 /* There is corner case that: in former LFSCK scanning we have
831                  * created the .lustre/lost+found/MDTxxxx but failed to update
832                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
833                  * it from MDT0 firstly. */
834                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
835                                (const struct dt_key *)name, BYPASS_CAPA);
836                 if (rc != 0 && rc != -ENOENT)
837                         GOTO(unlock, rc);
838
839                 if (rc == 0) {
840                         bk->lb_lpf_fid = *cfid;
841                         rc = lfsck_bookmark_store(env, lfsck);
842                 } else {
843                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
844                 }
845                 if (rc != 0)
846                         GOTO(unlock, rc);
847         } else {
848                 *cfid = bk->lb_lpf_fid;
849         }
850
851         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
852         if (IS_ERR(child))
853                 GOTO(unlock, rc = PTR_ERR(child));
854
855         if (dt_object_exists(child) != 0) {
856                 if (unlikely(!dt_try_as_dir(env, child)))
857                         rc = -ENOTDIR;
858                 else
859                         lfsck->li_lpf_obj = child;
860
861                 GOTO(unlock, rc);
862         }
863
864         memset(la, 0, sizeof(*la));
865         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
866         la->la_mode = S_IFDIR | S_IRWXU;
867         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
868                        LA_UID | LA_GID;
869         memset(dof, 0, sizeof(*dof));
870         dof->dof_type = dt_mode_to_dft(S_IFDIR);
871
872         if (node == 0)
873                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
874                                             dof, name);
875         else
876                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
877                                              dof, name);
878         if (rc == 0)
879                 lfsck->li_lpf_obj = child;
880
881         GOTO(unlock, rc);
882
883 unlock:
884         mutex_unlock(&lfsck->li_mutex);
885         lfsck_ibits_unlock(&lh, LCK_EX);
886         if (rc != 0 && child != NULL && !IS_ERR(child))
887                 lu_object_put(env, &child->do_lu);
888 out:
889         if (parent != NULL && !IS_ERR(parent))
890                 lu_object_put(env, &parent->do_lu);
891
892         return rc;
893 }
894
895 /**
896  * Scan .lustre/lost+found for bad name entries and remove them.
897  *
898  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
899  * index in the system. Any other formatted name is invalid and should be
900  * removed.
901  *
902  * \param[in] env       pointer to the thread context
903  * \param[in] lfsck     pointer to the lfsck instance
904  * \param[in] parent    pointer to the lost+found object
905  *
906  * \retval              0 for success
907  * \retval              negative error number on failure
908  */
909 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
910                                       struct lfsck_instance *lfsck,
911                                       struct dt_object *parent)
912 {
913         struct lu_dirent        *ent    =
914                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
915         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
916         struct dt_it            *it;
917         int                      rc;
918         ENTRY;
919
920         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
921         if (IS_ERR(it))
922                 RETURN(PTR_ERR(it));
923
924         rc = iops->load(env, it, 0);
925         if (rc == 0)
926                 rc = iops->next(env, it);
927         else if (rc > 0)
928                 rc = 0;
929
930         while (rc == 0) {
931                 int off = 3;
932
933                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
934                 if (rc != 0)
935                         break;
936
937                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
938                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
939                         goto next;
940
941                 /* name length must be strlen("MDTxxxx") */
942                 if (ent->lde_namelen != 7)
943                         goto remove;
944
945                 if (memcmp(ent->lde_name, "MDT", off) != 0)
946                         goto remove;
947
948                 while (off < 7 && isxdigit(ent->lde_name[off]))
949                         off++;
950
951                 if (off != 7) {
952
953 remove:
954                         rc = lfsck_remove_name_entry(env, lfsck, parent,
955                                                      ent->lde_name, S_IFDIR);
956                         if (rc != 0)
957                                 break;
958                 }
959
960 next:
961                 rc = iops->next(env, it);
962         }
963
964         iops->put(env, it);
965         iops->fini(env, it);
966
967         RETURN(rc > 0 ? 0 : rc);
968 }
969
970 static int lfsck_update_lpf_entry(const struct lu_env *env,
971                                   struct lfsck_instance *lfsck,
972                                   struct dt_object *parent,
973                                   struct dt_object *child,
974                                   const char *name,
975                                   enum lfsck_verify_lpf_types type)
976 {
977         int rc;
978
979         if (type == LVLT_BY_BOOKMARK) {
980                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
981                                              lfsck_dto2fid(child), S_IFDIR);
982         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
983                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
984                 rc = lfsck_bookmark_store(env, lfsck);
985
986                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
987                        " in the bookmark file: rc = %d\n",
988                        lfsck_lfsck2name(lfsck),
989                        PFID(lfsck_dto2fid(child)), rc);
990         }
991
992         return rc;
993 }
994
995 /**
996  * Check whether the @child back references the @parent.
997  *
998  * Two cases:
999  * 1) The child's FID is stored in the bookmark file. If the child back
1000  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1001  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1002  *    the child back references another parent2, then:
1003  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1004  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1005  *      references the child. So keep them there. As the LFSCK processing,
1006  *      the parent3 may be found, then when the LFSCK run next time, the
1007  *      inconsistency can be repaired.
1008  *
1009  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1010  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1011  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1012  *    back references another parent2, then:
1013  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1014  *      from .lustre/lost+found/;
1015  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1016  *      sub-directory name entry and update the child;
1017  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1018  *      or not, then keep them there.
1019  *
1020  * \param[in] env       pointer to the thread context
1021  * \param[in] lfsck     pointer to the lfsck instance
1022  * \param[in] parent    pointer to the lost+found object
1023  * \param[in] child     pointer to the lost+found sub-directory object
1024  * \param[in] name      the name for lost+found sub-directory object
1025  * \param[out] fid      pointer to the buffer to hold the FID of the object
1026  *                      (called it as parent2) that is referenced via the
1027  *                      child's dotdot entry; it also can be the FID that
1028  *                      is referenced by the name entry under the parent2.
1029  * \param[in] type      to indicate where the child's FID is stored in
1030  *
1031  * \retval              positive number for uncertain inconsistency
1032  * \retval              0 for success
1033  * \retval              negative error number on failure
1034  */
1035 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1036                                   struct lfsck_instance *lfsck,
1037                                   struct dt_object *parent,
1038                                   struct dt_object *child, const char *name,
1039                                   struct lu_fid *fid,
1040                                   enum lfsck_verify_lpf_types type)
1041 {
1042         struct lfsck_thread_info *info    = lfsck_env_info(env);
1043         char                     *name2   = info->lti_key;
1044         struct lu_fid            *fid2    = &info->lti_fid3;
1045         struct dt_object         *parent2 = NULL;
1046         struct lustre_handle      lh      = { 0 };
1047         int                       rc;
1048         ENTRY;
1049
1050         fid_zero(fid);
1051         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1052                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1053         if (rc != 0)
1054                 GOTO(linkea, rc);
1055
1056         if (!fid_is_sane(fid))
1057                 GOTO(linkea, rc = -EINVAL);
1058
1059         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1060                 const struct lu_name *cname;
1061
1062                 if (lfsck->li_lpf_obj == NULL) {
1063                         lu_object_get(&child->do_lu);
1064                         lfsck->li_lpf_obj = child;
1065                 }
1066
1067                 cname = lfsck_name_get_const(env, name, strlen(name));
1068                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1069                                          &LU_LPF_FID);
1070                 if (rc == 0)
1071                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1072                                                     name, type);
1073
1074                 GOTO(out_done, rc);
1075         }
1076
1077         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1078         if (IS_ERR(parent2))
1079                 GOTO(linkea, parent2);
1080
1081         if (!dt_object_exists(parent2)) {
1082                 lu_object_put(env, &parent2->do_lu);
1083
1084                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1085         }
1086
1087         if (!dt_try_as_dir(env, parent2)) {
1088                 lu_object_put(env, &parent2->do_lu);
1089
1090                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1091         }
1092
1093 linkea:
1094         /* To prevent rename/unlink race */
1095         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1096                               MDS_INODELOCK_UPDATE, LCK_PR);
1097         if (rc != 0)
1098                 GOTO(out_put, rc);
1099
1100         dt_read_lock(env, child, 0);
1101         rc = lfsck_links_get_first(env, child, name2, fid2);
1102         if (rc != 0) {
1103                 dt_read_unlock(env, child);
1104                 lfsck_ibits_unlock(&lh, LCK_PR);
1105
1106                 GOTO(out_put, rc = 1);
1107         }
1108
1109         /* It is almost impossible that the bookmark file (or the name entry)
1110          * and the linkEA hit the same data corruption. Trust the linkEA. */
1111         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1112                 dt_read_unlock(env, child);
1113                 lfsck_ibits_unlock(&lh, LCK_PR);
1114
1115                 *fid = *fid2;
1116                 if (lfsck->li_lpf_obj == NULL) {
1117                         lu_object_get(&child->do_lu);
1118                         lfsck->li_lpf_obj = child;
1119                 }
1120
1121                 /* Update the child's dotdot entry */
1122                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1123                                              &LU_LPF_FID, S_IFDIR);
1124                 if (rc == 0)
1125                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1126                                                     name, type);
1127
1128                 GOTO(out_put, rc);
1129         }
1130
1131         if (parent2 == NULL || IS_ERR(parent2)) {
1132                 dt_read_unlock(env, child);
1133                 lfsck_ibits_unlock(&lh, LCK_PR);
1134
1135                 GOTO(out_done, rc = 1);
1136         }
1137
1138         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1139                        (const struct dt_key *)name2, BYPASS_CAPA);
1140         dt_read_unlock(env, child);
1141         lfsck_ibits_unlock(&lh, LCK_PR);
1142         if (rc != 0 && rc != -ENOENT)
1143                 GOTO(out_put, rc);
1144
1145         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1146                 if (type == LVLT_BY_BOOKMARK)
1147                         GOTO(out_put, rc = 1);
1148
1149                 /* Trust the name entry, update the child's dotdot entry. */
1150                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1151                                              &LU_LPF_FID, S_IFDIR);
1152
1153                 GOTO(out_put, rc);
1154         }
1155
1156         if (type == LVLT_BY_BOOKMARK) {
1157                 /* Invalid FID record in the bookmark file, reset it. */
1158                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1159                 rc = lfsck_bookmark_store(env, lfsck);
1160
1161                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1162                        " in the bookmark file: rc = %d\n",
1163                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1164         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1165                 /* The name entry is wrong, remove it. */
1166                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1167         }
1168
1169         GOTO(out_put, rc);
1170
1171 out_put:
1172         if (parent2 != NULL && !IS_ERR(parent2))
1173                 lu_object_put(env, &parent2->do_lu);
1174
1175 out_done:
1176         return rc;
1177 }
1178
1179 /**
1180  * Verify the /ROOT/.lustre/lost+found/ directory.
1181  *
1182  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1183  * the LFSCK does not exactly know how to handle, such as orphans. So before
1184  * the LFSCK scanning the system, the consistency of such directory needs to
1185  * be verified firstly to allow the users to use it during the LFSCK.
1186  *
1187  * \param[in] env       pointer to the thread context
1188  * \param[in] lfsck     pointer to the lfsck instance
1189  *
1190  * \retval              positive number for uncertain inconsistency
1191  * \retval              0 for success
1192  * \retval              negative error number on failure
1193  */
1194 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1195 {
1196         struct lfsck_thread_info *info   = lfsck_env_info(env);
1197         struct lu_fid            *pfid   = &info->lti_fid;
1198         struct lu_fid            *cfid   = &info->lti_fid2;
1199         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1200         struct dt_object         *parent = NULL;
1201         /* child1's FID is in the bookmark file. */
1202         struct dt_object         *child1 = NULL;
1203         /* child2's FID is in the name entry MDTxxxx. */
1204         struct dt_object         *child2 = NULL;
1205         struct dt_device         *dev    = lfsck->li_bottom;
1206         const struct lu_name     *cname;
1207         char                      name[8];
1208         int                       node   = lfsck_dev_idx(dev);
1209         int                       rc     = 0;
1210         ENTRY;
1211
1212         LASSERT(lfsck->li_master);
1213
1214         if (node == 0) {
1215                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1216         } else {
1217                 struct lfsck_tgt_desc *ltd;
1218
1219                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1220                 if (unlikely(ltd == NULL))
1221                         RETURN(-ENXIO);
1222
1223                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1224                                                   &LU_LPF_FID);
1225                 lfsck_tgt_put(ltd);
1226         }
1227
1228         if (IS_ERR(parent))
1229                 RETURN(PTR_ERR(parent));
1230
1231         LASSERT(dt_object_exists(parent));
1232
1233         if (unlikely(!dt_try_as_dir(env, parent)))
1234                 GOTO(put, rc = -ENOTDIR);
1235
1236         if (node == 0) {
1237                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1238                 if (rc != 0)
1239                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1240                                "for bad sub-directories: rc = %d\n",
1241                                lfsck_lfsck2name(lfsck), rc);
1242         }
1243
1244         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1245                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1246                         struct lu_fid tfid = bk->lb_lpf_fid;
1247
1248                         /* Invalid FID record in the bookmark file, reset it. */
1249                         fid_zero(&bk->lb_lpf_fid);
1250                         rc = lfsck_bookmark_store(env, lfsck);
1251
1252                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1253                                " in the bookmark file: rc = %d\n",
1254                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1255
1256                         if (rc != 0)
1257                                 GOTO(put, rc);
1258                 } else {
1259                         child1 = lfsck_object_find_by_dev(env, dev,
1260                                                           &bk->lb_lpf_fid);
1261                         if (IS_ERR(child1))
1262                                 GOTO(put, rc = PTR_ERR(child1));
1263
1264                         if (unlikely(!dt_object_exists(child1) ||
1265                                      dt_object_remote(child1)) ||
1266                                      !S_ISDIR(lfsck_object_type(child1))) {
1267                                 /* Invalid FID record in the bookmark file,
1268                                  * reset it. */
1269                                 fid_zero(&bk->lb_lpf_fid);
1270                                 rc = lfsck_bookmark_store(env, lfsck);
1271
1272                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1273                                        " in the bookmark file: rc = %d\n",
1274                                        lfsck_lfsck2name(lfsck),
1275                                        PFID(lfsck_dto2fid(child1)), rc);
1276
1277                                 if (rc != 0)
1278                                         GOTO(put, rc);
1279
1280                                 lu_object_put(env, &child1->do_lu);
1281                                 child1 = NULL;
1282                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1283                                 GOTO(put, rc = -ENOTDIR);
1284                         }
1285                 }
1286         }
1287
1288         snprintf(name, 8, "MDT%04x", node);
1289         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1290                        (const struct dt_key *)name, BYPASS_CAPA);
1291         if (rc == -ENOENT) {
1292                 if (!fid_is_zero(&bk->lb_lpf_fid))
1293                         goto check_child1;
1294
1295                 GOTO(put, rc = 0);
1296         }
1297
1298         if (rc != 0)
1299                 GOTO(put, rc);
1300
1301         /* Invalid FID in the name entry, remove the name entry. */
1302         if (!fid_is_norm(cfid)) {
1303                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1304                 if (rc != 0)
1305                         GOTO(put, rc);
1306
1307                 goto check_child1;
1308         }
1309
1310         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1311         if (IS_ERR(child2))
1312                 GOTO(put, rc = PTR_ERR(child2));
1313
1314         if (unlikely(!dt_object_exists(child2) ||
1315                      dt_object_remote(child2)) ||
1316                      !S_ISDIR(lfsck_object_type(child2))) {
1317                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1318                                              S_IFDIR);
1319                 if (rc != 0)
1320                         GOTO(put, rc);
1321
1322                 goto check_child1;
1323         }
1324
1325         if (unlikely(!dt_try_as_dir(env, child2)))
1326                 GOTO(put, rc = -ENOTDIR);
1327
1328         if (child1 == NULL) {
1329                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1330                                             pfid, LVLT_BY_NAMEENTRY);
1331         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1332                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1333                                             pfid, LVLT_BY_BOOKMARK);
1334                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1335                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1336                                                     name, pfid,
1337                                                     LVLT_BY_NAMEENTRY);
1338         } else {
1339                 if (lfsck->li_lpf_obj == NULL) {
1340                         lu_object_get(&child2->do_lu);
1341                         lfsck->li_lpf_obj = child2;
1342                 }
1343
1344                 cname = lfsck_name_get_const(env, name, strlen(name));
1345                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1346         }
1347
1348         GOTO(put, rc);
1349
1350 check_child1:
1351         if (child1 != NULL)
1352                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1353                                             pfid, LVLT_BY_BOOKMARK);
1354
1355         GOTO(put, rc);
1356
1357 put:
1358         if (lfsck->li_lpf_obj != NULL &&
1359             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1360                 rc = -ENOTDIR;
1361
1362         if (child2 != NULL && !IS_ERR(child2))
1363                 lu_object_put(env, &child2->do_lu);
1364         if (child1 != NULL && !IS_ERR(child1))
1365                 lu_object_put(env, &child1->do_lu);
1366         if (parent != NULL && !IS_ERR(parent))
1367                 lu_object_put(env, &parent->do_lu);
1368
1369         return rc;
1370 }
1371
1372 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1373 {
1374         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1375         struct seq_server_site  *ss;
1376         char                    *prefix;
1377         int                      rc     = 0;
1378         ENTRY;
1379
1380         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1381         if (unlikely(ss == NULL))
1382                 RETURN(-ENXIO);
1383
1384         OBD_ALLOC_PTR(lfsck->li_seq);
1385         if (lfsck->li_seq == NULL)
1386                 RETURN(-ENOMEM);
1387
1388         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1389         if (prefix == NULL)
1390                 GOTO(out, rc = -ENOMEM);
1391
1392         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1393         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1394                              ss->ss_server_seq);
1395         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1396         if (rc != 0)
1397                 GOTO(out, rc);
1398
1399         if (fid_is_sane(&bk->lb_last_fid))
1400                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1401
1402         RETURN(0);
1403
1404 out:
1405         OBD_FREE_PTR(lfsck->li_seq);
1406         lfsck->li_seq = NULL;
1407
1408         return rc;
1409 }
1410
1411 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1412 {
1413         if (lfsck->li_seq != NULL) {
1414                 seq_client_fini(lfsck->li_seq);
1415                 OBD_FREE_PTR(lfsck->li_seq);
1416                 lfsck->li_seq = NULL;
1417         }
1418 }
1419
1420 void lfsck_instance_cleanup(const struct lu_env *env,
1421                             struct lfsck_instance *lfsck)
1422 {
1423         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1424         struct lfsck_component  *com;
1425         struct lfsck_component  *next;
1426         ENTRY;
1427
1428         LASSERT(list_empty(&lfsck->li_link));
1429         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1430
1431         if (lfsck->li_obj_oit != NULL) {
1432                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1433                 lfsck->li_obj_oit = NULL;
1434         }
1435
1436         LASSERT(lfsck->li_obj_dir == NULL);
1437
1438         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1439                 lfsck_component_cleanup(env, com);
1440         }
1441
1442         LASSERT(list_empty(&lfsck->li_list_dir));
1443
1444         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1445                                  lc_link) {
1446                 lfsck_component_cleanup(env, com);
1447         }
1448
1449         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1450                 lfsck_component_cleanup(env, com);
1451         }
1452
1453         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1454         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1455
1456         if (lfsck->li_bookmark_obj != NULL) {
1457                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1458                 lfsck->li_bookmark_obj = NULL;
1459         }
1460
1461         if (lfsck->li_lpf_obj != NULL) {
1462                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1463                 lfsck->li_lpf_obj = NULL;
1464         }
1465
1466         if (lfsck->li_los != NULL) {
1467                 local_oid_storage_fini(env, lfsck->li_los);
1468                 lfsck->li_los = NULL;
1469         }
1470
1471         lfsck_fid_fini(lfsck);
1472
1473         OBD_FREE_PTR(lfsck);
1474 }
1475
1476 static inline struct lfsck_instance *
1477 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1478 {
1479         struct lfsck_instance *lfsck;
1480
1481         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1482                 if (lfsck->li_bottom == key) {
1483                         if (ref)
1484                                 lfsck_instance_get(lfsck);
1485                         if (unlink)
1486                                 list_del_init(&lfsck->li_link);
1487
1488                         return lfsck;
1489                 }
1490         }
1491
1492         return NULL;
1493 }
1494
1495 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1496                                            bool unlink)
1497 {
1498         struct lfsck_instance *lfsck;
1499
1500         spin_lock(&lfsck_instance_lock);
1501         lfsck = __lfsck_instance_find(key, ref, unlink);
1502         spin_unlock(&lfsck_instance_lock);
1503
1504         return lfsck;
1505 }
1506
1507 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1508 {
1509         struct lfsck_instance *tmp;
1510
1511         spin_lock(&lfsck_instance_lock);
1512         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1513                 if (lfsck->li_bottom == tmp->li_bottom) {
1514                         spin_unlock(&lfsck_instance_lock);
1515                         return -EEXIST;
1516                 }
1517         }
1518
1519         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1520         spin_unlock(&lfsck_instance_lock);
1521         return 0;
1522 }
1523
1524 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1525                     const char *prefix)
1526 {
1527         int flag;
1528         int i;
1529         bool newline = (bits != 0 ? false : true);
1530
1531         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1532
1533         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1534                 if (flag & bits) {
1535                         bits &= ~flag;
1536                         if (names[i] != NULL) {
1537                                 if (bits == 0)
1538                                         newline = true;
1539
1540                                 seq_printf(m, "%s%c", names[i],
1541                                            newline ? '\n' : ',');
1542                         }
1543                 }
1544         }
1545
1546         if (!newline)
1547                 seq_printf(m, "\n");
1548         return 0;
1549 }
1550
1551 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1552 {
1553         if (time != 0)
1554                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1555                           cfs_time_current_sec() - time);
1556         else
1557                 seq_printf(m, "%s: N/A\n", prefix);
1558         return 0;
1559 }
1560
1561 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1562                    const char *prefix)
1563 {
1564         if (fid_is_zero(&pos->lp_dir_parent)) {
1565                 if (pos->lp_oit_cookie == 0)
1566                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1567                                    prefix);
1568                 else
1569                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1570                                    prefix, pos->lp_oit_cookie);
1571         } else {
1572                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1573                            prefix, pos->lp_oit_cookie,
1574                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1575         }
1576         return 0;
1577 }
1578
1579 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1580                     struct lfsck_position *pos, bool init)
1581 {
1582         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1583
1584         if (unlikely(lfsck->li_di_oit == NULL)) {
1585                 memset(pos, 0, sizeof(*pos));
1586                 return;
1587         }
1588
1589         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1590         if (!lfsck->li_current_oit_processed && !init)
1591                 pos->lp_oit_cookie--;
1592
1593         LASSERT(pos->lp_oit_cookie > 0);
1594
1595         if (lfsck->li_di_dir != NULL) {
1596                 struct dt_object *dto = lfsck->li_obj_dir;
1597
1598                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1599                                                         lfsck->li_di_dir);
1600
1601                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1602                         fid_zero(&pos->lp_dir_parent);
1603                         pos->lp_dir_cookie = 0;
1604                 } else {
1605                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1606                 }
1607         } else {
1608                 fid_zero(&pos->lp_dir_parent);
1609                 pos->lp_dir_cookie = 0;
1610         }
1611 }
1612
1613 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1614 {
1615         bool dirty = false;
1616
1617         if (limit != LFSCK_SPEED_NO_LIMIT) {
1618                 if (limit > HZ) {
1619                         lfsck->li_sleep_rate = limit / HZ;
1620                         lfsck->li_sleep_jif = 1;
1621                 } else {
1622                         lfsck->li_sleep_rate = 1;
1623                         lfsck->li_sleep_jif = HZ / limit;
1624                 }
1625         } else {
1626                 lfsck->li_sleep_jif = 0;
1627                 lfsck->li_sleep_rate = 0;
1628         }
1629
1630         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1631                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1632                 dirty = true;
1633         }
1634
1635         return dirty;
1636 }
1637
1638 void lfsck_control_speed(struct lfsck_instance *lfsck)
1639 {
1640         struct ptlrpc_thread *thread = &lfsck->li_thread;
1641         struct l_wait_info    lwi;
1642
1643         if (lfsck->li_sleep_jif > 0 &&
1644             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1645                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1646                                        LWI_ON_SIGNAL_NOOP, NULL);
1647
1648                 l_wait_event(thread->t_ctl_waitq,
1649                              !thread_is_running(thread),
1650                              &lwi);
1651                 lfsck->li_new_scanned = 0;
1652         }
1653 }
1654
1655 void lfsck_control_speed_by_self(struct lfsck_component *com)
1656 {
1657         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1658         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1659         struct l_wait_info       lwi;
1660
1661         if (lfsck->li_sleep_jif > 0 &&
1662             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1663                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1664                                        LWI_ON_SIGNAL_NOOP, NULL);
1665
1666                 l_wait_event(thread->t_ctl_waitq,
1667                              !thread_is_running(thread),
1668                              &lwi);
1669                 com->lc_new_scanned = 0;
1670         }
1671 }
1672
1673 static struct lfsck_thread_args *
1674 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1675                        struct lfsck_component *com,
1676                        struct lfsck_start_param *lsp)
1677 {
1678         struct lfsck_thread_args *lta;
1679         int                       rc;
1680
1681         OBD_ALLOC_PTR(lta);
1682         if (lta == NULL)
1683                 return ERR_PTR(-ENOMEM);
1684
1685         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1686         if (rc != 0) {
1687                 OBD_FREE_PTR(lta);
1688                 return ERR_PTR(rc);
1689         }
1690
1691         lta->lta_lfsck = lfsck_instance_get(lfsck);
1692         if (com != NULL)
1693                 lta->lta_com = lfsck_component_get(com);
1694
1695         lta->lta_lsp = lsp;
1696
1697         return lta;
1698 }
1699
1700 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1701 {
1702         if (lta->lta_com != NULL)
1703                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1704         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1705         lu_env_fini(&lta->lta_env);
1706         OBD_FREE_PTR(lta);
1707 }
1708
1709 struct lfsck_assistant_data *
1710 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1711                           const char *name)
1712 {
1713         struct lfsck_assistant_data *lad;
1714
1715         OBD_ALLOC_PTR(lad);
1716         if (lad != NULL) {
1717                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1718                 if (lad->lad_bitmap == NULL) {
1719                         OBD_FREE_PTR(lad);
1720                         return NULL;
1721                 }
1722
1723                 INIT_LIST_HEAD(&lad->lad_req_list);
1724                 spin_lock_init(&lad->lad_lock);
1725                 INIT_LIST_HEAD(&lad->lad_ost_list);
1726                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1727                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1728                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1729                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1730                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1731                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1732                 lad->lad_ops = lao;
1733                 lad->lad_name = name;
1734         }
1735
1736         return lad;
1737 }
1738
1739 /**
1740  * Generic LFSCK asynchronous communication interpretor function.
1741  * The LFSCK RPC reply for both the event notification and status
1742  * querying will be handled here.
1743  *
1744  * \param[in] env       pointer to the thread context
1745  * \param[in] req       pointer to the LFSCK request
1746  * \param[in] args      pointer to the lfsck_async_interpret_args
1747  * \param[in] rc        the result for handling the LFSCK request
1748  *
1749  * \retval              0 for success
1750  * \retval              negative error number on failure
1751  */
1752 int lfsck_async_interpret_common(const struct lu_env *env,
1753                                  struct ptlrpc_request *req,
1754                                  void *args, int rc)
1755 {
1756         struct lfsck_async_interpret_args *laia = args;
1757         struct lfsck_component            *com  = laia->laia_com;
1758         struct lfsck_assistant_data       *lad  = com->lc_data;
1759         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1760         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1761         struct lfsck_request              *lr   = laia->laia_lr;
1762
1763         LASSERT(com->lc_lfsck->li_master);
1764
1765         switch (lr->lr_event) {
1766         case LE_START:
1767                 if (rc != 0) {
1768                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1769                                "start: rc = %d\n",
1770                                lfsck_lfsck2name(com->lc_lfsck),
1771                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1772                                ltd->ltd_index, lad->lad_name, rc);
1773
1774                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1775                                 struct lfsck_layout *lo = com->lc_file_ram;
1776
1777                                 if (lr->lr_flags & LEF_TO_OST)
1778                                         lfsck_lad_set_bitmap(env, com,
1779                                                              ltd->ltd_index);
1780                                 else
1781                                         lo->ll_flags |= LF_INCOMPLETE;
1782                         } else {
1783                                 struct lfsck_namespace *ns = com->lc_file_ram;
1784
1785                                 /* If some MDT does not join the namespace
1786                                  * LFSCK, then we cannot know whether there
1787                                  * is some name entry on such MDT that with
1788                                  * the referenced MDT-object on this MDT or
1789                                  * not. So the namespace LFSCK on this MDT
1790                                  * cannot handle orphan MDT-objects properly.
1791                                  * So we mark the LFSCK as LF_INCOMPLETE and
1792                                  * skip orphan MDT-objects handling. */
1793                                 ns->ln_flags |= LF_INCOMPLETE;
1794                         }
1795                         break;
1796                 }
1797
1798                 spin_lock(&ltds->ltd_lock);
1799                 if (ltd->ltd_dead) {
1800                         spin_unlock(&ltds->ltd_lock);
1801                         break;
1802                 }
1803
1804                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1805                         struct list_head *list;
1806                         struct list_head *phase_list;
1807
1808                         if (ltd->ltd_layout_done) {
1809                                 spin_unlock(&ltds->ltd_lock);
1810                                 break;
1811                         }
1812
1813                         if (lr->lr_flags & LEF_TO_OST) {
1814                                 list = &lad->lad_ost_list;
1815                                 phase_list = &lad->lad_ost_phase1_list;
1816                         } else {
1817                                 list = &lad->lad_mdt_list;
1818                                 phase_list = &lad->lad_mdt_phase1_list;
1819                         }
1820
1821                         if (list_empty(&ltd->ltd_layout_list))
1822                                 list_add_tail(&ltd->ltd_layout_list, list);
1823                         if (list_empty(&ltd->ltd_layout_phase_list))
1824                                 list_add_tail(&ltd->ltd_layout_phase_list,
1825                                               phase_list);
1826                 } else {
1827                         if (ltd->ltd_namespace_done) {
1828                                 spin_unlock(&ltds->ltd_lock);
1829                                 break;
1830                         }
1831
1832                         if (list_empty(&ltd->ltd_namespace_list))
1833                                 list_add_tail(&ltd->ltd_namespace_list,
1834                                               &lad->lad_mdt_list);
1835                         if (list_empty(&ltd->ltd_namespace_phase_list))
1836                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1837                                               &lad->lad_mdt_phase1_list);
1838                 }
1839                 spin_unlock(&ltds->ltd_lock);
1840                 break;
1841         case LE_STOP:
1842         case LE_PHASE1_DONE:
1843         case LE_PHASE2_DONE:
1844         case LE_PEER_EXIT:
1845                 if (rc != 0 && rc != -EALREADY)
1846                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1847                               "event = %d, rc = %d\n",
1848                               lfsck_lfsck2name(com->lc_lfsck),
1849                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1850                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1851                 break;
1852         case LE_QUERY: {
1853                 struct lfsck_reply *reply;
1854                 struct list_head *list;
1855                 struct list_head *phase_list;
1856
1857                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1858                         list = &ltd->ltd_layout_list;
1859                         phase_list = &ltd->ltd_layout_phase_list;
1860                 } else {
1861                         list = &ltd->ltd_namespace_list;
1862                         phase_list = &ltd->ltd_namespace_phase_list;
1863                 }
1864
1865                 if (rc != 0) {
1866                         spin_lock(&ltds->ltd_lock);
1867                         list_del_init(phase_list);
1868                         list_del_init(list);
1869                         spin_unlock(&ltds->ltd_lock);
1870                         break;
1871                 }
1872
1873                 reply = req_capsule_server_get(&req->rq_pill,
1874                                                &RMF_LFSCK_REPLY);
1875                 if (reply == NULL) {
1876                         rc = -EPROTO;
1877                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1878                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1879                                lad->lad_name, rc);
1880                         spin_lock(&ltds->ltd_lock);
1881                         list_del_init(phase_list);
1882                         list_del_init(list);
1883                         spin_unlock(&ltds->ltd_lock);
1884                         break;
1885                 }
1886
1887                 switch (reply->lr_status) {
1888                 case LS_SCANNING_PHASE1:
1889                         break;
1890                 case LS_SCANNING_PHASE2:
1891                         spin_lock(&ltds->ltd_lock);
1892                         list_del_init(phase_list);
1893                         if (ltd->ltd_dead) {
1894                                 spin_unlock(&ltds->ltd_lock);
1895                                 break;
1896                         }
1897
1898                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1899                                 if (ltd->ltd_layout_done) {
1900                                         spin_unlock(&ltds->ltd_lock);
1901                                         break;
1902                                 }
1903
1904                                 if (lr->lr_flags & LEF_TO_OST)
1905                                         list_add_tail(phase_list,
1906                                                 &lad->lad_ost_phase2_list);
1907                                 else
1908                                         list_add_tail(phase_list,
1909                                                 &lad->lad_mdt_phase2_list);
1910                         } else {
1911                                 if (ltd->ltd_namespace_done) {
1912                                         spin_unlock(&ltds->ltd_lock);
1913                                         break;
1914                                 }
1915
1916                                 list_add_tail(phase_list,
1917                                               &lad->lad_mdt_phase2_list);
1918                         }
1919                         spin_unlock(&ltds->ltd_lock);
1920                         break;
1921                 default:
1922                         spin_lock(&ltds->ltd_lock);
1923                         list_del_init(phase_list);
1924                         list_del_init(list);
1925                         spin_unlock(&ltds->ltd_lock);
1926                         break;
1927                 }
1928                 break;
1929         }
1930         default:
1931                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
1932                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
1933                 break;
1934         }
1935
1936         if (!laia->laia_shared) {
1937                 lfsck_tgt_put(ltd);
1938                 lfsck_component_put(env, com);
1939         }
1940
1941         return 0;
1942 }
1943
1944 static void lfsck_interpret(const struct lu_env *env,
1945                             struct lfsck_instance *lfsck,
1946                             struct ptlrpc_request *req, void *args, int result)
1947 {
1948         struct lfsck_async_interpret_args *laia = args;
1949         struct lfsck_component            *com;
1950
1951         LASSERT(laia->laia_com == NULL);
1952         LASSERT(laia->laia_shared);
1953
1954         spin_lock(&lfsck->li_lock);
1955         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1956                 laia->laia_com = com;
1957                 lfsck_async_interpret_common(env, req, laia, result);
1958         }
1959
1960         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1961                 laia->laia_com = com;
1962                 lfsck_async_interpret_common(env, req, laia, result);
1963         }
1964         spin_unlock(&lfsck->li_lock);
1965 }
1966
1967 static int lfsck_stop_notify(const struct lu_env *env,
1968                              struct lfsck_instance *lfsck,
1969                              struct lfsck_tgt_descs *ltds,
1970                              struct lfsck_tgt_desc *ltd, __u16 type)
1971 {
1972         struct lfsck_component *com;
1973         int                     rc = 0;
1974         ENTRY;
1975
1976         LASSERT(lfsck->li_master);
1977
1978         spin_lock(&lfsck->li_lock);
1979         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1980         if (com == NULL)
1981                 com = __lfsck_component_find(lfsck, type,
1982                                              &lfsck->li_list_double_scan);
1983         if (com != NULL)
1984                 lfsck_component_get(com);
1985         spin_unlock(&lfsck->li_lock);
1986
1987         if (com != NULL) {
1988                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
1989                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
1990                 struct lfsck_request              *lr    = &info->lti_lr;
1991                 struct lfsck_assistant_data       *lad   = com->lc_data;
1992                 struct list_head                  *list;
1993                 struct list_head                  *phase_list;
1994                 struct ptlrpc_request_set         *set;
1995
1996                 set = ptlrpc_prep_set();
1997                 if (set == NULL) {
1998                         lfsck_component_put(env, com);
1999
2000                         RETURN(-ENOMEM);
2001                 }
2002
2003                 if (type == LFSCK_TYPE_LAYOUT) {
2004                         list = &ltd->ltd_layout_list;
2005                         phase_list = &ltd->ltd_layout_phase_list;
2006                 } else {
2007                         list = &ltd->ltd_namespace_list;
2008                         phase_list = &ltd->ltd_namespace_phase_list;
2009                 }
2010
2011                 spin_lock(&ltds->ltd_lock);
2012                 if (list_empty(list)) {
2013                         LASSERT(list_empty(phase_list));
2014                         spin_unlock(&ltds->ltd_lock);
2015                         ptlrpc_set_destroy(set);
2016
2017                         RETURN(0);
2018                 }
2019
2020                 list_del_init(phase_list);
2021                 list_del_init(list);
2022                 spin_unlock(&ltds->ltd_lock);
2023
2024                 memset(lr, 0, sizeof(*lr));
2025                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2026                 lr->lr_event = LE_PEER_EXIT;
2027                 lr->lr_active = type;
2028                 lr->lr_status = LS_CO_PAUSED;
2029                 if (ltds == &lfsck->li_ost_descs)
2030                         lr->lr_flags = LEF_TO_OST;
2031
2032                 laia->laia_com = com;
2033                 laia->laia_ltds = ltds;
2034                 atomic_inc(&ltd->ltd_ref);
2035                 laia->laia_ltd = ltd;
2036                 laia->laia_lr = lr;
2037                 laia->laia_shared = 0;
2038
2039                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2040                                          lfsck_async_interpret_common,
2041                                          laia, LFSCK_NOTIFY);
2042                 if (rc != 0) {
2043                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2044                                "co-stop for %s: rc = %d\n",
2045                                lfsck_lfsck2name(lfsck),
2046                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2047                                ltd->ltd_index, lad->lad_name, rc);
2048                         lfsck_tgt_put(ltd);
2049                 } else {
2050                         rc = ptlrpc_set_wait(set);
2051                 }
2052
2053                 ptlrpc_set_destroy(set);
2054                 lfsck_component_put(env, com);
2055         }
2056
2057         RETURN(rc);
2058 }
2059
2060 static int lfsck_async_interpret(const struct lu_env *env,
2061                                  struct ptlrpc_request *req,
2062                                  void *args, int rc)
2063 {
2064         struct lfsck_async_interpret_args *laia = args;
2065         struct lfsck_instance             *lfsck;
2066
2067         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2068                               li_mdt_descs);
2069         lfsck_interpret(env, lfsck, req, laia, rc);
2070         lfsck_tgt_put(laia->laia_ltd);
2071         if (rc != 0 && laia->laia_result != -EALREADY)
2072                 laia->laia_result = rc;
2073
2074         return 0;
2075 }
2076
2077 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2078                         struct lfsck_request *lr,
2079                         struct ptlrpc_request_set *set,
2080                         ptlrpc_interpterer_t interpreter,
2081                         void *args, int request)
2082 {
2083         struct lfsck_async_interpret_args *laia;
2084         struct ptlrpc_request             *req;
2085         struct lfsck_request              *tmp;
2086         struct req_format                 *format;
2087         int                                rc;
2088
2089         switch (request) {
2090         case LFSCK_NOTIFY:
2091                 format = &RQF_LFSCK_NOTIFY;
2092                 break;
2093         case LFSCK_QUERY:
2094                 format = &RQF_LFSCK_QUERY;
2095                 break;
2096         default:
2097                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2098                        exp->exp_obd->obd_name, request, -EINVAL);
2099                 return -EINVAL;
2100         }
2101
2102         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2103         if (req == NULL)
2104                 return -ENOMEM;
2105
2106         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2107         if (rc != 0) {
2108                 ptlrpc_request_free(req);
2109
2110                 return rc;
2111         }
2112
2113         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2114         *tmp = *lr;
2115         ptlrpc_request_set_replen(req);
2116
2117         laia = ptlrpc_req_async_args(req);
2118         *laia = *(struct lfsck_async_interpret_args *)args;
2119         if (laia->laia_com != NULL)
2120                 lfsck_component_get(laia->laia_com);
2121         req->rq_interpret_reply = interpreter;
2122         ptlrpc_set_add_req(set, req);
2123
2124         return 0;
2125 }
2126
2127 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2128                           struct lfsck_start_param *lsp)
2129 {
2130         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2131         struct lfsck_assistant_data     *lad     = com->lc_data;
2132         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2133         struct ptlrpc_thread            *athread = &lad->lad_thread;
2134         struct lfsck_thread_args        *lta;
2135         struct task_struct              *task;
2136         int                              rc;
2137         ENTRY;
2138
2139         lad->lad_assistant_status = 0;
2140         lad->lad_post_result = 0;
2141         lad->lad_to_post = 0;
2142         lad->lad_to_double_scan = 0;
2143         lad->lad_in_double_scan = 0;
2144         lad->lad_exit = 0;
2145         thread_set_flags(athread, 0);
2146
2147         lta = lfsck_thread_args_init(lfsck, com, lsp);
2148         if (IS_ERR(lta))
2149                 RETURN(PTR_ERR(lta));
2150
2151         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2152         if (IS_ERR(task)) {
2153                 rc = PTR_ERR(task);
2154                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2155                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2156                 lfsck_thread_args_fini(lta);
2157         } else {
2158                 struct l_wait_info lwi = { 0 };
2159
2160                 l_wait_event(mthread->t_ctl_waitq,
2161                              thread_is_running(athread) ||
2162                              thread_is_stopped(athread),
2163                              &lwi);
2164                 if (unlikely(!thread_is_running(athread)))
2165                         rc = lad->lad_assistant_status;
2166                 else
2167                         rc = 0;
2168         }
2169
2170         RETURN(rc);
2171 }
2172
2173 int lfsck_checkpoint_generic(const struct lu_env *env,
2174                              struct lfsck_component *com)
2175 {
2176         struct lfsck_assistant_data     *lad     = com->lc_data;
2177         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2178         struct ptlrpc_thread            *athread = &lad->lad_thread;
2179         struct l_wait_info               lwi     = { 0 };
2180
2181         if (com->lc_new_checked == 0)
2182                 return LFSCK_CHECKPOINT_SKIP;
2183
2184         l_wait_event(mthread->t_ctl_waitq,
2185                      list_empty(&lad->lad_req_list) ||
2186                      !thread_is_running(mthread) ||
2187                      thread_is_stopped(athread),
2188                      &lwi);
2189
2190         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2191                 return LFSCK_CHECKPOINT_SKIP;
2192
2193         return 0;
2194 }
2195
2196 void lfsck_post_generic(const struct lu_env *env,
2197                         struct lfsck_component *com, int *result)
2198 {
2199         struct lfsck_assistant_data     *lad     = com->lc_data;
2200         struct ptlrpc_thread            *athread = &lad->lad_thread;
2201         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2202         struct l_wait_info               lwi     = { 0 };
2203
2204         lad->lad_post_result = *result;
2205         if (*result <= 0)
2206                 lad->lad_exit = 1;
2207         lad->lad_to_post = 1;
2208
2209         wake_up_all(&athread->t_ctl_waitq);
2210         l_wait_event(mthread->t_ctl_waitq,
2211                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2212                      thread_is_stopped(athread),
2213                      &lwi);
2214
2215         if (lad->lad_assistant_status < 0)
2216                 *result = lad->lad_assistant_status;
2217 }
2218
2219 int lfsck_double_scan_generic(const struct lu_env *env,
2220                               struct lfsck_component *com, int status)
2221 {
2222         struct lfsck_assistant_data     *lad     = com->lc_data;
2223         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2224         struct ptlrpc_thread            *athread = &lad->lad_thread;
2225         struct l_wait_info               lwi     = { 0 };
2226
2227         if (status != LS_SCANNING_PHASE2)
2228                 lad->lad_exit = 1;
2229         else
2230                 lad->lad_to_double_scan = 1;
2231
2232         wake_up_all(&athread->t_ctl_waitq);
2233         l_wait_event(mthread->t_ctl_waitq,
2234                      lad->lad_in_double_scan ||
2235                      thread_is_stopped(athread),
2236                      &lwi);
2237
2238         if (lad->lad_assistant_status < 0)
2239                 return lad->lad_assistant_status;
2240
2241         return 0;
2242 }
2243
2244 void lfsck_quit_generic(const struct lu_env *env,
2245                         struct lfsck_component *com)
2246 {
2247         struct lfsck_assistant_data     *lad     = com->lc_data;
2248         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2249         struct ptlrpc_thread            *athread = &lad->lad_thread;
2250         struct l_wait_info               lwi     = { 0 };
2251
2252         lad->lad_exit = 1;
2253         wake_up_all(&athread->t_ctl_waitq);
2254         l_wait_event(mthread->t_ctl_waitq,
2255                      thread_is_init(athread) ||
2256                      thread_is_stopped(athread),
2257                      &lwi);
2258 }
2259
2260 /* external interfaces */
2261
2262 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2263 {
2264         struct lu_env           env;
2265         struct lfsck_instance  *lfsck;
2266         int                     rc;
2267         ENTRY;
2268
2269         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2270         if (rc != 0)
2271                 RETURN(rc);
2272
2273         lfsck = lfsck_instance_find(key, true, false);
2274         if (likely(lfsck != NULL)) {
2275                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2276                 lfsck_instance_put(&env, lfsck);
2277         } else {
2278                 rc = -ENXIO;
2279         }
2280
2281         lu_env_fini(&env);
2282
2283         RETURN(rc);
2284 }
2285 EXPORT_SYMBOL(lfsck_get_speed);
2286
2287 int lfsck_set_speed(struct dt_device *key, int val)
2288 {
2289         struct lu_env           env;
2290         struct lfsck_instance  *lfsck;
2291         int                     rc;
2292         ENTRY;
2293
2294         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2295         if (rc != 0)
2296                 RETURN(rc);
2297
2298         lfsck = lfsck_instance_find(key, true, false);
2299         if (likely(lfsck != NULL)) {
2300                 mutex_lock(&lfsck->li_mutex);
2301                 if (__lfsck_set_speed(lfsck, val))
2302                         rc = lfsck_bookmark_store(&env, lfsck);
2303                 mutex_unlock(&lfsck->li_mutex);
2304                 lfsck_instance_put(&env, lfsck);
2305         } else {
2306                 rc = -ENXIO;
2307         }
2308
2309         lu_env_fini(&env);
2310
2311         RETURN(rc);
2312 }
2313 EXPORT_SYMBOL(lfsck_set_speed);
2314
2315 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2316 {
2317         struct lu_env           env;
2318         struct lfsck_instance  *lfsck;
2319         int                     rc;
2320         ENTRY;
2321
2322         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2323         if (rc != 0)
2324                 RETURN(rc);
2325
2326         lfsck = lfsck_instance_find(key, true, false);
2327         if (likely(lfsck != NULL)) {
2328                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2329                 lfsck_instance_put(&env, lfsck);
2330         } else {
2331                 rc = -ENXIO;
2332         }
2333
2334         lu_env_fini(&env);
2335
2336         RETURN(rc);
2337 }
2338 EXPORT_SYMBOL(lfsck_get_windows);
2339
2340 int lfsck_set_windows(struct dt_device *key, int val)
2341 {
2342         struct lu_env           env;
2343         struct lfsck_instance  *lfsck;
2344         int                     rc;
2345         ENTRY;
2346
2347         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2348         if (rc != 0)
2349                 RETURN(rc);
2350
2351         lfsck = lfsck_instance_find(key, true, false);
2352         if (likely(lfsck != NULL)) {
2353                 if (val > LFSCK_ASYNC_WIN_MAX) {
2354                         CWARN("%s: Too large async window size, which "
2355                               "may cause memory issues. The valid range "
2356                               "is [0 - %u]. If you do not want to restrict "
2357                               "the window size for async requests pipeline, "
2358                               "just set it as 0.\n",
2359                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2360                         rc = -EINVAL;
2361                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2362                         mutex_lock(&lfsck->li_mutex);
2363                         lfsck->li_bookmark_ram.lb_async_windows = val;
2364                         rc = lfsck_bookmark_store(&env, lfsck);
2365                         mutex_unlock(&lfsck->li_mutex);
2366                 }
2367                 lfsck_instance_put(&env, lfsck);
2368         } else {
2369                 rc = -ENXIO;
2370         }
2371
2372         lu_env_fini(&env);
2373
2374         RETURN(rc);
2375 }
2376 EXPORT_SYMBOL(lfsck_set_windows);
2377
2378 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2379 {
2380         struct lu_env           env;
2381         struct lfsck_instance  *lfsck;
2382         struct lfsck_component *com;
2383         int                     rc;
2384         ENTRY;
2385
2386         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2387         if (rc != 0)
2388                 RETURN(rc);
2389
2390         lfsck = lfsck_instance_find(key, true, false);
2391         if (likely(lfsck != NULL)) {
2392                 com = lfsck_component_find(lfsck, type);
2393                 if (likely(com != NULL)) {
2394                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2395                         lfsck_component_put(&env, com);
2396                 } else {
2397                         rc = -ENOTSUPP;
2398                 }
2399
2400                 lfsck_instance_put(&env, lfsck);
2401         } else {
2402                 rc = -ENXIO;
2403         }
2404
2405         lu_env_fini(&env);
2406
2407         RETURN(rc);
2408 }
2409 EXPORT_SYMBOL(lfsck_dump);
2410
2411 static int lfsck_stop_all(const struct lu_env *env,
2412                           struct lfsck_instance *lfsck,
2413                           struct lfsck_stop *stop)
2414 {
2415         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2416         struct lfsck_request              *lr     = &info->lti_lr;
2417         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2418         struct ptlrpc_request_set         *set;
2419         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2420         struct lfsck_tgt_desc             *ltd;
2421         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2422         __u32                              idx;
2423         int                                rc     = 0;
2424         int                                rc1    = 0;
2425         ENTRY;
2426
2427         LASSERT(stop->ls_flags & LPF_BROADCAST);
2428
2429         set = ptlrpc_prep_set();
2430         if (unlikely(set == NULL))
2431                 RETURN(-ENOMEM);
2432
2433         memset(lr, 0, sizeof(*lr));
2434         lr->lr_event = LE_STOP;
2435         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2436         lr->lr_status = stop->ls_status;
2437         lr->lr_version = bk->lb_version;
2438         lr->lr_active = LFSCK_TYPES_ALL;
2439         lr->lr_param = stop->ls_flags;
2440
2441         laia->laia_com = NULL;
2442         laia->laia_ltds = ltds;
2443         laia->laia_lr = lr;
2444         laia->laia_result = 0;
2445         laia->laia_shared = 1;
2446
2447         down_read(&ltds->ltd_rw_sem);
2448         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2449                 ltd = lfsck_tgt_get(ltds, idx);
2450                 LASSERT(ltd != NULL);
2451
2452                 laia->laia_ltd = ltd;
2453                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2454                                          lfsck_async_interpret, laia,
2455                                          LFSCK_NOTIFY);
2456                 if (rc != 0) {
2457                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2458                         lfsck_tgt_put(ltd);
2459                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2460                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2461                         rc1 = rc;
2462                 }
2463         }
2464         up_read(&ltds->ltd_rw_sem);
2465
2466         rc = ptlrpc_set_wait(set);
2467         ptlrpc_set_destroy(set);
2468
2469         if (rc == 0)
2470                 rc = laia->laia_result;
2471
2472         if (rc == -EALREADY)
2473                 rc = 0;
2474
2475         if (rc != 0)
2476                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2477                        lfsck_lfsck2name(lfsck), rc);
2478
2479         RETURN(rc != 0 ? rc : rc1);
2480 }
2481
2482 static int lfsck_start_all(const struct lu_env *env,
2483                            struct lfsck_instance *lfsck,
2484                            struct lfsck_start *start)
2485 {
2486         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2487         struct lfsck_request              *lr     = &info->lti_lr;
2488         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2489         struct ptlrpc_request_set         *set;
2490         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2491         struct lfsck_tgt_desc             *ltd;
2492         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2493         __u32                              idx;
2494         int                                rc     = 0;
2495         ENTRY;
2496
2497         LASSERT(start->ls_flags & LPF_BROADCAST);
2498
2499         set = ptlrpc_prep_set();
2500         if (unlikely(set == NULL))
2501                 RETURN(-ENOMEM);
2502
2503         memset(lr, 0, sizeof(*lr));
2504         lr->lr_event = LE_START;
2505         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2506         lr->lr_speed = bk->lb_speed_limit;
2507         lr->lr_version = bk->lb_version;
2508         lr->lr_active = start->ls_active;
2509         lr->lr_param = start->ls_flags;
2510         lr->lr_async_windows = bk->lb_async_windows;
2511         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2512                        LSV_ASYNC_WINDOWS;
2513
2514         laia->laia_com = NULL;
2515         laia->laia_ltds = ltds;
2516         laia->laia_lr = lr;
2517         laia->laia_result = 0;
2518         laia->laia_shared = 1;
2519
2520         down_read(&ltds->ltd_rw_sem);
2521         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2522                 ltd = lfsck_tgt_get(ltds, idx);
2523                 LASSERT(ltd != NULL);
2524
2525                 laia->laia_ltd = ltd;
2526                 ltd->ltd_layout_done = 0;
2527                 ltd->ltd_namespace_done = 0;
2528                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2529                                          lfsck_async_interpret, laia,
2530                                          LFSCK_NOTIFY);
2531                 if (rc != 0) {
2532                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2533                         lfsck_tgt_put(ltd);
2534                         CERROR("%s: cannot notify MDT %x for LFSCK "
2535                                "start, failout: rc = %d\n",
2536                                lfsck_lfsck2name(lfsck), idx, rc);
2537                         break;
2538                 }
2539         }
2540         up_read(&ltds->ltd_rw_sem);
2541
2542         if (rc != 0) {
2543                 ptlrpc_set_destroy(set);
2544
2545                 RETURN(rc);
2546         }
2547
2548         rc = ptlrpc_set_wait(set);
2549         ptlrpc_set_destroy(set);
2550
2551         if (rc == 0)
2552                 rc = laia->laia_result;
2553
2554         if (rc != 0) {
2555                 struct lfsck_stop *stop = &info->lti_stop;
2556
2557                 CERROR("%s: cannot start LFSCK on some MDTs, "
2558                        "stop all: rc = %d\n",
2559                        lfsck_lfsck2name(lfsck), rc);
2560                 if (rc != -EALREADY) {
2561                         stop->ls_status = LS_FAILED;
2562                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2563                         lfsck_stop_all(env, lfsck, stop);
2564                 }
2565         }
2566
2567         RETURN(rc);
2568 }
2569
2570 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2571                 struct lfsck_start_param *lsp)
2572 {
2573         struct lfsck_start              *start  = lsp->lsp_start;
2574         struct lfsck_instance           *lfsck;
2575         struct lfsck_bookmark           *bk;
2576         struct ptlrpc_thread            *thread;
2577         struct lfsck_component          *com;
2578         struct l_wait_info               lwi    = { 0 };
2579         struct lfsck_thread_args        *lta;
2580         struct task_struct              *task;
2581         int                              rc     = 0;
2582         __u16                            valid  = 0;
2583         __u16                            flags  = 0;
2584         __u16                            type   = 1;
2585         ENTRY;
2586
2587         lfsck = lfsck_instance_find(key, true, false);
2588         if (unlikely(lfsck == NULL))
2589                 RETURN(-ENXIO);
2590
2591         /* System is not ready, try again later. */
2592         if (unlikely(lfsck->li_namespace == NULL))
2593                 GOTO(put, rc = -EAGAIN);
2594
2595         /* start == NULL means auto trigger paused LFSCK. */
2596         if ((start == NULL) &&
2597             (list_empty(&lfsck->li_list_scan) ||
2598              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2599                 GOTO(put, rc = 0);
2600
2601         bk = &lfsck->li_bookmark_ram;
2602         thread = &lfsck->li_thread;
2603         mutex_lock(&lfsck->li_mutex);
2604         spin_lock(&lfsck->li_lock);
2605         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2606                 rc = -EALREADY;
2607                 if (unlikely(start == NULL)) {
2608                         spin_unlock(&lfsck->li_lock);
2609                         GOTO(out, rc);
2610                 }
2611
2612                 while (start->ls_active != 0) {
2613                         if (!(type & start->ls_active)) {
2614                                 type <<= 1;
2615                                 continue;
2616                         }
2617
2618                         com = __lfsck_component_find(lfsck, type,
2619                                                      &lfsck->li_list_scan);
2620                         if (com == NULL)
2621                                 com = __lfsck_component_find(lfsck, type,
2622                                                 &lfsck->li_list_double_scan);
2623                         if (com == NULL) {
2624                                 rc = -EOPNOTSUPP;
2625                                 break;
2626                         }
2627
2628                         if (com->lc_ops->lfsck_join != NULL) {
2629                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2630                                 if (rc != 0 && rc != -EALREADY)
2631                                         break;
2632                         }
2633                         start->ls_active &= ~type;
2634                         type <<= 1;
2635                 }
2636                 spin_unlock(&lfsck->li_lock);
2637                 GOTO(out, rc);
2638         }
2639         spin_unlock(&lfsck->li_lock);
2640
2641         lfsck->li_status = 0;
2642         lfsck->li_oit_over = 0;
2643         lfsck->li_start_unplug = 0;
2644         lfsck->li_drop_dryrun = 0;
2645         lfsck->li_new_scanned = 0;
2646
2647         /* For auto trigger. */
2648         if (start == NULL)
2649                 goto trigger;
2650
2651         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2652                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2653                        lfsck_lfsck2name(lfsck));
2654
2655                 GOTO(out, rc = -EPERM);
2656         }
2657
2658         start->ls_version = bk->lb_version;
2659
2660         if (start->ls_active != 0) {
2661                 struct lfsck_component *next;
2662
2663                 if (start->ls_active == LFSCK_TYPES_ALL)
2664                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2665
2666                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2667                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2668                         GOTO(out, rc = -ENOTSUPP);
2669                 }
2670
2671                 list_for_each_entry_safe(com, next,
2672                                          &lfsck->li_list_scan, lc_link) {
2673                         if (!(com->lc_type & start->ls_active)) {
2674                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2675                                                              false);
2676                                 if (rc != 0)
2677                                         GOTO(out, rc);
2678                         }
2679                 }
2680
2681                 while (start->ls_active != 0) {
2682                         if (type & start->ls_active) {
2683                                 com = __lfsck_component_find(lfsck, type,
2684                                                         &lfsck->li_list_idle);
2685                                 if (com != NULL)
2686                                         /* The component status will be updated
2687                                          * when its prep() is called later by
2688                                          * the LFSCK main engine. */
2689                                         list_move_tail(&com->lc_link,
2690                                                        &lfsck->li_list_scan);
2691                                 start->ls_active &= ~type;
2692                         }
2693                         type <<= 1;
2694                 }
2695         }
2696
2697         if (list_empty(&lfsck->li_list_scan)) {
2698                 /* The speed limit will be used to control both the LFSCK and
2699                  * low layer scrub (if applied), need to be handled firstly. */
2700                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2701                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2702                                 rc = lfsck_bookmark_store(env, lfsck);
2703                                 if (rc != 0)
2704                                         GOTO(out, rc);
2705                         }
2706                 }
2707
2708                 goto trigger;
2709         }
2710
2711         if (start->ls_flags & LPF_RESET)
2712                 flags |= DOIF_RESET;
2713
2714         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2715         if (rc != 0)
2716                 GOTO(out, rc);
2717
2718         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2719                 start->ls_active |= com->lc_type;
2720                 if (flags & DOIF_RESET) {
2721                         rc = com->lc_ops->lfsck_reset(env, com, false);
2722                         if (rc != 0)
2723                                 GOTO(out, rc);
2724                 }
2725         }
2726
2727 trigger:
2728         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2729         if (bk->lb_param & LPF_DRYRUN)
2730                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2731
2732         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2733                 valid |= DOIV_ERROR_HANDLE;
2734                 if (start->ls_flags & LPF_FAILOUT)
2735                         flags |= DOIF_FAILOUT;
2736         }
2737
2738         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2739                 valid |= DOIV_DRYRUN;
2740                 if (start->ls_flags & LPF_DRYRUN)
2741                         flags |= DOIF_DRYRUN;
2742         }
2743
2744         if (!list_empty(&lfsck->li_list_scan))
2745                 flags |= DOIF_OUTUSED;
2746
2747         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2748         thread_set_flags(thread, 0);
2749         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2750         if (IS_ERR(lta))
2751                 GOTO(out, rc = PTR_ERR(lta));
2752
2753         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2754         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2755         if (IS_ERR(task)) {
2756                 rc = PTR_ERR(task);
2757                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2758                        lfsck_lfsck2name(lfsck), rc);
2759                 lfsck_thread_args_fini(lta);
2760
2761                 GOTO(out, rc);
2762         }
2763
2764         l_wait_event(thread->t_ctl_waitq,
2765                      thread_is_running(thread) ||
2766                      thread_is_stopped(thread),
2767                      &lwi);
2768         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2769                 lfsck->li_start_unplug = 1;
2770                 wake_up_all(&thread->t_ctl_waitq);
2771
2772                 GOTO(out, rc = 0);
2773         }
2774
2775         /* release lfsck::li_mutex to avoid deadlock. */
2776         mutex_unlock(&lfsck->li_mutex);
2777         rc = lfsck_start_all(env, lfsck, start);
2778         if (rc != 0) {
2779                 spin_lock(&lfsck->li_lock);
2780                 if (thread_is_stopped(thread)) {
2781                         spin_unlock(&lfsck->li_lock);
2782                 } else {
2783                         lfsck->li_status = LS_FAILED;
2784                         lfsck->li_flags = 0;
2785                         thread_set_flags(thread, SVC_STOPPING);
2786                         spin_unlock(&lfsck->li_lock);
2787
2788                         lfsck->li_start_unplug = 1;
2789                         wake_up_all(&thread->t_ctl_waitq);
2790                         l_wait_event(thread->t_ctl_waitq,
2791                                      thread_is_stopped(thread),
2792                                      &lwi);
2793                 }
2794         } else {
2795                 lfsck->li_start_unplug = 1;
2796                 wake_up_all(&thread->t_ctl_waitq);
2797         }
2798
2799         GOTO(put, rc);
2800
2801 out:
2802         mutex_unlock(&lfsck->li_mutex);
2803
2804 put:
2805         lfsck_instance_put(env, lfsck);
2806
2807         return rc < 0 ? rc : 0;
2808 }
2809 EXPORT_SYMBOL(lfsck_start);
2810
2811 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2812                struct lfsck_stop *stop)
2813 {
2814         struct lfsck_instance   *lfsck;
2815         struct ptlrpc_thread    *thread;
2816         struct l_wait_info       lwi    = { 0 };
2817         int                      rc     = 0;
2818         int                      rc1    = 0;
2819         ENTRY;
2820
2821         lfsck = lfsck_instance_find(key, true, false);
2822         if (unlikely(lfsck == NULL))
2823                 RETURN(-ENXIO);
2824
2825         thread = &lfsck->li_thread;
2826         /* release lfsck::li_mutex to avoid deadlock. */
2827         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2828                 if (!lfsck->li_master) {
2829                         CERROR("%s: only allow to specify '-A' via MDS\n",
2830                                lfsck_lfsck2name(lfsck));
2831
2832                         GOTO(out, rc = -EPERM);
2833                 }
2834
2835                 rc1 = lfsck_stop_all(env, lfsck, stop);
2836         }
2837
2838         mutex_lock(&lfsck->li_mutex);
2839         spin_lock(&lfsck->li_lock);
2840         /* no error if LFSCK is already stopped, or was never started */
2841         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2842                 spin_unlock(&lfsck->li_lock);
2843                 GOTO(out, rc = 0);
2844         }
2845
2846         if (stop != NULL) {
2847                 lfsck->li_status = stop->ls_status;
2848                 lfsck->li_flags = stop->ls_flags;
2849         } else {
2850                 lfsck->li_status = LS_STOPPED;
2851                 lfsck->li_flags = 0;
2852         }
2853
2854         thread_set_flags(thread, SVC_STOPPING);
2855         spin_unlock(&lfsck->li_lock);
2856
2857         wake_up_all(&thread->t_ctl_waitq);
2858         l_wait_event(thread->t_ctl_waitq,
2859                      thread_is_stopped(thread),
2860                      &lwi);
2861
2862         GOTO(out, rc = 0);
2863
2864 out:
2865         mutex_unlock(&lfsck->li_mutex);
2866         lfsck_instance_put(env, lfsck);
2867
2868         return rc != 0 ? rc : rc1;
2869 }
2870 EXPORT_SYMBOL(lfsck_stop);
2871
2872 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2873                     struct lfsck_request *lr, struct thandle *th)
2874 {
2875         int rc = -EOPNOTSUPP;
2876         ENTRY;
2877
2878         switch (lr->lr_event) {
2879         case LE_START: {
2880                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2881                 struct lfsck_start_param  lsp;
2882
2883                 memset(start, 0, sizeof(*start));
2884                 start->ls_valid = lr->lr_valid;
2885                 start->ls_speed_limit = lr->lr_speed;
2886                 start->ls_version = lr->lr_version;
2887                 start->ls_active = lr->lr_active;
2888                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2889                 start->ls_async_windows = lr->lr_async_windows;
2890
2891                 lsp.lsp_start = start;
2892                 lsp.lsp_index = lr->lr_index;
2893                 lsp.lsp_index_valid = 1;
2894                 rc = lfsck_start(env, key, &lsp);
2895                 break;
2896         }
2897         case LE_STOP: {
2898                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2899
2900                 memset(stop, 0, sizeof(*stop));
2901                 stop->ls_status = lr->lr_status;
2902                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2903                 rc = lfsck_stop(env, key, stop);
2904                 break;
2905         }
2906         case LE_PHASE1_DONE:
2907         case LE_PHASE2_DONE:
2908         case LE_FID_ACCESSED:
2909         case LE_PEER_EXIT:
2910         case LE_CONDITIONAL_DESTROY:
2911         case LE_CREATE_ORPHAN:
2912         case LE_SKIP_NLINK_DECLARE:
2913         case LE_SKIP_NLINK:
2914         case LE_PAIRS_VERIFY: {
2915                 struct lfsck_instance  *lfsck;
2916                 struct lfsck_component *com;
2917
2918                 lfsck = lfsck_instance_find(key, true, false);
2919                 if (unlikely(lfsck == NULL))
2920                         RETURN(-ENXIO);
2921
2922                 com = lfsck_component_find(lfsck, lr->lr_active);
2923                 if (likely(com != NULL)) {
2924                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
2925                         lfsck_component_put(env, com);
2926                 }
2927
2928                 lfsck_instance_put(env, lfsck);
2929                 break;
2930         }
2931         default:
2932                 break;
2933         }
2934
2935         RETURN(rc);
2936 }
2937 EXPORT_SYMBOL(lfsck_in_notify);
2938
2939 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2940                 struct lfsck_request *lr)
2941 {
2942         struct lfsck_instance  *lfsck;
2943         struct lfsck_component *com;
2944         int                     rc;
2945         ENTRY;
2946
2947         lfsck = lfsck_instance_find(key, true, false);
2948         if (unlikely(lfsck == NULL))
2949                 RETURN(-ENXIO);
2950
2951         com = lfsck_component_find(lfsck, lr->lr_active);
2952         if (likely(com != NULL)) {
2953                 rc = com->lc_ops->lfsck_query(env, com);
2954                 lfsck_component_put(env, com);
2955         } else {
2956                 rc = -ENOTSUPP;
2957         }
2958
2959         lfsck_instance_put(env, lfsck);
2960
2961         RETURN(rc);
2962 }
2963 EXPORT_SYMBOL(lfsck_query);
2964
2965 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2966                              struct ldlm_namespace *ns)
2967 {
2968         struct lfsck_instance  *lfsck;
2969         int                     rc      = -ENXIO;
2970
2971         lfsck = lfsck_instance_find(key, true, false);
2972         if (likely(lfsck != NULL)) {
2973                 lfsck->li_namespace = ns;
2974                 lfsck_instance_put(env, lfsck);
2975                 rc = 0;
2976         }
2977
2978         return rc;
2979 }
2980 EXPORT_SYMBOL(lfsck_register_namespace);
2981
2982 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2983                    struct dt_device *next, struct obd_device *obd,
2984                    lfsck_out_notify notify, void *notify_data, bool master)
2985 {
2986         struct lfsck_instance   *lfsck;
2987         struct dt_object        *root  = NULL;
2988         struct dt_object        *obj   = NULL;
2989         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2990         int                      rc;
2991         ENTRY;
2992
2993         lfsck = lfsck_instance_find(key, false, false);
2994         if (unlikely(lfsck != NULL))
2995                 RETURN(-EEXIST);
2996
2997         OBD_ALLOC_PTR(lfsck);
2998         if (lfsck == NULL)
2999                 RETURN(-ENOMEM);
3000
3001         mutex_init(&lfsck->li_mutex);
3002         spin_lock_init(&lfsck->li_lock);
3003         INIT_LIST_HEAD(&lfsck->li_link);
3004         INIT_LIST_HEAD(&lfsck->li_list_scan);
3005         INIT_LIST_HEAD(&lfsck->li_list_dir);
3006         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3007         INIT_LIST_HEAD(&lfsck->li_list_idle);
3008         atomic_set(&lfsck->li_ref, 1);
3009         atomic_set(&lfsck->li_double_scan_count, 0);
3010         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3011         lfsck->li_out_notify = notify;
3012         lfsck->li_out_notify_data = notify_data;
3013         lfsck->li_next = next;
3014         lfsck->li_bottom = key;
3015         lfsck->li_obd = obd;
3016
3017         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3018         if (rc != 0)
3019                 GOTO(out, rc);
3020
3021         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3022         if (rc != 0)
3023                 GOTO(out, rc);
3024
3025         fid->f_seq = FID_SEQ_LOCAL_NAME;
3026         fid->f_oid = 1;
3027         fid->f_ver = 0;
3028         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3029         if (rc != 0)
3030                 GOTO(out, rc);
3031
3032         rc = dt_root_get(env, key, fid);
3033         if (rc != 0)
3034                 GOTO(out, rc);
3035
3036         root = dt_locate(env, key, fid);
3037         if (IS_ERR(root))
3038                 GOTO(out, rc = PTR_ERR(root));
3039
3040         if (unlikely(!dt_try_as_dir(env, root)))
3041                 GOTO(out, rc = -ENOTDIR);
3042
3043         lfsck->li_local_root_fid = *fid;
3044         if (master) {
3045                 lfsck->li_master = 1;
3046                 if (lfsck_dev_idx(key) == 0) {
3047                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3048                         const struct lu_name *cname;
3049
3050                         rc = dt_lookup(env, root,
3051                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3052                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3053                         if (rc != 0)
3054                                 GOTO(out, rc);
3055
3056                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3057                         if (IS_ERR(obj))
3058                                 GOTO(out, rc = PTR_ERR(obj));
3059
3060                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3061                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3062                         if (rc != 0)
3063                                 GOTO(out, rc);
3064
3065                         lu_object_put(env, &obj->do_lu);
3066                         obj = dt_locate(env, key, fid);
3067                         if (IS_ERR(obj))
3068                                 GOTO(out, rc = PTR_ERR(obj));
3069
3070                         cname = lfsck_name_get_const(env, dotlustre,
3071                                                      strlen(dotlustre));
3072                         rc = lfsck_verify_linkea(env, key, obj, cname,
3073                                                  &lfsck->li_global_root_fid);
3074                         if (rc != 0)
3075                                 GOTO(out, rc);
3076
3077                         *pfid = *fid;
3078                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3079                                        (const struct dt_key *)lostfound,
3080                                        BYPASS_CAPA);
3081                         if (rc != 0)
3082                                 GOTO(out, rc);
3083
3084                         lu_object_put(env, &obj->do_lu);
3085                         obj = dt_locate(env, key, fid);
3086                         if (IS_ERR(obj))
3087                                 GOTO(out, rc = PTR_ERR(obj));
3088
3089                         cname = lfsck_name_get_const(env, lostfound,
3090                                                      strlen(lostfound));
3091                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3092                         if (rc != 0)
3093                                 GOTO(out, rc);
3094
3095                         lu_object_put(env, &obj->do_lu);
3096                         obj = NULL;
3097                 }
3098         }
3099
3100         fid->f_seq = FID_SEQ_LOCAL_FILE;
3101         fid->f_oid = OTABLE_IT_OID;
3102         fid->f_ver = 0;
3103         obj = dt_locate(env, key, fid);
3104         if (IS_ERR(obj))
3105                 GOTO(out, rc = PTR_ERR(obj));
3106
3107         lu_object_get(&obj->do_lu);
3108         lfsck->li_obj_oit = obj;
3109         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3110         if (rc != 0)
3111                 GOTO(out, rc);
3112
3113         rc = lfsck_bookmark_setup(env, lfsck);
3114         if (rc != 0)
3115                 GOTO(out, rc);
3116
3117         if (master) {
3118                 rc = lfsck_fid_init(lfsck);
3119                 if (rc < 0)
3120                         GOTO(out, rc);
3121
3122                 rc = lfsck_namespace_setup(env, lfsck);
3123                 if (rc < 0)
3124                         GOTO(out, rc);
3125         }
3126
3127         rc = lfsck_layout_setup(env, lfsck);
3128         if (rc < 0)
3129                 GOTO(out, rc);
3130
3131         /* XXX: more LFSCK components initialization to be added here. */
3132
3133         rc = lfsck_instance_add(lfsck);
3134         if (rc == 0)
3135                 rc = lfsck_add_target_from_orphan(env, lfsck);
3136 out:
3137         if (obj != NULL && !IS_ERR(obj))
3138                 lu_object_put(env, &obj->do_lu);
3139         if (root != NULL && !IS_ERR(root))
3140                 lu_object_put(env, &root->do_lu);
3141         if (rc != 0)
3142                 lfsck_instance_cleanup(env, lfsck);
3143         return rc;
3144 }
3145 EXPORT_SYMBOL(lfsck_register);
3146
3147 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3148 {
3149         struct lfsck_instance *lfsck;
3150
3151         lfsck = lfsck_instance_find(key, false, true);
3152         if (lfsck != NULL)
3153                 lfsck_instance_put(env, lfsck);
3154 }
3155 EXPORT_SYMBOL(lfsck_degister);
3156
3157 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3158                      struct dt_device *tgt, struct obd_export *exp,
3159                      __u32 index, bool for_ost)
3160 {
3161         struct lfsck_instance   *lfsck;
3162         struct lfsck_tgt_desc   *ltd;
3163         int                      rc;
3164         ENTRY;
3165
3166         OBD_ALLOC_PTR(ltd);
3167         if (ltd == NULL)
3168                 RETURN(-ENOMEM);
3169
3170         ltd->ltd_tgt = tgt;
3171         ltd->ltd_key = key;
3172         ltd->ltd_exp = exp;
3173         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3174         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3175         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3176         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3177         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3178         atomic_set(&ltd->ltd_ref, 1);
3179         ltd->ltd_index = index;
3180
3181         spin_lock(&lfsck_instance_lock);
3182         lfsck = __lfsck_instance_find(key, true, false);
3183         if (lfsck == NULL) {
3184                 if (for_ost)
3185                         list_add_tail(&ltd->ltd_orphan_list,
3186                                       &lfsck_ost_orphan_list);
3187                 else
3188                         list_add_tail(&ltd->ltd_orphan_list,
3189                                       &lfsck_mdt_orphan_list);
3190                 spin_unlock(&lfsck_instance_lock);
3191
3192                 RETURN(0);
3193         }
3194         spin_unlock(&lfsck_instance_lock);
3195
3196         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3197         if (rc != 0)
3198                 lfsck_tgt_put(ltd);
3199
3200         lfsck_instance_put(env, lfsck);
3201
3202         RETURN(rc);
3203 }
3204 EXPORT_SYMBOL(lfsck_add_target);
3205
3206 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3207                       struct dt_device *tgt, __u32 index, bool for_ost)
3208 {
3209         struct lfsck_instance   *lfsck;
3210         struct lfsck_tgt_descs  *ltds;
3211         struct lfsck_tgt_desc   *ltd;
3212         struct list_head        *head;
3213
3214         if (for_ost)
3215                 head = &lfsck_ost_orphan_list;
3216         else
3217                 head = &lfsck_mdt_orphan_list;
3218
3219         spin_lock(&lfsck_instance_lock);
3220         list_for_each_entry(ltd, head, ltd_orphan_list) {
3221                 if (ltd->ltd_tgt == tgt) {
3222                         list_del_init(&ltd->ltd_orphan_list);
3223                         spin_unlock(&lfsck_instance_lock);
3224                         lfsck_tgt_put(ltd);
3225
3226                         return;
3227                 }
3228         }
3229
3230         ltd = NULL;
3231         lfsck = __lfsck_instance_find(key, true, false);
3232         spin_unlock(&lfsck_instance_lock);
3233         if (unlikely(lfsck == NULL))
3234                 return;
3235
3236         if (for_ost)
3237                 ltds = &lfsck->li_ost_descs;
3238         else
3239                 ltds = &lfsck->li_mdt_descs;
3240
3241         down_write(&ltds->ltd_rw_sem);
3242         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3243
3244         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3245                 goto unlock;
3246
3247         ltd = LTD_TGT(ltds, index);
3248         if (unlikely(ltd == NULL))
3249                 goto unlock;
3250
3251         LASSERT(ltds->ltd_tgtnr > 0);
3252
3253         ltds->ltd_tgtnr--;
3254         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3255         LTD_TGT(ltds, index) = NULL;
3256
3257 unlock:
3258         if (ltd == NULL) {
3259                 if (for_ost)
3260                         head = &lfsck->li_ost_descs.ltd_orphan;
3261                 else
3262                         head = &lfsck->li_mdt_descs.ltd_orphan;
3263
3264                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3265                         if (ltd->ltd_tgt == tgt) {
3266                                 list_del_init(&ltd->ltd_orphan_list);
3267                                 break;
3268                         }
3269                 }
3270         }
3271
3272         up_write(&ltds->ltd_rw_sem);
3273         if (ltd != NULL) {
3274                 spin_lock(&ltds->ltd_lock);
3275                 ltd->ltd_dead = 1;
3276                 spin_unlock(&ltds->ltd_lock);
3277                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3278                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3279                 lfsck_tgt_put(ltd);
3280         }
3281
3282         lfsck_instance_put(env, lfsck);
3283 }
3284 EXPORT_SYMBOL(lfsck_del_target);
3285
3286 static int __init lfsck_init(void)
3287 {
3288         int rc;
3289
3290         INIT_LIST_HEAD(&lfsck_instance_list);
3291         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3292         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3293         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3294         rc = lu_context_key_register(&lfsck_thread_key);
3295         if (rc == 0) {
3296                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3297                 tgt_register_lfsck_query(lfsck_query);
3298         }
3299
3300         return rc;
3301 }
3302
3303 static void __exit lfsck_exit(void)
3304 {
3305         struct lfsck_tgt_desc *ltd;
3306         struct lfsck_tgt_desc *next;
3307
3308         LASSERT(list_empty(&lfsck_instance_list));
3309
3310         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3311                                  ltd_orphan_list) {
3312                 list_del_init(&ltd->ltd_orphan_list);
3313                 lfsck_tgt_put(ltd);
3314         }
3315
3316         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3317                                  ltd_orphan_list) {
3318                 list_del_init(&ltd->ltd_orphan_list);
3319                 lfsck_tgt_put(ltd);
3320         }
3321
3322         lu_context_key_degister(&lfsck_thread_key);
3323 }
3324
3325 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3326 MODULE_DESCRIPTION("LFSCK");
3327 MODULE_LICENSE("GPL");
3328
3329 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);