Whamcloud - gitweb
325f7e7dc7e6ea2030ea2a9ec4a06af2f104eea1
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static struct list_head lfsck_instance_list;
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         "create_ostobj",
98         NULL
99 };
100
101 const char *lfsck_status2names(enum lfsck_status status)
102 {
103         if (unlikely(status < 0 || status >= LS_MAX))
104                 return "unknown";
105
106         return lfsck_status_names[status];
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = LTD_TGT(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146
147                         ltds->ltd_tgtnr--;
148                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
149                         LTD_TGT(ltds, idx) = NULL;
150                         lfsck_tgt_put(ltd);
151                 }
152         }
153
154         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
155                  ltds->ltd_tgtnr);
156
157         for (idx = 0; idx < TGT_PTRS; idx++) {
158                 if (ltds->ltd_tgts_idx[idx] != NULL) {
159                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
160                         ltds->ltd_tgts_idx[idx] = NULL;
161                 }
162         }
163
164         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
165         ltds->ltd_tgts_bitmap = NULL;
166         up_write(&ltds->ltd_rw_sem);
167 }
168
169 static int __lfsck_add_target(const struct lu_env *env,
170                               struct lfsck_instance *lfsck,
171                               struct lfsck_tgt_desc *ltd,
172                               bool for_ost, bool locked)
173 {
174         struct lfsck_tgt_descs *ltds;
175         __u32                   index = ltd->ltd_index;
176         int                     rc    = 0;
177         ENTRY;
178
179         if (for_ost)
180                 ltds = &lfsck->li_ost_descs;
181         else
182                 ltds = &lfsck->li_mdt_descs;
183
184         if (!locked)
185                 down_write(&ltds->ltd_rw_sem);
186
187         LASSERT(ltds->ltd_tgts_bitmap != NULL);
188
189         if (index >= ltds->ltd_tgts_bitmap->size) {
190                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
191                                     (__u32)BITS_PER_LONG);
192                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
193                 cfs_bitmap_t *new_bitmap;
194
195                 while (newsize < index + 1)
196                         newsize <<= 1;
197
198                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
199                 if (new_bitmap == NULL)
200                         GOTO(unlock, rc = -ENOMEM);
201
202                 if (ltds->ltd_tgtnr > 0)
203                         cfs_bitmap_copy(new_bitmap, old_bitmap);
204                 ltds->ltd_tgts_bitmap = new_bitmap;
205                 CFS_FREE_BITMAP(old_bitmap);
206         }
207
208         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
209                 CERROR("%s: the device %s (%u) is registered already\n",
210                        lfsck_lfsck2name(lfsck),
211                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
212                 GOTO(unlock, rc = -EEXIST);
213         }
214
215         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
216                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
217                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
218                         GOTO(unlock, rc = -ENOMEM);
219         }
220
221         LTD_TGT(ltds, index) = ltd;
222         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
223         ltds->ltd_tgtnr++;
224
225         GOTO(unlock, rc = 0);
226
227 unlock:
228         if (!locked)
229                 up_write(&ltds->ltd_rw_sem);
230
231         return rc;
232 }
233
234 static int lfsck_add_target_from_orphan(const struct lu_env *env,
235                                         struct lfsck_instance *lfsck)
236 {
237         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
238         struct lfsck_tgt_desc   *ltd;
239         struct lfsck_tgt_desc   *next;
240         struct list_head        *head    = &lfsck_ost_orphan_list;
241         int                      rc;
242         bool                     for_ost = true;
243
244 again:
245         spin_lock(&lfsck_instance_lock);
246         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
247                 if (ltd->ltd_key == lfsck->li_bottom)
248                         list_move_tail(&ltd->ltd_orphan_list,
249                                        &ltds->ltd_orphan);
250         }
251         spin_unlock(&lfsck_instance_lock);
252
253         down_write(&ltds->ltd_rw_sem);
254         while (!list_empty(&ltds->ltd_orphan)) {
255                 ltd = list_entry(ltds->ltd_orphan.next,
256                                  struct lfsck_tgt_desc,
257                                  ltd_orphan_list);
258                 list_del_init(&ltd->ltd_orphan_list);
259                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
260                 /* Do not hold the semaphore for too long time. */
261                 up_write(&ltds->ltd_rw_sem);
262                 if (rc != 0)
263                         return rc;
264
265                 down_write(&ltds->ltd_rw_sem);
266         }
267         up_write(&ltds->ltd_rw_sem);
268
269         if (for_ost) {
270                 ltds = &lfsck->li_mdt_descs;
271                 head = &lfsck_mdt_orphan_list;
272                 for_ost = false;
273                 goto again;
274         }
275
276         return 0;
277 }
278
279 static inline struct lfsck_component *
280 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
281                        struct list_head *list)
282 {
283         struct lfsck_component *com;
284
285         list_for_each_entry(com, list, lc_link) {
286                 if (com->lc_type == type)
287                         return com;
288         }
289         return NULL;
290 }
291
292 struct lfsck_component *
293 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
294 {
295         struct lfsck_component *com;
296
297         spin_lock(&lfsck->li_lock);
298         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
299         if (com != NULL)
300                 goto unlock;
301
302         com = __lfsck_component_find(lfsck, type,
303                                      &lfsck->li_list_double_scan);
304         if (com != NULL)
305                 goto unlock;
306
307         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
308
309 unlock:
310         if (com != NULL)
311                 lfsck_component_get(com);
312         spin_unlock(&lfsck->li_lock);
313         return com;
314 }
315
316 void lfsck_component_cleanup(const struct lu_env *env,
317                              struct lfsck_component *com)
318 {
319         if (!list_empty(&com->lc_link))
320                 list_del_init(&com->lc_link);
321         if (!list_empty(&com->lc_link_dir))
322                 list_del_init(&com->lc_link_dir);
323
324         lfsck_component_put(env, com);
325 }
326
327 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
328                     struct lu_fid *fid, bool locked)
329 {
330         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
331         int                      rc = 0;
332         ENTRY;
333
334         if (!locked)
335                 mutex_lock(&lfsck->li_mutex);
336
337         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
338         if (rc >= 0) {
339                 bk->lb_last_fid = *fid;
340                 /* We do not care about whether the subsequent sub-operations
341                  * failed or not. The worst case is that one FID is lost that
342                  * is not a big issue for the LFSCK since it is relative rare
343                  * for LFSCK create. */
344                 rc = lfsck_bookmark_store(env, lfsck);
345         }
346
347         if (!locked)
348                 mutex_unlock(&lfsck->li_mutex);
349
350         RETURN(rc);
351 }
352
353 /**
354  * Request the specified ibits lock for the given object.
355  *
356  * Before the LFSCK modifying on the namespace visible object,
357  * it needs to acquire related ibits ldlm lock.
358  *
359  * \param[in] env       pointer to the thread context
360  * \param[in] lfsck     pointer to the lfsck instance
361  * \param[in] obj       pointer to the dt_object to be locked
362  * \param[out] lh       pointer to the lock handle
363  * \param[in] ibits     the bits for the ldlm lock to be acquired
364  * \param[in] mode      the mode for the ldlm lock to be acquired
365  *
366  * \retval              0 for success
367  * \retval              negative error number on failure
368  */
369 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
370                      struct dt_object *obj, struct lustre_handle *lh,
371                      __u64 bits, ldlm_mode_t mode)
372 {
373         struct lfsck_thread_info        *info   = lfsck_env_info(env);
374         ldlm_policy_data_t              *policy = &info->lti_policy;
375         struct ldlm_res_id              *resid  = &info->lti_resid;
376         __u64                            flags  = LDLM_FL_ATOMIC_CB;
377         int                              rc;
378
379         LASSERT(lfsck->li_namespace != NULL);
380
381         memset(policy, 0, sizeof(*policy));
382         policy->l_inodebits.bits = bits;
383         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
384         rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
385                                     policy, mode, &flags, ldlm_blocking_ast,
386                                     ldlm_completion_ast, NULL, NULL, 0,
387                                     LVB_T_NONE, NULL, lh);
388         if (rc == ELDLM_OK) {
389                 rc = 0;
390         } else {
391                 memset(lh, 0, sizeof(*lh));
392                 rc = -EIO;
393         }
394
395         return rc;
396 }
397
398 /**
399  * Release the the specified ibits lock.
400  *
401  * If the lock has been acquired before, release it
402  * and cleanup the handle. Otherwise, do nothing.
403  *
404  * \param[in] lh        pointer to the lock handle
405  * \param[in] mode      the mode for the ldlm lock to be released
406  */
407 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
408 {
409         if (lustre_handle_is_used(lh)) {
410                 ldlm_lock_decref(lh, mode);
411                 memset(lh, 0, sizeof(*lh));
412         }
413 }
414
415 static const char dot[] = ".";
416 static const char dotdot[] = "..";
417 static const char dotlustre[] = ".lustre";
418 static const char lostfound[] = "lost+found";
419
420 static int lfsck_create_lpf_local(const struct lu_env *env,
421                                   struct lfsck_instance *lfsck,
422                                   struct dt_object *parent,
423                                   struct dt_object *child,
424                                   struct lu_attr *la,
425                                   struct dt_object_format *dof,
426                                   const char *name)
427 {
428         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
429         struct dt_device        *dev    = lfsck->li_bottom;
430         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
431         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
432         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
433         struct thandle          *th     = NULL;
434         struct linkea_data       ldata  = { 0 };
435         struct lu_buf            linkea_buf;
436         const struct lu_name    *cname;
437         loff_t                   pos    = 0;
438         int                      len    = sizeof(struct lfsck_bookmark);
439         int                      rc;
440         ENTRY;
441
442         rc = linkea_data_new(&ldata,
443                              &lfsck_env_info(env)->lti_linkea_buf);
444         if (rc != 0)
445                 RETURN(rc);
446
447         cname = lfsck_name_get_const(env, name, strlen(name));
448         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
449         if (rc != 0)
450                 RETURN(rc);
451
452         th = dt_trans_create(env, dev);
453         if (IS_ERR(th))
454                 RETURN(PTR_ERR(th));
455
456         /* 1a. create child */
457         rc = dt_declare_create(env, child, la, NULL, dof, th);
458         if (rc != 0)
459                 GOTO(stop, rc);
460
461         /* 2a. increase child nlink */
462         rc = dt_declare_ref_add(env, child, th);
463         if (rc != 0)
464                 GOTO(stop, rc);
465
466         /* 3a. insert linkEA for child */
467         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
468                        ldata.ld_leh->leh_len);
469         rc = dt_declare_xattr_set(env, child, &linkea_buf,
470                                   XATTR_NAME_LINK, 0, th);
471         if (rc != 0)
472                 GOTO(stop, rc);
473
474         /* 4a. insert name into parent dir */
475         rec->rec_type = S_IFDIR;
476         rec->rec_fid = cfid;
477         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
478                                (const struct dt_key *)name, th);
479         if (rc != 0)
480                 GOTO(stop, rc);
481
482         /* 5a. increase parent nlink */
483         rc = dt_declare_ref_add(env, parent, th);
484         if (rc != 0)
485                 GOTO(stop, rc);
486
487         /* 6a. update bookmark */
488         rc = dt_declare_record_write(env, bk_obj,
489                                      lfsck_buf_get(env, bk, len), 0, th);
490         if (rc != 0)
491                 GOTO(stop, rc);
492
493         rc = dt_trans_start_local(env, dev, th);
494         if (rc != 0)
495                 GOTO(stop, rc);
496
497         dt_write_lock(env, child, 0);
498         /* 1b.1. create child */
499         rc = dt_create(env, child, la, NULL, dof, th);
500         if (rc != 0)
501                 GOTO(unlock, rc);
502
503         if (unlikely(!dt_try_as_dir(env, child)))
504                 GOTO(unlock, rc = -ENOTDIR);
505
506         /* 1b.2. insert dot into child dir */
507         rec->rec_fid = cfid;
508         rc = dt_insert(env, child, (const struct dt_rec *)rec,
509                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
510         if (rc != 0)
511                 GOTO(unlock, rc);
512
513         /* 1b.3. insert dotdot into child dir */
514         rec->rec_fid = &LU_LPF_FID;
515         rc = dt_insert(env, child, (const struct dt_rec *)rec,
516                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
517         if (rc != 0)
518                 GOTO(unlock, rc);
519
520         /* 2b. increase child nlink */
521         rc = dt_ref_add(env, child, th);
522         if (rc != 0)
523                 GOTO(unlock, rc);
524
525         /* 3b. insert linkEA for child. */
526         rc = dt_xattr_set(env, child, &linkea_buf,
527                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
528         dt_write_unlock(env, child);
529         if (rc != 0)
530                 GOTO(stop, rc);
531
532         /* 4b. insert name into parent dir */
533         rec->rec_fid = cfid;
534         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
535                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
536         if (rc != 0)
537                 GOTO(stop, rc);
538
539         dt_write_lock(env, parent, 0);
540         /* 5b. increase parent nlink */
541         rc = dt_ref_add(env, parent, th);
542         dt_write_unlock(env, parent);
543         if (rc != 0)
544                 GOTO(stop, rc);
545
546         bk->lb_lpf_fid = *cfid;
547         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
548
549         /* 6b. update bookmark */
550         rc = dt_record_write(env, bk_obj,
551                              lfsck_buf_get(env, bk, len), &pos, th);
552
553         GOTO(stop, rc);
554
555 unlock:
556         dt_write_unlock(env, child);
557
558 stop:
559         dt_trans_stop(env, dev, th);
560
561         return rc;
562 }
563
564 static int lfsck_create_lpf_remote(const struct lu_env *env,
565                                    struct lfsck_instance *lfsck,
566                                    struct dt_object *parent,
567                                    struct dt_object *child,
568                                    struct lu_attr *la,
569                                    struct dt_object_format *dof,
570                                    const char *name)
571 {
572         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
573         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
574         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
575         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
576         struct thandle          *th     = NULL;
577         struct linkea_data       ldata  = { 0 };
578         struct lu_buf            linkea_buf;
579         const struct lu_name    *cname;
580         struct dt_device        *dev;
581         loff_t                   pos    = 0;
582         int                      len    = sizeof(struct lfsck_bookmark);
583         int                      rc;
584         ENTRY;
585
586         rc = linkea_data_new(&ldata,
587                              &lfsck_env_info(env)->lti_linkea_buf);
588         if (rc != 0)
589                 RETURN(rc);
590
591         cname = lfsck_name_get_const(env, name, strlen(name));
592         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
593         if (rc != 0)
594                 RETURN(rc);
595
596         /* Create .lustre/lost+found/MDTxxxx. */
597
598         /* XXX: Currently, cross-MDT create operation needs to create the child
599          *      object firstly, then insert name into the parent directory. For
600          *      this case, the child object resides on current MDT (local), but
601          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
602          *      easy to contain all the sub-modifications orderly within single
603          *      transaction.
604          *
605          *      To avoid more inconsistency, we split the create operation into
606          *      two transactions:
607          *
608          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
609          *         locally.
610          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
611          *         remotely.
612          *
613          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
614          *      repair such inconsistency when LFSCK run next time. */
615
616         /* Transaction I: locally */
617
618         dev = lfsck->li_bottom;
619         th = dt_trans_create(env, dev);
620         if (IS_ERR(th))
621                 RETURN(PTR_ERR(th));
622
623         /* 1a. create child */
624         rc = dt_declare_create(env, child, la, NULL, dof, th);
625         if (rc != 0)
626                 GOTO(stop, rc);
627
628         /* 2a. increase child nlink */
629         rc = dt_declare_ref_add(env, child, th);
630         if (rc != 0)
631                 GOTO(stop, rc);
632
633         /* 3a. insert linkEA for child */
634         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
635                        ldata.ld_leh->leh_len);
636         rc = dt_declare_xattr_set(env, child, &linkea_buf,
637                                   XATTR_NAME_LINK, 0, th);
638         if (rc != 0)
639                 GOTO(stop, rc);
640
641         /* 4a. update bookmark */
642         rc = dt_declare_record_write(env, bk_obj,
643                                      lfsck_buf_get(env, bk, len), 0, th);
644         if (rc != 0)
645                 GOTO(stop, rc);
646
647         rc = dt_trans_start_local(env, dev, th);
648         if (rc != 0)
649                 GOTO(stop, rc);
650
651         dt_write_lock(env, child, 0);
652         /* 1b.1. create child */
653         rc = dt_create(env, child, la, NULL, dof, th);
654         if (rc != 0)
655                 GOTO(unlock, rc);
656
657         if (unlikely(!dt_try_as_dir(env, child)))
658                 GOTO(unlock, rc = -ENOTDIR);
659
660         /* 1b.2. insert dot into child dir */
661         rec->rec_type = S_IFDIR;
662         rec->rec_fid = cfid;
663         rc = dt_insert(env, child, (const struct dt_rec *)rec,
664                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
665         if (rc != 0)
666                 GOTO(unlock, rc);
667
668         /* 1b.3. insert dotdot into child dir */
669         rec->rec_fid = &LU_LPF_FID;
670         rc = dt_insert(env, child, (const struct dt_rec *)rec,
671                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
672         if (rc != 0)
673                 GOTO(unlock, rc);
674
675         /* 2b. increase child nlink */
676         rc = dt_ref_add(env, child, th);
677         if (rc != 0)
678                 GOTO(unlock, rc);
679
680         /* 3b. insert linkEA for child */
681         rc = dt_xattr_set(env, child, &linkea_buf,
682                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
683         if (rc != 0)
684                 GOTO(unlock, rc);
685
686         bk->lb_lpf_fid = *cfid;
687         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
688
689         /* 4b. update bookmark */
690         rc = dt_record_write(env, bk_obj,
691                              lfsck_buf_get(env, bk, len), &pos, th);
692
693         dt_write_unlock(env, child);
694         dt_trans_stop(env, dev, th);
695         if (rc != 0)
696                 RETURN(rc);
697
698         /* Transaction II: remotely */
699
700         dev = lfsck->li_next;
701         th = dt_trans_create(env, dev);
702         if (IS_ERR(th))
703                 RETURN(PTR_ERR(th));
704
705         /* 5a. insert name into parent dir */
706         rec->rec_fid = cfid;
707         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
708                                (const struct dt_key *)name, th);
709         if (rc != 0)
710                 GOTO(stop, rc);
711
712         /* 6a. increase parent nlink */
713         rc = dt_declare_ref_add(env, parent, th);
714         if (rc != 0)
715                 GOTO(stop, rc);
716
717         rc = dt_trans_start(env, dev, th);
718         if (rc != 0)
719                 GOTO(stop, rc);
720
721         /* 5b. insert name into parent dir */
722         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
723                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
724         if (rc != 0)
725                 GOTO(stop, rc);
726
727         dt_write_lock(env, parent, 0);
728         /* 6b. increase parent nlink */
729         rc = dt_ref_add(env, parent, th);
730         dt_write_unlock(env, parent);
731
732         GOTO(stop, rc);
733
734 unlock:
735         dt_write_unlock(env, child);
736 stop:
737         dt_trans_stop(env, dev, th);
738
739         if (rc != 0 && dev == lfsck->li_next)
740                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
741                        "for orphans, but failed to insert the name %s "
742                        "to the .lustre/lost+found/. Such inconsistency "
743                        "will be repaired when LFSCK run next time: rc = %d\n",
744                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
745
746         return rc;
747 }
748
749 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
750  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
751  * only when it is required, such as orphan OST-objects repairing. */
752 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
753 {
754         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
755         struct lfsck_thread_info *info  = lfsck_env_info(env);
756         struct lu_fid            *cfid  = &info->lti_fid2;
757         struct lu_attr           *la    = &info->lti_la;
758         struct dt_object_format  *dof   = &info->lti_dof;
759         struct dt_object         *parent = NULL;
760         struct dt_object         *child = NULL;
761         struct lustre_handle      lh    = { 0 };
762         char                      name[8];
763         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
764         int                       rc    = 0;
765         ENTRY;
766
767         LASSERT(lfsck->li_master);
768
769         sprintf(name, "MDT%04x", node);
770         if (node == 0) {
771                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
772                                                   &LU_LPF_FID);
773         } else {
774                 struct lfsck_tgt_desc *ltd;
775
776                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
777                 if (unlikely(ltd == NULL))
778                         RETURN(-ENXIO);
779
780                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
781                                                   &LU_LPF_FID);
782                 lfsck_tgt_put(ltd);
783         }
784         if (IS_ERR(parent))
785                 RETURN(PTR_ERR(parent));
786
787         if (lfsck->li_lpf_obj != NULL)
788                 GOTO(out, rc = 0);
789
790         if (unlikely(!dt_try_as_dir(env, parent)))
791                 GOTO(out, rc = -ENOTDIR);
792
793         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
794                               MDS_INODELOCK_UPDATE, LCK_EX);
795         if (rc != 0)
796                 GOTO(out, rc);
797
798         mutex_lock(&lfsck->li_mutex);
799         if (lfsck->li_lpf_obj != NULL)
800                 GOTO(unlock, rc = 0);
801
802         if (fid_is_zero(&bk->lb_lpf_fid)) {
803                 /* There is corner case that: in former LFSCK scanning we have
804                  * created the .lustre/lost+found/MDTxxxx but failed to update
805                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
806                  * it from MDT0 firstly. */
807                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
808                                (const struct dt_key *)name, BYPASS_CAPA);
809                 if (rc != 0 && rc != -ENOENT)
810                         GOTO(unlock, rc);
811
812                 if (rc == 0) {
813                         bk->lb_lpf_fid = *cfid;
814                         rc = lfsck_bookmark_store(env, lfsck);
815                 } else {
816                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
817                 }
818                 if (rc != 0)
819                         GOTO(unlock, rc);
820         } else {
821                 *cfid = bk->lb_lpf_fid;
822         }
823
824         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
825         if (IS_ERR(child))
826                 GOTO(unlock, rc = PTR_ERR(child));
827
828         if (dt_object_exists(child) != 0) {
829                 if (unlikely(!dt_try_as_dir(env, child)))
830                         rc = -ENOTDIR;
831                 else
832                         lfsck->li_lpf_obj = child;
833
834                 GOTO(unlock, rc);
835         }
836
837         memset(la, 0, sizeof(*la));
838         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
839         la->la_mode = S_IFDIR | S_IRWXU;
840         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
841                        LA_UID | LA_GID;
842         memset(dof, 0, sizeof(*dof));
843         dof->dof_type = dt_mode_to_dft(S_IFDIR);
844
845         if (node == 0)
846                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
847                                             dof, name);
848         else
849                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
850                                              dof, name);
851         if (rc == 0)
852                 lfsck->li_lpf_obj = child;
853
854         GOTO(unlock, rc);
855
856 unlock:
857         mutex_unlock(&lfsck->li_mutex);
858         lfsck_ibits_unlock(&lh, LCK_EX);
859         if (rc != 0 && child != NULL && !IS_ERR(child))
860                 lu_object_put(env, &child->do_lu);
861 out:
862         if (parent != NULL && !IS_ERR(parent))
863                 lu_object_put(env, &parent->do_lu);
864
865         return rc;
866 }
867
868 static int lfsck_fid_init(struct lfsck_instance *lfsck)
869 {
870         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
871         struct seq_server_site  *ss;
872         char                    *prefix;
873         int                      rc     = 0;
874         ENTRY;
875
876         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
877         if (unlikely(ss == NULL))
878                 RETURN(-ENXIO);
879
880         OBD_ALLOC_PTR(lfsck->li_seq);
881         if (lfsck->li_seq == NULL)
882                 RETURN(-ENOMEM);
883
884         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
885         if (prefix == NULL)
886                 GOTO(out, rc = -ENOMEM);
887
888         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
889         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
890                              ss->ss_server_seq);
891         OBD_FREE(prefix, MAX_OBD_NAME + 7);
892         if (rc != 0)
893                 GOTO(out, rc);
894
895         if (fid_is_sane(&bk->lb_last_fid))
896                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
897
898         RETURN(0);
899
900 out:
901         OBD_FREE_PTR(lfsck->li_seq);
902         lfsck->li_seq = NULL;
903
904         return rc;
905 }
906
907 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
908 {
909         if (lfsck->li_seq != NULL) {
910                 seq_client_fini(lfsck->li_seq);
911                 OBD_FREE_PTR(lfsck->li_seq);
912                 lfsck->li_seq = NULL;
913         }
914 }
915
916 void lfsck_instance_cleanup(const struct lu_env *env,
917                             struct lfsck_instance *lfsck)
918 {
919         struct ptlrpc_thread    *thread = &lfsck->li_thread;
920         struct lfsck_component  *com;
921         struct lfsck_component  *next;
922         ENTRY;
923
924         LASSERT(list_empty(&lfsck->li_link));
925         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
926
927         if (lfsck->li_obj_oit != NULL) {
928                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
929                 lfsck->li_obj_oit = NULL;
930         }
931
932         LASSERT(lfsck->li_obj_dir == NULL);
933
934         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
935                 lfsck_component_cleanup(env, com);
936         }
937
938         LASSERT(list_empty(&lfsck->li_list_dir));
939
940         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
941                                  lc_link) {
942                 lfsck_component_cleanup(env, com);
943         }
944
945         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
946                 lfsck_component_cleanup(env, com);
947         }
948
949         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
950         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
951
952         if (lfsck->li_bookmark_obj != NULL) {
953                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
954                 lfsck->li_bookmark_obj = NULL;
955         }
956
957         if (lfsck->li_lpf_obj != NULL) {
958                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
959                 lfsck->li_lpf_obj = NULL;
960         }
961
962         if (lfsck->li_los != NULL) {
963                 local_oid_storage_fini(env, lfsck->li_los);
964                 lfsck->li_los = NULL;
965         }
966
967         lfsck_fid_fini(lfsck);
968
969         OBD_FREE_PTR(lfsck);
970 }
971
972 static inline struct lfsck_instance *
973 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
974 {
975         struct lfsck_instance *lfsck;
976
977         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
978                 if (lfsck->li_bottom == key) {
979                         if (ref)
980                                 lfsck_instance_get(lfsck);
981                         if (unlink)
982                                 list_del_init(&lfsck->li_link);
983
984                         return lfsck;
985                 }
986         }
987
988         return NULL;
989 }
990
991 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
992                                            bool unlink)
993 {
994         struct lfsck_instance *lfsck;
995
996         spin_lock(&lfsck_instance_lock);
997         lfsck = __lfsck_instance_find(key, ref, unlink);
998         spin_unlock(&lfsck_instance_lock);
999
1000         return lfsck;
1001 }
1002
1003 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1004 {
1005         struct lfsck_instance *tmp;
1006
1007         spin_lock(&lfsck_instance_lock);
1008         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1009                 if (lfsck->li_bottom == tmp->li_bottom) {
1010                         spin_unlock(&lfsck_instance_lock);
1011                         return -EEXIST;
1012                 }
1013         }
1014
1015         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1016         spin_unlock(&lfsck_instance_lock);
1017         return 0;
1018 }
1019
1020 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1021                     const char *prefix)
1022 {
1023         int flag;
1024         int i;
1025         bool newline = (bits != 0 ? false : true);
1026
1027         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1028
1029         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1030                 if (flag & bits) {
1031                         bits &= ~flag;
1032                         if (names[i] != NULL) {
1033                                 if (bits == 0)
1034                                         newline = true;
1035
1036                                 seq_printf(m, "%s%c", names[i],
1037                                            newline ? '\n' : ',');
1038                         }
1039                 }
1040         }
1041
1042         if (!newline)
1043                 seq_printf(m, "\n");
1044         return 0;
1045 }
1046
1047 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1048 {
1049         if (time != 0)
1050                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1051                           cfs_time_current_sec() - time);
1052         else
1053                 seq_printf(m, "%s: N/A\n", prefix);
1054         return 0;
1055 }
1056
1057 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1058                    const char *prefix)
1059 {
1060         if (fid_is_zero(&pos->lp_dir_parent)) {
1061                 if (pos->lp_oit_cookie == 0)
1062                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1063                                    prefix);
1064                 else
1065                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1066                                    prefix, pos->lp_oit_cookie);
1067         } else {
1068                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1069                            prefix, pos->lp_oit_cookie,
1070                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1071         }
1072         return 0;
1073 }
1074
1075 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1076                     struct lfsck_position *pos, bool init)
1077 {
1078         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1079
1080         if (unlikely(lfsck->li_di_oit == NULL)) {
1081                 memset(pos, 0, sizeof(*pos));
1082                 return;
1083         }
1084
1085         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1086         if (!lfsck->li_current_oit_processed && !init)
1087                 pos->lp_oit_cookie--;
1088
1089         LASSERT(pos->lp_oit_cookie > 0);
1090
1091         if (lfsck->li_di_dir != NULL) {
1092                 struct dt_object *dto = lfsck->li_obj_dir;
1093
1094                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1095                                                         lfsck->li_di_dir);
1096
1097                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1098                         fid_zero(&pos->lp_dir_parent);
1099                         pos->lp_dir_cookie = 0;
1100                 } else {
1101                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1102                 }
1103         } else {
1104                 fid_zero(&pos->lp_dir_parent);
1105                 pos->lp_dir_cookie = 0;
1106         }
1107 }
1108
1109 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1110 {
1111         bool dirty = false;
1112
1113         if (limit != LFSCK_SPEED_NO_LIMIT) {
1114                 if (limit > HZ) {
1115                         lfsck->li_sleep_rate = limit / HZ;
1116                         lfsck->li_sleep_jif = 1;
1117                 } else {
1118                         lfsck->li_sleep_rate = 1;
1119                         lfsck->li_sleep_jif = HZ / limit;
1120                 }
1121         } else {
1122                 lfsck->li_sleep_jif = 0;
1123                 lfsck->li_sleep_rate = 0;
1124         }
1125
1126         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1127                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1128                 dirty = true;
1129         }
1130
1131         return dirty;
1132 }
1133
1134 void lfsck_control_speed(struct lfsck_instance *lfsck)
1135 {
1136         struct ptlrpc_thread *thread = &lfsck->li_thread;
1137         struct l_wait_info    lwi;
1138
1139         if (lfsck->li_sleep_jif > 0 &&
1140             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1141                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1142                                        LWI_ON_SIGNAL_NOOP, NULL);
1143
1144                 l_wait_event(thread->t_ctl_waitq,
1145                              !thread_is_running(thread),
1146                              &lwi);
1147                 lfsck->li_new_scanned = 0;
1148         }
1149 }
1150
1151 void lfsck_control_speed_by_self(struct lfsck_component *com)
1152 {
1153         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1154         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1155         struct l_wait_info       lwi;
1156
1157         if (lfsck->li_sleep_jif > 0 &&
1158             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1159                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1160                                        LWI_ON_SIGNAL_NOOP, NULL);
1161
1162                 l_wait_event(thread->t_ctl_waitq,
1163                              !thread_is_running(thread),
1164                              &lwi);
1165                 com->lc_new_scanned = 0;
1166         }
1167 }
1168
1169 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1170                                                  struct lfsck_component *com,
1171                                                  struct lfsck_start_param *lsp)
1172 {
1173         struct lfsck_thread_args *lta;
1174         int                       rc;
1175
1176         OBD_ALLOC_PTR(lta);
1177         if (lta == NULL)
1178                 return ERR_PTR(-ENOMEM);
1179
1180         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1181         if (rc != 0) {
1182                 OBD_FREE_PTR(lta);
1183                 return ERR_PTR(rc);
1184         }
1185
1186         lta->lta_lfsck = lfsck_instance_get(lfsck);
1187         if (com != NULL)
1188                 lta->lta_com = lfsck_component_get(com);
1189
1190         lta->lta_lsp = lsp;
1191
1192         return lta;
1193 }
1194
1195 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1196 {
1197         if (lta->lta_com != NULL)
1198                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1199         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1200         lu_env_fini(&lta->lta_env);
1201         OBD_FREE_PTR(lta);
1202 }
1203
1204 static void lfsck_interpret(const struct lu_env *env,
1205                             struct lfsck_instance *lfsck,
1206                             struct ptlrpc_request *req, void *args, int result)
1207 {
1208         struct lfsck_async_interpret_args *laia = args;
1209         struct lfsck_component            *com;
1210
1211         LASSERT(laia->laia_com == NULL);
1212         LASSERT(laia->laia_shared);
1213
1214         spin_lock(&lfsck->li_lock);
1215         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1216                 if (com->lc_ops->lfsck_interpret != NULL) {
1217                         laia->laia_com = com;
1218                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1219                 }
1220         }
1221
1222         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1223                 if (com->lc_ops->lfsck_interpret != NULL) {
1224                         laia->laia_com = com;
1225                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1226                 }
1227         }
1228         spin_unlock(&lfsck->li_lock);
1229 }
1230
1231 static int lfsck_stop_notify(const struct lu_env *env,
1232                              struct lfsck_instance *lfsck,
1233                              struct lfsck_tgt_descs *ltds,
1234                              struct lfsck_tgt_desc *ltd, __u16 type)
1235 {
1236         struct ptlrpc_request_set *set;
1237         struct lfsck_component    *com;
1238         int                        rc  = 0;
1239         ENTRY;
1240
1241         spin_lock(&lfsck->li_lock);
1242         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1243         if (com == NULL)
1244                 com = __lfsck_component_find(lfsck, type,
1245                                              &lfsck->li_list_double_scan);
1246         if (com != NULL)
1247                 lfsck_component_get(com);
1248         spin_unlock(&lfsck->li_lock);
1249
1250         if (com != NULL) {
1251                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1252                         set = ptlrpc_prep_set();
1253                         if (set == NULL) {
1254                                 lfsck_component_put(env, com);
1255
1256                                 RETURN(-ENOMEM);
1257                         }
1258
1259                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1260                                                             ltd, set);
1261                         if (rc == 0)
1262                                 rc = ptlrpc_set_wait(set);
1263
1264                         ptlrpc_set_destroy(set);
1265                 }
1266
1267                 lfsck_component_put(env, com);
1268         }
1269
1270         RETURN(rc);
1271 }
1272
1273 static int lfsck_async_interpret(const struct lu_env *env,
1274                                  struct ptlrpc_request *req,
1275                                  void *args, int rc)
1276 {
1277         struct lfsck_async_interpret_args *laia = args;
1278         struct lfsck_instance             *lfsck;
1279
1280         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1281                               li_mdt_descs);
1282         lfsck_interpret(env, lfsck, req, laia, rc);
1283         lfsck_tgt_put(laia->laia_ltd);
1284         if (rc != 0 && laia->laia_result != -EALREADY)
1285                 laia->laia_result = rc;
1286
1287         return 0;
1288 }
1289
1290 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1291                         struct lfsck_request *lr,
1292                         struct ptlrpc_request_set *set,
1293                         ptlrpc_interpterer_t interpreter,
1294                         void *args, int request)
1295 {
1296         struct lfsck_async_interpret_args *laia;
1297         struct ptlrpc_request             *req;
1298         struct lfsck_request              *tmp;
1299         struct req_format                 *format;
1300         int                                rc;
1301
1302         switch (request) {
1303         case LFSCK_NOTIFY:
1304                 format = &RQF_LFSCK_NOTIFY;
1305                 break;
1306         case LFSCK_QUERY:
1307                 format = &RQF_LFSCK_QUERY;
1308                 break;
1309         default:
1310                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
1311                        exp->exp_obd->obd_name, request, -EINVAL);
1312                 return -EINVAL;
1313         }
1314
1315         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1316         if (req == NULL)
1317                 return -ENOMEM;
1318
1319         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1320         if (rc != 0) {
1321                 ptlrpc_request_free(req);
1322
1323                 return rc;
1324         }
1325
1326         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1327         *tmp = *lr;
1328         ptlrpc_request_set_replen(req);
1329
1330         laia = ptlrpc_req_async_args(req);
1331         *laia = *(struct lfsck_async_interpret_args *)args;
1332         if (laia->laia_com != NULL)
1333                 lfsck_component_get(laia->laia_com);
1334         req->rq_interpret_reply = interpreter;
1335         ptlrpc_set_add_req(set, req);
1336
1337         return 0;
1338 }
1339
1340 /* external interfaces */
1341
1342 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
1343 {
1344         struct lu_env           env;
1345         struct lfsck_instance  *lfsck;
1346         int                     rc;
1347         ENTRY;
1348
1349         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1350         if (rc != 0)
1351                 RETURN(rc);
1352
1353         lfsck = lfsck_instance_find(key, true, false);
1354         if (likely(lfsck != NULL)) {
1355                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
1356                 lfsck_instance_put(&env, lfsck);
1357         } else {
1358                 rc = -ENXIO;
1359         }
1360
1361         lu_env_fini(&env);
1362
1363         RETURN(rc);
1364 }
1365 EXPORT_SYMBOL(lfsck_get_speed);
1366
1367 int lfsck_set_speed(struct dt_device *key, int val)
1368 {
1369         struct lu_env           env;
1370         struct lfsck_instance  *lfsck;
1371         int                     rc;
1372         ENTRY;
1373
1374         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1375         if (rc != 0)
1376                 RETURN(rc);
1377
1378         lfsck = lfsck_instance_find(key, true, false);
1379         if (likely(lfsck != NULL)) {
1380                 mutex_lock(&lfsck->li_mutex);
1381                 if (__lfsck_set_speed(lfsck, val))
1382                         rc = lfsck_bookmark_store(&env, lfsck);
1383                 mutex_unlock(&lfsck->li_mutex);
1384                 lfsck_instance_put(&env, lfsck);
1385         } else {
1386                 rc = -ENXIO;
1387         }
1388
1389         lu_env_fini(&env);
1390
1391         RETURN(rc);
1392 }
1393 EXPORT_SYMBOL(lfsck_set_speed);
1394
1395 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
1396 {
1397         struct lu_env           env;
1398         struct lfsck_instance  *lfsck;
1399         int                     rc;
1400         ENTRY;
1401
1402         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1403         if (rc != 0)
1404                 RETURN(rc);
1405
1406         lfsck = lfsck_instance_find(key, true, false);
1407         if (likely(lfsck != NULL)) {
1408                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
1409                 lfsck_instance_put(&env, lfsck);
1410         } else {
1411                 rc = -ENXIO;
1412         }
1413
1414         lu_env_fini(&env);
1415
1416         RETURN(rc);
1417 }
1418 EXPORT_SYMBOL(lfsck_get_windows);
1419
1420 int lfsck_set_windows(struct dt_device *key, int val)
1421 {
1422         struct lu_env           env;
1423         struct lfsck_instance  *lfsck;
1424         int                     rc;
1425         ENTRY;
1426
1427         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1428         if (rc != 0)
1429                 RETURN(rc);
1430
1431         lfsck = lfsck_instance_find(key, true, false);
1432         if (likely(lfsck != NULL)) {
1433                 if (val > LFSCK_ASYNC_WIN_MAX) {
1434                         CWARN("%s: Too large async window size, which "
1435                               "may cause memory issues. The valid range "
1436                               "is [0 - %u]. If you do not want to restrict "
1437                               "the window size for async requests pipeline, "
1438                               "just set it as 0.\n",
1439                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1440                         rc = -EINVAL;
1441                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1442                         mutex_lock(&lfsck->li_mutex);
1443                         lfsck->li_bookmark_ram.lb_async_windows = val;
1444                         rc = lfsck_bookmark_store(&env, lfsck);
1445                         mutex_unlock(&lfsck->li_mutex);
1446                 }
1447                 lfsck_instance_put(&env, lfsck);
1448         } else {
1449                 rc = -ENXIO;
1450         }
1451
1452         lu_env_fini(&env);
1453
1454         RETURN(rc);
1455 }
1456 EXPORT_SYMBOL(lfsck_set_windows);
1457
1458 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
1459 {
1460         struct lu_env           env;
1461         struct lfsck_instance  *lfsck;
1462         struct lfsck_component *com;
1463         int                     rc;
1464         ENTRY;
1465
1466         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1467         if (rc != 0)
1468                 RETURN(rc);
1469
1470         lfsck = lfsck_instance_find(key, true, false);
1471         if (likely(lfsck != NULL)) {
1472                 com = lfsck_component_find(lfsck, type);
1473                 if (likely(com != NULL)) {
1474                         rc = com->lc_ops->lfsck_dump(&env, com, m);
1475                         lfsck_component_put(&env, com);
1476                 } else {
1477                         rc = -ENOTSUPP;
1478                 }
1479
1480                 lfsck_instance_put(&env, lfsck);
1481         } else {
1482                 rc = -ENXIO;
1483         }
1484
1485         lu_env_fini(&env);
1486
1487         RETURN(rc);
1488 }
1489 EXPORT_SYMBOL(lfsck_dump);
1490
1491 static int lfsck_stop_all(const struct lu_env *env,
1492                           struct lfsck_instance *lfsck,
1493                           struct lfsck_stop *stop)
1494 {
1495         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1496         struct lfsck_request              *lr     = &info->lti_lr;
1497         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1498         struct ptlrpc_request_set         *set;
1499         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1500         struct lfsck_tgt_desc             *ltd;
1501         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1502         __u32                              idx;
1503         int                                rc     = 0;
1504         int                                rc1    = 0;
1505         ENTRY;
1506
1507         LASSERT(stop->ls_flags & LPF_BROADCAST);
1508
1509         set = ptlrpc_prep_set();
1510         if (unlikely(set == NULL))
1511                 RETURN(-ENOMEM);
1512
1513         memset(lr, 0, sizeof(*lr));
1514         lr->lr_event = LE_STOP;
1515         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1516         lr->lr_status = stop->ls_status;
1517         lr->lr_version = bk->lb_version;
1518         lr->lr_active = LFSCK_TYPES_ALL;
1519         lr->lr_param = stop->ls_flags;
1520
1521         laia->laia_com = NULL;
1522         laia->laia_ltds = ltds;
1523         laia->laia_lr = lr;
1524         laia->laia_result = 0;
1525         laia->laia_shared = 1;
1526
1527         down_read(&ltds->ltd_rw_sem);
1528         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1529                 ltd = lfsck_tgt_get(ltds, idx);
1530                 LASSERT(ltd != NULL);
1531
1532                 laia->laia_ltd = ltd;
1533                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1534                                          lfsck_async_interpret, laia,
1535                                          LFSCK_NOTIFY);
1536                 if (rc != 0) {
1537                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1538                         lfsck_tgt_put(ltd);
1539                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
1540                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1541                         rc1 = rc;
1542                 }
1543         }
1544         up_read(&ltds->ltd_rw_sem);
1545
1546         rc = ptlrpc_set_wait(set);
1547         ptlrpc_set_destroy(set);
1548
1549         if (rc == 0)
1550                 rc = laia->laia_result;
1551
1552         if (rc == -EALREADY)
1553                 rc = 0;
1554
1555         if (rc != 0)
1556                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1557                        lfsck_lfsck2name(lfsck), rc);
1558
1559         RETURN(rc != 0 ? rc : rc1);
1560 }
1561
1562 static int lfsck_start_all(const struct lu_env *env,
1563                            struct lfsck_instance *lfsck,
1564                            struct lfsck_start *start)
1565 {
1566         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1567         struct lfsck_request              *lr     = &info->lti_lr;
1568         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1569         struct ptlrpc_request_set         *set;
1570         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1571         struct lfsck_tgt_desc             *ltd;
1572         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1573         __u32                              idx;
1574         int                                rc     = 0;
1575         ENTRY;
1576
1577         LASSERT(start->ls_flags & LPF_BROADCAST);
1578
1579         set = ptlrpc_prep_set();
1580         if (unlikely(set == NULL))
1581                 RETURN(-ENOMEM);
1582
1583         memset(lr, 0, sizeof(*lr));
1584         lr->lr_event = LE_START;
1585         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1586         lr->lr_speed = bk->lb_speed_limit;
1587         lr->lr_version = bk->lb_version;
1588         lr->lr_active = start->ls_active;
1589         lr->lr_param = start->ls_flags;
1590         lr->lr_async_windows = bk->lb_async_windows;
1591         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1592                        LSV_ASYNC_WINDOWS;
1593
1594         laia->laia_com = NULL;
1595         laia->laia_ltds = ltds;
1596         laia->laia_lr = lr;
1597         laia->laia_result = 0;
1598         laia->laia_shared = 1;
1599
1600         down_read(&ltds->ltd_rw_sem);
1601         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1602                 ltd = lfsck_tgt_get(ltds, idx);
1603                 LASSERT(ltd != NULL);
1604
1605                 laia->laia_ltd = ltd;
1606                 ltd->ltd_layout_done = 0;
1607                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1608                                          lfsck_async_interpret, laia,
1609                                          LFSCK_NOTIFY);
1610                 if (rc != 0) {
1611                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1612                         lfsck_tgt_put(ltd);
1613                         CERROR("%s: cannot notify MDT %x for LFSCK "
1614                                "start, failout: rc = %d\n",
1615                                lfsck_lfsck2name(lfsck), idx, rc);
1616                         break;
1617                 }
1618         }
1619         up_read(&ltds->ltd_rw_sem);
1620
1621         if (rc != 0) {
1622                 ptlrpc_set_destroy(set);
1623
1624                 RETURN(rc);
1625         }
1626
1627         rc = ptlrpc_set_wait(set);
1628         ptlrpc_set_destroy(set);
1629
1630         if (rc == 0)
1631                 rc = laia->laia_result;
1632
1633         if (rc != 0) {
1634                 struct lfsck_stop *stop = &info->lti_stop;
1635
1636                 CERROR("%s: cannot start LFSCK on some MDTs, "
1637                        "stop all: rc = %d\n",
1638                        lfsck_lfsck2name(lfsck), rc);
1639                 if (rc != -EALREADY) {
1640                         stop->ls_status = LS_FAILED;
1641                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1642                         lfsck_stop_all(env, lfsck, stop);
1643                 }
1644         }
1645
1646         RETURN(rc);
1647 }
1648
1649 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1650                 struct lfsck_start_param *lsp)
1651 {
1652         struct lfsck_start              *start  = lsp->lsp_start;
1653         struct lfsck_instance           *lfsck;
1654         struct lfsck_bookmark           *bk;
1655         struct ptlrpc_thread            *thread;
1656         struct lfsck_component          *com;
1657         struct l_wait_info               lwi    = { 0 };
1658         struct lfsck_thread_args        *lta;
1659         struct task_struct              *task;
1660         int                              rc     = 0;
1661         __u16                            valid  = 0;
1662         __u16                            flags  = 0;
1663         __u16                            type   = 1;
1664         ENTRY;
1665
1666         lfsck = lfsck_instance_find(key, true, false);
1667         if (unlikely(lfsck == NULL))
1668                 RETURN(-ENXIO);
1669
1670         /* System is not ready, try again later. */
1671         if (unlikely(lfsck->li_namespace == NULL))
1672                 GOTO(put, rc = -EAGAIN);
1673
1674         /* start == NULL means auto trigger paused LFSCK. */
1675         if ((start == NULL) &&
1676             (list_empty(&lfsck->li_list_scan) ||
1677              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1678                 GOTO(put, rc = 0);
1679
1680         bk = &lfsck->li_bookmark_ram;
1681         thread = &lfsck->li_thread;
1682         mutex_lock(&lfsck->li_mutex);
1683         spin_lock(&lfsck->li_lock);
1684         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1685                 rc = -EALREADY;
1686                 if (unlikely(start == NULL)) {
1687                         spin_unlock(&lfsck->li_lock);
1688                         GOTO(out, rc);
1689                 }
1690
1691                 while (start->ls_active != 0) {
1692                         if (!(type & start->ls_active)) {
1693                                 type <<= 1;
1694                                 continue;
1695                         }
1696
1697                         com = __lfsck_component_find(lfsck, type,
1698                                                      &lfsck->li_list_scan);
1699                         if (com == NULL)
1700                                 com = __lfsck_component_find(lfsck, type,
1701                                                 &lfsck->li_list_double_scan);
1702                         if (com == NULL) {
1703                                 rc = -EOPNOTSUPP;
1704                                 break;
1705                         }
1706
1707                         if (com->lc_ops->lfsck_join != NULL) {
1708                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
1709                                 if (rc != 0 && rc != -EALREADY)
1710                                         break;
1711                         }
1712                         start->ls_active &= ~type;
1713                         type <<= 1;
1714                 }
1715                 spin_unlock(&lfsck->li_lock);
1716                 GOTO(out, rc);
1717         }
1718         spin_unlock(&lfsck->li_lock);
1719
1720         lfsck->li_status = 0;
1721         lfsck->li_oit_over = 0;
1722         lfsck->li_start_unplug = 0;
1723         lfsck->li_drop_dryrun = 0;
1724         lfsck->li_new_scanned = 0;
1725
1726         /* For auto trigger. */
1727         if (start == NULL)
1728                 goto trigger;
1729
1730         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
1731                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
1732                        lfsck_lfsck2name(lfsck));
1733
1734                 GOTO(out, rc = -EPERM);
1735         }
1736
1737         start->ls_version = bk->lb_version;
1738
1739         if (start->ls_active != 0) {
1740                 struct lfsck_component *next;
1741
1742                 if (start->ls_active == LFSCK_TYPES_ALL)
1743                         start->ls_active = LFSCK_TYPES_SUPPORTED;
1744
1745                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
1746                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1747                         GOTO(out, rc = -ENOTSUPP);
1748                 }
1749
1750                 list_for_each_entry_safe(com, next,
1751                                          &lfsck->li_list_scan, lc_link) {
1752                         if (!(com->lc_type & start->ls_active)) {
1753                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1754                                                              false);
1755                                 if (rc != 0)
1756                                         GOTO(out, rc);
1757                         }
1758                 }
1759
1760                 while (start->ls_active != 0) {
1761                         if (type & start->ls_active) {
1762                                 com = __lfsck_component_find(lfsck, type,
1763                                                         &lfsck->li_list_idle);
1764                                 if (com != NULL)
1765                                         /* The component status will be updated
1766                                          * when its prep() is called later by
1767                                          * the LFSCK main engine. */
1768                                         list_move_tail(&com->lc_link,
1769                                                        &lfsck->li_list_scan);
1770                                 start->ls_active &= ~type;
1771                         }
1772                         type <<= 1;
1773                 }
1774         }
1775
1776         if (list_empty(&lfsck->li_list_scan)) {
1777                 /* The speed limit will be used to control both the LFSCK and
1778                  * low layer scrub (if applied), need to be handled firstly. */
1779                 if (start->ls_valid & LSV_SPEED_LIMIT) {
1780                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
1781                                 rc = lfsck_bookmark_store(env, lfsck);
1782                                 if (rc != 0)
1783                                         GOTO(out, rc);
1784                         }
1785                 }
1786
1787                 goto trigger;
1788         }
1789
1790         if (start->ls_flags & LPF_RESET)
1791                 flags |= DOIF_RESET;
1792
1793         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
1794         if (rc != 0)
1795                 GOTO(out, rc);
1796
1797         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1798                 start->ls_active |= com->lc_type;
1799                 if (flags & DOIF_RESET) {
1800                         rc = com->lc_ops->lfsck_reset(env, com, false);
1801                         if (rc != 0)
1802                                 GOTO(out, rc);
1803                 }
1804         }
1805
1806 trigger:
1807         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1808         if (bk->lb_param & LPF_DRYRUN)
1809                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1810
1811         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
1812                 valid |= DOIV_ERROR_HANDLE;
1813                 if (start->ls_flags & LPF_FAILOUT)
1814                         flags |= DOIF_FAILOUT;
1815         }
1816
1817         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
1818                 valid |= DOIV_DRYRUN;
1819                 if (start->ls_flags & LPF_DRYRUN)
1820                         flags |= DOIF_DRYRUN;
1821         }
1822
1823         if (!list_empty(&lfsck->li_list_scan))
1824                 flags |= DOIF_OUTUSED;
1825
1826         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1827         thread_set_flags(thread, 0);
1828         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
1829         if (IS_ERR(lta))
1830                 GOTO(out, rc = PTR_ERR(lta));
1831
1832         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
1833         task = kthread_run(lfsck_master_engine, lta, "lfsck");
1834         if (IS_ERR(task)) {
1835                 rc = PTR_ERR(task);
1836                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
1837                        lfsck_lfsck2name(lfsck), rc);
1838                 lfsck_thread_args_fini(lta);
1839
1840                 GOTO(out, rc);
1841         }
1842
1843         l_wait_event(thread->t_ctl_waitq,
1844                      thread_is_running(thread) ||
1845                      thread_is_stopped(thread),
1846                      &lwi);
1847         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
1848                 lfsck->li_start_unplug = 1;
1849                 wake_up_all(&thread->t_ctl_waitq);
1850
1851                 GOTO(out, rc = 0);
1852         }
1853
1854         /* release lfsck::li_mutex to avoid deadlock. */
1855         mutex_unlock(&lfsck->li_mutex);
1856         rc = lfsck_start_all(env, lfsck, start);
1857         if (rc != 0) {
1858                 spin_lock(&lfsck->li_lock);
1859                 if (thread_is_stopped(thread)) {
1860                         spin_unlock(&lfsck->li_lock);
1861                 } else {
1862                         lfsck->li_status = LS_FAILED;
1863                         lfsck->li_flags = 0;
1864                         thread_set_flags(thread, SVC_STOPPING);
1865                         spin_unlock(&lfsck->li_lock);
1866
1867                         lfsck->li_start_unplug = 1;
1868                         wake_up_all(&thread->t_ctl_waitq);
1869                         l_wait_event(thread->t_ctl_waitq,
1870                                      thread_is_stopped(thread),
1871                                      &lwi);
1872                 }
1873         } else {
1874                 lfsck->li_start_unplug = 1;
1875                 wake_up_all(&thread->t_ctl_waitq);
1876         }
1877
1878         GOTO(put, rc);
1879
1880 out:
1881         mutex_unlock(&lfsck->li_mutex);
1882
1883 put:
1884         lfsck_instance_put(env, lfsck);
1885
1886         return rc < 0 ? rc : 0;
1887 }
1888 EXPORT_SYMBOL(lfsck_start);
1889
1890 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
1891                struct lfsck_stop *stop)
1892 {
1893         struct lfsck_instance   *lfsck;
1894         struct ptlrpc_thread    *thread;
1895         struct l_wait_info       lwi    = { 0 };
1896         int                      rc     = 0;
1897         int                      rc1    = 0;
1898         ENTRY;
1899
1900         lfsck = lfsck_instance_find(key, true, false);
1901         if (unlikely(lfsck == NULL))
1902                 RETURN(-ENXIO);
1903
1904         thread = &lfsck->li_thread;
1905         /* release lfsck::li_mutex to avoid deadlock. */
1906         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
1907                 if (!lfsck->li_master) {
1908                         CERROR("%s: only allow to specify '-A' via MDS\n",
1909                                lfsck_lfsck2name(lfsck));
1910
1911                         GOTO(out, rc = -EPERM);
1912                 }
1913
1914                 rc1 = lfsck_stop_all(env, lfsck, stop);
1915         }
1916
1917         mutex_lock(&lfsck->li_mutex);
1918         spin_lock(&lfsck->li_lock);
1919         /* no error if LFSCK is already stopped, or was never started */
1920         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1921                 spin_unlock(&lfsck->li_lock);
1922                 GOTO(out, rc = 0);
1923         }
1924
1925         if (stop != NULL) {
1926                 lfsck->li_status = stop->ls_status;
1927                 lfsck->li_flags = stop->ls_flags;
1928         } else {
1929                 lfsck->li_status = LS_STOPPED;
1930                 lfsck->li_flags = 0;
1931         }
1932
1933         thread_set_flags(thread, SVC_STOPPING);
1934         spin_unlock(&lfsck->li_lock);
1935
1936         wake_up_all(&thread->t_ctl_waitq);
1937         l_wait_event(thread->t_ctl_waitq,
1938                      thread_is_stopped(thread),
1939                      &lwi);
1940
1941         GOTO(out, rc = 0);
1942
1943 out:
1944         mutex_unlock(&lfsck->li_mutex);
1945         lfsck_instance_put(env, lfsck);
1946
1947         return rc != 0 ? rc : rc1;
1948 }
1949 EXPORT_SYMBOL(lfsck_stop);
1950
1951 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
1952                     struct lfsck_request *lr)
1953 {
1954         int rc = -EOPNOTSUPP;
1955         ENTRY;
1956
1957         switch (lr->lr_event) {
1958         case LE_START: {
1959                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
1960                 struct lfsck_start_param  lsp;
1961
1962                 memset(start, 0, sizeof(*start));
1963                 start->ls_valid = lr->lr_valid;
1964                 start->ls_speed_limit = lr->lr_speed;
1965                 start->ls_version = lr->lr_version;
1966                 start->ls_active = lr->lr_active;
1967                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1968                 start->ls_async_windows = lr->lr_async_windows;
1969
1970                 lsp.lsp_start = start;
1971                 lsp.lsp_index = lr->lr_index;
1972                 lsp.lsp_index_valid = 1;
1973                 rc = lfsck_start(env, key, &lsp);
1974                 break;
1975         }
1976         case LE_STOP: {
1977                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
1978
1979                 memset(stop, 0, sizeof(*stop));
1980                 stop->ls_status = lr->lr_status;
1981                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1982                 rc = lfsck_stop(env, key, stop);
1983                 break;
1984         }
1985         case LE_PHASE1_DONE:
1986         case LE_PHASE2_DONE:
1987         case LE_FID_ACCESSED:
1988         case LE_PEER_EXIT:
1989         case LE_CONDITIONAL_DESTROY:
1990         case LE_PAIRS_VERIFY: {
1991                 struct lfsck_instance  *lfsck;
1992                 struct lfsck_component *com;
1993
1994                 lfsck = lfsck_instance_find(key, true, false);
1995                 if (unlikely(lfsck == NULL))
1996                         RETURN(-ENXIO);
1997
1998                 com = lfsck_component_find(lfsck, lr->lr_active);
1999                 if (likely(com != NULL)) {
2000                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
2001                         lfsck_component_put(env, com);
2002                 }
2003
2004                 lfsck_instance_put(env, lfsck);
2005                 break;
2006         }
2007         default:
2008                 break;
2009         }
2010
2011         RETURN(rc);
2012 }
2013 EXPORT_SYMBOL(lfsck_in_notify);
2014
2015 int lfsck_query(const struct lu_env *env, struct dt_device *key,
2016                 struct lfsck_request *lr)
2017 {
2018         struct lfsck_instance  *lfsck;
2019         struct lfsck_component *com;
2020         int                     rc;
2021         ENTRY;
2022
2023         lfsck = lfsck_instance_find(key, true, false);
2024         if (unlikely(lfsck == NULL))
2025                 RETURN(-ENXIO);
2026
2027         com = lfsck_component_find(lfsck, lr->lr_active);
2028         if (likely(com != NULL)) {
2029                 rc = com->lc_ops->lfsck_query(env, com);
2030                 lfsck_component_put(env, com);
2031         } else {
2032                 rc = -ENOTSUPP;
2033         }
2034
2035         lfsck_instance_put(env, lfsck);
2036
2037         RETURN(rc);
2038 }
2039 EXPORT_SYMBOL(lfsck_query);
2040
2041 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
2042                              struct ldlm_namespace *ns)
2043 {
2044         struct lfsck_instance  *lfsck;
2045         int                     rc      = -ENXIO;
2046
2047         lfsck = lfsck_instance_find(key, true, false);
2048         if (likely(lfsck != NULL)) {
2049                 lfsck->li_namespace = ns;
2050                 lfsck_instance_put(env, lfsck);
2051                 rc = 0;
2052         }
2053
2054         return rc;
2055 }
2056 EXPORT_SYMBOL(lfsck_register_namespace);
2057
2058 int lfsck_register(const struct lu_env *env, struct dt_device *key,
2059                    struct dt_device *next, struct obd_device *obd,
2060                    lfsck_out_notify notify, void *notify_data, bool master)
2061 {
2062         struct lfsck_instance   *lfsck;
2063         struct dt_object        *root  = NULL;
2064         struct dt_object        *obj   = NULL;
2065         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
2066         int                      rc;
2067         ENTRY;
2068
2069         lfsck = lfsck_instance_find(key, false, false);
2070         if (unlikely(lfsck != NULL))
2071                 RETURN(-EEXIST);
2072
2073         OBD_ALLOC_PTR(lfsck);
2074         if (lfsck == NULL)
2075                 RETURN(-ENOMEM);
2076
2077         mutex_init(&lfsck->li_mutex);
2078         spin_lock_init(&lfsck->li_lock);
2079         INIT_LIST_HEAD(&lfsck->li_link);
2080         INIT_LIST_HEAD(&lfsck->li_list_scan);
2081         INIT_LIST_HEAD(&lfsck->li_list_dir);
2082         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2083         INIT_LIST_HEAD(&lfsck->li_list_idle);
2084         atomic_set(&lfsck->li_ref, 1);
2085         atomic_set(&lfsck->li_double_scan_count, 0);
2086         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2087         lfsck->li_out_notify = notify;
2088         lfsck->li_out_notify_data = notify_data;
2089         lfsck->li_next = next;
2090         lfsck->li_bottom = key;
2091         lfsck->li_obd = obd;
2092
2093         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2094         if (rc != 0)
2095                 GOTO(out, rc);
2096
2097         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2098         if (rc != 0)
2099                 GOTO(out, rc);
2100
2101         fid->f_seq = FID_SEQ_LOCAL_NAME;
2102         fid->f_oid = 1;
2103         fid->f_ver = 0;
2104         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
2105         if (rc != 0)
2106                 GOTO(out, rc);
2107
2108         rc = dt_root_get(env, key, fid);
2109         if (rc != 0)
2110                 GOTO(out, rc);
2111
2112         root = dt_locate(env, key, fid);
2113         if (IS_ERR(root))
2114                 GOTO(out, rc = PTR_ERR(root));
2115
2116         if (unlikely(!dt_try_as_dir(env, root)))
2117                 GOTO(out, rc = -ENOTDIR);
2118
2119         lfsck->li_local_root_fid = *fid;
2120         if (master) {
2121                 lfsck->li_master = 1;
2122                 if (lfsck_dev_idx(key) == 0) {
2123                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
2124                         const struct lu_name *cname;
2125
2126                         rc = dt_lookup(env, root,
2127                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2128                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2129                         if (rc != 0)
2130                                 GOTO(out, rc);
2131
2132                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
2133                         if (IS_ERR(obj))
2134                                 GOTO(out, rc = PTR_ERR(obj));
2135
2136                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2137                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
2138                         if (rc != 0)
2139                                 GOTO(out, rc);
2140
2141                         lu_object_put(env, &obj->do_lu);
2142                         obj = dt_locate(env, key, fid);
2143                         if (IS_ERR(obj))
2144                                 GOTO(out, rc = PTR_ERR(obj));
2145
2146                         cname = lfsck_name_get_const(env, dotlustre,
2147                                                      strlen(dotlustre));
2148                         rc = lfsck_verify_linkea(env, key, obj, cname,
2149                                                  &lfsck->li_global_root_fid);
2150                         if (rc != 0)
2151                                 GOTO(out, rc);
2152
2153                         *pfid = *fid;
2154                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2155                                        (const struct dt_key *)lostfound,
2156                                        BYPASS_CAPA);
2157                         if (rc != 0)
2158                                 GOTO(out, rc);
2159
2160                         lu_object_put(env, &obj->do_lu);
2161                         obj = dt_locate(env, key, fid);
2162                         if (IS_ERR(obj))
2163                                 GOTO(out, rc = PTR_ERR(obj));
2164
2165                         cname = lfsck_name_get_const(env, lostfound,
2166                                                      strlen(lostfound));
2167                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
2168                         if (rc != 0)
2169                                 GOTO(out, rc);
2170
2171                         lu_object_put(env, &obj->do_lu);
2172                         obj = NULL;
2173                 }
2174         }
2175
2176         fid->f_seq = FID_SEQ_LOCAL_FILE;
2177         fid->f_oid = OTABLE_IT_OID;
2178         fid->f_ver = 0;
2179         obj = dt_locate(env, key, fid);
2180         if (IS_ERR(obj))
2181                 GOTO(out, rc = PTR_ERR(obj));
2182
2183         lu_object_get(&obj->do_lu);
2184         lfsck->li_obj_oit = obj;
2185         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2186         if (rc != 0)
2187                 GOTO(out, rc);
2188
2189         rc = lfsck_bookmark_setup(env, lfsck);
2190         if (rc != 0)
2191                 GOTO(out, rc);
2192
2193         if (master) {
2194                 rc = lfsck_fid_init(lfsck);
2195                 if (rc < 0)
2196                         GOTO(out, rc);
2197
2198                 rc = lfsck_namespace_setup(env, lfsck);
2199                 if (rc < 0)
2200                         GOTO(out, rc);
2201         }
2202
2203         rc = lfsck_layout_setup(env, lfsck);
2204         if (rc < 0)
2205                 GOTO(out, rc);
2206
2207         /* XXX: more LFSCK components initialization to be added here. */
2208
2209         rc = lfsck_instance_add(lfsck);
2210         if (rc == 0)
2211                 rc = lfsck_add_target_from_orphan(env, lfsck);
2212 out:
2213         if (obj != NULL && !IS_ERR(obj))
2214                 lu_object_put(env, &obj->do_lu);
2215         if (root != NULL && !IS_ERR(root))
2216                 lu_object_put(env, &root->do_lu);
2217         if (rc != 0)
2218                 lfsck_instance_cleanup(env, lfsck);
2219         return rc;
2220 }
2221 EXPORT_SYMBOL(lfsck_register);
2222
2223 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2224 {
2225         struct lfsck_instance *lfsck;
2226
2227         lfsck = lfsck_instance_find(key, false, true);
2228         if (lfsck != NULL)
2229                 lfsck_instance_put(env, lfsck);
2230 }
2231 EXPORT_SYMBOL(lfsck_degister);
2232
2233 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2234                      struct dt_device *tgt, struct obd_export *exp,
2235                      __u32 index, bool for_ost)
2236 {
2237         struct lfsck_instance   *lfsck;
2238         struct lfsck_tgt_desc   *ltd;
2239         int                      rc;
2240         ENTRY;
2241
2242         OBD_ALLOC_PTR(ltd);
2243         if (ltd == NULL)
2244                 RETURN(-ENOMEM);
2245
2246         ltd->ltd_tgt = tgt;
2247         ltd->ltd_key = key;
2248         ltd->ltd_exp = exp;
2249         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2250         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2251         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2252         atomic_set(&ltd->ltd_ref, 1);
2253         ltd->ltd_index = index;
2254
2255         spin_lock(&lfsck_instance_lock);
2256         lfsck = __lfsck_instance_find(key, true, false);
2257         if (lfsck == NULL) {
2258                 if (for_ost)
2259                         list_add_tail(&ltd->ltd_orphan_list,
2260                                       &lfsck_ost_orphan_list);
2261                 else
2262                         list_add_tail(&ltd->ltd_orphan_list,
2263                                       &lfsck_mdt_orphan_list);
2264                 spin_unlock(&lfsck_instance_lock);
2265
2266                 RETURN(0);
2267         }
2268         spin_unlock(&lfsck_instance_lock);
2269
2270         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2271         if (rc != 0)
2272                 lfsck_tgt_put(ltd);
2273
2274         lfsck_instance_put(env, lfsck);
2275
2276         RETURN(rc);
2277 }
2278 EXPORT_SYMBOL(lfsck_add_target);
2279
2280 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2281                       struct dt_device *tgt, __u32 index, bool for_ost)
2282 {
2283         struct lfsck_instance   *lfsck;
2284         struct lfsck_tgt_descs  *ltds;
2285         struct lfsck_tgt_desc   *ltd;
2286         struct list_head        *head;
2287
2288         if (for_ost)
2289                 head = &lfsck_ost_orphan_list;
2290         else
2291                 head = &lfsck_mdt_orphan_list;
2292
2293         spin_lock(&lfsck_instance_lock);
2294         list_for_each_entry(ltd, head, ltd_orphan_list) {
2295                 if (ltd->ltd_tgt == tgt) {
2296                         list_del_init(&ltd->ltd_orphan_list);
2297                         spin_unlock(&lfsck_instance_lock);
2298                         lfsck_tgt_put(ltd);
2299
2300                         return;
2301                 }
2302         }
2303
2304         ltd = NULL;
2305         lfsck = __lfsck_instance_find(key, true, false);
2306         spin_unlock(&lfsck_instance_lock);
2307         if (unlikely(lfsck == NULL))
2308                 return;
2309
2310         if (for_ost)
2311                 ltds = &lfsck->li_ost_descs;
2312         else
2313                 ltds = &lfsck->li_mdt_descs;
2314
2315         down_write(&ltds->ltd_rw_sem);
2316         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2317
2318         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2319                 goto unlock;
2320
2321         ltd = LTD_TGT(ltds, index);
2322         if (unlikely(ltd == NULL))
2323                 goto unlock;
2324
2325         LASSERT(ltds->ltd_tgtnr > 0);
2326
2327         ltds->ltd_tgtnr--;
2328         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2329         LTD_TGT(ltds, index) = NULL;
2330
2331 unlock:
2332         if (ltd == NULL) {
2333                 if (for_ost)
2334                         head = &lfsck->li_ost_descs.ltd_orphan;
2335                 else
2336                         head = &lfsck->li_mdt_descs.ltd_orphan;
2337
2338                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2339                         if (ltd->ltd_tgt == tgt) {
2340                                 list_del_init(&ltd->ltd_orphan_list);
2341                                 break;
2342                         }
2343                 }
2344         }
2345
2346         up_write(&ltds->ltd_rw_sem);
2347         if (ltd != NULL) {
2348                 spin_lock(&ltds->ltd_lock);
2349                 ltd->ltd_dead = 1;
2350                 spin_unlock(&ltds->ltd_lock);
2351                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
2352                 lfsck_tgt_put(ltd);
2353         }
2354
2355         lfsck_instance_put(env, lfsck);
2356 }
2357 EXPORT_SYMBOL(lfsck_del_target);
2358
2359 static int __init lfsck_init(void)
2360 {
2361         int rc;
2362
2363         INIT_LIST_HEAD(&lfsck_instance_list);
2364         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2365         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2366         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2367         rc = lu_context_key_register(&lfsck_thread_key);
2368         if (rc == 0) {
2369                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2370                 tgt_register_lfsck_query(lfsck_query);
2371         }
2372
2373         return rc;
2374 }
2375
2376 static void __exit lfsck_exit(void)
2377 {
2378         struct lfsck_tgt_desc *ltd;
2379         struct lfsck_tgt_desc *next;
2380
2381         LASSERT(list_empty(&lfsck_instance_list));
2382
2383         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2384                                  ltd_orphan_list) {
2385                 list_del_init(&ltd->ltd_orphan_list);
2386                 lfsck_tgt_put(ltd);
2387         }
2388
2389         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2390                                  ltd_orphan_list) {
2391                 list_del_init(&ltd->ltd_orphan_list);
2392                 lfsck_tgt_put(ltd);
2393         }
2394
2395         lu_context_key_degister(&lfsck_thread_key);
2396 }
2397
2398 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2399 MODULE_DESCRIPTION("LFSCK");
2400 MODULE_LICENSE("GPL");
2401
2402 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);