Whamcloud - gitweb
1a45e332432f0d3d002095e81739ed761247f6b0
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <libcfs/list.h>
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <md_object.h>
39 #include <lustre_fld.h>
40 #include <lustre_lib.h>
41 #include <lustre_net.h>
42 #include <lustre_lfsck.h>
43 #include <lustre/lustre_lfsck_user.h>
44
45 #include "lfsck_internal.h"
46
47 #define LFSCK_CHECKPOINT_SKIP   1
48
49 /* define lfsck thread key */
50 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
51
52 static void lfsck_key_fini(const struct lu_context *ctx,
53                            struct lu_context_key *key, void *data)
54 {
55         struct lfsck_thread_info *info = data;
56
57         lu_buf_free(&info->lti_linkea_buf);
58         lu_buf_free(&info->lti_linkea_buf2);
59         lu_buf_free(&info->lti_big_buf);
60         OBD_FREE_PTR(info);
61 }
62
63 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
64 LU_KEY_INIT_GENERIC(lfsck);
65
66 static struct list_head lfsck_instance_list;
67 static struct list_head lfsck_ost_orphan_list;
68 static struct list_head lfsck_mdt_orphan_list;
69 static DEFINE_SPINLOCK(lfsck_instance_lock);
70
71 static const char *lfsck_status_names[] = {
72         [LS_INIT]               = "init",
73         [LS_SCANNING_PHASE1]    = "scanning-phase1",
74         [LS_SCANNING_PHASE2]    = "scanning-phase2",
75         [LS_COMPLETED]          = "completed",
76         [LS_FAILED]             = "failed",
77         [LS_STOPPED]            = "stopped",
78         [LS_PAUSED]             = "paused",
79         [LS_CRASHED]            = "crashed",
80         [LS_PARTIAL]            = "partial",
81         [LS_CO_FAILED]          = "co-failed",
82         [LS_CO_STOPPED]         = "co-stopped",
83         [LS_CO_PAUSED]          = "co-paused"
84 };
85
86 const char *lfsck_flags_names[] = {
87         "scanned-once",
88         "inconsistent",
89         "upgrade",
90         "incomplete",
91         "crashed_lastid",
92         NULL
93 };
94
95 const char *lfsck_param_names[] = {
96         NULL,
97         "failout",
98         "dryrun",
99         "all_targets",
100         "broadcast",
101         "orphan",
102         "create_ostobj",
103         "create_mdtobj",
104         NULL
105 };
106
107 enum lfsck_verify_lpf_types {
108         LVLT_BY_BOOKMARK        = 0,
109         LVLT_BY_NAMEENTRY       = 1,
110 };
111
112 const char *lfsck_status2names(enum lfsck_status status)
113 {
114         if (unlikely(status < 0 || status >= LS_MAX))
115                 return "unknown";
116
117         return lfsck_status_names[status];
118 }
119
120 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
121 {
122         spin_lock_init(&ltds->ltd_lock);
123         init_rwsem(&ltds->ltd_rw_sem);
124         INIT_LIST_HEAD(&ltds->ltd_orphan);
125         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
126         if (ltds->ltd_tgts_bitmap == NULL)
127                 return -ENOMEM;
128
129         return 0;
130 }
131
132 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
133 {
134         struct lfsck_tgt_desc   *ltd;
135         struct lfsck_tgt_desc   *next;
136         int                      idx;
137
138         down_write(&ltds->ltd_rw_sem);
139
140         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
141                                  ltd_orphan_list) {
142                 list_del_init(&ltd->ltd_orphan_list);
143                 lfsck_tgt_put(ltd);
144         }
145
146         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
147                 up_write(&ltds->ltd_rw_sem);
148
149                 return;
150         }
151
152         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
153                 ltd = lfsck_ltd2tgt(ltds, idx);
154                 if (likely(ltd != NULL)) {
155                         LASSERT(list_empty(&ltd->ltd_layout_list));
156                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
157                         LASSERT(list_empty(&ltd->ltd_namespace_list));
158                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
159
160                         ltds->ltd_tgtnr--;
161                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
162                         lfsck_assign_tgt(ltds, NULL, idx);
163                         lfsck_tgt_put(ltd);
164                 }
165         }
166
167         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
168                  ltds->ltd_tgtnr);
169
170         for (idx = 0; idx < TGT_PTRS; idx++) {
171                 if (ltds->ltd_tgts_idx[idx] != NULL) {
172                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
173                         ltds->ltd_tgts_idx[idx] = NULL;
174                 }
175         }
176
177         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
178         ltds->ltd_tgts_bitmap = NULL;
179         up_write(&ltds->ltd_rw_sem);
180 }
181
182 static int __lfsck_add_target(const struct lu_env *env,
183                               struct lfsck_instance *lfsck,
184                               struct lfsck_tgt_desc *ltd,
185                               bool for_ost, bool locked)
186 {
187         struct lfsck_tgt_descs *ltds;
188         __u32                   index = ltd->ltd_index;
189         int                     rc    = 0;
190         ENTRY;
191
192         if (for_ost)
193                 ltds = &lfsck->li_ost_descs;
194         else
195                 ltds = &lfsck->li_mdt_descs;
196
197         if (!locked)
198                 down_write(&ltds->ltd_rw_sem);
199
200         LASSERT(ltds->ltd_tgts_bitmap != NULL);
201
202         if (index >= ltds->ltd_tgts_bitmap->size) {
203                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
204                                     (__u32)BITS_PER_LONG);
205                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
206                 cfs_bitmap_t *new_bitmap;
207
208                 while (newsize < index + 1)
209                         newsize <<= 1;
210
211                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
212                 if (new_bitmap == NULL)
213                         GOTO(unlock, rc = -ENOMEM);
214
215                 if (ltds->ltd_tgtnr > 0)
216                         cfs_bitmap_copy(new_bitmap, old_bitmap);
217                 ltds->ltd_tgts_bitmap = new_bitmap;
218                 CFS_FREE_BITMAP(old_bitmap);
219         }
220
221         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
222                 CERROR("%s: the device %s (%u) is registered already\n",
223                        lfsck_lfsck2name(lfsck),
224                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
225                 GOTO(unlock, rc = -EEXIST);
226         }
227
228         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
229                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
230                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
231                         GOTO(unlock, rc = -ENOMEM);
232         }
233
234         lfsck_assign_tgt(ltds, ltd, index);
235         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
236         ltds->ltd_tgtnr++;
237
238         GOTO(unlock, rc = 0);
239
240 unlock:
241         if (!locked)
242                 up_write(&ltds->ltd_rw_sem);
243
244         return rc;
245 }
246
247 static int lfsck_add_target_from_orphan(const struct lu_env *env,
248                                         struct lfsck_instance *lfsck)
249 {
250         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
251         struct lfsck_tgt_desc   *ltd;
252         struct lfsck_tgt_desc   *next;
253         struct list_head        *head    = &lfsck_ost_orphan_list;
254         int                      rc;
255         bool                     for_ost = true;
256
257 again:
258         spin_lock(&lfsck_instance_lock);
259         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
260                 if (ltd->ltd_key == lfsck->li_bottom)
261                         list_move_tail(&ltd->ltd_orphan_list,
262                                        &ltds->ltd_orphan);
263         }
264         spin_unlock(&lfsck_instance_lock);
265
266         down_write(&ltds->ltd_rw_sem);
267         while (!list_empty(&ltds->ltd_orphan)) {
268                 ltd = list_entry(ltds->ltd_orphan.next,
269                                  struct lfsck_tgt_desc,
270                                  ltd_orphan_list);
271                 list_del_init(&ltd->ltd_orphan_list);
272                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
273                 /* Do not hold the semaphore for too long time. */
274                 up_write(&ltds->ltd_rw_sem);
275                 if (rc != 0)
276                         return rc;
277
278                 down_write(&ltds->ltd_rw_sem);
279         }
280         up_write(&ltds->ltd_rw_sem);
281
282         if (for_ost) {
283                 ltds = &lfsck->li_mdt_descs;
284                 head = &lfsck_mdt_orphan_list;
285                 for_ost = false;
286                 goto again;
287         }
288
289         return 0;
290 }
291
292 static inline struct lfsck_component *
293 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
294                        struct list_head *list)
295 {
296         struct lfsck_component *com;
297
298         list_for_each_entry(com, list, lc_link) {
299                 if (com->lc_type == type)
300                         return com;
301         }
302         return NULL;
303 }
304
305 struct lfsck_component *
306 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
307 {
308         struct lfsck_component *com;
309
310         spin_lock(&lfsck->li_lock);
311         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
312         if (com != NULL)
313                 goto unlock;
314
315         com = __lfsck_component_find(lfsck, type,
316                                      &lfsck->li_list_double_scan);
317         if (com != NULL)
318                 goto unlock;
319
320         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
321
322 unlock:
323         if (com != NULL)
324                 lfsck_component_get(com);
325         spin_unlock(&lfsck->li_lock);
326         return com;
327 }
328
329 void lfsck_component_cleanup(const struct lu_env *env,
330                              struct lfsck_component *com)
331 {
332         if (!list_empty(&com->lc_link))
333                 list_del_init(&com->lc_link);
334         if (!list_empty(&com->lc_link_dir))
335                 list_del_init(&com->lc_link_dir);
336
337         lfsck_component_put(env, com);
338 }
339
340 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
341                     struct lu_fid *fid, bool locked)
342 {
343         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
344         int                      rc = 0;
345         ENTRY;
346
347         if (!locked)
348                 mutex_lock(&lfsck->li_mutex);
349
350         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
351         if (rc >= 0) {
352                 bk->lb_last_fid = *fid;
353                 /* We do not care about whether the subsequent sub-operations
354                  * failed or not. The worst case is that one FID is lost that
355                  * is not a big issue for the LFSCK since it is relative rare
356                  * for LFSCK create. */
357                 rc = lfsck_bookmark_store(env, lfsck);
358         }
359
360         if (!locked)
361                 mutex_unlock(&lfsck->li_mutex);
362
363         RETURN(rc);
364 }
365
366 static int __lfsck_ibits_lock(const struct lu_env *env,
367                               struct lfsck_instance *lfsck,
368                               struct dt_object *obj, struct ldlm_res_id *resid,
369                               struct lustre_handle *lh, __u64 bits,
370                               enum ldlm_mode mode)
371 {
372         struct lfsck_thread_info        *info   = lfsck_env_info(env);
373         union ldlm_policy_data          *policy = &info->lti_policy;
374         __u64                            flags  = LDLM_FL_ATOMIC_CB;
375         int                              rc;
376
377         LASSERT(lfsck->li_namespace != NULL);
378
379         memset(policy, 0, sizeof(*policy));
380         policy->l_inodebits.bits = bits;
381         if (dt_object_remote(obj)) {
382                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
383
384                 memset(einfo, 0, sizeof(*einfo));
385                 einfo->ei_type = LDLM_IBITS;
386                 einfo->ei_mode = mode;
387                 einfo->ei_cb_bl = ldlm_blocking_ast;
388                 einfo->ei_cb_cp = ldlm_completion_ast;
389                 einfo->ei_res_id = resid;
390
391                 rc = dt_object_lock(env, obj, lh, einfo, policy);
392         } else {
393                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
394                                             LDLM_IBITS, policy, mode,
395                                             &flags, ldlm_blocking_ast,
396                                             ldlm_completion_ast, NULL, NULL,
397                                             0, LVB_T_NONE, NULL, lh);
398         }
399
400         if (rc == ELDLM_OK) {
401                 rc = 0;
402         } else {
403                 memset(lh, 0, sizeof(*lh));
404                 rc = -EIO;
405         }
406
407         return rc;
408 }
409
410 /**
411  * Request the specified ibits lock for the given object.
412  *
413  * Before the LFSCK modifying on the namespace visible object,
414  * it needs to acquire related ibits ldlm lock.
415  *
416  * \param[in] env       pointer to the thread context
417  * \param[in] lfsck     pointer to the lfsck instance
418  * \param[in] obj       pointer to the dt_object to be locked
419  * \param[out] lh       pointer to the lock handle
420  * \param[in] bits      the bits for the ldlm lock to be acquired
421  * \param[in] mode      the mode for the ldlm lock to be acquired
422  *
423  * \retval              0 for success
424  * \retval              negative error number on failure
425  */
426 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
427                      struct dt_object *obj, struct lustre_handle *lh,
428                      __u64 bits, enum ldlm_mode mode)
429 {
430         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
431
432         LASSERT(!lustre_handle_is_used(lh));
433
434         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
435         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
436 }
437
438 /**
439  * Release the the specified ibits lock.
440  *
441  * If the lock has been acquired before, release it
442  * and cleanup the handle. Otherwise, do nothing.
443  *
444  * \param[in] lh        pointer to the lock handle
445  * \param[in] mode      the mode for the ldlm lock to be released
446  */
447 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
448 {
449         if (lustre_handle_is_used(lh)) {
450                 ldlm_lock_decref(lh, mode);
451                 memset(lh, 0, sizeof(*lh));
452         }
453 }
454
455 /**
456  * Request compound ibits locks for the given <obj, name> pairs.
457  *
458  * Before the LFSCK modifying on the namespace visible object, it needs to
459  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
460  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
461  * modificationis (such as insert name entry to the directory) may be too
462  * coarse-grained and not efficient.
463  *
464  * The lfsck_lock() will request compound ibits locks on the specified
465  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
466  * lock on the directory object, and the regular ibits lock on the name hash.
467  *
468  * \param[in] env       pointer to the thread context
469  * \param[in] lfsck     pointer to the lfsck instance
470  * \param[in] obj       pointer to the dt_object to be locked
471  * \param[in] name      used for building the PDO lock resource
472  * \param[out] llh      pointer to the lfsck_lock_handle
473  * \param[in] bits      the bits for the ldlm lock to be acquired
474  * \param[in] mode      the mode for the ldlm lock to be acquired
475  *
476  * \retval              0 for success
477  * \retval              negative error number on failure
478  */
479 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
480                struct dt_object *obj, const char *name,
481                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
482 {
483         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
484         int                 rc;
485
486         LASSERT(S_ISDIR(lfsck_object_type(obj)));
487         LASSERT(name != NULL);
488         LASSERT(name[0] != 0);
489         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
490         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
491
492         switch (mode) {
493         case LCK_EX:
494                 llh->llh_pdo_mode = LCK_EX;
495                 break;
496         case LCK_PW:
497                 llh->llh_pdo_mode = LCK_CW;
498                 break;
499         case LCK_PR:
500                 llh->llh_pdo_mode = LCK_CR;
501                 break;
502         default:
503                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
504                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
505                        PFID(lfsck_dto2fid(obj)));
506                 LBUG();
507         }
508
509         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
510         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
511                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
512         if (rc != 0)
513                 return rc;
514
515         llh->llh_reg_mode = mode;
516         resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name));
517         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
518         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
519                                 bits, llh->llh_reg_mode);
520         if (rc != 0)
521                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
522
523         return rc;
524 }
525
526 /**
527  * Release the the compound ibits locks.
528  *
529  * \param[in] llh       pointer to the lfsck_lock_handle to be released
530  */
531 void lfsck_unlock(struct lfsck_lock_handle *llh)
532 {
533         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
534         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
535 }
536
537 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
538                               struct lfsck_instance *lfsck,
539                               const struct lu_fid *fid)
540 {
541         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
542         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
543         int                      rc;
544
545         fld_range_set_mdt(range);
546         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
547         if (rc == 0)
548                 rc = range->lsr_index;
549
550         return rc;
551 }
552
553 const char dot[] = ".";
554 const char dotdot[] = "..";
555 static const char dotlustre[] = ".lustre";
556 static const char lostfound[] = "lost+found";
557
558 /**
559  * Remove the name entry from the .lustre/lost+found directory.
560  *
561  * No need to care about the object referenced by the name entry,
562  * either the name entry is invalid or redundant, or the referenced
563  * object has been processed or will be handled by others.
564  *
565  * \param[in] env       pointer to the thread context
566  * \param[in] lfsck     pointer to the lfsck instance
567  * \param[in] name      the name for the name entry to be removed
568  *
569  * \retval              0 for success
570  * \retval              negative error number on failure
571  */
572 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
573                                        struct lfsck_instance *lfsck,
574                                        const char *name)
575 {
576         struct dt_object        *parent = lfsck->li_lpf_root_obj;
577         struct dt_device        *dev    = lfsck_obj2dev(parent);
578         struct thandle          *th;
579         struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
580         int                      rc;
581         ENTRY;
582
583         rc = lfsck_lock(env, lfsck, parent, name, llh,
584                         MDS_INODELOCK_UPDATE, LCK_PW);
585         if (rc != 0)
586                 RETURN(rc);
587
588         th = dt_trans_create(env, dev);
589         if (IS_ERR(th))
590                 GOTO(unlock, rc = PTR_ERR(th));
591
592         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
593         if (rc != 0)
594                 GOTO(stop, rc);
595
596         rc = dt_declare_ref_del(env, parent, th);
597         if (rc != 0)
598                 GOTO(stop, rc);
599
600         rc = dt_trans_start_local(env, dev, th);
601         if (rc != 0)
602                 GOTO(stop, rc);
603
604         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
605         if (rc != 0)
606                 GOTO(stop, rc);
607
608         dt_write_lock(env, parent, 0);
609         rc = dt_ref_del(env, parent, th);
610         dt_write_unlock(env, parent);
611
612         GOTO(stop, rc);
613
614 stop:
615         dt_trans_stop(env, dev, th);
616
617 unlock:
618         lfsck_unlock(llh);
619
620         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
621                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
622
623         return rc;
624 }
625
626 static int lfsck_create_lpf_local(const struct lu_env *env,
627                                   struct lfsck_instance *lfsck,
628                                   struct dt_object *child,
629                                   struct lu_attr *la,
630                                   struct dt_object_format *dof,
631                                   const char *name)
632 {
633         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
634         struct dt_object        *parent = lfsck->li_lpf_root_obj;
635         struct dt_device        *dev    = lfsck_obj2dev(child);
636         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
637         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
638         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
639         struct thandle          *th     = NULL;
640         struct linkea_data       ldata  = { NULL };
641         struct lu_buf            linkea_buf;
642         const struct lu_name    *cname;
643         loff_t                   pos    = 0;
644         int                      len    = sizeof(struct lfsck_bookmark);
645         int                      rc;
646         ENTRY;
647
648         rc = linkea_data_new(&ldata,
649                              &lfsck_env_info(env)->lti_linkea_buf2);
650         if (rc != 0)
651                 RETURN(rc);
652
653         cname = lfsck_name_get_const(env, name, strlen(name));
654         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
655         if (rc != 0)
656                 RETURN(rc);
657
658         th = dt_trans_create(env, dev);
659         if (IS_ERR(th))
660                 RETURN(PTR_ERR(th));
661
662         /* 1a. create child */
663         rc = dt_declare_create(env, child, la, NULL, dof, th);
664         if (rc != 0)
665                 GOTO(stop, rc);
666
667         if (!dt_try_as_dir(env, child))
668                 GOTO(stop, rc = -ENOTDIR);
669
670         /* 2a. increase child nlink */
671         rc = dt_declare_ref_add(env, child, th);
672         if (rc != 0)
673                 GOTO(stop, rc);
674
675         /* 3a. insert dot into child dir */
676         rec->rec_type = S_IFDIR;
677         rec->rec_fid = cfid;
678         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
679                                (const struct dt_key *)dot, th);
680         if (rc != 0)
681                 GOTO(stop, rc);
682
683         /* 4a. insert dotdot into child dir */
684         rec->rec_fid = &LU_LPF_FID;
685         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
686                                (const struct dt_key *)dotdot, th);
687         if (rc != 0)
688                 GOTO(stop, rc);
689
690         /* 5a. insert linkEA for child */
691         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
692                        ldata.ld_leh->leh_len);
693         rc = dt_declare_xattr_set(env, child, &linkea_buf,
694                                   XATTR_NAME_LINK, 0, th);
695         if (rc != 0)
696                 GOTO(stop, rc);
697
698         /* 6a. insert name into parent dir */
699         rec->rec_type = S_IFDIR;
700         rec->rec_fid = cfid;
701         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
702                                (const struct dt_key *)name, th);
703         if (rc != 0)
704                 GOTO(stop, rc);
705
706         /* 7a. increase parent nlink */
707         rc = dt_declare_ref_add(env, parent, th);
708         if (rc != 0)
709                 GOTO(stop, rc);
710
711         /* 8a. update bookmark */
712         rc = dt_declare_record_write(env, bk_obj,
713                                      lfsck_buf_get(env, bk, len), 0, th);
714         if (rc != 0)
715                 GOTO(stop, rc);
716
717         rc = dt_trans_start_local(env, dev, th);
718         if (rc != 0)
719                 GOTO(stop, rc);
720
721         dt_write_lock(env, child, 0);
722         /* 1b. create child */
723         rc = dt_create(env, child, la, NULL, dof, th);
724         if (rc != 0)
725                 GOTO(unlock, rc);
726
727         /* 2b. increase child nlink */
728         rc = dt_ref_add(env, child, th);
729         if (rc != 0)
730                 GOTO(unlock, rc);
731
732         /* 3b. insert dot into child dir */
733         rec->rec_fid = cfid;
734         rc = dt_insert(env, child, (const struct dt_rec *)rec,
735                        (const struct dt_key *)dot, th, 1);
736         if (rc != 0)
737                 GOTO(unlock, rc);
738
739         /* 4b. insert dotdot into child dir */
740         rec->rec_fid = &LU_LPF_FID;
741         rc = dt_insert(env, child, (const struct dt_rec *)rec,
742                        (const struct dt_key *)dotdot, th, 1);
743         if (rc != 0)
744                 GOTO(unlock, rc);
745
746         /* 5b. insert linkEA for child. */
747         rc = dt_xattr_set(env, child, &linkea_buf,
748                           XATTR_NAME_LINK, 0, th);
749         dt_write_unlock(env, child);
750         if (rc != 0)
751                 GOTO(stop, rc);
752
753         /* 6b. insert name into parent dir */
754         rec->rec_fid = cfid;
755         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
756                        (const struct dt_key *)name, th, 1);
757         if (rc != 0)
758                 GOTO(stop, rc);
759
760         dt_write_lock(env, parent, 0);
761         /* 7b. increase parent nlink */
762         rc = dt_ref_add(env, parent, th);
763         dt_write_unlock(env, parent);
764         if (rc != 0)
765                 GOTO(stop, rc);
766
767         bk->lb_lpf_fid = *cfid;
768         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
769
770         /* 8b. update bookmark */
771         rc = dt_record_write(env, bk_obj,
772                              lfsck_buf_get(env, bk, len), &pos, th);
773
774         GOTO(stop, rc);
775
776 unlock:
777         dt_write_unlock(env, child);
778
779 stop:
780         dt_trans_stop(env, dev, th);
781
782         return rc;
783 }
784
785 static int lfsck_create_lpf_remote(const struct lu_env *env,
786                                    struct lfsck_instance *lfsck,
787                                    struct dt_object *child,
788                                    struct lu_attr *la,
789                                    struct dt_object_format *dof,
790                                    const char *name)
791 {
792         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
793         struct dt_object        *parent = lfsck->li_lpf_root_obj;
794         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
795         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
796         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
797         struct thandle          *th     = NULL;
798         struct linkea_data       ldata  = { NULL };
799         struct lu_buf            linkea_buf;
800         const struct lu_name    *cname;
801         struct dt_device        *dev;
802         loff_t                   pos    = 0;
803         int                      len    = sizeof(struct lfsck_bookmark);
804         int                      rc;
805         ENTRY;
806
807         rc = linkea_data_new(&ldata,
808                              &lfsck_env_info(env)->lti_linkea_buf2);
809         if (rc != 0)
810                 RETURN(rc);
811
812         cname = lfsck_name_get_const(env, name, strlen(name));
813         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
814         if (rc != 0)
815                 RETURN(rc);
816
817         /* Create .lustre/lost+found/MDTxxxx. */
818
819         /* XXX: Currently, cross-MDT create operation needs to create the child
820          *      object firstly, then insert name into the parent directory. For
821          *      this case, the child object resides on current MDT (local), but
822          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
823          *      easy to contain all the sub-modifications orderly within single
824          *      transaction.
825          *
826          *      To avoid more inconsistency, we split the create operation into
827          *      two transactions:
828          *
829          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
830          *         locally.
831          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
832          *         remotely.
833          *
834          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
835          *      repair such inconsistency when LFSCK run next time. */
836
837         /* Transaction I: locally */
838
839         dev = lfsck_obj2dev(child);
840         th = dt_trans_create(env, dev);
841         if (IS_ERR(th))
842                 RETURN(PTR_ERR(th));
843
844         /* 1a. create child */
845         rc = dt_declare_create(env, child, la, NULL, dof, th);
846         if (rc != 0)
847                 GOTO(stop, rc);
848
849         if (!dt_try_as_dir(env, child))
850                 GOTO(stop, rc = -ENOTDIR);
851
852         /* 2a. increase child nlink */
853         rc = dt_declare_ref_add(env, child, th);
854         if (rc != 0)
855                 GOTO(stop, rc);
856
857         /* 3a. insert dot into child dir */
858         rec->rec_type = S_IFDIR;
859         rec->rec_fid = cfid;
860         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
861                                (const struct dt_key *)dot, th);
862         if (rc != 0)
863                 GOTO(stop, rc);
864
865         /* 4a. insert dotdot into child dir */
866         rec->rec_fid = &LU_LPF_FID;
867         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
868                                (const struct dt_key *)dotdot, th);
869         if (rc != 0)
870                 GOTO(stop, rc);
871
872         /* 5a. insert linkEA for child */
873         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
874                        ldata.ld_leh->leh_len);
875         rc = dt_declare_xattr_set(env, child, &linkea_buf,
876                                   XATTR_NAME_LINK, 0, th);
877         if (rc != 0)
878                 GOTO(stop, rc);
879
880         /* 6a. update bookmark */
881         rc = dt_declare_record_write(env, bk_obj,
882                                      lfsck_buf_get(env, bk, len), 0, th);
883         if (rc != 0)
884                 GOTO(stop, rc);
885
886         rc = dt_trans_start_local(env, dev, th);
887         if (rc != 0)
888                 GOTO(stop, rc);
889
890         dt_write_lock(env, child, 0);
891         /* 1b. create child */
892         rc = dt_create(env, child, la, NULL, dof, th);
893         if (rc != 0)
894                 GOTO(unlock, rc);
895
896         /* 2b. increase child nlink */
897         rc = dt_ref_add(env, child, th);
898         if (rc != 0)
899                 GOTO(unlock, rc);
900
901         /* 3b. insert dot into child dir */
902         rec->rec_type = S_IFDIR;
903         rec->rec_fid = cfid;
904         rc = dt_insert(env, child, (const struct dt_rec *)rec,
905                        (const struct dt_key *)dot, th, 1);
906         if (rc != 0)
907                 GOTO(unlock, rc);
908
909         /* 4b. insert dotdot into child dir */
910         rec->rec_fid = &LU_LPF_FID;
911         rc = dt_insert(env, child, (const struct dt_rec *)rec,
912                        (const struct dt_key *)dotdot, th, 1);
913         if (rc != 0)
914                 GOTO(unlock, rc);
915
916         /* 5b. insert linkEA for child */
917         rc = dt_xattr_set(env, child, &linkea_buf,
918                           XATTR_NAME_LINK, 0, th);
919         if (rc != 0)
920                 GOTO(unlock, rc);
921
922         bk->lb_lpf_fid = *cfid;
923         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
924
925         /* 6b. update bookmark */
926         rc = dt_record_write(env, bk_obj,
927                              lfsck_buf_get(env, bk, len), &pos, th);
928
929         dt_write_unlock(env, child);
930         dt_trans_stop(env, dev, th);
931         if (rc != 0)
932                 RETURN(rc);
933
934         /* Transaction II: remotely */
935
936         dev = lfsck_obj2dev(parent);
937         th = dt_trans_create(env, dev);
938         if (IS_ERR(th))
939                 RETURN(PTR_ERR(th));
940
941         th->th_sync = 1;
942         /* 5a. insert name into parent dir */
943         rec->rec_fid = cfid;
944         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
945                                (const struct dt_key *)name, th);
946         if (rc != 0)
947                 GOTO(stop, rc);
948
949         /* 6a. increase parent nlink */
950         rc = dt_declare_ref_add(env, parent, th);
951         if (rc != 0)
952                 GOTO(stop, rc);
953
954         rc = dt_trans_start_local(env, dev, th);
955         if (rc != 0)
956                 GOTO(stop, rc);
957
958         /* 5b. insert name into parent dir */
959         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
960                        (const struct dt_key *)name, th, 1);
961         if (rc != 0)
962                 GOTO(stop, rc);
963
964         dt_write_lock(env, parent, 0);
965         /* 6b. increase parent nlink */
966         rc = dt_ref_add(env, parent, th);
967         dt_write_unlock(env, parent);
968
969         GOTO(stop, rc);
970
971 unlock:
972         dt_write_unlock(env, child);
973 stop:
974         dt_trans_stop(env, dev, th);
975
976         if (rc != 0 && dev == lfsck_obj2dev(parent))
977                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
978                        "for orphans, but failed to insert the name %s "
979                        "to the .lustre/lost+found/. Such inconsistency "
980                        "will be repaired when LFSCK run next time: rc = %d\n",
981                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
982
983         return rc;
984 }
985
986 /**
987  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
988  *
989  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
990  * orphans and other uncertain inconsistent objects found during the
991  * LFSCK. Such directory will be created by the LFSCK engine on the
992  * local MDT before the LFSCK scanning.
993  *
994  * \param[in] env       pointer to the thread context
995  * \param[in] lfsck     pointer to the lfsck instance
996  *
997  * \retval              0 for success
998  * \retval              negative error number on failure
999  */
1000 static int lfsck_create_lpf(const struct lu_env *env,
1001                             struct lfsck_instance *lfsck)
1002 {
1003         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
1004         struct lfsck_thread_info *info  = lfsck_env_info(env);
1005         struct lu_fid            *cfid  = &info->lti_fid2;
1006         struct lu_attr           *la    = &info->lti_la;
1007         struct dt_object_format  *dof   = &info->lti_dof;
1008         struct dt_object         *parent = lfsck->li_lpf_root_obj;
1009         struct dt_object         *child = NULL;
1010         struct lfsck_lock_handle *llh   = &info->lti_llh;
1011         char                      name[8];
1012         int                       node  = lfsck_dev_idx(lfsck);
1013         int                       rc    = 0;
1014         ENTRY;
1015
1016         LASSERT(lfsck->li_master);
1017         LASSERT(parent != NULL);
1018         LASSERT(lfsck->li_lpf_obj == NULL);
1019
1020         snprintf(name, 8, "MDT%04x", node);
1021         rc = lfsck_lock(env, lfsck, parent, name, llh,
1022                         MDS_INODELOCK_UPDATE, LCK_PW);
1023         if (rc != 0)
1024                 RETURN(rc);
1025
1026         if (fid_is_zero(&bk->lb_lpf_fid)) {
1027                 /* There is corner case that: in former LFSCK scanning we have
1028                  * created the .lustre/lost+found/MDTxxxx but failed to update
1029                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1030                  * it from MDT0 firstly. */
1031                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1032                                (const struct dt_key *)name);
1033                 if (rc != 0 && rc != -ENOENT)
1034                         GOTO(unlock, rc);
1035
1036                 if (rc == 0) {
1037                         bk->lb_lpf_fid = *cfid;
1038                         rc = lfsck_bookmark_store(env, lfsck);
1039                 } else {
1040                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1041                 }
1042                 if (rc != 0)
1043                         GOTO(unlock, rc);
1044         } else {
1045                 *cfid = bk->lb_lpf_fid;
1046         }
1047
1048         child = lfsck_object_find_bottom(env, lfsck, cfid);
1049         if (IS_ERR(child))
1050                 GOTO(unlock, rc = PTR_ERR(child));
1051
1052         if (dt_object_exists(child) != 0) {
1053                 if (unlikely(!dt_try_as_dir(env, child)))
1054                         rc = -ENOTDIR;
1055                 else
1056                         lfsck->li_lpf_obj = child;
1057
1058                 GOTO(unlock, rc);
1059         }
1060
1061         memset(la, 0, sizeof(*la));
1062         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
1063         la->la_mode = S_IFDIR | S_IRWXU;
1064         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1065                        LA_UID | LA_GID;
1066         memset(dof, 0, sizeof(*dof));
1067         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1068
1069         if (node == 0)
1070                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1071         else
1072                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1073         if (rc == 0)
1074                 lfsck->li_lpf_obj = child;
1075
1076         GOTO(unlock, rc);
1077
1078 unlock:
1079         lfsck_unlock(llh);
1080         if (rc != 0 && child != NULL && !IS_ERR(child))
1081                 lfsck_object_put(env, child);
1082
1083         return rc;
1084 }
1085
1086 /**
1087  * Scan .lustre/lost+found for bad name entries and remove them.
1088  *
1089  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1090  * index in the system. Any other formatted name is invalid and should be
1091  * removed.
1092  *
1093  * \param[in] env       pointer to the thread context
1094  * \param[in] lfsck     pointer to the lfsck instance
1095  *
1096  * \retval              0 for success
1097  * \retval              negative error number on failure
1098  */
1099 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1100                                       struct lfsck_instance *lfsck)
1101 {
1102         struct dt_object        *parent = lfsck->li_lpf_root_obj;
1103         struct lu_dirent        *ent    =
1104                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1105         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
1106         struct dt_it            *it;
1107         int                      rc;
1108         ENTRY;
1109
1110         it = iops->init(env, parent, LUDA_64BITHASH);
1111         if (IS_ERR(it))
1112                 RETURN(PTR_ERR(it));
1113
1114         rc = iops->load(env, it, 0);
1115         if (rc == 0)
1116                 rc = iops->next(env, it);
1117         else if (rc > 0)
1118                 rc = 0;
1119
1120         while (rc == 0) {
1121                 int off = 3;
1122
1123                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1124                 if (rc != 0)
1125                         break;
1126
1127                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1128                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1129                         goto next;
1130
1131                 /* name length must be strlen("MDTxxxx") */
1132                 if (ent->lde_namelen != 7)
1133                         goto remove;
1134
1135                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1136                         goto remove;
1137
1138                 while (off < 7 && isxdigit(ent->lde_name[off]))
1139                         off++;
1140
1141                 if (off != 7) {
1142
1143 remove:
1144                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1145                                                          ent->lde_name);
1146                         if (rc != 0)
1147                                 break;
1148                 }
1149
1150 next:
1151                 rc = iops->next(env, it);
1152         }
1153
1154         iops->put(env, it);
1155         iops->fini(env, it);
1156
1157         RETURN(rc > 0 ? 0 : rc);
1158 }
1159
1160 static int lfsck_update_lpf_entry(const struct lu_env *env,
1161                                   struct lfsck_instance *lfsck,
1162                                   struct dt_object *parent,
1163                                   struct dt_object *child,
1164                                   const char *name,
1165                                   enum lfsck_verify_lpf_types type)
1166 {
1167         int rc;
1168
1169         if (type == LVLT_BY_BOOKMARK) {
1170                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1171                                              lfsck_dto2fid(child), S_IFDIR);
1172         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1173                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1174                 rc = lfsck_bookmark_store(env, lfsck);
1175
1176                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1177                        " in the bookmark file: rc = %d\n",
1178                        lfsck_lfsck2name(lfsck),
1179                        PFID(lfsck_dto2fid(child)), rc);
1180         }
1181
1182         return rc;
1183 }
1184
1185 /**
1186  * Check whether the @child back references the @parent.
1187  *
1188  * Two cases:
1189  * 1) The child's FID is stored in the bookmark file. If the child back
1190  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1191  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1192  *    the child back references another parent2, then:
1193  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1194  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1195  *      references the child. So keep them there. As the LFSCK processing,
1196  *      the parent3 may be found, then when the LFSCK run next time, the
1197  *      inconsistency can be repaired.
1198  *
1199  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1200  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1201  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1202  *    back references another parent2, then:
1203  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1204  *      from .lustre/lost+found/;
1205  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1206  *      sub-directory name entry and update the child;
1207  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1208  *      or not, then keep them there.
1209  *
1210  * \param[in] env       pointer to the thread context
1211  * \param[in] lfsck     pointer to the lfsck instance
1212  * \param[in] child     pointer to the lost+found sub-directory object
1213  * \param[in] name      the name for lost+found sub-directory object
1214  * \param[out] fid      pointer to the buffer to hold the FID of the object
1215  *                      (called it as parent2) that is referenced via the
1216  *                      child's dotdot entry; it also can be the FID that
1217  *                      is referenced by the name entry under the parent2.
1218  * \param[in] type      to indicate where the child's FID is stored in
1219  *
1220  * \retval              positive number for uncertain inconsistency
1221  * \retval              0 for success
1222  * \retval              negative error number on failure
1223  */
1224 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1225                                   struct lfsck_instance *lfsck,
1226                                   struct dt_object *child, const char *name,
1227                                   struct lu_fid *fid,
1228                                   enum lfsck_verify_lpf_types type)
1229 {
1230         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1231         struct lfsck_thread_info *info    = lfsck_env_info(env);
1232         char                     *name2   = info->lti_key;
1233         struct lu_fid            *fid2    = &info->lti_fid3;
1234         struct dt_object         *parent2 = NULL;
1235         struct lustre_handle      lh      = { 0 };
1236         int                       rc;
1237         ENTRY;
1238
1239         fid_zero(fid);
1240         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1241                        (const struct dt_key *)dotdot);
1242         if (rc != 0)
1243                 GOTO(linkea, rc);
1244
1245         if (!fid_is_sane(fid))
1246                 GOTO(linkea, rc = -EINVAL);
1247
1248         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1249                 const struct lu_name *cname;
1250
1251                 if (lfsck->li_lpf_obj == NULL) {
1252                         lu_object_get(&child->do_lu);
1253                         lfsck->li_lpf_obj = child;
1254                 }
1255
1256                 cname = lfsck_name_get_const(env, name, strlen(name));
1257                 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1258                 if (rc == 0)
1259                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1260                                                     name, type);
1261
1262                 GOTO(out_done, rc);
1263         }
1264
1265         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1266         if (IS_ERR(parent2))
1267                 GOTO(linkea, parent2);
1268
1269         if (!dt_object_exists(parent2)) {
1270                 lfsck_object_put(env, parent2);
1271
1272                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1273         }
1274
1275         if (!dt_try_as_dir(env, parent2)) {
1276                 lfsck_object_put(env, parent2);
1277
1278                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1279         }
1280
1281 linkea:
1282         /* To prevent rename/unlink race */
1283         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1284                               MDS_INODELOCK_UPDATE, LCK_PR);
1285         if (rc != 0)
1286                 GOTO(out_put, rc);
1287
1288         dt_read_lock(env, child, 0);
1289         rc = lfsck_links_get_first(env, child, name2, fid2);
1290         if (rc != 0) {
1291                 dt_read_unlock(env, child);
1292                 lfsck_ibits_unlock(&lh, LCK_PR);
1293
1294                 GOTO(out_put, rc = 1);
1295         }
1296
1297         /* It is almost impossible that the bookmark file (or the name entry)
1298          * and the linkEA hit the same data corruption. Trust the linkEA. */
1299         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1300                 dt_read_unlock(env, child);
1301                 lfsck_ibits_unlock(&lh, LCK_PR);
1302
1303                 *fid = *fid2;
1304                 if (lfsck->li_lpf_obj == NULL) {
1305                         lu_object_get(&child->do_lu);
1306                         lfsck->li_lpf_obj = child;
1307                 }
1308
1309                 /* Update the child's dotdot entry */
1310                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1311                                              &LU_LPF_FID, S_IFDIR);
1312                 if (rc == 0)
1313                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1314                                                     name, type);
1315
1316                 GOTO(out_put, rc);
1317         }
1318
1319         if (parent2 == NULL || IS_ERR(parent2)) {
1320                 dt_read_unlock(env, child);
1321                 lfsck_ibits_unlock(&lh, LCK_PR);
1322
1323                 GOTO(out_done, rc = 1);
1324         }
1325
1326         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1327                        (const struct dt_key *)name2);
1328         dt_read_unlock(env, child);
1329         lfsck_ibits_unlock(&lh, LCK_PR);
1330         if (rc != 0 && rc != -ENOENT)
1331                 GOTO(out_put, rc);
1332
1333         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1334                 if (type == LVLT_BY_BOOKMARK)
1335                         GOTO(out_put, rc = 1);
1336
1337                 /* Trust the name entry, update the child's dotdot entry. */
1338                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1339                                              &LU_LPF_FID, S_IFDIR);
1340
1341                 GOTO(out_put, rc);
1342         }
1343
1344         if (type == LVLT_BY_BOOKMARK) {
1345                 /* Invalid FID record in the bookmark file, reset it. */
1346                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1347                 rc = lfsck_bookmark_store(env, lfsck);
1348
1349                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1350                        " in the bookmark file: rc = %d\n",
1351                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1352         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1353                 /* The name entry is wrong, remove it. */
1354                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1355         }
1356
1357         GOTO(out_put, rc);
1358
1359 out_put:
1360         if (parent2 != NULL && !IS_ERR(parent2))
1361                 lfsck_object_put(env, parent2);
1362
1363 out_done:
1364         return rc;
1365 }
1366
1367 /**
1368  * Verify the /ROOT/.lustre/lost+found/ directory.
1369  *
1370  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1371  * the LFSCK does not exactly know how to handle, such as orphans. So before
1372  * the LFSCK scanning the system, the consistency of such directory needs to
1373  * be verified firstly to allow the users to use it during the LFSCK.
1374  *
1375  * \param[in] env       pointer to the thread context
1376  * \param[in] lfsck     pointer to the lfsck instance
1377  *
1378  * \retval              positive number for uncertain inconsistency
1379  * \retval              0 for success
1380  * \retval              negative error number on failure
1381  */
1382 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1383 {
1384         struct lfsck_thread_info *info   = lfsck_env_info(env);
1385         struct lu_fid            *pfid   = &info->lti_fid;
1386         struct lu_fid            *cfid   = &info->lti_fid2;
1387         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1388         struct dt_object         *parent;
1389         /* child1's FID is in the bookmark file. */
1390         struct dt_object         *child1 = NULL;
1391         /* child2's FID is in the name entry MDTxxxx. */
1392         struct dt_object         *child2 = NULL;
1393         const struct lu_name     *cname;
1394         char                      name[8];
1395         int                       node   = lfsck_dev_idx(lfsck);
1396         int                       rc     = 0;
1397         ENTRY;
1398
1399         LASSERT(lfsck->li_master);
1400
1401         if (lfsck->li_lpf_root_obj != NULL)
1402                 RETURN(0);
1403
1404         if (node == 0) {
1405                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1406                                                   &LU_LPF_FID);
1407         } else {
1408                 struct lfsck_tgt_desc *ltd;
1409
1410                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1411                 if (unlikely(ltd == NULL))
1412                         RETURN(-ENXIO);
1413
1414                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1415                                                   &LU_LPF_FID);
1416                 lfsck_tgt_put(ltd);
1417         }
1418
1419         if (IS_ERR(parent))
1420                 RETURN(PTR_ERR(parent));
1421
1422         LASSERT(dt_object_exists(parent));
1423
1424         if (unlikely(!dt_try_as_dir(env, parent))) {
1425                 lfsck_object_put(env, parent);
1426
1427                 GOTO(put, rc = -ENOTDIR);
1428         }
1429
1430         lfsck->li_lpf_root_obj = parent;
1431         if (node == 0) {
1432                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1433                 if (rc != 0)
1434                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1435                                "for bad sub-directories: rc = %d\n",
1436                                lfsck_lfsck2name(lfsck), rc);
1437         }
1438
1439         /* child2 */
1440         snprintf(name, 8, "MDT%04x", node);
1441         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1442                        (const struct dt_key *)name);
1443         if (rc == -ENOENT) {
1444                 rc = 0;
1445                 goto find_child1;
1446         }
1447
1448         if (rc != 0)
1449                 GOTO(put, rc);
1450
1451         /* Invalid FID in the name entry, remove the name entry. */
1452         if (!fid_is_norm(cfid)) {
1453                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1454                 if (rc != 0)
1455                         GOTO(put, rc);
1456
1457                 goto find_child1;
1458         }
1459
1460         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1461         if (IS_ERR(child2))
1462                 GOTO(put, rc = PTR_ERR(child2));
1463
1464         if (unlikely(!dt_object_exists(child2) ||
1465                      dt_object_remote(child2)) ||
1466                      !S_ISDIR(lfsck_object_type(child2))) {
1467                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1468                 if (rc != 0)
1469                         GOTO(put, rc);
1470
1471                 goto find_child1;
1472         }
1473
1474         if (unlikely(!dt_try_as_dir(env, child2))) {
1475                 lfsck_object_put(env, child2);
1476                 child2 = NULL;
1477                 rc = -ENOTDIR;
1478         }
1479
1480 find_child1:
1481         if (fid_is_zero(&bk->lb_lpf_fid))
1482                 goto check_child2;
1483
1484         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1485                 if (lfsck->li_lpf_obj == NULL) {
1486                         lu_object_get(&child2->do_lu);
1487                         lfsck->li_lpf_obj = child2;
1488                 }
1489
1490                 cname = lfsck_name_get_const(env, name, strlen(name));
1491                 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1492
1493                 GOTO(put, rc);
1494         }
1495
1496         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1497                 struct lu_fid tfid = bk->lb_lpf_fid;
1498
1499                 /* Invalid FID record in the bookmark file, reset it. */
1500                 fid_zero(&bk->lb_lpf_fid);
1501                 rc = lfsck_bookmark_store(env, lfsck);
1502
1503                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1504                        " in the bookmark file: rc = %d\n",
1505                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1506
1507                 if (rc != 0)
1508                         GOTO(put, rc);
1509
1510                 goto check_child2;
1511         }
1512
1513         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1514         if (IS_ERR(child1)) {
1515                 child1 = NULL;
1516                 goto check_child2;
1517         }
1518
1519         if (unlikely(!dt_object_exists(child1) ||
1520                      dt_object_remote(child1)) ||
1521                      !S_ISDIR(lfsck_object_type(child1))) {
1522                 /* Invalid FID record in the bookmark file, reset it. */
1523                 fid_zero(&bk->lb_lpf_fid);
1524                 rc = lfsck_bookmark_store(env, lfsck);
1525
1526                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1527                        " in the bookmark file: rc = %d\n",
1528                        lfsck_lfsck2name(lfsck),
1529                        PFID(lfsck_dto2fid(child1)), rc);
1530
1531                 if (rc != 0)
1532                         GOTO(put, rc);
1533
1534                 lfsck_object_put(env, child1);
1535                 child1 = NULL;
1536                 goto check_child2;
1537         }
1538
1539         if (unlikely(!dt_try_as_dir(env, child1))) {
1540                 lfsck_object_put(env, child1);
1541                 child1 = NULL;
1542                 rc = -ENOTDIR;
1543                 goto check_child2;
1544         }
1545
1546         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1547                                     LVLT_BY_BOOKMARK);
1548         if (lu_fid_eq(pfid, &LU_LPF_FID))
1549                 GOTO(put, rc);
1550
1551 check_child2:
1552         if (child2 != NULL)
1553                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1554                                             pfid, LVLT_BY_NAMEENTRY);
1555
1556         GOTO(put, rc);
1557
1558 put:
1559         if (lfsck->li_lpf_obj != NULL) {
1560                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1561                         lfsck_object_put(env, lfsck->li_lpf_obj);
1562                         lfsck->li_lpf_obj = NULL;
1563                         rc = -ENOTDIR;
1564                 }
1565         } else if (rc == 0) {
1566                 rc = lfsck_create_lpf(env, lfsck);
1567         }
1568
1569         if (child2 != NULL && !IS_ERR(child2))
1570                 lfsck_object_put(env, child2);
1571         if (child1 != NULL && !IS_ERR(child1))
1572                 lfsck_object_put(env, child1);
1573
1574         return rc;
1575 }
1576
1577 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1578 {
1579         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1580         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1581         char                    *prefix;
1582         int                      rc     = 0;
1583         ENTRY;
1584
1585         if (unlikely(ss == NULL))
1586                 RETURN(-ENXIO);
1587
1588         OBD_ALLOC_PTR(lfsck->li_seq);
1589         if (lfsck->li_seq == NULL)
1590                 RETURN(-ENOMEM);
1591
1592         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1593         if (prefix == NULL)
1594                 GOTO(out, rc = -ENOMEM);
1595
1596         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1597         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1598                              ss->ss_server_seq);
1599         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1600         if (rc != 0)
1601                 GOTO(out, rc);
1602
1603         if (fid_is_sane(&bk->lb_last_fid))
1604                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1605
1606         RETURN(0);
1607
1608 out:
1609         OBD_FREE_PTR(lfsck->li_seq);
1610         lfsck->li_seq = NULL;
1611
1612         return rc;
1613 }
1614
1615 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1616 {
1617         if (lfsck->li_seq != NULL) {
1618                 seq_client_fini(lfsck->li_seq);
1619                 OBD_FREE_PTR(lfsck->li_seq);
1620                 lfsck->li_seq = NULL;
1621         }
1622 }
1623
1624 void lfsck_instance_cleanup(const struct lu_env *env,
1625                             struct lfsck_instance *lfsck)
1626 {
1627         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1628         struct lfsck_component  *com;
1629         struct lfsck_component  *next;
1630         struct lfsck_lmv_unit   *llu;
1631         struct lfsck_lmv_unit   *llu_next;
1632         struct lfsck_lmv        *llmv;
1633         ENTRY;
1634
1635         LASSERT(list_empty(&lfsck->li_link));
1636         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1637
1638         if (lfsck->li_obj_oit != NULL) {
1639                 lfsck_object_put(env, lfsck->li_obj_oit);
1640                 lfsck->li_obj_oit = NULL;
1641         }
1642
1643         LASSERT(lfsck->li_obj_dir == NULL);
1644         LASSERT(lfsck->li_lmv == NULL);
1645
1646         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1647                 llmv = &llu->llu_lmv;
1648
1649                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1650                          "still in using: %u\n",
1651                          atomic_read(&llmv->ll_ref));
1652
1653                 lfsck_lmv_put(env, llmv);
1654         }
1655
1656         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1657                 lfsck_component_cleanup(env, com);
1658         }
1659
1660         LASSERT(list_empty(&lfsck->li_list_dir));
1661
1662         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1663                                  lc_link) {
1664                 lfsck_component_cleanup(env, com);
1665         }
1666
1667         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1668                 lfsck_component_cleanup(env, com);
1669         }
1670
1671         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1672         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1673
1674         if (lfsck->li_lfsck_dir != NULL) {
1675                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1676                 lfsck->li_lfsck_dir = NULL;
1677         }
1678
1679         if (lfsck->li_bookmark_obj != NULL) {
1680                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1681                 lfsck->li_bookmark_obj = NULL;
1682         }
1683
1684         if (lfsck->li_lpf_obj != NULL) {
1685                 lfsck_object_put(env, lfsck->li_lpf_obj);
1686                 lfsck->li_lpf_obj = NULL;
1687         }
1688
1689         if (lfsck->li_lpf_root_obj != NULL) {
1690                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1691                 lfsck->li_lpf_root_obj = NULL;
1692         }
1693
1694         if (lfsck->li_los != NULL) {
1695                 local_oid_storage_fini(env, lfsck->li_los);
1696                 lfsck->li_los = NULL;
1697         }
1698
1699         lfsck_fid_fini(lfsck);
1700
1701         OBD_FREE_PTR(lfsck);
1702 }
1703
1704 static inline struct lfsck_instance *
1705 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1706 {
1707         struct lfsck_instance *lfsck;
1708
1709         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1710                 if (lfsck->li_bottom == key) {
1711                         if (ref)
1712                                 lfsck_instance_get(lfsck);
1713                         if (unlink)
1714                                 list_del_init(&lfsck->li_link);
1715
1716                         return lfsck;
1717                 }
1718         }
1719
1720         return NULL;
1721 }
1722
1723 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1724                                            bool unlink)
1725 {
1726         struct lfsck_instance *lfsck;
1727
1728         spin_lock(&lfsck_instance_lock);
1729         lfsck = __lfsck_instance_find(key, ref, unlink);
1730         spin_unlock(&lfsck_instance_lock);
1731
1732         return lfsck;
1733 }
1734
1735 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1736 {
1737         struct lfsck_instance *tmp;
1738
1739         spin_lock(&lfsck_instance_lock);
1740         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1741                 if (lfsck->li_bottom == tmp->li_bottom) {
1742                         spin_unlock(&lfsck_instance_lock);
1743                         return -EEXIST;
1744                 }
1745         }
1746
1747         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1748         spin_unlock(&lfsck_instance_lock);
1749         return 0;
1750 }
1751
1752 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1753                     const char *prefix)
1754 {
1755         int flag;
1756         int i;
1757         bool newline = (bits != 0 ? false : true);
1758         int rc;
1759
1760         rc = seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1761         if (rc < 0)
1762                 return rc;
1763
1764         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1765                 if (flag & bits) {
1766                         bits &= ~flag;
1767                         if (names[i] != NULL) {
1768                                 if (bits == 0)
1769                                         newline = true;
1770
1771                                 rc = seq_printf(m, "%s%c", names[i],
1772                                                 newline ? '\n' : ',');
1773                                 if (rc < 0)
1774                                         return rc;
1775                         }
1776                 }
1777         }
1778
1779         if (!newline)
1780                 rc = seq_printf(m, "\n");
1781
1782         return rc;
1783 }
1784
1785 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *name)
1786 {
1787         int rc;
1788
1789         if (time == 0) {
1790                 rc = seq_printf(m, "%s_time: N/A\n", name);
1791                 if (rc == 0)
1792                         rc = seq_printf(m, "time_since_%s: N/A\n", name);
1793
1794                 return rc;
1795         }
1796
1797         rc = seq_printf(m, "%s_time: "LPU64"\n", name, time);
1798         if (rc == 0)
1799                 rc = seq_printf(m, "time_since_%s: "LPU64" seconds\n",
1800                                 name, cfs_time_current_sec() - time);
1801
1802         return rc;
1803 }
1804
1805 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1806                    const char *prefix)
1807 {
1808         if (fid_is_zero(&pos->lp_dir_parent)) {
1809                 if (pos->lp_oit_cookie == 0)
1810                         return seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1811
1812                 return seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1813                                   prefix, pos->lp_oit_cookie);
1814         }
1815
1816         return seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1817                           prefix, pos->lp_oit_cookie,
1818                           PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1819 }
1820
1821 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1822                     struct lfsck_position *pos, bool init)
1823 {
1824         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1825
1826         if (unlikely(lfsck->li_di_oit == NULL)) {
1827                 memset(pos, 0, sizeof(*pos));
1828                 return;
1829         }
1830
1831         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1832         if (!lfsck->li_current_oit_processed && !init)
1833                 pos->lp_oit_cookie--;
1834
1835         LASSERT(pos->lp_oit_cookie > 0);
1836
1837         if (lfsck->li_di_dir != NULL) {
1838                 struct dt_object *dto = lfsck->li_obj_dir;
1839
1840                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1841                                                         lfsck->li_di_dir);
1842
1843                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1844                         fid_zero(&pos->lp_dir_parent);
1845                         pos->lp_dir_cookie = 0;
1846                 } else {
1847                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1848                 }
1849         } else {
1850                 fid_zero(&pos->lp_dir_parent);
1851                 pos->lp_dir_cookie = 0;
1852         }
1853 }
1854
1855 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1856 {
1857         bool dirty = false;
1858
1859         if (limit != LFSCK_SPEED_NO_LIMIT) {
1860                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1861                         lfsck->li_sleep_rate = limit /
1862                                                msecs_to_jiffies(MSEC_PER_SEC);
1863                         lfsck->li_sleep_jif = 1;
1864                 } else {
1865                         lfsck->li_sleep_rate = 1;
1866                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1867                                               limit;
1868                 }
1869         } else {
1870                 lfsck->li_sleep_jif = 0;
1871                 lfsck->li_sleep_rate = 0;
1872         }
1873
1874         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1875                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1876                 dirty = true;
1877         }
1878
1879         return dirty;
1880 }
1881
1882 void lfsck_control_speed(struct lfsck_instance *lfsck)
1883 {
1884         struct ptlrpc_thread *thread = &lfsck->li_thread;
1885         struct l_wait_info    lwi;
1886
1887         if (lfsck->li_sleep_jif > 0 &&
1888             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1889                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1890                                        LWI_ON_SIGNAL_NOOP, NULL);
1891
1892                 l_wait_event(thread->t_ctl_waitq,
1893                              !thread_is_running(thread),
1894                              &lwi);
1895                 lfsck->li_new_scanned = 0;
1896         }
1897 }
1898
1899 void lfsck_control_speed_by_self(struct lfsck_component *com)
1900 {
1901         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1902         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1903         struct l_wait_info       lwi;
1904
1905         if (lfsck->li_sleep_jif > 0 &&
1906             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1907                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1908                                        LWI_ON_SIGNAL_NOOP, NULL);
1909
1910                 l_wait_event(thread->t_ctl_waitq,
1911                              !thread_is_running(thread),
1912                              &lwi);
1913                 com->lc_new_scanned = 0;
1914         }
1915 }
1916
1917 static struct lfsck_thread_args *
1918 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1919                        struct lfsck_component *com,
1920                        struct lfsck_start_param *lsp)
1921 {
1922         struct lfsck_thread_args *lta;
1923         int                       rc;
1924
1925         OBD_ALLOC_PTR(lta);
1926         if (lta == NULL)
1927                 return ERR_PTR(-ENOMEM);
1928
1929         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1930         if (rc != 0) {
1931                 OBD_FREE_PTR(lta);
1932                 return ERR_PTR(rc);
1933         }
1934
1935         lta->lta_lfsck = lfsck_instance_get(lfsck);
1936         if (com != NULL)
1937                 lta->lta_com = lfsck_component_get(com);
1938
1939         lta->lta_lsp = lsp;
1940
1941         return lta;
1942 }
1943
1944 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1945 {
1946         if (lta->lta_com != NULL)
1947                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1948         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1949         lu_env_fini(&lta->lta_env);
1950         OBD_FREE_PTR(lta);
1951 }
1952
1953 struct lfsck_assistant_data *
1954 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1955                           const char *name)
1956 {
1957         struct lfsck_assistant_data *lad;
1958
1959         OBD_ALLOC_PTR(lad);
1960         if (lad != NULL) {
1961                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1962                 if (lad->lad_bitmap == NULL) {
1963                         OBD_FREE_PTR(lad);
1964                         return NULL;
1965                 }
1966
1967                 INIT_LIST_HEAD(&lad->lad_req_list);
1968                 spin_lock_init(&lad->lad_lock);
1969                 INIT_LIST_HEAD(&lad->lad_ost_list);
1970                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1971                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1972                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1973                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1974                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1975                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1976                 lad->lad_ops = lao;
1977                 lad->lad_name = name;
1978         }
1979
1980         return lad;
1981 }
1982
1983 struct lfsck_assistant_object *
1984 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1985                             const struct lu_attr *attr, __u64 cookie,
1986                             bool is_dir)
1987 {
1988         struct lfsck_assistant_object   *lso;
1989
1990         OBD_ALLOC_PTR(lso);
1991         if (lso == NULL)
1992                 return ERR_PTR(-ENOMEM);
1993
1994         lso->lso_fid = *fid;
1995         if (attr != NULL)
1996                 lso->lso_attr = *attr;
1997
1998         atomic_set(&lso->lso_ref, 1);
1999         lso->lso_oit_cookie = cookie;
2000         if (is_dir)
2001                 lso->lso_is_dir = 1;
2002
2003         return lso;
2004 }
2005
2006 struct dt_object *
2007 lfsck_assistant_object_load(const struct lu_env *env,
2008                             struct lfsck_instance *lfsck,
2009                             struct lfsck_assistant_object *lso)
2010 {
2011         struct dt_object *obj;
2012
2013         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2014         if (IS_ERR(obj))
2015                 return obj;
2016
2017         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2018                 lso->lso_dead = 1;
2019                 lfsck_object_put(env, obj);
2020
2021                 return ERR_PTR(-ENOENT);
2022         }
2023
2024         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) {
2025                 lfsck_object_put(env, obj);
2026
2027                 return ERR_PTR(-ENOTDIR);
2028         }
2029
2030         return obj;
2031 }
2032
2033 /**
2034  * Generic LFSCK asynchronous communication interpretor function.
2035  * The LFSCK RPC reply for both the event notification and status
2036  * querying will be handled here.
2037  *
2038  * \param[in] env       pointer to the thread context
2039  * \param[in] req       pointer to the LFSCK request
2040  * \param[in] args      pointer to the lfsck_async_interpret_args
2041  * \param[in] rc        the result for handling the LFSCK request
2042  *
2043  * \retval              0 for success
2044  * \retval              negative error number on failure
2045  */
2046 int lfsck_async_interpret_common(const struct lu_env *env,
2047                                  struct ptlrpc_request *req,
2048                                  void *args, int rc)
2049 {
2050         struct lfsck_async_interpret_args *laia = args;
2051         struct lfsck_component            *com  = laia->laia_com;
2052         struct lfsck_assistant_data       *lad  = com->lc_data;
2053         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
2054         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
2055         struct lfsck_request              *lr   = laia->laia_lr;
2056
2057         LASSERT(com->lc_lfsck->li_master);
2058
2059         switch (lr->lr_event) {
2060         case LE_START:
2061                 if (rc != 0) {
2062                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2063                                "start: rc = %d\n",
2064                                lfsck_lfsck2name(com->lc_lfsck),
2065                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2066                                ltd->ltd_index, lad->lad_name, rc);
2067
2068                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2069                                 struct lfsck_layout *lo = com->lc_file_ram;
2070
2071                                 if (lr->lr_flags & LEF_TO_OST)
2072                                         lfsck_lad_set_bitmap(env, com,
2073                                                              ltd->ltd_index);
2074                                 else
2075                                         lo->ll_flags |= LF_INCOMPLETE;
2076                         } else {
2077                                 struct lfsck_namespace *ns = com->lc_file_ram;
2078
2079                                 /* If some MDT does not join the namespace
2080                                  * LFSCK, then we cannot know whether there
2081                                  * is some name entry on such MDT that with
2082                                  * the referenced MDT-object on this MDT or
2083                                  * not. So the namespace LFSCK on this MDT
2084                                  * cannot handle orphan MDT-objects properly.
2085                                  * So we mark the LFSCK as LF_INCOMPLETE and
2086                                  * skip orphan MDT-objects handling. */
2087                                 ns->ln_flags |= LF_INCOMPLETE;
2088                         }
2089                         break;
2090                 }
2091
2092                 spin_lock(&ltds->ltd_lock);
2093                 if (ltd->ltd_dead) {
2094                         spin_unlock(&ltds->ltd_lock);
2095                         break;
2096                 }
2097
2098                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2099                         struct list_head *list;
2100                         struct list_head *phase_list;
2101
2102                         if (ltd->ltd_layout_done) {
2103                                 spin_unlock(&ltds->ltd_lock);
2104                                 break;
2105                         }
2106
2107                         if (lr->lr_flags & LEF_TO_OST) {
2108                                 list = &lad->lad_ost_list;
2109                                 phase_list = &lad->lad_ost_phase1_list;
2110                         } else {
2111                                 list = &lad->lad_mdt_list;
2112                                 phase_list = &lad->lad_mdt_phase1_list;
2113                         }
2114
2115                         if (list_empty(&ltd->ltd_layout_list))
2116                                 list_add_tail(&ltd->ltd_layout_list, list);
2117                         if (list_empty(&ltd->ltd_layout_phase_list))
2118                                 list_add_tail(&ltd->ltd_layout_phase_list,
2119                                               phase_list);
2120                 } else {
2121                         if (ltd->ltd_namespace_done) {
2122                                 spin_unlock(&ltds->ltd_lock);
2123                                 break;
2124                         }
2125
2126                         if (list_empty(&ltd->ltd_namespace_list))
2127                                 list_add_tail(&ltd->ltd_namespace_list,
2128                                               &lad->lad_mdt_list);
2129                         if (list_empty(&ltd->ltd_namespace_phase_list))
2130                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2131                                               &lad->lad_mdt_phase1_list);
2132                 }
2133                 spin_unlock(&ltds->ltd_lock);
2134                 break;
2135         case LE_STOP:
2136         case LE_PHASE1_DONE:
2137         case LE_PHASE2_DONE:
2138         case LE_PEER_EXIT:
2139                 if (rc != 0 && rc != -EALREADY)
2140                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2141                               "event = %d, rc = %d\n",
2142                               lfsck_lfsck2name(com->lc_lfsck),
2143                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2144                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2145                 break;
2146         case LE_QUERY: {
2147                 struct lfsck_reply *reply;
2148                 struct list_head *list;
2149                 struct list_head *phase_list;
2150
2151                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2152                         list = &ltd->ltd_layout_list;
2153                         phase_list = &ltd->ltd_layout_phase_list;
2154                 } else {
2155                         list = &ltd->ltd_namespace_list;
2156                         phase_list = &ltd->ltd_namespace_phase_list;
2157                 }
2158
2159                 if (rc != 0) {
2160                         spin_lock(&ltds->ltd_lock);
2161                         list_del_init(phase_list);
2162                         list_del_init(list);
2163                         spin_unlock(&ltds->ltd_lock);
2164                         break;
2165                 }
2166
2167                 reply = req_capsule_server_get(&req->rq_pill,
2168                                                &RMF_LFSCK_REPLY);
2169                 if (reply == NULL) {
2170                         rc = -EPROTO;
2171                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2172                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2173                                lad->lad_name, rc);
2174                         spin_lock(&ltds->ltd_lock);
2175                         list_del_init(phase_list);
2176                         list_del_init(list);
2177                         spin_unlock(&ltds->ltd_lock);
2178                         break;
2179                 }
2180
2181                 switch (reply->lr_status) {
2182                 case LS_SCANNING_PHASE1:
2183                         break;
2184                 case LS_SCANNING_PHASE2:
2185                         spin_lock(&ltds->ltd_lock);
2186                         list_del_init(phase_list);
2187                         if (ltd->ltd_dead) {
2188                                 spin_unlock(&ltds->ltd_lock);
2189                                 break;
2190                         }
2191
2192                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2193                                 if (ltd->ltd_layout_done) {
2194                                         spin_unlock(&ltds->ltd_lock);
2195                                         break;
2196                                 }
2197
2198                                 if (lr->lr_flags & LEF_TO_OST)
2199                                         list_add_tail(phase_list,
2200                                                 &lad->lad_ost_phase2_list);
2201                                 else
2202                                         list_add_tail(phase_list,
2203                                                 &lad->lad_mdt_phase2_list);
2204                         } else {
2205                                 if (ltd->ltd_namespace_done) {
2206                                         spin_unlock(&ltds->ltd_lock);
2207                                         break;
2208                                 }
2209
2210                                 list_add_tail(phase_list,
2211                                               &lad->lad_mdt_phase2_list);
2212                         }
2213                         spin_unlock(&ltds->ltd_lock);
2214                         break;
2215                 default:
2216                         spin_lock(&ltds->ltd_lock);
2217                         list_del_init(phase_list);
2218                         list_del_init(list);
2219                         spin_unlock(&ltds->ltd_lock);
2220                         break;
2221                 }
2222                 break;
2223         }
2224         default:
2225                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2226                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2227                 break;
2228         }
2229
2230         if (!laia->laia_shared) {
2231                 lfsck_tgt_put(ltd);
2232                 lfsck_component_put(env, com);
2233         }
2234
2235         return 0;
2236 }
2237
2238 static void lfsck_interpret(const struct lu_env *env,
2239                             struct lfsck_instance *lfsck,
2240                             struct ptlrpc_request *req, void *args, int result)
2241 {
2242         struct lfsck_async_interpret_args *laia = args;
2243         struct lfsck_component            *com;
2244
2245         LASSERT(laia->laia_com == NULL);
2246         LASSERT(laia->laia_shared);
2247
2248         spin_lock(&lfsck->li_lock);
2249         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2250                 laia->laia_com = com;
2251                 lfsck_async_interpret_common(env, req, laia, result);
2252         }
2253
2254         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2255                 laia->laia_com = com;
2256                 lfsck_async_interpret_common(env, req, laia, result);
2257         }
2258         spin_unlock(&lfsck->li_lock);
2259 }
2260
2261 static int lfsck_stop_notify(const struct lu_env *env,
2262                              struct lfsck_instance *lfsck,
2263                              struct lfsck_tgt_descs *ltds,
2264                              struct lfsck_tgt_desc *ltd, __u16 type)
2265 {
2266         struct lfsck_component *com;
2267         int                     rc = 0;
2268         ENTRY;
2269
2270         LASSERT(lfsck->li_master);
2271
2272         spin_lock(&lfsck->li_lock);
2273         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2274         if (com == NULL)
2275                 com = __lfsck_component_find(lfsck, type,
2276                                              &lfsck->li_list_double_scan);
2277         if (com != NULL)
2278                 lfsck_component_get(com);
2279         spin_unlock(&lfsck->li_lock);
2280
2281         if (com != NULL) {
2282                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2283                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2284                 struct lfsck_request              *lr    = &info->lti_lr;
2285                 struct lfsck_assistant_data       *lad   = com->lc_data;
2286                 struct list_head                  *list;
2287                 struct list_head                  *phase_list;
2288                 struct ptlrpc_request_set         *set;
2289
2290                 set = ptlrpc_prep_set();
2291                 if (set == NULL) {
2292                         lfsck_component_put(env, com);
2293
2294                         RETURN(-ENOMEM);
2295                 }
2296
2297                 if (type == LFSCK_TYPE_LAYOUT) {
2298                         list = &ltd->ltd_layout_list;
2299                         phase_list = &ltd->ltd_layout_phase_list;
2300                 } else {
2301                         list = &ltd->ltd_namespace_list;
2302                         phase_list = &ltd->ltd_namespace_phase_list;
2303                 }
2304
2305                 spin_lock(&ltds->ltd_lock);
2306                 if (list_empty(list)) {
2307                         LASSERT(list_empty(phase_list));
2308                         spin_unlock(&ltds->ltd_lock);
2309                         ptlrpc_set_destroy(set);
2310
2311                         RETURN(0);
2312                 }
2313
2314                 list_del_init(phase_list);
2315                 list_del_init(list);
2316                 spin_unlock(&ltds->ltd_lock);
2317
2318                 memset(lr, 0, sizeof(*lr));
2319                 lr->lr_index = lfsck_dev_idx(lfsck);
2320                 lr->lr_event = LE_PEER_EXIT;
2321                 lr->lr_active = type;
2322                 lr->lr_status = LS_CO_PAUSED;
2323                 if (ltds == &lfsck->li_ost_descs)
2324                         lr->lr_flags = LEF_TO_OST;
2325
2326                 laia->laia_com = com;
2327                 laia->laia_ltds = ltds;
2328                 atomic_inc(&ltd->ltd_ref);
2329                 laia->laia_ltd = ltd;
2330                 laia->laia_lr = lr;
2331                 laia->laia_shared = 0;
2332
2333                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2334                                          lfsck_async_interpret_common,
2335                                          laia, LFSCK_NOTIFY);
2336                 if (rc != 0) {
2337                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2338                                "co-stop for %s: rc = %d\n",
2339                                lfsck_lfsck2name(lfsck),
2340                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2341                                ltd->ltd_index, lad->lad_name, rc);
2342                         lfsck_tgt_put(ltd);
2343                 } else {
2344                         rc = ptlrpc_set_wait(set);
2345                 }
2346
2347                 ptlrpc_set_destroy(set);
2348                 lfsck_component_put(env, com);
2349         }
2350
2351         RETURN(rc);
2352 }
2353
2354 static int lfsck_async_interpret(const struct lu_env *env,
2355                                  struct ptlrpc_request *req,
2356                                  void *args, int rc)
2357 {
2358         struct lfsck_async_interpret_args *laia = args;
2359         struct lfsck_instance             *lfsck;
2360
2361         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2362                               li_mdt_descs);
2363         lfsck_interpret(env, lfsck, req, laia, rc);
2364         lfsck_tgt_put(laia->laia_ltd);
2365         if (rc != 0 && laia->laia_result != -EALREADY)
2366                 laia->laia_result = rc;
2367
2368         return 0;
2369 }
2370
2371 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2372                         struct lfsck_request *lr,
2373                         struct ptlrpc_request_set *set,
2374                         ptlrpc_interpterer_t interpreter,
2375                         void *args, int request)
2376 {
2377         struct lfsck_async_interpret_args *laia;
2378         struct ptlrpc_request             *req;
2379         struct lfsck_request              *tmp;
2380         struct req_format                 *format;
2381         int                                rc;
2382
2383         switch (request) {
2384         case LFSCK_NOTIFY:
2385                 format = &RQF_LFSCK_NOTIFY;
2386                 break;
2387         case LFSCK_QUERY:
2388                 format = &RQF_LFSCK_QUERY;
2389                 break;
2390         default:
2391                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2392                        exp->exp_obd->obd_name, request, -EINVAL);
2393                 return -EINVAL;
2394         }
2395
2396         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2397         if (req == NULL)
2398                 return -ENOMEM;
2399
2400         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2401         if (rc != 0) {
2402                 ptlrpc_request_free(req);
2403
2404                 return rc;
2405         }
2406
2407         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2408         *tmp = *lr;
2409         ptlrpc_request_set_replen(req);
2410
2411         laia = ptlrpc_req_async_args(req);
2412         *laia = *(struct lfsck_async_interpret_args *)args;
2413         if (laia->laia_com != NULL)
2414                 lfsck_component_get(laia->laia_com);
2415         req->rq_interpret_reply = interpreter;
2416         ptlrpc_set_add_req(set, req);
2417
2418         return 0;
2419 }
2420
2421 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2422                           struct lfsck_start_param *lsp)
2423 {
2424         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2425         struct lfsck_assistant_data     *lad     = com->lc_data;
2426         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2427         struct ptlrpc_thread            *athread = &lad->lad_thread;
2428         struct lfsck_thread_args        *lta;
2429         struct task_struct              *task;
2430         int                              rc;
2431         ENTRY;
2432
2433         lad->lad_assistant_status = 0;
2434         lad->lad_post_result = 0;
2435         lad->lad_to_post = 0;
2436         lad->lad_to_double_scan = 0;
2437         lad->lad_in_double_scan = 0;
2438         lad->lad_exit = 0;
2439         lad->lad_advance_lock = false;
2440         thread_set_flags(athread, 0);
2441
2442         lta = lfsck_thread_args_init(lfsck, com, lsp);
2443         if (IS_ERR(lta))
2444                 RETURN(PTR_ERR(lta));
2445
2446         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2447         if (IS_ERR(task)) {
2448                 rc = PTR_ERR(task);
2449                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2450                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2451                 lfsck_thread_args_fini(lta);
2452         } else {
2453                 struct l_wait_info lwi = { 0 };
2454
2455                 l_wait_event(mthread->t_ctl_waitq,
2456                              thread_is_running(athread) ||
2457                              thread_is_stopped(athread),
2458                              &lwi);
2459                 if (unlikely(!thread_is_running(athread)))
2460                         rc = lad->lad_assistant_status;
2461                 else
2462                         rc = 0;
2463         }
2464
2465         RETURN(rc);
2466 }
2467
2468 int lfsck_checkpoint_generic(const struct lu_env *env,
2469                              struct lfsck_component *com)
2470 {
2471         struct lfsck_assistant_data     *lad     = com->lc_data;
2472         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2473         struct ptlrpc_thread            *athread = &lad->lad_thread;
2474         struct l_wait_info               lwi     = { 0 };
2475
2476         l_wait_event(mthread->t_ctl_waitq,
2477                      list_empty(&lad->lad_req_list) ||
2478                      !thread_is_running(mthread) ||
2479                      thread_is_stopped(athread),
2480                      &lwi);
2481
2482         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2483                 return LFSCK_CHECKPOINT_SKIP;
2484
2485         return 0;
2486 }
2487
2488 void lfsck_post_generic(const struct lu_env *env,
2489                         struct lfsck_component *com, int *result)
2490 {
2491         struct lfsck_assistant_data     *lad     = com->lc_data;
2492         struct ptlrpc_thread            *athread = &lad->lad_thread;
2493         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2494         struct l_wait_info               lwi     = { 0 };
2495
2496         lad->lad_post_result = *result;
2497         if (*result <= 0)
2498                 lad->lad_exit = 1;
2499         lad->lad_to_post = 1;
2500
2501         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2502                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2503
2504         wake_up_all(&athread->t_ctl_waitq);
2505         l_wait_event(mthread->t_ctl_waitq,
2506                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2507                      thread_is_stopped(athread),
2508                      &lwi);
2509
2510         if (lad->lad_assistant_status < 0)
2511                 *result = lad->lad_assistant_status;
2512
2513         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2514                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2515 }
2516
2517 int lfsck_double_scan_generic(const struct lu_env *env,
2518                               struct lfsck_component *com, int status)
2519 {
2520         struct lfsck_assistant_data     *lad     = com->lc_data;
2521         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2522         struct ptlrpc_thread            *athread = &lad->lad_thread;
2523         struct l_wait_info               lwi     = { 0 };
2524
2525         if (status != LS_SCANNING_PHASE2)
2526                 lad->lad_exit = 1;
2527         else
2528                 lad->lad_to_double_scan = 1;
2529
2530         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2531                "status %d\n",
2532                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2533
2534         wake_up_all(&athread->t_ctl_waitq);
2535         l_wait_event(mthread->t_ctl_waitq,
2536                      lad->lad_in_double_scan ||
2537                      thread_is_stopped(athread),
2538                      &lwi);
2539
2540         CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2541                "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2542                lad->lad_assistant_status);
2543
2544         if (lad->lad_assistant_status < 0)
2545                 return lad->lad_assistant_status;
2546
2547         return 0;
2548 }
2549
2550 void lfsck_quit_generic(const struct lu_env *env,
2551                         struct lfsck_component *com)
2552 {
2553         struct lfsck_assistant_data     *lad     = com->lc_data;
2554         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2555         struct ptlrpc_thread            *athread = &lad->lad_thread;
2556         struct l_wait_info               lwi     = { 0 };
2557
2558         lad->lad_exit = 1;
2559         wake_up_all(&athread->t_ctl_waitq);
2560         l_wait_event(mthread->t_ctl_waitq,
2561                      thread_is_init(athread) ||
2562                      thread_is_stopped(athread),
2563                      &lwi);
2564 }
2565
2566 /* external interfaces */
2567
2568 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2569 {
2570         struct lu_env           env;
2571         struct lfsck_instance  *lfsck;
2572         int                     rc;
2573         ENTRY;
2574
2575         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2576         if (rc != 0)
2577                 RETURN(rc);
2578
2579         lfsck = lfsck_instance_find(key, true, false);
2580         if (likely(lfsck != NULL)) {
2581                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2582                 lfsck_instance_put(&env, lfsck);
2583         } else {
2584                 rc = -ENXIO;
2585         }
2586
2587         lu_env_fini(&env);
2588
2589         RETURN(rc);
2590 }
2591 EXPORT_SYMBOL(lfsck_get_speed);
2592
2593 int lfsck_set_speed(struct dt_device *key, int val)
2594 {
2595         struct lu_env           env;
2596         struct lfsck_instance  *lfsck;
2597         int                     rc;
2598         ENTRY;
2599
2600         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2601         if (rc != 0)
2602                 RETURN(rc);
2603
2604         lfsck = lfsck_instance_find(key, true, false);
2605         if (likely(lfsck != NULL)) {
2606                 mutex_lock(&lfsck->li_mutex);
2607                 if (__lfsck_set_speed(lfsck, val))
2608                         rc = lfsck_bookmark_store(&env, lfsck);
2609                 mutex_unlock(&lfsck->li_mutex);
2610                 lfsck_instance_put(&env, lfsck);
2611         } else {
2612                 rc = -ENXIO;
2613         }
2614
2615         lu_env_fini(&env);
2616
2617         RETURN(rc);
2618 }
2619 EXPORT_SYMBOL(lfsck_set_speed);
2620
2621 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2622 {
2623         struct lu_env           env;
2624         struct lfsck_instance  *lfsck;
2625         int                     rc;
2626         ENTRY;
2627
2628         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2629         if (rc != 0)
2630                 RETURN(rc);
2631
2632         lfsck = lfsck_instance_find(key, true, false);
2633         if (likely(lfsck != NULL)) {
2634                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2635                 lfsck_instance_put(&env, lfsck);
2636         } else {
2637                 rc = -ENXIO;
2638         }
2639
2640         lu_env_fini(&env);
2641
2642         RETURN(rc);
2643 }
2644 EXPORT_SYMBOL(lfsck_get_windows);
2645
2646 int lfsck_set_windows(struct dt_device *key, int val)
2647 {
2648         struct lu_env           env;
2649         struct lfsck_instance  *lfsck;
2650         int                     rc;
2651         ENTRY;
2652
2653         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2654         if (rc != 0)
2655                 RETURN(rc);
2656
2657         lfsck = lfsck_instance_find(key, true, false);
2658         if (likely(lfsck != NULL)) {
2659                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2660                         CWARN("%s: invalid async windows size that may "
2661                               "cause memory issues. The valid range is "
2662                               "[1 - %u].\n",
2663                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2664                         rc = -EINVAL;
2665                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2666                         mutex_lock(&lfsck->li_mutex);
2667                         lfsck->li_bookmark_ram.lb_async_windows = val;
2668                         rc = lfsck_bookmark_store(&env, lfsck);
2669                         mutex_unlock(&lfsck->li_mutex);
2670                 }
2671                 lfsck_instance_put(&env, lfsck);
2672         } else {
2673                 rc = -ENXIO;
2674         }
2675
2676         lu_env_fini(&env);
2677
2678         RETURN(rc);
2679 }
2680 EXPORT_SYMBOL(lfsck_set_windows);
2681
2682 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2683 {
2684         struct lu_env           env;
2685         struct lfsck_instance  *lfsck;
2686         struct lfsck_component *com;
2687         int                     rc;
2688         ENTRY;
2689
2690         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2691         if (rc != 0)
2692                 RETURN(rc);
2693
2694         lfsck = lfsck_instance_find(key, true, false);
2695         if (likely(lfsck != NULL)) {
2696                 com = lfsck_component_find(lfsck, type);
2697                 if (likely(com != NULL)) {
2698                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2699                         lfsck_component_put(&env, com);
2700                 } else {
2701                         rc = -ENOTSUPP;
2702                 }
2703
2704                 lfsck_instance_put(&env, lfsck);
2705         } else {
2706                 rc = -ENXIO;
2707         }
2708
2709         lu_env_fini(&env);
2710
2711         RETURN(rc);
2712 }
2713 EXPORT_SYMBOL(lfsck_dump);
2714
2715 static int lfsck_stop_all(const struct lu_env *env,
2716                           struct lfsck_instance *lfsck,
2717                           struct lfsck_stop *stop)
2718 {
2719         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2720         struct lfsck_request              *lr     = &info->lti_lr;
2721         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2722         struct ptlrpc_request_set         *set;
2723         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2724         struct lfsck_tgt_desc             *ltd;
2725         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2726         __u32                              idx;
2727         int                                rc     = 0;
2728         int                                rc1    = 0;
2729         ENTRY;
2730
2731         LASSERT(stop->ls_flags & LPF_BROADCAST);
2732
2733         set = ptlrpc_prep_set();
2734         if (unlikely(set == NULL))
2735                 RETURN(-ENOMEM);
2736
2737         memset(lr, 0, sizeof(*lr));
2738         lr->lr_event = LE_STOP;
2739         lr->lr_index = lfsck_dev_idx(lfsck);
2740         lr->lr_status = stop->ls_status;
2741         lr->lr_version = bk->lb_version;
2742         lr->lr_active = LFSCK_TYPES_ALL;
2743         lr->lr_param = stop->ls_flags;
2744
2745         laia->laia_com = NULL;
2746         laia->laia_ltds = ltds;
2747         laia->laia_lr = lr;
2748         laia->laia_result = 0;
2749         laia->laia_shared = 1;
2750
2751         down_read(&ltds->ltd_rw_sem);
2752         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2753                 ltd = lfsck_tgt_get(ltds, idx);
2754                 LASSERT(ltd != NULL);
2755
2756                 laia->laia_ltd = ltd;
2757                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2758                                          lfsck_async_interpret, laia,
2759                                          LFSCK_NOTIFY);
2760                 if (rc != 0) {
2761                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2762                         lfsck_tgt_put(ltd);
2763                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2764                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2765                         rc1 = rc;
2766                 }
2767         }
2768         up_read(&ltds->ltd_rw_sem);
2769
2770         rc = ptlrpc_set_wait(set);
2771         ptlrpc_set_destroy(set);
2772
2773         if (rc == 0)
2774                 rc = laia->laia_result;
2775
2776         if (rc == -EALREADY)
2777                 rc = 0;
2778
2779         if (rc != 0)
2780                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2781                        lfsck_lfsck2name(lfsck), rc);
2782
2783         RETURN(rc != 0 ? rc : rc1);
2784 }
2785
2786 static int lfsck_start_all(const struct lu_env *env,
2787                            struct lfsck_instance *lfsck,
2788                            struct lfsck_start *start)
2789 {
2790         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2791         struct lfsck_request              *lr     = &info->lti_lr;
2792         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2793         struct ptlrpc_request_set         *set;
2794         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2795         struct lfsck_tgt_desc             *ltd;
2796         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2797         __u32                              idx;
2798         int                                rc     = 0;
2799         ENTRY;
2800
2801         LASSERT(start->ls_flags & LPF_BROADCAST);
2802
2803         set = ptlrpc_prep_set();
2804         if (unlikely(set == NULL))
2805                 RETURN(-ENOMEM);
2806
2807         memset(lr, 0, sizeof(*lr));
2808         lr->lr_event = LE_START;
2809         lr->lr_index = lfsck_dev_idx(lfsck);
2810         lr->lr_speed = bk->lb_speed_limit;
2811         lr->lr_version = bk->lb_version;
2812         lr->lr_active = start->ls_active;
2813         lr->lr_param = start->ls_flags;
2814         lr->lr_async_windows = bk->lb_async_windows;
2815         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2816                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2817                        LSV_CREATE_MDTOBJ;
2818
2819         laia->laia_com = NULL;
2820         laia->laia_ltds = ltds;
2821         laia->laia_lr = lr;
2822         laia->laia_result = 0;
2823         laia->laia_shared = 1;
2824
2825         down_read(&ltds->ltd_rw_sem);
2826         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2827                 ltd = lfsck_tgt_get(ltds, idx);
2828                 LASSERT(ltd != NULL);
2829
2830                 laia->laia_ltd = ltd;
2831                 ltd->ltd_layout_done = 0;
2832                 ltd->ltd_namespace_done = 0;
2833                 ltd->ltd_synced_failures = 0;
2834                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2835                                          lfsck_async_interpret, laia,
2836                                          LFSCK_NOTIFY);
2837                 if (rc != 0) {
2838                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2839                         lfsck_tgt_put(ltd);
2840                         CERROR("%s: cannot notify MDT %x for LFSCK "
2841                                "start, failout: rc = %d\n",
2842                                lfsck_lfsck2name(lfsck), idx, rc);
2843                         break;
2844                 }
2845         }
2846         up_read(&ltds->ltd_rw_sem);
2847
2848         if (rc != 0) {
2849                 ptlrpc_set_destroy(set);
2850
2851                 RETURN(rc);
2852         }
2853
2854         rc = ptlrpc_set_wait(set);
2855         ptlrpc_set_destroy(set);
2856
2857         if (rc == 0)
2858                 rc = laia->laia_result;
2859
2860         if (rc != 0) {
2861                 struct lfsck_stop *stop = &info->lti_stop;
2862
2863                 CERROR("%s: cannot start LFSCK on some MDTs, "
2864                        "stop all: rc = %d\n",
2865                        lfsck_lfsck2name(lfsck), rc);
2866                 if (rc != -EALREADY) {
2867                         stop->ls_status = LS_FAILED;
2868                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2869                         lfsck_stop_all(env, lfsck, stop);
2870                 }
2871         }
2872
2873         RETURN(rc);
2874 }
2875
2876 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2877                 struct lfsck_start_param *lsp)
2878 {
2879         struct lfsck_start              *start  = lsp->lsp_start;
2880         struct lfsck_instance           *lfsck;
2881         struct lfsck_bookmark           *bk;
2882         struct ptlrpc_thread            *thread;
2883         struct lfsck_component          *com;
2884         struct l_wait_info               lwi    = { 0 };
2885         struct lfsck_thread_args        *lta;
2886         struct task_struct              *task;
2887         int                              rc     = 0;
2888         __u16                            valid  = 0;
2889         __u16                            flags  = 0;
2890         __u16                            type   = 1;
2891         ENTRY;
2892
2893         lfsck = lfsck_instance_find(key, true, false);
2894         if (unlikely(lfsck == NULL))
2895                 RETURN(-ENXIO);
2896
2897         /* System is not ready, try again later. */
2898         if (unlikely(lfsck->li_namespace == NULL))
2899                 GOTO(put, rc = -EAGAIN);
2900
2901         /* start == NULL means auto trigger paused LFSCK. */
2902         if ((start == NULL) &&
2903             (list_empty(&lfsck->li_list_scan) ||
2904              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2905                 GOTO(put, rc = 0);
2906
2907         bk = &lfsck->li_bookmark_ram;
2908         thread = &lfsck->li_thread;
2909         mutex_lock(&lfsck->li_mutex);
2910         spin_lock(&lfsck->li_lock);
2911         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2912                 rc = -EALREADY;
2913                 if (unlikely(start == NULL)) {
2914                         spin_unlock(&lfsck->li_lock);
2915                         GOTO(out, rc);
2916                 }
2917
2918                 while (start->ls_active != 0) {
2919                         if (!(type & start->ls_active)) {
2920                                 type <<= 1;
2921                                 continue;
2922                         }
2923
2924                         com = __lfsck_component_find(lfsck, type,
2925                                                      &lfsck->li_list_scan);
2926                         if (com == NULL)
2927                                 com = __lfsck_component_find(lfsck, type,
2928                                                 &lfsck->li_list_double_scan);
2929                         if (com == NULL) {
2930                                 rc = -EOPNOTSUPP;
2931                                 break;
2932                         }
2933
2934                         if (com->lc_ops->lfsck_join != NULL) {
2935                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2936                                 if (rc != 0 && rc != -EALREADY)
2937                                         break;
2938                         }
2939                         start->ls_active &= ~type;
2940                         type <<= 1;
2941                 }
2942                 spin_unlock(&lfsck->li_lock);
2943                 GOTO(out, rc);
2944         }
2945         spin_unlock(&lfsck->li_lock);
2946
2947         lfsck->li_status = 0;
2948         lfsck->li_oit_over = 0;
2949         lfsck->li_start_unplug = 0;
2950         lfsck->li_drop_dryrun = 0;
2951         lfsck->li_new_scanned = 0;
2952
2953         /* For auto trigger. */
2954         if (start == NULL)
2955                 goto trigger;
2956
2957         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2958                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2959                        lfsck_lfsck2name(lfsck));
2960
2961                 GOTO(out, rc = -EPERM);
2962         }
2963
2964         start->ls_version = bk->lb_version;
2965
2966         if (start->ls_active != 0) {
2967                 struct lfsck_component *next;
2968
2969                 if (start->ls_active == LFSCK_TYPES_ALL)
2970                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2971
2972                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2973                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2974                         GOTO(out, rc = -ENOTSUPP);
2975                 }
2976
2977                 list_for_each_entry_safe(com, next,
2978                                          &lfsck->li_list_scan, lc_link) {
2979                         if (!(com->lc_type & start->ls_active)) {
2980                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2981                                                              false);
2982                                 if (rc != 0)
2983                                         GOTO(out, rc);
2984                         }
2985                 }
2986
2987                 while (start->ls_active != 0) {
2988                         if (type & start->ls_active) {
2989                                 com = __lfsck_component_find(lfsck, type,
2990                                                         &lfsck->li_list_idle);
2991                                 if (com != NULL)
2992                                         /* The component status will be updated
2993                                          * when its prep() is called later by
2994                                          * the LFSCK main engine. */
2995                                         list_move_tail(&com->lc_link,
2996                                                        &lfsck->li_list_scan);
2997                                 start->ls_active &= ~type;
2998                         }
2999                         type <<= 1;
3000                 }
3001         }
3002
3003         if (list_empty(&lfsck->li_list_scan)) {
3004                 /* The speed limit will be used to control both the LFSCK and
3005                  * low layer scrub (if applied), need to be handled firstly. */
3006                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3007                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3008                                 rc = lfsck_bookmark_store(env, lfsck);
3009                                 if (rc != 0)
3010                                         GOTO(out, rc);
3011                         }
3012                 }
3013
3014                 goto trigger;
3015         }
3016
3017         if (start->ls_flags & LPF_RESET)
3018                 flags |= DOIF_RESET;
3019
3020         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3021         if (rc != 0)
3022                 GOTO(out, rc);
3023
3024         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3025                 start->ls_active |= com->lc_type;
3026                 if (flags & DOIF_RESET) {
3027                         rc = com->lc_ops->lfsck_reset(env, com, false);
3028                         if (rc != 0)
3029                                 GOTO(out, rc);
3030                 }
3031         }
3032
3033 trigger:
3034         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3035         if (bk->lb_param & LPF_DRYRUN)
3036                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3037
3038         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3039                 valid |= DOIV_ERROR_HANDLE;
3040                 if (start->ls_flags & LPF_FAILOUT)
3041                         flags |= DOIF_FAILOUT;
3042         }
3043
3044         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3045                 valid |= DOIV_DRYRUN;
3046                 if (start->ls_flags & LPF_DRYRUN)
3047                         flags |= DOIF_DRYRUN;
3048         }
3049
3050         if (!list_empty(&lfsck->li_list_scan))
3051                 flags |= DOIF_OUTUSED;
3052
3053         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3054         thread_set_flags(thread, 0);
3055         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3056         if (IS_ERR(lta))
3057                 GOTO(out, rc = PTR_ERR(lta));
3058
3059         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3060         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3061         if (IS_ERR(task)) {
3062                 rc = PTR_ERR(task);
3063                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3064                        lfsck_lfsck2name(lfsck), rc);
3065                 lfsck_thread_args_fini(lta);
3066
3067                 GOTO(out, rc);
3068         }
3069
3070         l_wait_event(thread->t_ctl_waitq,
3071                      thread_is_running(thread) ||
3072                      thread_is_stopped(thread),
3073                      &lwi);
3074         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3075                 lfsck->li_start_unplug = 1;
3076                 wake_up_all(&thread->t_ctl_waitq);
3077
3078                 GOTO(out, rc = 0);
3079         }
3080
3081         /* release lfsck::li_mutex to avoid deadlock. */
3082         mutex_unlock(&lfsck->li_mutex);
3083         rc = lfsck_start_all(env, lfsck, start);
3084         if (rc != 0) {
3085                 spin_lock(&lfsck->li_lock);
3086                 if (thread_is_stopped(thread)) {
3087                         spin_unlock(&lfsck->li_lock);
3088                 } else {
3089                         lfsck->li_status = LS_FAILED;
3090                         lfsck->li_flags = 0;
3091                         thread_set_flags(thread, SVC_STOPPING);
3092                         spin_unlock(&lfsck->li_lock);
3093
3094                         lfsck->li_start_unplug = 1;
3095                         wake_up_all(&thread->t_ctl_waitq);
3096                         l_wait_event(thread->t_ctl_waitq,
3097                                      thread_is_stopped(thread),
3098                                      &lwi);
3099                 }
3100         } else {
3101                 lfsck->li_start_unplug = 1;
3102                 wake_up_all(&thread->t_ctl_waitq);
3103         }
3104
3105         GOTO(put, rc);
3106
3107 out:
3108         mutex_unlock(&lfsck->li_mutex);
3109
3110 put:
3111         lfsck_instance_put(env, lfsck);
3112
3113         return rc < 0 ? rc : 0;
3114 }
3115 EXPORT_SYMBOL(lfsck_start);
3116
3117 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3118                struct lfsck_stop *stop)
3119 {
3120         struct lfsck_instance   *lfsck;
3121         struct ptlrpc_thread    *thread;
3122         struct l_wait_info       lwi    = { 0 };
3123         int                      rc     = 0;
3124         int                      rc1    = 0;
3125         ENTRY;
3126
3127         lfsck = lfsck_instance_find(key, true, false);
3128         if (unlikely(lfsck == NULL))
3129                 RETURN(-ENXIO);
3130
3131         thread = &lfsck->li_thread;
3132         /* release lfsck::li_mutex to avoid deadlock. */
3133         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
3134                 if (!lfsck->li_master) {
3135                         CERROR("%s: only allow to specify '-A' via MDS\n",
3136                                lfsck_lfsck2name(lfsck));
3137
3138                         GOTO(out, rc = -EPERM);
3139                 }
3140
3141                 rc1 = lfsck_stop_all(env, lfsck, stop);
3142         }
3143
3144         mutex_lock(&lfsck->li_mutex);
3145         spin_lock(&lfsck->li_lock);
3146         /* no error if LFSCK is already stopped, or was never started */
3147         if (thread_is_init(thread) || thread_is_stopped(thread)) {
3148                 spin_unlock(&lfsck->li_lock);
3149                 GOTO(out, rc = 0);
3150         }
3151
3152         if (stop != NULL) {
3153                 lfsck->li_status = stop->ls_status;
3154                 lfsck->li_flags = stop->ls_flags;
3155         } else {
3156                 lfsck->li_status = LS_STOPPED;
3157                 lfsck->li_flags = 0;
3158         }
3159
3160         thread_set_flags(thread, SVC_STOPPING);
3161
3162         if (lfsck->li_master) {
3163                 struct lfsck_component *com;
3164                 struct lfsck_assistant_data *lad;
3165
3166                 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3167                         lad = com->lc_data;
3168                         spin_lock(&lad->lad_lock);
3169                         if (lad->lad_task != NULL)
3170                                 force_sig(SIGINT, lad->lad_task);
3171                         spin_unlock(&lad->lad_lock);
3172                 }
3173
3174                 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3175                         lad = com->lc_data;
3176                         spin_lock(&lad->lad_lock);
3177                         if (lad->lad_task != NULL)
3178                                 force_sig(SIGINT, lad->lad_task);
3179                         spin_unlock(&lad->lad_lock);
3180                 }
3181         }
3182
3183         spin_unlock(&lfsck->li_lock);
3184
3185         wake_up_all(&thread->t_ctl_waitq);
3186         l_wait_event(thread->t_ctl_waitq,
3187                      thread_is_stopped(thread),
3188                      &lwi);
3189
3190         GOTO(out, rc = 0);
3191
3192 out:
3193         mutex_unlock(&lfsck->li_mutex);
3194         lfsck_instance_put(env, lfsck);
3195
3196         return rc != 0 ? rc : rc1;
3197 }
3198 EXPORT_SYMBOL(lfsck_stop);
3199
3200 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3201                     struct lfsck_request *lr, struct thandle *th)
3202 {
3203         int rc = -EOPNOTSUPP;
3204         ENTRY;
3205
3206         switch (lr->lr_event) {
3207         case LE_START: {
3208                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
3209                 struct lfsck_start_param  lsp;
3210
3211                 memset(start, 0, sizeof(*start));
3212                 start->ls_valid = lr->lr_valid;
3213                 start->ls_speed_limit = lr->lr_speed;
3214                 start->ls_version = lr->lr_version;
3215                 start->ls_active = lr->lr_active;
3216                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3217                 start->ls_async_windows = lr->lr_async_windows;
3218
3219                 lsp.lsp_start = start;
3220                 lsp.lsp_index = lr->lr_index;
3221                 lsp.lsp_index_valid = 1;
3222                 rc = lfsck_start(env, key, &lsp);
3223                 break;
3224         }
3225         case LE_STOP: {
3226                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3227
3228                 memset(stop, 0, sizeof(*stop));
3229                 stop->ls_status = lr->lr_status;
3230                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3231                 rc = lfsck_stop(env, key, stop);
3232                 break;
3233         }
3234         case LE_PHASE1_DONE:
3235         case LE_PHASE2_DONE:
3236         case LE_FID_ACCESSED:
3237         case LE_PEER_EXIT:
3238         case LE_CONDITIONAL_DESTROY:
3239         case LE_SKIP_NLINK_DECLARE:
3240         case LE_SKIP_NLINK:
3241         case LE_SET_LMV_MASTER:
3242         case LE_SET_LMV_SLAVE:
3243         case LE_PAIRS_VERIFY: {
3244                 struct lfsck_instance  *lfsck;
3245                 struct lfsck_component *com;
3246
3247                 lfsck = lfsck_instance_find(key, true, false);
3248                 if (unlikely(lfsck == NULL))
3249                         RETURN(-ENXIO);
3250
3251                 com = lfsck_component_find(lfsck, lr->lr_active);
3252                 if (likely(com != NULL)) {
3253                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3254                         lfsck_component_put(env, com);
3255                 }
3256
3257                 lfsck_instance_put(env, lfsck);
3258                 break;
3259         }
3260         default:
3261                 break;
3262         }
3263
3264         RETURN(rc);
3265 }
3266 EXPORT_SYMBOL(lfsck_in_notify);
3267
3268 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3269                 struct lfsck_request *lr)
3270 {
3271         struct lfsck_instance  *lfsck;
3272         struct lfsck_component *com;
3273         int                     rc;
3274         ENTRY;
3275
3276         lfsck = lfsck_instance_find(key, true, false);
3277         if (unlikely(lfsck == NULL))
3278                 RETURN(-ENXIO);
3279
3280         com = lfsck_component_find(lfsck, lr->lr_active);
3281         if (likely(com != NULL)) {
3282                 rc = com->lc_ops->lfsck_query(env, com);
3283                 lfsck_component_put(env, com);
3284         } else {
3285                 rc = -ENOTSUPP;
3286         }
3287
3288         lfsck_instance_put(env, lfsck);
3289
3290         RETURN(rc);
3291 }
3292
3293 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3294                              struct ldlm_namespace *ns)
3295 {
3296         struct lfsck_instance  *lfsck;
3297         int                     rc      = -ENXIO;
3298
3299         lfsck = lfsck_instance_find(key, true, false);
3300         if (likely(lfsck != NULL)) {
3301                 lfsck->li_namespace = ns;
3302                 lfsck_instance_put(env, lfsck);
3303                 rc = 0;
3304         }
3305
3306         return rc;
3307 }
3308 EXPORT_SYMBOL(lfsck_register_namespace);
3309
3310 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3311                    struct dt_device *next, struct obd_device *obd,
3312                    lfsck_out_notify notify, void *notify_data, bool master)
3313 {
3314         struct lfsck_instance   *lfsck;
3315         struct dt_object        *root  = NULL;
3316         struct dt_object        *obj   = NULL;
3317         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3318         int                      rc;
3319         ENTRY;
3320
3321         lfsck = lfsck_instance_find(key, false, false);
3322         if (unlikely(lfsck != NULL))
3323                 RETURN(-EEXIST);
3324
3325         OBD_ALLOC_PTR(lfsck);
3326         if (lfsck == NULL)
3327                 RETURN(-ENOMEM);
3328
3329         mutex_init(&lfsck->li_mutex);
3330         spin_lock_init(&lfsck->li_lock);
3331         INIT_LIST_HEAD(&lfsck->li_link);
3332         INIT_LIST_HEAD(&lfsck->li_list_scan);
3333         INIT_LIST_HEAD(&lfsck->li_list_dir);
3334         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3335         INIT_LIST_HEAD(&lfsck->li_list_idle);
3336         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3337         atomic_set(&lfsck->li_ref, 1);
3338         atomic_set(&lfsck->li_double_scan_count, 0);
3339         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3340         lfsck->li_out_notify = notify;
3341         lfsck->li_out_notify_data = notify_data;
3342         lfsck->li_next = next;
3343         lfsck->li_bottom = key;
3344         lfsck->li_obd = obd;
3345
3346         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3347         if (rc != 0)
3348                 GOTO(out, rc);
3349
3350         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3351         if (rc != 0)
3352                 GOTO(out, rc);
3353
3354         fid->f_seq = FID_SEQ_LOCAL_NAME;
3355         fid->f_oid = 1;
3356         fid->f_ver = 0;
3357         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3358         if (rc != 0)
3359                 GOTO(out, rc);
3360
3361         rc = dt_root_get(env, key, fid);
3362         if (rc != 0)
3363                 GOTO(out, rc);
3364
3365         root = dt_locate(env, key, fid);
3366         if (IS_ERR(root))
3367                 GOTO(out, rc = PTR_ERR(root));
3368
3369         if (unlikely(!dt_try_as_dir(env, root)))
3370                 GOTO(out, rc = -ENOTDIR);
3371
3372         lfsck->li_local_root_fid = *fid;
3373         if (master) {
3374                 lfsck->li_master = 1;
3375                 if (lfsck_dev_idx(lfsck) == 0) {
3376                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3377                         const struct lu_name *cname;
3378
3379                         rc = dt_lookup(env, root,
3380                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3381                                 (const struct dt_key *)"ROOT");
3382                         if (rc != 0)
3383                                 GOTO(out, rc);
3384
3385                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3386                         if (IS_ERR(obj))
3387                                 GOTO(out, rc = PTR_ERR(obj));
3388
3389                         if (unlikely(!dt_try_as_dir(env, obj)))
3390                                 GOTO(out, rc = -ENOTDIR);
3391
3392                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3393                                 (const struct dt_key *)dotlustre);
3394                         if (rc != 0)
3395                                 GOTO(out, rc);
3396
3397                         lfsck_object_put(env, obj);
3398                         obj = dt_locate(env, key, fid);
3399                         if (IS_ERR(obj))
3400                                 GOTO(out, rc = PTR_ERR(obj));
3401
3402                         cname = lfsck_name_get_const(env, dotlustre,
3403                                                      strlen(dotlustre));
3404                         rc = lfsck_verify_linkea(env, obj, cname,
3405                                                  &lfsck->li_global_root_fid);
3406                         if (rc != 0)
3407                                 GOTO(out, rc);
3408
3409                         if (unlikely(!dt_try_as_dir(env, obj)))
3410                                 GOTO(out, rc = -ENOTDIR);
3411
3412                         *pfid = *fid;
3413                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3414                                        (const struct dt_key *)lostfound);
3415                         if (rc != 0)
3416                                 GOTO(out, rc);
3417
3418                         lfsck_object_put(env, obj);
3419                         obj = dt_locate(env, key, fid);
3420                         if (IS_ERR(obj))
3421                                 GOTO(out, rc = PTR_ERR(obj));
3422
3423                         cname = lfsck_name_get_const(env, lostfound,
3424                                                      strlen(lostfound));
3425                         rc = lfsck_verify_linkea(env, obj, cname, pfid);
3426                         if (rc != 0)
3427                                 GOTO(out, rc);
3428
3429                         lfsck_object_put(env, obj);
3430                         obj = NULL;
3431                 }
3432         }
3433
3434         fid->f_seq = FID_SEQ_LOCAL_FILE;
3435         fid->f_oid = OTABLE_IT_OID;
3436         fid->f_ver = 0;
3437         obj = dt_locate(env, key, fid);
3438         if (IS_ERR(obj))
3439                 GOTO(out, rc = PTR_ERR(obj));
3440
3441         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3442         if (rc != 0)
3443                 GOTO(out, rc);
3444
3445         lfsck->li_obj_oit = obj;
3446         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3447                                         S_IFDIR | S_IRUGO | S_IWUSR);
3448         if (IS_ERR(obj))
3449                 GOTO(out, rc = PTR_ERR(obj));
3450
3451         lu_object_get(&obj->do_lu);
3452         lfsck->li_lfsck_dir = obj;
3453         rc = lfsck_bookmark_setup(env, lfsck);
3454         if (rc != 0)
3455                 GOTO(out, rc);
3456
3457         if (master) {
3458                 rc = lfsck_fid_init(lfsck);
3459                 if (rc < 0)
3460                         GOTO(out, rc);
3461
3462                 rc = lfsck_namespace_setup(env, lfsck);
3463                 if (rc < 0)
3464                         GOTO(out, rc);
3465         }
3466
3467         rc = lfsck_layout_setup(env, lfsck);
3468         if (rc < 0)
3469                 GOTO(out, rc);
3470
3471         /* XXX: more LFSCK components initialization to be added here. */
3472
3473         rc = lfsck_instance_add(lfsck);
3474         if (rc == 0)
3475                 rc = lfsck_add_target_from_orphan(env, lfsck);
3476 out:
3477         if (obj != NULL && !IS_ERR(obj))
3478                 lfsck_object_put(env, obj);
3479         if (root != NULL && !IS_ERR(root))
3480                 lfsck_object_put(env, root);
3481         if (rc != 0)
3482                 lfsck_instance_cleanup(env, lfsck);
3483         return rc;
3484 }
3485 EXPORT_SYMBOL(lfsck_register);
3486
3487 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3488 {
3489         struct lfsck_instance *lfsck;
3490
3491         lfsck = lfsck_instance_find(key, false, true);
3492         if (lfsck != NULL)
3493                 lfsck_instance_put(env, lfsck);
3494 }
3495 EXPORT_SYMBOL(lfsck_degister);
3496
3497 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3498                      struct dt_device *tgt, struct obd_export *exp,
3499                      __u32 index, bool for_ost)
3500 {
3501         struct lfsck_instance   *lfsck;
3502         struct lfsck_tgt_desc   *ltd;
3503         int                      rc;
3504         ENTRY;
3505
3506         OBD_ALLOC_PTR(ltd);
3507         if (ltd == NULL)
3508                 RETURN(-ENOMEM);
3509
3510         ltd->ltd_tgt = tgt;
3511         ltd->ltd_key = key;
3512         ltd->ltd_exp = exp;
3513         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3514         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3515         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3516         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3517         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3518         atomic_set(&ltd->ltd_ref, 1);
3519         ltd->ltd_index = index;
3520
3521         spin_lock(&lfsck_instance_lock);
3522         lfsck = __lfsck_instance_find(key, true, false);
3523         if (lfsck == NULL) {
3524                 if (for_ost)
3525                         list_add_tail(&ltd->ltd_orphan_list,
3526                                       &lfsck_ost_orphan_list);
3527                 else
3528                         list_add_tail(&ltd->ltd_orphan_list,
3529                                       &lfsck_mdt_orphan_list);
3530                 spin_unlock(&lfsck_instance_lock);
3531
3532                 RETURN(0);
3533         }
3534         spin_unlock(&lfsck_instance_lock);
3535
3536         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3537         if (rc != 0)
3538                 lfsck_tgt_put(ltd);
3539
3540         lfsck_instance_put(env, lfsck);
3541
3542         RETURN(rc);
3543 }
3544 EXPORT_SYMBOL(lfsck_add_target);
3545
3546 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3547                       struct dt_device *tgt, __u32 index, bool for_ost)
3548 {
3549         struct lfsck_instance   *lfsck;
3550         struct lfsck_tgt_descs  *ltds;
3551         struct lfsck_tgt_desc   *ltd;
3552         struct list_head        *head;
3553
3554         if (for_ost)
3555                 head = &lfsck_ost_orphan_list;
3556         else
3557                 head = &lfsck_mdt_orphan_list;
3558
3559         spin_lock(&lfsck_instance_lock);
3560         list_for_each_entry(ltd, head, ltd_orphan_list) {
3561                 if (ltd->ltd_tgt == tgt) {
3562                         list_del_init(&ltd->ltd_orphan_list);
3563                         spin_unlock(&lfsck_instance_lock);
3564                         lfsck_tgt_put(ltd);
3565
3566                         return;
3567                 }
3568         }
3569
3570         ltd = NULL;
3571         lfsck = __lfsck_instance_find(key, true, false);
3572         spin_unlock(&lfsck_instance_lock);
3573         if (unlikely(lfsck == NULL))
3574                 return;
3575
3576         if (for_ost)
3577                 ltds = &lfsck->li_ost_descs;
3578         else
3579                 ltds = &lfsck->li_mdt_descs;
3580
3581         down_write(&ltds->ltd_rw_sem);
3582         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3583
3584         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3585                 goto unlock;
3586
3587         ltd = lfsck_ltd2tgt(ltds, index);
3588         if (unlikely(ltd == NULL))
3589                 goto unlock;
3590
3591         LASSERT(ltds->ltd_tgtnr > 0);
3592
3593         ltds->ltd_tgtnr--;
3594         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3595         lfsck_assign_tgt(ltds, NULL, index);
3596
3597 unlock:
3598         if (ltd == NULL) {
3599                 if (for_ost)
3600                         head = &lfsck->li_ost_descs.ltd_orphan;
3601                 else
3602                         head = &lfsck->li_mdt_descs.ltd_orphan;
3603
3604                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3605                         if (ltd->ltd_tgt == tgt) {
3606                                 list_del_init(&ltd->ltd_orphan_list);
3607                                 break;
3608                         }
3609                 }
3610         }
3611
3612         up_write(&ltds->ltd_rw_sem);
3613         if (ltd != NULL) {
3614                 spin_lock(&ltds->ltd_lock);
3615                 ltd->ltd_dead = 1;
3616                 spin_unlock(&ltds->ltd_lock);
3617                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3618                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3619                 lfsck_tgt_put(ltd);
3620         }
3621
3622         lfsck_instance_put(env, lfsck);
3623 }
3624 EXPORT_SYMBOL(lfsck_del_target);
3625
3626 static int __init lfsck_init(void)
3627 {
3628         int rc;
3629
3630         INIT_LIST_HEAD(&lfsck_instance_list);
3631         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3632         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3633         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3634         rc = lu_context_key_register(&lfsck_thread_key);
3635         if (rc == 0) {
3636                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3637                 tgt_register_lfsck_query(lfsck_query);
3638         }
3639
3640         return rc;
3641 }
3642
3643 static void __exit lfsck_exit(void)
3644 {
3645         struct lfsck_tgt_desc *ltd;
3646         struct lfsck_tgt_desc *next;
3647
3648         LASSERT(list_empty(&lfsck_instance_list));
3649
3650         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3651                                  ltd_orphan_list) {
3652                 list_del_init(&ltd->ltd_orphan_list);
3653                 lfsck_tgt_put(ltd);
3654         }
3655
3656         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3657                                  ltd_orphan_list) {
3658                 list_del_init(&ltd->ltd_orphan_list);
3659                 lfsck_tgt_put(ltd);
3660         }
3661
3662         lu_context_key_degister(&lfsck_thread_key);
3663 }
3664
3665 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3666 MODULE_DESCRIPTION("Lustre File System Checker");
3667 MODULE_VERSION(LUSTRE_VERSION_STRING);
3668 MODULE_LICENSE("GPL");
3669
3670 module_init(lfsck_init);
3671 module_exit(lfsck_exit);