Whamcloud - gitweb
LU-4788 lfsck: verify .lustre/lost+found at the LFSCK start
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static struct list_head lfsck_instance_list;
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         "create_ostobj",
98         NULL
99 };
100
101 enum lfsck_verify_lpf_types {
102         LVLT_BY_BOOKMARK        = 0,
103         LVLT_BY_NAMEENTRY       = 1,
104 };
105
106 const char *lfsck_status2names(enum lfsck_status status)
107 {
108         if (unlikely(status < 0 || status >= LS_MAX))
109                 return "unknown";
110
111         return lfsck_status_names[status];
112 }
113
114 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
115 {
116         spin_lock_init(&ltds->ltd_lock);
117         init_rwsem(&ltds->ltd_rw_sem);
118         INIT_LIST_HEAD(&ltds->ltd_orphan);
119         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
120         if (ltds->ltd_tgts_bitmap == NULL)
121                 return -ENOMEM;
122
123         return 0;
124 }
125
126 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
127 {
128         struct lfsck_tgt_desc   *ltd;
129         struct lfsck_tgt_desc   *next;
130         int                      idx;
131
132         down_write(&ltds->ltd_rw_sem);
133
134         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
135                                  ltd_orphan_list) {
136                 list_del_init(&ltd->ltd_orphan_list);
137                 lfsck_tgt_put(ltd);
138         }
139
140         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
141                 up_write(&ltds->ltd_rw_sem);
142
143                 return;
144         }
145
146         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
147                 ltd = LTD_TGT(ltds, idx);
148                 if (likely(ltd != NULL)) {
149                         LASSERT(list_empty(&ltd->ltd_layout_list));
150                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
151
152                         ltds->ltd_tgtnr--;
153                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
154                         LTD_TGT(ltds, idx) = NULL;
155                         lfsck_tgt_put(ltd);
156                 }
157         }
158
159         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
160                  ltds->ltd_tgtnr);
161
162         for (idx = 0; idx < TGT_PTRS; idx++) {
163                 if (ltds->ltd_tgts_idx[idx] != NULL) {
164                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
165                         ltds->ltd_tgts_idx[idx] = NULL;
166                 }
167         }
168
169         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
170         ltds->ltd_tgts_bitmap = NULL;
171         up_write(&ltds->ltd_rw_sem);
172 }
173
174 static int __lfsck_add_target(const struct lu_env *env,
175                               struct lfsck_instance *lfsck,
176                               struct lfsck_tgt_desc *ltd,
177                               bool for_ost, bool locked)
178 {
179         struct lfsck_tgt_descs *ltds;
180         __u32                   index = ltd->ltd_index;
181         int                     rc    = 0;
182         ENTRY;
183
184         if (for_ost)
185                 ltds = &lfsck->li_ost_descs;
186         else
187                 ltds = &lfsck->li_mdt_descs;
188
189         if (!locked)
190                 down_write(&ltds->ltd_rw_sem);
191
192         LASSERT(ltds->ltd_tgts_bitmap != NULL);
193
194         if (index >= ltds->ltd_tgts_bitmap->size) {
195                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
196                                     (__u32)BITS_PER_LONG);
197                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
198                 cfs_bitmap_t *new_bitmap;
199
200                 while (newsize < index + 1)
201                         newsize <<= 1;
202
203                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
204                 if (new_bitmap == NULL)
205                         GOTO(unlock, rc = -ENOMEM);
206
207                 if (ltds->ltd_tgtnr > 0)
208                         cfs_bitmap_copy(new_bitmap, old_bitmap);
209                 ltds->ltd_tgts_bitmap = new_bitmap;
210                 CFS_FREE_BITMAP(old_bitmap);
211         }
212
213         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
214                 CERROR("%s: the device %s (%u) is registered already\n",
215                        lfsck_lfsck2name(lfsck),
216                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
217                 GOTO(unlock, rc = -EEXIST);
218         }
219
220         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
221                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
222                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
223                         GOTO(unlock, rc = -ENOMEM);
224         }
225
226         LTD_TGT(ltds, index) = ltd;
227         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
228         ltds->ltd_tgtnr++;
229
230         GOTO(unlock, rc = 0);
231
232 unlock:
233         if (!locked)
234                 up_write(&ltds->ltd_rw_sem);
235
236         return rc;
237 }
238
239 static int lfsck_add_target_from_orphan(const struct lu_env *env,
240                                         struct lfsck_instance *lfsck)
241 {
242         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
243         struct lfsck_tgt_desc   *ltd;
244         struct lfsck_tgt_desc   *next;
245         struct list_head        *head    = &lfsck_ost_orphan_list;
246         int                      rc;
247         bool                     for_ost = true;
248
249 again:
250         spin_lock(&lfsck_instance_lock);
251         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
252                 if (ltd->ltd_key == lfsck->li_bottom)
253                         list_move_tail(&ltd->ltd_orphan_list,
254                                        &ltds->ltd_orphan);
255         }
256         spin_unlock(&lfsck_instance_lock);
257
258         down_write(&ltds->ltd_rw_sem);
259         while (!list_empty(&ltds->ltd_orphan)) {
260                 ltd = list_entry(ltds->ltd_orphan.next,
261                                  struct lfsck_tgt_desc,
262                                  ltd_orphan_list);
263                 list_del_init(&ltd->ltd_orphan_list);
264                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
265                 /* Do not hold the semaphore for too long time. */
266                 up_write(&ltds->ltd_rw_sem);
267                 if (rc != 0)
268                         return rc;
269
270                 down_write(&ltds->ltd_rw_sem);
271         }
272         up_write(&ltds->ltd_rw_sem);
273
274         if (for_ost) {
275                 ltds = &lfsck->li_mdt_descs;
276                 head = &lfsck_mdt_orphan_list;
277                 for_ost = false;
278                 goto again;
279         }
280
281         return 0;
282 }
283
284 static inline struct lfsck_component *
285 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
286                        struct list_head *list)
287 {
288         struct lfsck_component *com;
289
290         list_for_each_entry(com, list, lc_link) {
291                 if (com->lc_type == type)
292                         return com;
293         }
294         return NULL;
295 }
296
297 struct lfsck_component *
298 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
299 {
300         struct lfsck_component *com;
301
302         spin_lock(&lfsck->li_lock);
303         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
304         if (com != NULL)
305                 goto unlock;
306
307         com = __lfsck_component_find(lfsck, type,
308                                      &lfsck->li_list_double_scan);
309         if (com != NULL)
310                 goto unlock;
311
312         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
313
314 unlock:
315         if (com != NULL)
316                 lfsck_component_get(com);
317         spin_unlock(&lfsck->li_lock);
318         return com;
319 }
320
321 void lfsck_component_cleanup(const struct lu_env *env,
322                              struct lfsck_component *com)
323 {
324         if (!list_empty(&com->lc_link))
325                 list_del_init(&com->lc_link);
326         if (!list_empty(&com->lc_link_dir))
327                 list_del_init(&com->lc_link_dir);
328
329         lfsck_component_put(env, com);
330 }
331
332 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
333                     struct lu_fid *fid, bool locked)
334 {
335         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
336         int                      rc = 0;
337         ENTRY;
338
339         if (!locked)
340                 mutex_lock(&lfsck->li_mutex);
341
342         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
343         if (rc >= 0) {
344                 bk->lb_last_fid = *fid;
345                 /* We do not care about whether the subsequent sub-operations
346                  * failed or not. The worst case is that one FID is lost that
347                  * is not a big issue for the LFSCK since it is relative rare
348                  * for LFSCK create. */
349                 rc = lfsck_bookmark_store(env, lfsck);
350         }
351
352         if (!locked)
353                 mutex_unlock(&lfsck->li_mutex);
354
355         RETURN(rc);
356 }
357
358 /**
359  * Request the specified ibits lock for the given object.
360  *
361  * Before the LFSCK modifying on the namespace visible object,
362  * it needs to acquire related ibits ldlm lock.
363  *
364  * \param[in] env       pointer to the thread context
365  * \param[in] lfsck     pointer to the lfsck instance
366  * \param[in] obj       pointer to the dt_object to be locked
367  * \param[out] lh       pointer to the lock handle
368  * \param[in] ibits     the bits for the ldlm lock to be acquired
369  * \param[in] mode      the mode for the ldlm lock to be acquired
370  *
371  * \retval              0 for success
372  * \retval              negative error number on failure
373  */
374 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
375                      struct dt_object *obj, struct lustre_handle *lh,
376                      __u64 bits, ldlm_mode_t mode)
377 {
378         struct lfsck_thread_info        *info   = lfsck_env_info(env);
379         ldlm_policy_data_t              *policy = &info->lti_policy;
380         struct ldlm_res_id              *resid  = &info->lti_resid;
381         __u64                            flags  = LDLM_FL_ATOMIC_CB;
382         int                              rc;
383
384         LASSERT(lfsck->li_namespace != NULL);
385
386         memset(policy, 0, sizeof(*policy));
387         policy->l_inodebits.bits = bits;
388         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
389         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
390                                     policy, mode, &flags, ldlm_blocking_ast,
391                                     ldlm_completion_ast, NULL, NULL, 0,
392                                     LVB_T_NONE, NULL, lh);
393         if (rc == ELDLM_OK) {
394                 rc = 0;
395         } else {
396                 memset(lh, 0, sizeof(*lh));
397                 rc = -EIO;
398         }
399
400         return rc;
401 }
402
403 /**
404  * Release the the specified ibits lock.
405  *
406  * If the lock has been acquired before, release it
407  * and cleanup the handle. Otherwise, do nothing.
408  *
409  * \param[in] lh        pointer to the lock handle
410  * \param[in] mode      the mode for the ldlm lock to be released
411  */
412 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
413 {
414         if (lustre_handle_is_used(lh)) {
415                 ldlm_lock_decref(lh, mode);
416                 memset(lh, 0, sizeof(*lh));
417         }
418 }
419
420 static const char dot[] = ".";
421 static const char dotdot[] = "..";
422 static const char dotlustre[] = ".lustre";
423 static const char lostfound[] = "lost+found";
424
425 static int lfsck_create_lpf_local(const struct lu_env *env,
426                                   struct lfsck_instance *lfsck,
427                                   struct dt_object *parent,
428                                   struct dt_object *child,
429                                   struct lu_attr *la,
430                                   struct dt_object_format *dof,
431                                   const char *name)
432 {
433         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
434         struct dt_device        *dev    = lfsck->li_bottom;
435         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
436         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
437         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
438         struct thandle          *th     = NULL;
439         struct linkea_data       ldata  = { 0 };
440         struct lu_buf            linkea_buf;
441         const struct lu_name    *cname;
442         loff_t                   pos    = 0;
443         int                      len    = sizeof(struct lfsck_bookmark);
444         int                      rc;
445         ENTRY;
446
447         rc = linkea_data_new(&ldata,
448                              &lfsck_env_info(env)->lti_linkea_buf);
449         if (rc != 0)
450                 RETURN(rc);
451
452         cname = lfsck_name_get_const(env, name, strlen(name));
453         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
454         if (rc != 0)
455                 RETURN(rc);
456
457         th = dt_trans_create(env, dev);
458         if (IS_ERR(th))
459                 RETURN(PTR_ERR(th));
460
461         /* 1a. create child */
462         rc = dt_declare_create(env, child, la, NULL, dof, th);
463         if (rc != 0)
464                 GOTO(stop, rc);
465
466         /* 2a. increase child nlink */
467         rc = dt_declare_ref_add(env, child, th);
468         if (rc != 0)
469                 GOTO(stop, rc);
470
471         /* 3a. insert linkEA for child */
472         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
473                        ldata.ld_leh->leh_len);
474         rc = dt_declare_xattr_set(env, child, &linkea_buf,
475                                   XATTR_NAME_LINK, 0, th);
476         if (rc != 0)
477                 GOTO(stop, rc);
478
479         /* 4a. insert name into parent dir */
480         rec->rec_type = S_IFDIR;
481         rec->rec_fid = cfid;
482         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
483                                (const struct dt_key *)name, th);
484         if (rc != 0)
485                 GOTO(stop, rc);
486
487         /* 5a. increase parent nlink */
488         rc = dt_declare_ref_add(env, parent, th);
489         if (rc != 0)
490                 GOTO(stop, rc);
491
492         /* 6a. update bookmark */
493         rc = dt_declare_record_write(env, bk_obj,
494                                      lfsck_buf_get(env, bk, len), 0, th);
495         if (rc != 0)
496                 GOTO(stop, rc);
497
498         rc = dt_trans_start_local(env, dev, th);
499         if (rc != 0)
500                 GOTO(stop, rc);
501
502         dt_write_lock(env, child, 0);
503         /* 1b.1. create child */
504         rc = dt_create(env, child, la, NULL, dof, th);
505         if (rc != 0)
506                 GOTO(unlock, rc);
507
508         if (unlikely(!dt_try_as_dir(env, child)))
509                 GOTO(unlock, rc = -ENOTDIR);
510
511         /* 1b.2. insert dot into child dir */
512         rec->rec_fid = cfid;
513         rc = dt_insert(env, child, (const struct dt_rec *)rec,
514                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
515         if (rc != 0)
516                 GOTO(unlock, rc);
517
518         /* 1b.3. insert dotdot into child dir */
519         rec->rec_fid = &LU_LPF_FID;
520         rc = dt_insert(env, child, (const struct dt_rec *)rec,
521                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
522         if (rc != 0)
523                 GOTO(unlock, rc);
524
525         /* 2b. increase child nlink */
526         rc = dt_ref_add(env, child, th);
527         if (rc != 0)
528                 GOTO(unlock, rc);
529
530         /* 3b. insert linkEA for child. */
531         rc = dt_xattr_set(env, child, &linkea_buf,
532                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
533         dt_write_unlock(env, child);
534         if (rc != 0)
535                 GOTO(stop, rc);
536
537         /* 4b. insert name into parent dir */
538         rec->rec_fid = cfid;
539         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
540                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
541         if (rc != 0)
542                 GOTO(stop, rc);
543
544         dt_write_lock(env, parent, 0);
545         /* 5b. increase parent nlink */
546         rc = dt_ref_add(env, parent, th);
547         dt_write_unlock(env, parent);
548         if (rc != 0)
549                 GOTO(stop, rc);
550
551         bk->lb_lpf_fid = *cfid;
552         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
553
554         /* 6b. update bookmark */
555         rc = dt_record_write(env, bk_obj,
556                              lfsck_buf_get(env, bk, len), &pos, th);
557
558         GOTO(stop, rc);
559
560 unlock:
561         dt_write_unlock(env, child);
562
563 stop:
564         dt_trans_stop(env, dev, th);
565
566         return rc;
567 }
568
569 static int lfsck_create_lpf_remote(const struct lu_env *env,
570                                    struct lfsck_instance *lfsck,
571                                    struct dt_object *parent,
572                                    struct dt_object *child,
573                                    struct lu_attr *la,
574                                    struct dt_object_format *dof,
575                                    const char *name)
576 {
577         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
578         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
579         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
580         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
581         struct thandle          *th     = NULL;
582         struct linkea_data       ldata  = { 0 };
583         struct lu_buf            linkea_buf;
584         const struct lu_name    *cname;
585         struct dt_device        *dev;
586         loff_t                   pos    = 0;
587         int                      len    = sizeof(struct lfsck_bookmark);
588         int                      rc;
589         ENTRY;
590
591         rc = linkea_data_new(&ldata,
592                              &lfsck_env_info(env)->lti_linkea_buf);
593         if (rc != 0)
594                 RETURN(rc);
595
596         cname = lfsck_name_get_const(env, name, strlen(name));
597         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
598         if (rc != 0)
599                 RETURN(rc);
600
601         /* Create .lustre/lost+found/MDTxxxx. */
602
603         /* XXX: Currently, cross-MDT create operation needs to create the child
604          *      object firstly, then insert name into the parent directory. For
605          *      this case, the child object resides on current MDT (local), but
606          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
607          *      easy to contain all the sub-modifications orderly within single
608          *      transaction.
609          *
610          *      To avoid more inconsistency, we split the create operation into
611          *      two transactions:
612          *
613          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
614          *         locally.
615          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
616          *         remotely.
617          *
618          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
619          *      repair such inconsistency when LFSCK run next time. */
620
621         /* Transaction I: locally */
622
623         dev = lfsck->li_bottom;
624         th = dt_trans_create(env, dev);
625         if (IS_ERR(th))
626                 RETURN(PTR_ERR(th));
627
628         /* 1a. create child */
629         rc = dt_declare_create(env, child, la, NULL, dof, th);
630         if (rc != 0)
631                 GOTO(stop, rc);
632
633         /* 2a. increase child nlink */
634         rc = dt_declare_ref_add(env, child, th);
635         if (rc != 0)
636                 GOTO(stop, rc);
637
638         /* 3a. insert linkEA for child */
639         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
640                        ldata.ld_leh->leh_len);
641         rc = dt_declare_xattr_set(env, child, &linkea_buf,
642                                   XATTR_NAME_LINK, 0, th);
643         if (rc != 0)
644                 GOTO(stop, rc);
645
646         /* 4a. update bookmark */
647         rc = dt_declare_record_write(env, bk_obj,
648                                      lfsck_buf_get(env, bk, len), 0, th);
649         if (rc != 0)
650                 GOTO(stop, rc);
651
652         rc = dt_trans_start_local(env, dev, th);
653         if (rc != 0)
654                 GOTO(stop, rc);
655
656         dt_write_lock(env, child, 0);
657         /* 1b.1. create child */
658         rc = dt_create(env, child, la, NULL, dof, th);
659         if (rc != 0)
660                 GOTO(unlock, rc);
661
662         if (unlikely(!dt_try_as_dir(env, child)))
663                 GOTO(unlock, rc = -ENOTDIR);
664
665         /* 1b.2. insert dot into child dir */
666         rec->rec_type = S_IFDIR;
667         rec->rec_fid = cfid;
668         rc = dt_insert(env, child, (const struct dt_rec *)rec,
669                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
670         if (rc != 0)
671                 GOTO(unlock, rc);
672
673         /* 1b.3. insert dotdot into child dir */
674         rec->rec_fid = &LU_LPF_FID;
675         rc = dt_insert(env, child, (const struct dt_rec *)rec,
676                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
677         if (rc != 0)
678                 GOTO(unlock, rc);
679
680         /* 2b. increase child nlink */
681         rc = dt_ref_add(env, child, th);
682         if (rc != 0)
683                 GOTO(unlock, rc);
684
685         /* 3b. insert linkEA for child */
686         rc = dt_xattr_set(env, child, &linkea_buf,
687                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
688         if (rc != 0)
689                 GOTO(unlock, rc);
690
691         bk->lb_lpf_fid = *cfid;
692         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
693
694         /* 4b. update bookmark */
695         rc = dt_record_write(env, bk_obj,
696                              lfsck_buf_get(env, bk, len), &pos, th);
697
698         dt_write_unlock(env, child);
699         dt_trans_stop(env, dev, th);
700         if (rc != 0)
701                 RETURN(rc);
702
703         /* Transaction II: remotely */
704
705         dev = lfsck->li_next;
706         th = dt_trans_create(env, dev);
707         if (IS_ERR(th))
708                 RETURN(PTR_ERR(th));
709
710         /* 5a. insert name into parent dir */
711         rec->rec_fid = cfid;
712         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
713                                (const struct dt_key *)name, th);
714         if (rc != 0)
715                 GOTO(stop, rc);
716
717         /* 6a. increase parent nlink */
718         rc = dt_declare_ref_add(env, parent, th);
719         if (rc != 0)
720                 GOTO(stop, rc);
721
722         rc = dt_trans_start(env, dev, th);
723         if (rc != 0)
724                 GOTO(stop, rc);
725
726         /* 5b. insert name into parent dir */
727         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
728                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
729         if (rc != 0)
730                 GOTO(stop, rc);
731
732         dt_write_lock(env, parent, 0);
733         /* 6b. increase parent nlink */
734         rc = dt_ref_add(env, parent, th);
735         dt_write_unlock(env, parent);
736
737         GOTO(stop, rc);
738
739 unlock:
740         dt_write_unlock(env, child);
741 stop:
742         dt_trans_stop(env, dev, th);
743
744         if (rc != 0 && dev == lfsck->li_next)
745                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
746                        "for orphans, but failed to insert the name %s "
747                        "to the .lustre/lost+found/. Such inconsistency "
748                        "will be repaired when LFSCK run next time: rc = %d\n",
749                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
750
751         return rc;
752 }
753
754 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
755  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
756  * only when it is required, such as orphan OST-objects repairing. */
757 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
758 {
759         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
760         struct lfsck_thread_info *info  = lfsck_env_info(env);
761         struct lu_fid            *cfid  = &info->lti_fid2;
762         struct lu_attr           *la    = &info->lti_la;
763         struct dt_object_format  *dof   = &info->lti_dof;
764         struct dt_object         *parent = NULL;
765         struct dt_object         *child = NULL;
766         struct lustre_handle      lh    = { 0 };
767         char                      name[8];
768         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
769         int                       rc    = 0;
770         ENTRY;
771
772         LASSERT(lfsck->li_master);
773
774         sprintf(name, "MDT%04x", node);
775         if (node == 0) {
776                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
777                                                   &LU_LPF_FID);
778         } else {
779                 struct lfsck_tgt_desc *ltd;
780
781                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
782                 if (unlikely(ltd == NULL))
783                         RETURN(-ENXIO);
784
785                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
786                                                   &LU_LPF_FID);
787                 lfsck_tgt_put(ltd);
788         }
789         if (IS_ERR(parent))
790                 RETURN(PTR_ERR(parent));
791
792         if (lfsck->li_lpf_obj != NULL)
793                 GOTO(out, rc = 0);
794
795         if (unlikely(!dt_try_as_dir(env, parent)))
796                 GOTO(out, rc = -ENOTDIR);
797
798         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
799                               MDS_INODELOCK_UPDATE, LCK_EX);
800         if (rc != 0)
801                 GOTO(out, rc);
802
803         mutex_lock(&lfsck->li_mutex);
804         if (lfsck->li_lpf_obj != NULL)
805                 GOTO(unlock, rc = 0);
806
807         if (fid_is_zero(&bk->lb_lpf_fid)) {
808                 /* There is corner case that: in former LFSCK scanning we have
809                  * created the .lustre/lost+found/MDTxxxx but failed to update
810                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
811                  * it from MDT0 firstly. */
812                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
813                                (const struct dt_key *)name, BYPASS_CAPA);
814                 if (rc != 0 && rc != -ENOENT)
815                         GOTO(unlock, rc);
816
817                 if (rc == 0) {
818                         bk->lb_lpf_fid = *cfid;
819                         rc = lfsck_bookmark_store(env, lfsck);
820                 } else {
821                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
822                 }
823                 if (rc != 0)
824                         GOTO(unlock, rc);
825         } else {
826                 *cfid = bk->lb_lpf_fid;
827         }
828
829         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
830         if (IS_ERR(child))
831                 GOTO(unlock, rc = PTR_ERR(child));
832
833         if (dt_object_exists(child) != 0) {
834                 if (unlikely(!dt_try_as_dir(env, child)))
835                         rc = -ENOTDIR;
836                 else
837                         lfsck->li_lpf_obj = child;
838
839                 GOTO(unlock, rc);
840         }
841
842         memset(la, 0, sizeof(*la));
843         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
844         la->la_mode = S_IFDIR | S_IRWXU;
845         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
846                        LA_UID | LA_GID;
847         memset(dof, 0, sizeof(*dof));
848         dof->dof_type = dt_mode_to_dft(S_IFDIR);
849
850         if (node == 0)
851                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
852                                             dof, name);
853         else
854                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
855                                              dof, name);
856         if (rc == 0)
857                 lfsck->li_lpf_obj = child;
858
859         GOTO(unlock, rc);
860
861 unlock:
862         mutex_unlock(&lfsck->li_mutex);
863         lfsck_ibits_unlock(&lh, LCK_EX);
864         if (rc != 0 && child != NULL && !IS_ERR(child))
865                 lu_object_put(env, &child->do_lu);
866 out:
867         if (parent != NULL && !IS_ERR(parent))
868                 lu_object_put(env, &parent->do_lu);
869
870         return rc;
871 }
872
873 /**
874  * Scan .lustre/lost+found for bad name entries and remove them.
875  *
876  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
877  * index in the system. Any other formatted name is invalid and should be
878  * removed.
879  *
880  * \param[in] env       pointer to the thread context
881  * \param[in] lfsck     pointer to the lfsck instance
882  * \param[in] parent    pointer to the lost+found object
883  *
884  * \retval              0 for success
885  * \retval              negative error number on failure
886  */
887 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
888                                       struct lfsck_instance *lfsck,
889                                       struct dt_object *parent)
890 {
891         struct lu_dirent        *ent    =
892                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
893         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
894         struct dt_it            *it;
895         int                      rc;
896         ENTRY;
897
898         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
899         if (IS_ERR(it))
900                 RETURN(PTR_ERR(it));
901
902         rc = iops->load(env, it, 0);
903         if (rc == 0)
904                 rc = iops->next(env, it);
905         else if (rc > 0)
906                 rc = 0;
907
908         while (rc == 0) {
909                 int off = 3;
910
911                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
912                 if (rc != 0)
913                         break;
914
915                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
916                 if (ent->lde_name[0] == '.') {
917                         if (ent->lde_namelen == 1)
918                                 goto next;
919
920                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
921                                 goto next;
922                 }
923
924                 /* name length must be strlen("MDTxxxx") */
925                 if (ent->lde_namelen != 7)
926                         goto remove;
927
928                 if (memcmp(ent->lde_name, "MDT", off) != 0)
929                         goto remove;
930
931                 while (off < 7 && isxdigit(ent->lde_name[off]))
932                         off++;
933
934                 if (off != 7) {
935
936 remove:
937                         rc = lfsck_remove_name_entry(env, lfsck, parent,
938                                                      ent->lde_name, S_IFDIR);
939                         if (rc != 0)
940                                 break;
941                 }
942
943 next:
944                 rc = iops->next(env, it);
945         }
946
947         iops->put(env, it);
948         iops->fini(env, it);
949
950         RETURN(rc > 0 ? 0 : rc);
951 }
952
953 static int lfsck_update_lpf_entry(const struct lu_env *env,
954                                   struct lfsck_instance *lfsck,
955                                   struct dt_object *parent,
956                                   struct dt_object *child,
957                                   const char *name,
958                                   enum lfsck_verify_lpf_types type)
959 {
960         int rc;
961
962         if (type == LVLT_BY_BOOKMARK) {
963                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
964                                              lfsck_dto2fid(child), S_IFDIR);
965         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
966                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
967                 rc = lfsck_bookmark_store(env, lfsck);
968
969                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
970                        " in the bookmark file: rc = %d\n",
971                        lfsck_lfsck2name(lfsck),
972                        PFID(lfsck_dto2fid(child)), rc);
973         }
974
975         return rc;
976 }
977
978 /**
979  * Check whether the @child back references the @parent.
980  *
981  * Two cases:
982  * 1) The child's FID is stored in the bookmark file. If the child back
983  *    references the parent (LU_LPF_FID object) via its ".." entry, then
984  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
985  *    the child back references another parent2, then:
986  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
987  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
988  *      references the child. So keep them there. As the LFSCK processing,
989  *      the parent3 may be found, then when the LFSCK run next time, the
990  *      inconsistency can be repaired.
991  *
992  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
993  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
994  *    via its ".." entry, then update the bookmark file, otherwise, if the child
995  *    back references another parent2, then:
996  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
997  *      from .lustre/lost+found/;
998  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
999  *      sub-directory name entry and update the child;
1000  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1001  *      or not, then keep them there.
1002  *
1003  * \param[in] env       pointer to the thread context
1004  * \param[in] lfsck     pointer to the lfsck instance
1005  * \param[in] parent    pointer to the lost+found object
1006  * \param[in] child     pointer to the lost+found sub-directory object
1007  * \param[in] name      the name for lost+found sub-directory object
1008  * \param[out] fid      pointer to the buffer to hold the FID of the object
1009  *                      (called it as parent2) that is referenced via the
1010  *                      child's dotdot entry; it also can be the FID that
1011  *                      is referenced by the name entry under the parent2.
1012  * \param[in] type      to indicate where the child's FID is stored in
1013  *
1014  * \retval              positive number for uncertain inconsistency
1015  * \retval              0 for success
1016  * \retval              negative error number on failure
1017  */
1018 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1019                                   struct lfsck_instance *lfsck,
1020                                   struct dt_object *parent,
1021                                   struct dt_object *child, const char *name,
1022                                   struct lu_fid *fid,
1023                                   enum lfsck_verify_lpf_types type)
1024 {
1025         struct lfsck_thread_info *info    = lfsck_env_info(env);
1026         char                     *name2   = info->lti_key;
1027         struct lu_fid            *fid2    = &info->lti_fid3;
1028         struct dt_object         *parent2 = NULL;
1029         struct lustre_handle      lh      = { 0 };
1030         int                       rc;
1031         ENTRY;
1032
1033         fid_zero(fid);
1034         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1035                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1036         if (rc != 0)
1037                 GOTO(linkea, rc);
1038
1039         if (!fid_is_sane(fid))
1040                 GOTO(linkea, rc = -EINVAL);
1041
1042         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1043                 const struct lu_name *cname;
1044
1045                 if (lfsck->li_lpf_obj == NULL) {
1046                         lu_object_get(&child->do_lu);
1047                         lfsck->li_lpf_obj = child;
1048                 }
1049
1050                 cname = lfsck_name_get_const(env, name, strlen(name));
1051                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1052                                          &LU_LPF_FID);
1053                 if (rc == 0)
1054                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1055                                                     name, type);
1056
1057                 GOTO(out_done, rc);
1058         }
1059
1060         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1061         if (IS_ERR(parent2))
1062                 GOTO(linkea, parent2);
1063
1064         if (!dt_object_exists(parent2)) {
1065                 lu_object_put(env, &parent2->do_lu);
1066
1067                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1068         }
1069
1070         if (!dt_try_as_dir(env, parent2)) {
1071                 lu_object_put(env, &parent2->do_lu);
1072
1073                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1074         }
1075
1076 linkea:
1077         /* To prevent rename/unlink race */
1078         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1079                               MDS_INODELOCK_UPDATE, LCK_PR);
1080         if (rc != 0)
1081                 GOTO(out_put, rc);
1082
1083         dt_read_lock(env, child, 0);
1084         rc = lfsck_links_get_first(env, child, name2, fid2);
1085         if (rc != 0) {
1086                 dt_read_unlock(env, child);
1087                 lfsck_ibits_unlock(&lh, LCK_PR);
1088
1089                 GOTO(out_put, rc = 1);
1090         }
1091
1092         /* It is almost impossible that the bookmark file (or the name entry)
1093          * and the linkEA hit the same data corruption. Trust the linkEA. */
1094         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1095                 dt_read_unlock(env, child);
1096                 lfsck_ibits_unlock(&lh, LCK_PR);
1097
1098                 *fid = *fid2;
1099                 if (lfsck->li_lpf_obj == NULL) {
1100                         lu_object_get(&child->do_lu);
1101                         lfsck->li_lpf_obj = child;
1102                 }
1103
1104                 /* Update the child's dotdot entry */
1105                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1106                                              &LU_LPF_FID, S_IFDIR);
1107                 if (rc == 0)
1108                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1109                                                     name, type);
1110
1111                 GOTO(out_put, rc);
1112         }
1113
1114         if (parent2 == NULL || IS_ERR(parent2)) {
1115                 dt_read_unlock(env, child);
1116                 lfsck_ibits_unlock(&lh, LCK_PR);
1117
1118                 GOTO(out_done, rc = 1);
1119         }
1120
1121         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1122                        (const struct dt_key *)name2, BYPASS_CAPA);
1123         dt_read_unlock(env, child);
1124         lfsck_ibits_unlock(&lh, LCK_PR);
1125         if (rc != 0 && rc != -ENOENT)
1126                 GOTO(out_put, rc);
1127
1128         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1129                 if (type == LVLT_BY_BOOKMARK)
1130                         GOTO(out_put, rc = 1);
1131
1132                 /* Trust the name entry, update the child's dotdot entry. */
1133                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1134                                              &LU_LPF_FID, S_IFDIR);
1135
1136                 GOTO(out_put, rc);
1137         }
1138
1139         if (type == LVLT_BY_BOOKMARK) {
1140                 /* Invalid FID record in the bookmark file, reset it. */
1141                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1142                 rc = lfsck_bookmark_store(env, lfsck);
1143
1144                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1145                        " in the bookmark file: rc = %d\n",
1146                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1147         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1148                 /* The name entry is wrong, remove it. */
1149                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1150         }
1151
1152         GOTO(out_put, rc);
1153
1154 out_put:
1155         if (parent2 != NULL && !IS_ERR(parent2))
1156                 lu_object_put(env, &parent2->do_lu);
1157
1158 out_done:
1159         return rc;
1160 }
1161
1162 /**
1163  * Verify the /ROOT/.lustre/lost+found/ directory.
1164  *
1165  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1166  * the LFSCK does not exactly know how to handle, such as orphans. So before
1167  * the LFSCK scanning the system, the consistency of such directory needs to
1168  * be verified firstly to allow the users to use it during the LFSCK.
1169  *
1170  * \param[in] env       pointer to the thread context
1171  * \param[in] lfsck     pointer to the lfsck instance
1172  *
1173  * \retval              positive number for uncertain inconsistency
1174  * \retval              0 for success
1175  * \retval              negative error number on failure
1176  */
1177 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1178 {
1179         struct lfsck_thread_info *info   = lfsck_env_info(env);
1180         struct lu_fid            *pfid   = &info->lti_fid;
1181         struct lu_fid            *cfid   = &info->lti_fid2;
1182         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1183         struct dt_object         *parent = NULL;
1184         /* child1's FID is in the bookmark file. */
1185         struct dt_object         *child1 = NULL;
1186         /* child2's FID is in the name entry MDTxxxx. */
1187         struct dt_object         *child2 = NULL;
1188         struct dt_device         *dev    = lfsck->li_bottom;
1189         const struct lu_name     *cname;
1190         char                      name[8];
1191         int                       node   = lfsck_dev_idx(dev);
1192         int                       rc     = 0;
1193         ENTRY;
1194
1195         LASSERT(lfsck->li_master);
1196
1197         if (node == 0) {
1198                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1199         } else {
1200                 struct lfsck_tgt_desc *ltd;
1201
1202                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1203                 if (unlikely(ltd == NULL))
1204                         RETURN(-ENXIO);
1205
1206                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1207                                                   &LU_LPF_FID);
1208                 lfsck_tgt_put(ltd);
1209         }
1210
1211         if (IS_ERR(parent))
1212                 RETURN(PTR_ERR(parent));
1213
1214         LASSERT(dt_object_exists(parent));
1215
1216         if (unlikely(!dt_try_as_dir(env, parent)))
1217                 GOTO(put, rc = -ENOTDIR);
1218
1219         if (node == 0) {
1220                 rc = lfsck_scan_lpf_bad_entries(env, lfsck, parent);
1221                 if (rc != 0)
1222                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1223                                "for bad sub-directories: rc = %d\n",
1224                                lfsck_lfsck2name(lfsck), rc);
1225         }
1226
1227         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1228                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1229                         struct lu_fid tfid = bk->lb_lpf_fid;
1230
1231                         /* Invalid FID record in the bookmark file, reset it. */
1232                         fid_zero(&bk->lb_lpf_fid);
1233                         rc = lfsck_bookmark_store(env, lfsck);
1234
1235                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1236                                " in the bookmark file: rc = %d\n",
1237                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1238
1239                         if (rc != 0)
1240                                 GOTO(put, rc);
1241                 } else {
1242                         child1 = lfsck_object_find_by_dev(env, dev,
1243                                                           &bk->lb_lpf_fid);
1244                         if (IS_ERR(child1))
1245                                 GOTO(put, rc = PTR_ERR(child1));
1246
1247                         if (unlikely(!dt_object_exists(child1) ||
1248                                      dt_object_remote(child1)) ||
1249                                      !S_ISDIR(lfsck_object_type(child1))) {
1250                                 /* Invalid FID record in the bookmark file,
1251                                  * reset it. */
1252                                 fid_zero(&bk->lb_lpf_fid);
1253                                 rc = lfsck_bookmark_store(env, lfsck);
1254
1255                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1256                                        " in the bookmark file: rc = %d\n",
1257                                        lfsck_lfsck2name(lfsck),
1258                                        PFID(lfsck_dto2fid(child1)), rc);
1259
1260                                 if (rc != 0)
1261                                         GOTO(put, rc);
1262
1263                                 lu_object_put(env, &child1->do_lu);
1264                                 child1 = NULL;
1265                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1266                                 GOTO(put, rc = -ENOTDIR);
1267                         }
1268                 }
1269         }
1270
1271         snprintf(name, 8, "MDT%04x", node);
1272         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1273                        (const struct dt_key *)name, BYPASS_CAPA);
1274         if (rc == -ENOENT)
1275                 goto check_child1;
1276
1277         if (rc != 0)
1278                 GOTO(put, rc);
1279
1280         /* Invalid FID in the name entry, remove the name entry. */
1281         if (!fid_is_norm(cfid)) {
1282                 rc = lfsck_remove_name_entry(env, lfsck, parent, name, S_IFDIR);
1283                 if (rc != 0)
1284                         GOTO(put, rc);
1285
1286                 goto check_child1;
1287         }
1288
1289         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1290         if (IS_ERR(child2))
1291                 GOTO(put, rc = PTR_ERR(child2));
1292
1293         if (unlikely(!dt_object_exists(child2) ||
1294                      dt_object_remote(child2)) ||
1295                      !S_ISDIR(lfsck_object_type(child2))) {
1296                 rc = lfsck_remove_name_entry(env, lfsck, parent, name,
1297                                              S_IFDIR);
1298                 if (rc != 0)
1299                         GOTO(put, rc);
1300
1301                 goto check_child1;
1302         }
1303
1304         if (unlikely(!dt_try_as_dir(env, child2)))
1305                 GOTO(put, rc = -ENOTDIR);
1306
1307         if (child1 == NULL) {
1308                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2, name,
1309                                             pfid, LVLT_BY_NAMEENTRY);
1310         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1311                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1312                                             pfid, LVLT_BY_BOOKMARK);
1313                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1314                         rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child2,
1315                                                     name, pfid,
1316                                                     LVLT_BY_NAMEENTRY);
1317         } else {
1318                 if (lfsck->li_lpf_obj == NULL) {
1319                         lu_object_get(&child2->do_lu);
1320                         lfsck->li_lpf_obj = child2;
1321                 }
1322
1323                 cname = lfsck_name_get_const(env, name, strlen(name));
1324                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1325         }
1326
1327         GOTO(put, rc);
1328
1329 check_child1:
1330         if (child1 != NULL)
1331                 rc = lfsck_verify_lpf_pairs(env, lfsck, parent, child1, name,
1332                                             pfid, LVLT_BY_BOOKMARK);
1333
1334         GOTO(put, rc);
1335
1336 put:
1337         if (lfsck->li_lpf_obj != NULL &&
1338             unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj)))
1339                 rc = -ENOTDIR;
1340
1341         if (child2 != NULL && !IS_ERR(child2))
1342                 lu_object_put(env, &child2->do_lu);
1343         if (child1 != NULL && !IS_ERR(child1))
1344                 lu_object_put(env, &child1->do_lu);
1345         if (parent != NULL && !IS_ERR(parent))
1346                 lu_object_put(env, &parent->do_lu);
1347
1348         return rc;
1349 }
1350
1351 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1352 {
1353         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1354         struct seq_server_site  *ss;
1355         char                    *prefix;
1356         int                      rc     = 0;
1357         ENTRY;
1358
1359         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1360         if (unlikely(ss == NULL))
1361                 RETURN(-ENXIO);
1362
1363         OBD_ALLOC_PTR(lfsck->li_seq);
1364         if (lfsck->li_seq == NULL)
1365                 RETURN(-ENOMEM);
1366
1367         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1368         if (prefix == NULL)
1369                 GOTO(out, rc = -ENOMEM);
1370
1371         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1372         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1373                              ss->ss_server_seq);
1374         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1375         if (rc != 0)
1376                 GOTO(out, rc);
1377
1378         if (fid_is_sane(&bk->lb_last_fid))
1379                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1380
1381         RETURN(0);
1382
1383 out:
1384         OBD_FREE_PTR(lfsck->li_seq);
1385         lfsck->li_seq = NULL;
1386
1387         return rc;
1388 }
1389
1390 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1391 {
1392         if (lfsck->li_seq != NULL) {
1393                 seq_client_fini(lfsck->li_seq);
1394                 OBD_FREE_PTR(lfsck->li_seq);
1395                 lfsck->li_seq = NULL;
1396         }
1397 }
1398
1399 void lfsck_instance_cleanup(const struct lu_env *env,
1400                             struct lfsck_instance *lfsck)
1401 {
1402         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1403         struct lfsck_component  *com;
1404         struct lfsck_component  *next;
1405         ENTRY;
1406
1407         LASSERT(list_empty(&lfsck->li_link));
1408         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1409
1410         if (lfsck->li_obj_oit != NULL) {
1411                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1412                 lfsck->li_obj_oit = NULL;
1413         }
1414
1415         LASSERT(lfsck->li_obj_dir == NULL);
1416
1417         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1418                 lfsck_component_cleanup(env, com);
1419         }
1420
1421         LASSERT(list_empty(&lfsck->li_list_dir));
1422
1423         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1424                                  lc_link) {
1425                 lfsck_component_cleanup(env, com);
1426         }
1427
1428         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1429                 lfsck_component_cleanup(env, com);
1430         }
1431
1432         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1433         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1434
1435         if (lfsck->li_bookmark_obj != NULL) {
1436                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1437                 lfsck->li_bookmark_obj = NULL;
1438         }
1439
1440         if (lfsck->li_lpf_obj != NULL) {
1441                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1442                 lfsck->li_lpf_obj = NULL;
1443         }
1444
1445         if (lfsck->li_los != NULL) {
1446                 local_oid_storage_fini(env, lfsck->li_los);
1447                 lfsck->li_los = NULL;
1448         }
1449
1450         lfsck_fid_fini(lfsck);
1451
1452         OBD_FREE_PTR(lfsck);
1453 }
1454
1455 static inline struct lfsck_instance *
1456 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1457 {
1458         struct lfsck_instance *lfsck;
1459
1460         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1461                 if (lfsck->li_bottom == key) {
1462                         if (ref)
1463                                 lfsck_instance_get(lfsck);
1464                         if (unlink)
1465                                 list_del_init(&lfsck->li_link);
1466
1467                         return lfsck;
1468                 }
1469         }
1470
1471         return NULL;
1472 }
1473
1474 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1475                                            bool unlink)
1476 {
1477         struct lfsck_instance *lfsck;
1478
1479         spin_lock(&lfsck_instance_lock);
1480         lfsck = __lfsck_instance_find(key, ref, unlink);
1481         spin_unlock(&lfsck_instance_lock);
1482
1483         return lfsck;
1484 }
1485
1486 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1487 {
1488         struct lfsck_instance *tmp;
1489
1490         spin_lock(&lfsck_instance_lock);
1491         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1492                 if (lfsck->li_bottom == tmp->li_bottom) {
1493                         spin_unlock(&lfsck_instance_lock);
1494                         return -EEXIST;
1495                 }
1496         }
1497
1498         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1499         spin_unlock(&lfsck_instance_lock);
1500         return 0;
1501 }
1502
1503 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1504                     const char *prefix)
1505 {
1506         int flag;
1507         int i;
1508         bool newline = (bits != 0 ? false : true);
1509
1510         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1511
1512         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1513                 if (flag & bits) {
1514                         bits &= ~flag;
1515                         if (names[i] != NULL) {
1516                                 if (bits == 0)
1517                                         newline = true;
1518
1519                                 seq_printf(m, "%s%c", names[i],
1520                                            newline ? '\n' : ',');
1521                         }
1522                 }
1523         }
1524
1525         if (!newline)
1526                 seq_printf(m, "\n");
1527         return 0;
1528 }
1529
1530 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1531 {
1532         if (time != 0)
1533                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1534                           cfs_time_current_sec() - time);
1535         else
1536                 seq_printf(m, "%s: N/A\n", prefix);
1537         return 0;
1538 }
1539
1540 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1541                    const char *prefix)
1542 {
1543         if (fid_is_zero(&pos->lp_dir_parent)) {
1544                 if (pos->lp_oit_cookie == 0)
1545                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1546                                    prefix);
1547                 else
1548                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1549                                    prefix, pos->lp_oit_cookie);
1550         } else {
1551                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1552                            prefix, pos->lp_oit_cookie,
1553                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1554         }
1555         return 0;
1556 }
1557
1558 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1559                     struct lfsck_position *pos, bool init)
1560 {
1561         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1562
1563         if (unlikely(lfsck->li_di_oit == NULL)) {
1564                 memset(pos, 0, sizeof(*pos));
1565                 return;
1566         }
1567
1568         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1569         if (!lfsck->li_current_oit_processed && !init)
1570                 pos->lp_oit_cookie--;
1571
1572         LASSERT(pos->lp_oit_cookie > 0);
1573
1574         if (lfsck->li_di_dir != NULL) {
1575                 struct dt_object *dto = lfsck->li_obj_dir;
1576
1577                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1578                                                         lfsck->li_di_dir);
1579
1580                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1581                         fid_zero(&pos->lp_dir_parent);
1582                         pos->lp_dir_cookie = 0;
1583                 } else {
1584                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1585                 }
1586         } else {
1587                 fid_zero(&pos->lp_dir_parent);
1588                 pos->lp_dir_cookie = 0;
1589         }
1590 }
1591
1592 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1593 {
1594         bool dirty = false;
1595
1596         if (limit != LFSCK_SPEED_NO_LIMIT) {
1597                 if (limit > HZ) {
1598                         lfsck->li_sleep_rate = limit / HZ;
1599                         lfsck->li_sleep_jif = 1;
1600                 } else {
1601                         lfsck->li_sleep_rate = 1;
1602                         lfsck->li_sleep_jif = HZ / limit;
1603                 }
1604         } else {
1605                 lfsck->li_sleep_jif = 0;
1606                 lfsck->li_sleep_rate = 0;
1607         }
1608
1609         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1610                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1611                 dirty = true;
1612         }
1613
1614         return dirty;
1615 }
1616
1617 void lfsck_control_speed(struct lfsck_instance *lfsck)
1618 {
1619         struct ptlrpc_thread *thread = &lfsck->li_thread;
1620         struct l_wait_info    lwi;
1621
1622         if (lfsck->li_sleep_jif > 0 &&
1623             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1624                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1625                                        LWI_ON_SIGNAL_NOOP, NULL);
1626
1627                 l_wait_event(thread->t_ctl_waitq,
1628                              !thread_is_running(thread),
1629                              &lwi);
1630                 lfsck->li_new_scanned = 0;
1631         }
1632 }
1633
1634 void lfsck_control_speed_by_self(struct lfsck_component *com)
1635 {
1636         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1637         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1638         struct l_wait_info       lwi;
1639
1640         if (lfsck->li_sleep_jif > 0 &&
1641             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1642                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1643                                        LWI_ON_SIGNAL_NOOP, NULL);
1644
1645                 l_wait_event(thread->t_ctl_waitq,
1646                              !thread_is_running(thread),
1647                              &lwi);
1648                 com->lc_new_scanned = 0;
1649         }
1650 }
1651
1652 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1653                                                  struct lfsck_component *com,
1654                                                  struct lfsck_start_param *lsp)
1655 {
1656         struct lfsck_thread_args *lta;
1657         int                       rc;
1658
1659         OBD_ALLOC_PTR(lta);
1660         if (lta == NULL)
1661                 return ERR_PTR(-ENOMEM);
1662
1663         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1664         if (rc != 0) {
1665                 OBD_FREE_PTR(lta);
1666                 return ERR_PTR(rc);
1667         }
1668
1669         lta->lta_lfsck = lfsck_instance_get(lfsck);
1670         if (com != NULL)
1671                 lta->lta_com = lfsck_component_get(com);
1672
1673         lta->lta_lsp = lsp;
1674
1675         return lta;
1676 }
1677
1678 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1679 {
1680         if (lta->lta_com != NULL)
1681                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1682         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1683         lu_env_fini(&lta->lta_env);
1684         OBD_FREE_PTR(lta);
1685 }
1686
1687 static void lfsck_interpret(const struct lu_env *env,
1688                             struct lfsck_instance *lfsck,
1689                             struct ptlrpc_request *req, void *args, int result)
1690 {
1691         struct lfsck_async_interpret_args *laia = args;
1692         struct lfsck_component            *com;
1693
1694         LASSERT(laia->laia_com == NULL);
1695         LASSERT(laia->laia_shared);
1696
1697         spin_lock(&lfsck->li_lock);
1698         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1699                 if (com->lc_ops->lfsck_interpret != NULL) {
1700                         laia->laia_com = com;
1701                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1702                 }
1703         }
1704
1705         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1706                 if (com->lc_ops->lfsck_interpret != NULL) {
1707                         laia->laia_com = com;
1708                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1709                 }
1710         }
1711         spin_unlock(&lfsck->li_lock);
1712 }
1713
1714 static int lfsck_stop_notify(const struct lu_env *env,
1715                              struct lfsck_instance *lfsck,
1716                              struct lfsck_tgt_descs *ltds,
1717                              struct lfsck_tgt_desc *ltd, __u16 type)
1718 {
1719         struct ptlrpc_request_set *set;
1720         struct lfsck_component    *com;
1721         int                        rc  = 0;
1722         ENTRY;
1723
1724         spin_lock(&lfsck->li_lock);
1725         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1726         if (com == NULL)
1727                 com = __lfsck_component_find(lfsck, type,
1728                                              &lfsck->li_list_double_scan);
1729         if (com != NULL)
1730                 lfsck_component_get(com);
1731         spin_unlock(&lfsck->li_lock);
1732
1733         if (com != NULL) {
1734                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1735                         set = ptlrpc_prep_set();
1736                         if (set == NULL) {
1737                                 lfsck_component_put(env, com);
1738
1739                                 RETURN(-ENOMEM);
1740                         }
1741
1742                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1743                                                             ltd, set);
1744                         if (rc == 0)
1745                                 rc = ptlrpc_set_wait(set);
1746
1747                         ptlrpc_set_destroy(set);
1748                 }
1749
1750                 lfsck_component_put(env, com);
1751         }
1752
1753         RETURN(rc);
1754 }
1755
1756 static int lfsck_async_interpret(const struct lu_env *env,
1757                                  struct ptlrpc_request *req,
1758                                  void *args, int rc)
1759 {
1760         struct lfsck_async_interpret_args *laia = args;
1761         struct lfsck_instance             *lfsck;
1762
1763         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1764                               li_mdt_descs);
1765         lfsck_interpret(env, lfsck, req, laia, rc);
1766         lfsck_tgt_put(laia->laia_ltd);
1767         if (rc != 0 && laia->laia_result != -EALREADY)
1768                 laia->laia_result = rc;
1769
1770         return 0;
1771 }
1772
1773 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1774                         struct lfsck_request *lr,
1775                         struct ptlrpc_request_set *set,
1776                         ptlrpc_interpterer_t interpreter,
1777                         void *args, int request)
1778 {
1779         struct lfsck_async_interpret_args *laia;
1780         struct ptlrpc_request             *req;
1781         struct lfsck_request              *tmp;
1782         struct req_format                 *format;
1783         int                                rc;
1784
1785         switch (request) {
1786         case LFSCK_NOTIFY:
1787                 format = &RQF_LFSCK_NOTIFY;
1788                 break;
1789         case LFSCK_QUERY:
1790                 format = &RQF_LFSCK_QUERY;
1791                 break;
1792         default:
1793                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
1794                        exp->exp_obd->obd_name, request, -EINVAL);
1795                 return -EINVAL;
1796         }
1797
1798         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1799         if (req == NULL)
1800                 return -ENOMEM;
1801
1802         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1803         if (rc != 0) {
1804                 ptlrpc_request_free(req);
1805
1806                 return rc;
1807         }
1808
1809         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1810         *tmp = *lr;
1811         ptlrpc_request_set_replen(req);
1812
1813         laia = ptlrpc_req_async_args(req);
1814         *laia = *(struct lfsck_async_interpret_args *)args;
1815         if (laia->laia_com != NULL)
1816                 lfsck_component_get(laia->laia_com);
1817         req->rq_interpret_reply = interpreter;
1818         ptlrpc_set_add_req(set, req);
1819
1820         return 0;
1821 }
1822
1823 /* external interfaces */
1824
1825 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
1826 {
1827         struct lu_env           env;
1828         struct lfsck_instance  *lfsck;
1829         int                     rc;
1830         ENTRY;
1831
1832         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1833         if (rc != 0)
1834                 RETURN(rc);
1835
1836         lfsck = lfsck_instance_find(key, true, false);
1837         if (likely(lfsck != NULL)) {
1838                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
1839                 lfsck_instance_put(&env, lfsck);
1840         } else {
1841                 rc = -ENXIO;
1842         }
1843
1844         lu_env_fini(&env);
1845
1846         RETURN(rc);
1847 }
1848 EXPORT_SYMBOL(lfsck_get_speed);
1849
1850 int lfsck_set_speed(struct dt_device *key, int val)
1851 {
1852         struct lu_env           env;
1853         struct lfsck_instance  *lfsck;
1854         int                     rc;
1855         ENTRY;
1856
1857         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1858         if (rc != 0)
1859                 RETURN(rc);
1860
1861         lfsck = lfsck_instance_find(key, true, false);
1862         if (likely(lfsck != NULL)) {
1863                 mutex_lock(&lfsck->li_mutex);
1864                 if (__lfsck_set_speed(lfsck, val))
1865                         rc = lfsck_bookmark_store(&env, lfsck);
1866                 mutex_unlock(&lfsck->li_mutex);
1867                 lfsck_instance_put(&env, lfsck);
1868         } else {
1869                 rc = -ENXIO;
1870         }
1871
1872         lu_env_fini(&env);
1873
1874         RETURN(rc);
1875 }
1876 EXPORT_SYMBOL(lfsck_set_speed);
1877
1878 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
1879 {
1880         struct lu_env           env;
1881         struct lfsck_instance  *lfsck;
1882         int                     rc;
1883         ENTRY;
1884
1885         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1886         if (rc != 0)
1887                 RETURN(rc);
1888
1889         lfsck = lfsck_instance_find(key, true, false);
1890         if (likely(lfsck != NULL)) {
1891                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
1892                 lfsck_instance_put(&env, lfsck);
1893         } else {
1894                 rc = -ENXIO;
1895         }
1896
1897         lu_env_fini(&env);
1898
1899         RETURN(rc);
1900 }
1901 EXPORT_SYMBOL(lfsck_get_windows);
1902
1903 int lfsck_set_windows(struct dt_device *key, int val)
1904 {
1905         struct lu_env           env;
1906         struct lfsck_instance  *lfsck;
1907         int                     rc;
1908         ENTRY;
1909
1910         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1911         if (rc != 0)
1912                 RETURN(rc);
1913
1914         lfsck = lfsck_instance_find(key, true, false);
1915         if (likely(lfsck != NULL)) {
1916                 if (val > LFSCK_ASYNC_WIN_MAX) {
1917                         CWARN("%s: Too large async window size, which "
1918                               "may cause memory issues. The valid range "
1919                               "is [0 - %u]. If you do not want to restrict "
1920                               "the window size for async requests pipeline, "
1921                               "just set it as 0.\n",
1922                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1923                         rc = -EINVAL;
1924                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1925                         mutex_lock(&lfsck->li_mutex);
1926                         lfsck->li_bookmark_ram.lb_async_windows = val;
1927                         rc = lfsck_bookmark_store(&env, lfsck);
1928                         mutex_unlock(&lfsck->li_mutex);
1929                 }
1930                 lfsck_instance_put(&env, lfsck);
1931         } else {
1932                 rc = -ENXIO;
1933         }
1934
1935         lu_env_fini(&env);
1936
1937         RETURN(rc);
1938 }
1939 EXPORT_SYMBOL(lfsck_set_windows);
1940
1941 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
1942 {
1943         struct lu_env           env;
1944         struct lfsck_instance  *lfsck;
1945         struct lfsck_component *com;
1946         int                     rc;
1947         ENTRY;
1948
1949         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1950         if (rc != 0)
1951                 RETURN(rc);
1952
1953         lfsck = lfsck_instance_find(key, true, false);
1954         if (likely(lfsck != NULL)) {
1955                 com = lfsck_component_find(lfsck, type);
1956                 if (likely(com != NULL)) {
1957                         rc = com->lc_ops->lfsck_dump(&env, com, m);
1958                         lfsck_component_put(&env, com);
1959                 } else {
1960                         rc = -ENOTSUPP;
1961                 }
1962
1963                 lfsck_instance_put(&env, lfsck);
1964         } else {
1965                 rc = -ENXIO;
1966         }
1967
1968         lu_env_fini(&env);
1969
1970         RETURN(rc);
1971 }
1972 EXPORT_SYMBOL(lfsck_dump);
1973
1974 static int lfsck_stop_all(const struct lu_env *env,
1975                           struct lfsck_instance *lfsck,
1976                           struct lfsck_stop *stop)
1977 {
1978         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1979         struct lfsck_request              *lr     = &info->lti_lr;
1980         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1981         struct ptlrpc_request_set         *set;
1982         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1983         struct lfsck_tgt_desc             *ltd;
1984         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1985         __u32                              idx;
1986         int                                rc     = 0;
1987         int                                rc1    = 0;
1988         ENTRY;
1989
1990         LASSERT(stop->ls_flags & LPF_BROADCAST);
1991
1992         set = ptlrpc_prep_set();
1993         if (unlikely(set == NULL))
1994                 RETURN(-ENOMEM);
1995
1996         memset(lr, 0, sizeof(*lr));
1997         lr->lr_event = LE_STOP;
1998         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1999         lr->lr_status = stop->ls_status;
2000         lr->lr_version = bk->lb_version;
2001         lr->lr_active = LFSCK_TYPES_ALL;
2002         lr->lr_param = stop->ls_flags;
2003
2004         laia->laia_com = NULL;
2005         laia->laia_ltds = ltds;
2006         laia->laia_lr = lr;
2007         laia->laia_result = 0;
2008         laia->laia_shared = 1;
2009
2010         down_read(&ltds->ltd_rw_sem);
2011         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2012                 ltd = lfsck_tgt_get(ltds, idx);
2013                 LASSERT(ltd != NULL);
2014
2015                 laia->laia_ltd = ltd;
2016                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2017                                          lfsck_async_interpret, laia,
2018                                          LFSCK_NOTIFY);
2019                 if (rc != 0) {
2020                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2021                         lfsck_tgt_put(ltd);
2022                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2023                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2024                         rc1 = rc;
2025                 }
2026         }
2027         up_read(&ltds->ltd_rw_sem);
2028
2029         rc = ptlrpc_set_wait(set);
2030         ptlrpc_set_destroy(set);
2031
2032         if (rc == 0)
2033                 rc = laia->laia_result;
2034
2035         if (rc == -EALREADY)
2036                 rc = 0;
2037
2038         if (rc != 0)
2039                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2040                        lfsck_lfsck2name(lfsck), rc);
2041
2042         RETURN(rc != 0 ? rc : rc1);
2043 }
2044
2045 static int lfsck_start_all(const struct lu_env *env,
2046                            struct lfsck_instance *lfsck,
2047                            struct lfsck_start *start)
2048 {
2049         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2050         struct lfsck_request              *lr     = &info->lti_lr;
2051         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2052         struct ptlrpc_request_set         *set;
2053         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2054         struct lfsck_tgt_desc             *ltd;
2055         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2056         __u32                              idx;
2057         int                                rc     = 0;
2058         ENTRY;
2059
2060         LASSERT(start->ls_flags & LPF_BROADCAST);
2061
2062         set = ptlrpc_prep_set();
2063         if (unlikely(set == NULL))
2064                 RETURN(-ENOMEM);
2065
2066         memset(lr, 0, sizeof(*lr));
2067         lr->lr_event = LE_START;
2068         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2069         lr->lr_speed = bk->lb_speed_limit;
2070         lr->lr_version = bk->lb_version;
2071         lr->lr_active = start->ls_active;
2072         lr->lr_param = start->ls_flags;
2073         lr->lr_async_windows = bk->lb_async_windows;
2074         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2075                        LSV_ASYNC_WINDOWS;
2076
2077         laia->laia_com = NULL;
2078         laia->laia_ltds = ltds;
2079         laia->laia_lr = lr;
2080         laia->laia_result = 0;
2081         laia->laia_shared = 1;
2082
2083         down_read(&ltds->ltd_rw_sem);
2084         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2085                 ltd = lfsck_tgt_get(ltds, idx);
2086                 LASSERT(ltd != NULL);
2087
2088                 laia->laia_ltd = ltd;
2089                 ltd->ltd_layout_done = 0;
2090                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2091                                          lfsck_async_interpret, laia,
2092                                          LFSCK_NOTIFY);
2093                 if (rc != 0) {
2094                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2095                         lfsck_tgt_put(ltd);
2096                         CERROR("%s: cannot notify MDT %x for LFSCK "
2097                                "start, failout: rc = %d\n",
2098                                lfsck_lfsck2name(lfsck), idx, rc);
2099                         break;
2100                 }
2101         }
2102         up_read(&ltds->ltd_rw_sem);
2103
2104         if (rc != 0) {
2105                 ptlrpc_set_destroy(set);
2106
2107                 RETURN(rc);
2108         }
2109
2110         rc = ptlrpc_set_wait(set);
2111         ptlrpc_set_destroy(set);
2112
2113         if (rc == 0)
2114                 rc = laia->laia_result;
2115
2116         if (rc != 0) {
2117                 struct lfsck_stop *stop = &info->lti_stop;
2118
2119                 CERROR("%s: cannot start LFSCK on some MDTs, "
2120                        "stop all: rc = %d\n",
2121                        lfsck_lfsck2name(lfsck), rc);
2122                 if (rc != -EALREADY) {
2123                         stop->ls_status = LS_FAILED;
2124                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2125                         lfsck_stop_all(env, lfsck, stop);
2126                 }
2127         }
2128
2129         RETURN(rc);
2130 }
2131
2132 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2133                 struct lfsck_start_param *lsp)
2134 {
2135         struct lfsck_start              *start  = lsp->lsp_start;
2136         struct lfsck_instance           *lfsck;
2137         struct lfsck_bookmark           *bk;
2138         struct ptlrpc_thread            *thread;
2139         struct lfsck_component          *com;
2140         struct l_wait_info               lwi    = { 0 };
2141         struct lfsck_thread_args        *lta;
2142         struct task_struct              *task;
2143         int                              rc     = 0;
2144         __u16                            valid  = 0;
2145         __u16                            flags  = 0;
2146         __u16                            type   = 1;
2147         ENTRY;
2148
2149         lfsck = lfsck_instance_find(key, true, false);
2150         if (unlikely(lfsck == NULL))
2151                 RETURN(-ENXIO);
2152
2153         /* System is not ready, try again later. */
2154         if (unlikely(lfsck->li_namespace == NULL))
2155                 GOTO(put, rc = -EAGAIN);
2156
2157         /* start == NULL means auto trigger paused LFSCK. */
2158         if ((start == NULL) &&
2159             (list_empty(&lfsck->li_list_scan) ||
2160              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2161                 GOTO(put, rc = 0);
2162
2163         bk = &lfsck->li_bookmark_ram;
2164         thread = &lfsck->li_thread;
2165         mutex_lock(&lfsck->li_mutex);
2166         spin_lock(&lfsck->li_lock);
2167         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2168                 rc = -EALREADY;
2169                 if (unlikely(start == NULL)) {
2170                         spin_unlock(&lfsck->li_lock);
2171                         GOTO(out, rc);
2172                 }
2173
2174                 while (start->ls_active != 0) {
2175                         if (!(type & start->ls_active)) {
2176                                 type <<= 1;
2177                                 continue;
2178                         }
2179
2180                         com = __lfsck_component_find(lfsck, type,
2181                                                      &lfsck->li_list_scan);
2182                         if (com == NULL)
2183                                 com = __lfsck_component_find(lfsck, type,
2184                                                 &lfsck->li_list_double_scan);
2185                         if (com == NULL) {
2186                                 rc = -EOPNOTSUPP;
2187                                 break;
2188                         }
2189
2190                         if (com->lc_ops->lfsck_join != NULL) {
2191                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2192                                 if (rc != 0 && rc != -EALREADY)
2193                                         break;
2194                         }
2195                         start->ls_active &= ~type;
2196                         type <<= 1;
2197                 }
2198                 spin_unlock(&lfsck->li_lock);
2199                 GOTO(out, rc);
2200         }
2201         spin_unlock(&lfsck->li_lock);
2202
2203         lfsck->li_status = 0;
2204         lfsck->li_oit_over = 0;
2205         lfsck->li_start_unplug = 0;
2206         lfsck->li_drop_dryrun = 0;
2207         lfsck->li_new_scanned = 0;
2208
2209         /* For auto trigger. */
2210         if (start == NULL)
2211                 goto trigger;
2212
2213         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2214                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2215                        lfsck_lfsck2name(lfsck));
2216
2217                 GOTO(out, rc = -EPERM);
2218         }
2219
2220         start->ls_version = bk->lb_version;
2221
2222         if (start->ls_active != 0) {
2223                 struct lfsck_component *next;
2224
2225                 if (start->ls_active == LFSCK_TYPES_ALL)
2226                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2227
2228                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2229                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2230                         GOTO(out, rc = -ENOTSUPP);
2231                 }
2232
2233                 list_for_each_entry_safe(com, next,
2234                                          &lfsck->li_list_scan, lc_link) {
2235                         if (!(com->lc_type & start->ls_active)) {
2236                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2237                                                              false);
2238                                 if (rc != 0)
2239                                         GOTO(out, rc);
2240                         }
2241                 }
2242
2243                 while (start->ls_active != 0) {
2244                         if (type & start->ls_active) {
2245                                 com = __lfsck_component_find(lfsck, type,
2246                                                         &lfsck->li_list_idle);
2247                                 if (com != NULL)
2248                                         /* The component status will be updated
2249                                          * when its prep() is called later by
2250                                          * the LFSCK main engine. */
2251                                         list_move_tail(&com->lc_link,
2252                                                        &lfsck->li_list_scan);
2253                                 start->ls_active &= ~type;
2254                         }
2255                         type <<= 1;
2256                 }
2257         }
2258
2259         if (list_empty(&lfsck->li_list_scan)) {
2260                 /* The speed limit will be used to control both the LFSCK and
2261                  * low layer scrub (if applied), need to be handled firstly. */
2262                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2263                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2264                                 rc = lfsck_bookmark_store(env, lfsck);
2265                                 if (rc != 0)
2266                                         GOTO(out, rc);
2267                         }
2268                 }
2269
2270                 goto trigger;
2271         }
2272
2273         if (start->ls_flags & LPF_RESET)
2274                 flags |= DOIF_RESET;
2275
2276         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2277         if (rc != 0)
2278                 GOTO(out, rc);
2279
2280         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2281                 start->ls_active |= com->lc_type;
2282                 if (flags & DOIF_RESET) {
2283                         rc = com->lc_ops->lfsck_reset(env, com, false);
2284                         if (rc != 0)
2285                                 GOTO(out, rc);
2286                 }
2287         }
2288
2289 trigger:
2290         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
2291         if (bk->lb_param & LPF_DRYRUN)
2292                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2293
2294         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2295                 valid |= DOIV_ERROR_HANDLE;
2296                 if (start->ls_flags & LPF_FAILOUT)
2297                         flags |= DOIF_FAILOUT;
2298         }
2299
2300         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2301                 valid |= DOIV_DRYRUN;
2302                 if (start->ls_flags & LPF_DRYRUN)
2303                         flags |= DOIF_DRYRUN;
2304         }
2305
2306         if (!list_empty(&lfsck->li_list_scan))
2307                 flags |= DOIF_OUTUSED;
2308
2309         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2310         thread_set_flags(thread, 0);
2311         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2312         if (IS_ERR(lta))
2313                 GOTO(out, rc = PTR_ERR(lta));
2314
2315         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2316         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2317         if (IS_ERR(task)) {
2318                 rc = PTR_ERR(task);
2319                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2320                        lfsck_lfsck2name(lfsck), rc);
2321                 lfsck_thread_args_fini(lta);
2322
2323                 GOTO(out, rc);
2324         }
2325
2326         l_wait_event(thread->t_ctl_waitq,
2327                      thread_is_running(thread) ||
2328                      thread_is_stopped(thread),
2329                      &lwi);
2330         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2331                 lfsck->li_start_unplug = 1;
2332                 wake_up_all(&thread->t_ctl_waitq);
2333
2334                 GOTO(out, rc = 0);
2335         }
2336
2337         /* release lfsck::li_mutex to avoid deadlock. */
2338         mutex_unlock(&lfsck->li_mutex);
2339         rc = lfsck_start_all(env, lfsck, start);
2340         if (rc != 0) {
2341                 spin_lock(&lfsck->li_lock);
2342                 if (thread_is_stopped(thread)) {
2343                         spin_unlock(&lfsck->li_lock);
2344                 } else {
2345                         lfsck->li_status = LS_FAILED;
2346                         lfsck->li_flags = 0;
2347                         thread_set_flags(thread, SVC_STOPPING);
2348                         spin_unlock(&lfsck->li_lock);
2349
2350                         lfsck->li_start_unplug = 1;
2351                         wake_up_all(&thread->t_ctl_waitq);
2352                         l_wait_event(thread->t_ctl_waitq,
2353                                      thread_is_stopped(thread),
2354                                      &lwi);
2355                 }
2356         } else {
2357                 lfsck->li_start_unplug = 1;
2358                 wake_up_all(&thread->t_ctl_waitq);
2359         }
2360
2361         GOTO(put, rc);
2362
2363 out:
2364         mutex_unlock(&lfsck->li_mutex);
2365
2366 put:
2367         lfsck_instance_put(env, lfsck);
2368
2369         return rc < 0 ? rc : 0;
2370 }
2371 EXPORT_SYMBOL(lfsck_start);
2372
2373 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2374                struct lfsck_stop *stop)
2375 {
2376         struct lfsck_instance   *lfsck;
2377         struct ptlrpc_thread    *thread;
2378         struct l_wait_info       lwi    = { 0 };
2379         int                      rc     = 0;
2380         int                      rc1    = 0;
2381         ENTRY;
2382
2383         lfsck = lfsck_instance_find(key, true, false);
2384         if (unlikely(lfsck == NULL))
2385                 RETURN(-ENXIO);
2386
2387         thread = &lfsck->li_thread;
2388         /* release lfsck::li_mutex to avoid deadlock. */
2389         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2390                 if (!lfsck->li_master) {
2391                         CERROR("%s: only allow to specify '-A' via MDS\n",
2392                                lfsck_lfsck2name(lfsck));
2393
2394                         GOTO(out, rc = -EPERM);
2395                 }
2396
2397                 rc1 = lfsck_stop_all(env, lfsck, stop);
2398         }
2399
2400         mutex_lock(&lfsck->li_mutex);
2401         spin_lock(&lfsck->li_lock);
2402         /* no error if LFSCK is already stopped, or was never started */
2403         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2404                 spin_unlock(&lfsck->li_lock);
2405                 GOTO(out, rc = 0);
2406         }
2407
2408         if (stop != NULL) {
2409                 lfsck->li_status = stop->ls_status;
2410                 lfsck->li_flags = stop->ls_flags;
2411         } else {
2412                 lfsck->li_status = LS_STOPPED;
2413                 lfsck->li_flags = 0;
2414         }
2415
2416         thread_set_flags(thread, SVC_STOPPING);
2417         spin_unlock(&lfsck->li_lock);
2418
2419         wake_up_all(&thread->t_ctl_waitq);
2420         l_wait_event(thread->t_ctl_waitq,
2421                      thread_is_stopped(thread),
2422                      &lwi);
2423
2424         GOTO(out, rc = 0);
2425
2426 out:
2427         mutex_unlock(&lfsck->li_mutex);
2428         lfsck_instance_put(env, lfsck);
2429
2430         return rc != 0 ? rc : rc1;
2431 }
2432 EXPORT_SYMBOL(lfsck_stop);
2433
2434 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2435                     struct lfsck_request *lr)
2436 {
2437         int rc = -EOPNOTSUPP;
2438         ENTRY;
2439
2440         switch (lr->lr_event) {
2441         case LE_START: {
2442                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2443                 struct lfsck_start_param  lsp;
2444
2445                 memset(start, 0, sizeof(*start));
2446                 start->ls_valid = lr->lr_valid;
2447                 start->ls_speed_limit = lr->lr_speed;
2448                 start->ls_version = lr->lr_version;
2449                 start->ls_active = lr->lr_active;
2450                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2451                 start->ls_async_windows = lr->lr_async_windows;
2452
2453                 lsp.lsp_start = start;
2454                 lsp.lsp_index = lr->lr_index;
2455                 lsp.lsp_index_valid = 1;
2456                 rc = lfsck_start(env, key, &lsp);
2457                 break;
2458         }
2459         case LE_STOP: {
2460                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
2461
2462                 memset(stop, 0, sizeof(*stop));
2463                 stop->ls_status = lr->lr_status;
2464                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2465                 rc = lfsck_stop(env, key, stop);
2466                 break;
2467         }
2468         case LE_PHASE1_DONE:
2469         case LE_PHASE2_DONE:
2470         case LE_FID_ACCESSED:
2471         case LE_PEER_EXIT:
2472         case LE_CONDITIONAL_DESTROY:
2473         case LE_PAIRS_VERIFY: {
2474                 struct lfsck_instance  *lfsck;
2475                 struct lfsck_component *com;
2476
2477                 lfsck = lfsck_instance_find(key, true, false);
2478                 if (unlikely(lfsck == NULL))
2479                         RETURN(-ENXIO);
2480
2481                 com = lfsck_component_find(lfsck, lr->lr_active);
2482                 if (likely(com != NULL)) {
2483                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2484                         lfsck_component_put(env, com);
2485                 }
2486
2487                 lfsck_instance_put(env, lfsck);
2488                 break;
2489         }
2490         default:
2491                 break;
2492         }
2493
2494         RETURN(rc);
2495 }
2496 EXPORT_SYMBOL(lfsck_in_notify);
2497
2498 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2499                 struct lfsck_request *lr)
2500 {
2501         struct lfsck_instance  *lfsck;
2502         struct lfsck_component *com;
2503         int                     rc;
2504         ENTRY;
2505
2506         lfsck = lfsck_instance_find(key, true, false);
2507         if (unlikely(lfsck == NULL))
2508                 RETURN(-ENXIO);
2509
2510         com = lfsck_component_find(lfsck, lr->lr_active);
2511         if (likely(com != NULL)) {
2512                 rc = com->lc_ops->lfsck_query(env, com);
2513                 lfsck_component_put(env, com);
2514         } else {
2515                 rc = -ENOTSUPP;
2516         }
2517
2518         lfsck_instance_put(env, lfsck);
2519
2520         RETURN(rc);
2521 }
2522 EXPORT_SYMBOL(lfsck_query);
2523
2524 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2525                              struct ldlm_namespace *ns)
2526 {
2527         struct lfsck_instance  *lfsck;
2528         int                     rc      = -ENXIO;
2529
2530         lfsck = lfsck_instance_find(key, true, false);
2531         if (likely(lfsck != NULL)) {
2532                 lfsck->li_namespace = ns;
2533                 lfsck_instance_put(env, lfsck);
2534                 rc = 0;
2535         }
2536
2537         return rc;
2538 }
2539 EXPORT_SYMBOL(lfsck_register_namespace);
2540
2541 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2542                    struct dt_device *next, struct obd_device *obd,
2543                    lfsck_out_notify notify, void *notify_data, bool master)
2544 {
2545         struct lfsck_instance   *lfsck;
2546         struct dt_object        *root  = NULL;
2547         struct dt_object        *obj   = NULL;
2548         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2549         int                      rc;
2550         ENTRY;
2551
2552         lfsck = lfsck_instance_find(key, false, false);
2553         if (unlikely(lfsck != NULL))
2554                 RETURN(-EEXIST);
2555
2556         OBD_ALLOC_PTR(lfsck);
2557         if (lfsck == NULL)
2558                 RETURN(-ENOMEM);
2559
2560         mutex_init(&lfsck->li_mutex);
2561         spin_lock_init(&lfsck->li_lock);
2562         INIT_LIST_HEAD(&lfsck->li_link);
2563         INIT_LIST_HEAD(&lfsck->li_list_scan);
2564         INIT_LIST_HEAD(&lfsck->li_list_dir);
2565         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2566         INIT_LIST_HEAD(&lfsck->li_list_idle);
2567         atomic_set(&lfsck->li_ref, 1);
2568         atomic_set(&lfsck->li_double_scan_count, 0);
2569         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2570         lfsck->li_out_notify = notify;
2571         lfsck->li_out_notify_data = notify_data;
2572         lfsck->li_next = next;
2573         lfsck->li_bottom = key;
2574         lfsck->li_obd = obd;
2575
2576         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2577         if (rc != 0)
2578                 GOTO(out, rc);
2579
2580         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2581         if (rc != 0)
2582                 GOTO(out, rc);
2583
2584         fid->f_seq = FID_SEQ_LOCAL_NAME;
2585         fid->f_oid = 1;
2586         fid->f_ver = 0;
2587         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
2588         if (rc != 0)
2589                 GOTO(out, rc);
2590
2591         rc = dt_root_get(env, key, fid);
2592         if (rc != 0)
2593                 GOTO(out, rc);
2594
2595         root = dt_locate(env, key, fid);
2596         if (IS_ERR(root))
2597                 GOTO(out, rc = PTR_ERR(root));
2598
2599         if (unlikely(!dt_try_as_dir(env, root)))
2600                 GOTO(out, rc = -ENOTDIR);
2601
2602         lfsck->li_local_root_fid = *fid;
2603         if (master) {
2604                 lfsck->li_master = 1;
2605                 if (lfsck_dev_idx(key) == 0) {
2606                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
2607                         const struct lu_name *cname;
2608
2609                         rc = dt_lookup(env, root,
2610                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2611                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2612                         if (rc != 0)
2613                                 GOTO(out, rc);
2614
2615                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
2616                         if (IS_ERR(obj))
2617                                 GOTO(out, rc = PTR_ERR(obj));
2618
2619                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2620                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
2621                         if (rc != 0)
2622                                 GOTO(out, rc);
2623
2624                         lu_object_put(env, &obj->do_lu);
2625                         obj = dt_locate(env, key, fid);
2626                         if (IS_ERR(obj))
2627                                 GOTO(out, rc = PTR_ERR(obj));
2628
2629                         cname = lfsck_name_get_const(env, dotlustre,
2630                                                      strlen(dotlustre));
2631                         rc = lfsck_verify_linkea(env, key, obj, cname,
2632                                                  &lfsck->li_global_root_fid);
2633                         if (rc != 0)
2634                                 GOTO(out, rc);
2635
2636                         *pfid = *fid;
2637                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2638                                        (const struct dt_key *)lostfound,
2639                                        BYPASS_CAPA);
2640                         if (rc != 0)
2641                                 GOTO(out, rc);
2642
2643                         lu_object_put(env, &obj->do_lu);
2644                         obj = dt_locate(env, key, fid);
2645                         if (IS_ERR(obj))
2646                                 GOTO(out, rc = PTR_ERR(obj));
2647
2648                         cname = lfsck_name_get_const(env, lostfound,
2649                                                      strlen(lostfound));
2650                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
2651                         if (rc != 0)
2652                                 GOTO(out, rc);
2653
2654                         lu_object_put(env, &obj->do_lu);
2655                         obj = NULL;
2656                 }
2657         }
2658
2659         fid->f_seq = FID_SEQ_LOCAL_FILE;
2660         fid->f_oid = OTABLE_IT_OID;
2661         fid->f_ver = 0;
2662         obj = dt_locate(env, key, fid);
2663         if (IS_ERR(obj))
2664                 GOTO(out, rc = PTR_ERR(obj));
2665
2666         lu_object_get(&obj->do_lu);
2667         lfsck->li_obj_oit = obj;
2668         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2669         if (rc != 0)
2670                 GOTO(out, rc);
2671
2672         rc = lfsck_bookmark_setup(env, lfsck);
2673         if (rc != 0)
2674                 GOTO(out, rc);
2675
2676         if (master) {
2677                 rc = lfsck_fid_init(lfsck);
2678                 if (rc < 0)
2679                         GOTO(out, rc);
2680
2681                 rc = lfsck_namespace_setup(env, lfsck);
2682                 if (rc < 0)
2683                         GOTO(out, rc);
2684         }
2685
2686         rc = lfsck_layout_setup(env, lfsck);
2687         if (rc < 0)
2688                 GOTO(out, rc);
2689
2690         /* XXX: more LFSCK components initialization to be added here. */
2691
2692         rc = lfsck_instance_add(lfsck);
2693         if (rc == 0)
2694                 rc = lfsck_add_target_from_orphan(env, lfsck);
2695 out:
2696         if (obj != NULL && !IS_ERR(obj))
2697                 lu_object_put(env, &obj->do_lu);
2698         if (root != NULL && !IS_ERR(root))
2699                 lu_object_put(env, &root->do_lu);
2700         if (rc != 0)
2701                 lfsck_instance_cleanup(env, lfsck);
2702         return rc;
2703 }
2704 EXPORT_SYMBOL(lfsck_register);
2705
2706 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2707 {
2708         struct lfsck_instance *lfsck;
2709
2710         lfsck = lfsck_instance_find(key, false, true);
2711         if (lfsck != NULL)
2712                 lfsck_instance_put(env, lfsck);
2713 }
2714 EXPORT_SYMBOL(lfsck_degister);
2715
2716 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2717                      struct dt_device *tgt, struct obd_export *exp,
2718                      __u32 index, bool for_ost)
2719 {
2720         struct lfsck_instance   *lfsck;
2721         struct lfsck_tgt_desc   *ltd;
2722         int                      rc;
2723         ENTRY;
2724
2725         OBD_ALLOC_PTR(ltd);
2726         if (ltd == NULL)
2727                 RETURN(-ENOMEM);
2728
2729         ltd->ltd_tgt = tgt;
2730         ltd->ltd_key = key;
2731         ltd->ltd_exp = exp;
2732         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2733         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2734         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2735         atomic_set(&ltd->ltd_ref, 1);
2736         ltd->ltd_index = index;
2737
2738         spin_lock(&lfsck_instance_lock);
2739         lfsck = __lfsck_instance_find(key, true, false);
2740         if (lfsck == NULL) {
2741                 if (for_ost)
2742                         list_add_tail(&ltd->ltd_orphan_list,
2743                                       &lfsck_ost_orphan_list);
2744                 else
2745                         list_add_tail(&ltd->ltd_orphan_list,
2746                                       &lfsck_mdt_orphan_list);
2747                 spin_unlock(&lfsck_instance_lock);
2748
2749                 RETURN(0);
2750         }
2751         spin_unlock(&lfsck_instance_lock);
2752
2753         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2754         if (rc != 0)
2755                 lfsck_tgt_put(ltd);
2756
2757         lfsck_instance_put(env, lfsck);
2758
2759         RETURN(rc);
2760 }
2761 EXPORT_SYMBOL(lfsck_add_target);
2762
2763 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2764                       struct dt_device *tgt, __u32 index, bool for_ost)
2765 {
2766         struct lfsck_instance   *lfsck;
2767         struct lfsck_tgt_descs  *ltds;
2768         struct lfsck_tgt_desc   *ltd;
2769         struct list_head        *head;
2770
2771         if (for_ost)
2772                 head = &lfsck_ost_orphan_list;
2773         else
2774                 head = &lfsck_mdt_orphan_list;
2775
2776         spin_lock(&lfsck_instance_lock);
2777         list_for_each_entry(ltd, head, ltd_orphan_list) {
2778                 if (ltd->ltd_tgt == tgt) {
2779                         list_del_init(&ltd->ltd_orphan_list);
2780                         spin_unlock(&lfsck_instance_lock);
2781                         lfsck_tgt_put(ltd);
2782
2783                         return;
2784                 }
2785         }
2786
2787         ltd = NULL;
2788         lfsck = __lfsck_instance_find(key, true, false);
2789         spin_unlock(&lfsck_instance_lock);
2790         if (unlikely(lfsck == NULL))
2791                 return;
2792
2793         if (for_ost)
2794                 ltds = &lfsck->li_ost_descs;
2795         else
2796                 ltds = &lfsck->li_mdt_descs;
2797
2798         down_write(&ltds->ltd_rw_sem);
2799         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2800
2801         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2802                 goto unlock;
2803
2804         ltd = LTD_TGT(ltds, index);
2805         if (unlikely(ltd == NULL))
2806                 goto unlock;
2807
2808         LASSERT(ltds->ltd_tgtnr > 0);
2809
2810         ltds->ltd_tgtnr--;
2811         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2812         LTD_TGT(ltds, index) = NULL;
2813
2814 unlock:
2815         if (ltd == NULL) {
2816                 if (for_ost)
2817                         head = &lfsck->li_ost_descs.ltd_orphan;
2818                 else
2819                         head = &lfsck->li_mdt_descs.ltd_orphan;
2820
2821                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2822                         if (ltd->ltd_tgt == tgt) {
2823                                 list_del_init(&ltd->ltd_orphan_list);
2824                                 break;
2825                         }
2826                 }
2827         }
2828
2829         up_write(&ltds->ltd_rw_sem);
2830         if (ltd != NULL) {
2831                 spin_lock(&ltds->ltd_lock);
2832                 ltd->ltd_dead = 1;
2833                 spin_unlock(&ltds->ltd_lock);
2834                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
2835                 lfsck_tgt_put(ltd);
2836         }
2837
2838         lfsck_instance_put(env, lfsck);
2839 }
2840 EXPORT_SYMBOL(lfsck_del_target);
2841
2842 static int __init lfsck_init(void)
2843 {
2844         int rc;
2845
2846         INIT_LIST_HEAD(&lfsck_instance_list);
2847         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2848         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2849         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2850         rc = lu_context_key_register(&lfsck_thread_key);
2851         if (rc == 0) {
2852                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2853                 tgt_register_lfsck_query(lfsck_query);
2854         }
2855
2856         return rc;
2857 }
2858
2859 static void __exit lfsck_exit(void)
2860 {
2861         struct lfsck_tgt_desc *ltd;
2862         struct lfsck_tgt_desc *next;
2863
2864         LASSERT(list_empty(&lfsck_instance_list));
2865
2866         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2867                                  ltd_orphan_list) {
2868                 list_del_init(&ltd->ltd_orphan_list);
2869                 lfsck_tgt_put(ltd);
2870         }
2871
2872         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2873                                  ltd_orphan_list) {
2874                 list_del_init(&ltd->ltd_orphan_list);
2875                 lfsck_tgt_put(ltd);
2876         }
2877
2878         lu_context_key_degister(&lfsck_thread_key);
2879 }
2880
2881 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2882 MODULE_DESCRIPTION("LFSCK");
2883 MODULE_LICENSE("GPL");
2884
2885 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);