Whamcloud - gitweb
LU-6684 lfsck: set the lfsck notify as interruptable
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <linux/kthread.h>
34 #include <linux/sched.h>
35 #include <libcfs/list.h>
36 #include <lu_object.h>
37 #include <dt_object.h>
38 #include <md_object.h>
39 #include <lustre_fld.h>
40 #include <lustre_lib.h>
41 #include <lustre_net.h>
42 #include <lustre_lfsck.h>
43 #include <lustre/lustre_lfsck_user.h>
44
45 #include "lfsck_internal.h"
46
47 #define LFSCK_CHECKPOINT_SKIP   1
48
49 /* define lfsck thread key */
50 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
51
52 static void lfsck_key_fini(const struct lu_context *ctx,
53                            struct lu_context_key *key, void *data)
54 {
55         struct lfsck_thread_info *info = data;
56
57         lu_buf_free(&info->lti_linkea_buf);
58         lu_buf_free(&info->lti_linkea_buf2);
59         lu_buf_free(&info->lti_big_buf);
60         OBD_FREE_PTR(info);
61 }
62
63 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
64 LU_KEY_INIT_GENERIC(lfsck);
65
66 static struct list_head lfsck_instance_list;
67 static struct list_head lfsck_ost_orphan_list;
68 static struct list_head lfsck_mdt_orphan_list;
69 static DEFINE_SPINLOCK(lfsck_instance_lock);
70
71 static const char *lfsck_status_names[] = {
72         [LS_INIT]               = "init",
73         [LS_SCANNING_PHASE1]    = "scanning-phase1",
74         [LS_SCANNING_PHASE2]    = "scanning-phase2",
75         [LS_COMPLETED]          = "completed",
76         [LS_FAILED]             = "failed",
77         [LS_STOPPED]            = "stopped",
78         [LS_PAUSED]             = "paused",
79         [LS_CRASHED]            = "crashed",
80         [LS_PARTIAL]            = "partial",
81         [LS_CO_FAILED]          = "co-failed",
82         [LS_CO_STOPPED]         = "co-stopped",
83         [LS_CO_PAUSED]          = "co-paused"
84 };
85
86 const char *lfsck_flags_names[] = {
87         "scanned-once",
88         "inconsistent",
89         "upgrade",
90         "incomplete",
91         "crashed_lastid",
92         NULL
93 };
94
95 const char *lfsck_param_names[] = {
96         NULL,
97         "failout",
98         "dryrun",
99         "all_targets",
100         "broadcast",
101         "orphan",
102         "create_ostobj",
103         "create_mdtobj",
104         NULL
105 };
106
107 enum lfsck_verify_lpf_types {
108         LVLT_BY_BOOKMARK        = 0,
109         LVLT_BY_NAMEENTRY       = 1,
110 };
111
112 const char *lfsck_status2names(enum lfsck_status status)
113 {
114         if (unlikely(status < 0 || status >= LS_MAX))
115                 return "unknown";
116
117         return lfsck_status_names[status];
118 }
119
120 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
121 {
122         spin_lock_init(&ltds->ltd_lock);
123         init_rwsem(&ltds->ltd_rw_sem);
124         INIT_LIST_HEAD(&ltds->ltd_orphan);
125         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
126         if (ltds->ltd_tgts_bitmap == NULL)
127                 return -ENOMEM;
128
129         return 0;
130 }
131
132 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
133 {
134         struct lfsck_tgt_desc   *ltd;
135         struct lfsck_tgt_desc   *next;
136         int                      idx;
137
138         down_write(&ltds->ltd_rw_sem);
139
140         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
141                                  ltd_orphan_list) {
142                 list_del_init(&ltd->ltd_orphan_list);
143                 lfsck_tgt_put(ltd);
144         }
145
146         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
147                 up_write(&ltds->ltd_rw_sem);
148
149                 return;
150         }
151
152         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
153                 ltd = lfsck_ltd2tgt(ltds, idx);
154                 if (likely(ltd != NULL)) {
155                         LASSERT(list_empty(&ltd->ltd_layout_list));
156                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
157                         LASSERT(list_empty(&ltd->ltd_namespace_list));
158                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
159
160                         ltds->ltd_tgtnr--;
161                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
162                         lfsck_assign_tgt(ltds, NULL, idx);
163                         lfsck_tgt_put(ltd);
164                 }
165         }
166
167         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
168                  ltds->ltd_tgtnr);
169
170         for (idx = 0; idx < TGT_PTRS; idx++) {
171                 if (ltds->ltd_tgts_idx[idx] != NULL) {
172                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
173                         ltds->ltd_tgts_idx[idx] = NULL;
174                 }
175         }
176
177         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
178         ltds->ltd_tgts_bitmap = NULL;
179         up_write(&ltds->ltd_rw_sem);
180 }
181
182 static int __lfsck_add_target(const struct lu_env *env,
183                               struct lfsck_instance *lfsck,
184                               struct lfsck_tgt_desc *ltd,
185                               bool for_ost, bool locked)
186 {
187         struct lfsck_tgt_descs *ltds;
188         __u32                   index = ltd->ltd_index;
189         int                     rc    = 0;
190         ENTRY;
191
192         if (for_ost)
193                 ltds = &lfsck->li_ost_descs;
194         else
195                 ltds = &lfsck->li_mdt_descs;
196
197         if (!locked)
198                 down_write(&ltds->ltd_rw_sem);
199
200         LASSERT(ltds->ltd_tgts_bitmap != NULL);
201
202         if (index >= ltds->ltd_tgts_bitmap->size) {
203                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
204                                     (__u32)BITS_PER_LONG);
205                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
206                 cfs_bitmap_t *new_bitmap;
207
208                 while (newsize < index + 1)
209                         newsize <<= 1;
210
211                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
212                 if (new_bitmap == NULL)
213                         GOTO(unlock, rc = -ENOMEM);
214
215                 if (ltds->ltd_tgtnr > 0)
216                         cfs_bitmap_copy(new_bitmap, old_bitmap);
217                 ltds->ltd_tgts_bitmap = new_bitmap;
218                 CFS_FREE_BITMAP(old_bitmap);
219         }
220
221         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
222                 CERROR("%s: the device %s (%u) is registered already\n",
223                        lfsck_lfsck2name(lfsck),
224                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
225                 GOTO(unlock, rc = -EEXIST);
226         }
227
228         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
229                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
230                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
231                         GOTO(unlock, rc = -ENOMEM);
232         }
233
234         lfsck_assign_tgt(ltds, ltd, index);
235         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
236         ltds->ltd_tgtnr++;
237
238         GOTO(unlock, rc = 0);
239
240 unlock:
241         if (!locked)
242                 up_write(&ltds->ltd_rw_sem);
243
244         return rc;
245 }
246
247 static int lfsck_add_target_from_orphan(const struct lu_env *env,
248                                         struct lfsck_instance *lfsck)
249 {
250         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
251         struct lfsck_tgt_desc   *ltd;
252         struct lfsck_tgt_desc   *next;
253         struct list_head        *head    = &lfsck_ost_orphan_list;
254         int                      rc;
255         bool                     for_ost = true;
256
257 again:
258         spin_lock(&lfsck_instance_lock);
259         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
260                 if (ltd->ltd_key == lfsck->li_bottom)
261                         list_move_tail(&ltd->ltd_orphan_list,
262                                        &ltds->ltd_orphan);
263         }
264         spin_unlock(&lfsck_instance_lock);
265
266         down_write(&ltds->ltd_rw_sem);
267         while (!list_empty(&ltds->ltd_orphan)) {
268                 ltd = list_entry(ltds->ltd_orphan.next,
269                                  struct lfsck_tgt_desc,
270                                  ltd_orphan_list);
271                 list_del_init(&ltd->ltd_orphan_list);
272                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
273                 /* Do not hold the semaphore for too long time. */
274                 up_write(&ltds->ltd_rw_sem);
275                 if (rc != 0)
276                         return rc;
277
278                 down_write(&ltds->ltd_rw_sem);
279         }
280         up_write(&ltds->ltd_rw_sem);
281
282         if (for_ost) {
283                 ltds = &lfsck->li_mdt_descs;
284                 head = &lfsck_mdt_orphan_list;
285                 for_ost = false;
286                 goto again;
287         }
288
289         return 0;
290 }
291
292 static inline struct lfsck_component *
293 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
294                        struct list_head *list)
295 {
296         struct lfsck_component *com;
297
298         list_for_each_entry(com, list, lc_link) {
299                 if (com->lc_type == type)
300                         return com;
301         }
302         return NULL;
303 }
304
305 struct lfsck_component *
306 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
307 {
308         struct lfsck_component *com;
309
310         spin_lock(&lfsck->li_lock);
311         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
312         if (com != NULL)
313                 goto unlock;
314
315         com = __lfsck_component_find(lfsck, type,
316                                      &lfsck->li_list_double_scan);
317         if (com != NULL)
318                 goto unlock;
319
320         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
321
322 unlock:
323         if (com != NULL)
324                 lfsck_component_get(com);
325         spin_unlock(&lfsck->li_lock);
326         return com;
327 }
328
329 void lfsck_component_cleanup(const struct lu_env *env,
330                              struct lfsck_component *com)
331 {
332         if (!list_empty(&com->lc_link))
333                 list_del_init(&com->lc_link);
334         if (!list_empty(&com->lc_link_dir))
335                 list_del_init(&com->lc_link_dir);
336
337         lfsck_component_put(env, com);
338 }
339
340 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
341                     struct lu_fid *fid, bool locked)
342 {
343         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
344         int                      rc = 0;
345         ENTRY;
346
347         if (!locked)
348                 mutex_lock(&lfsck->li_mutex);
349
350         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
351         if (rc >= 0) {
352                 bk->lb_last_fid = *fid;
353                 /* We do not care about whether the subsequent sub-operations
354                  * failed or not. The worst case is that one FID is lost that
355                  * is not a big issue for the LFSCK since it is relative rare
356                  * for LFSCK create. */
357                 rc = lfsck_bookmark_store(env, lfsck);
358         }
359
360         if (!locked)
361                 mutex_unlock(&lfsck->li_mutex);
362
363         RETURN(rc);
364 }
365
366 static int __lfsck_ibits_lock(const struct lu_env *env,
367                               struct lfsck_instance *lfsck,
368                               struct dt_object *obj, struct ldlm_res_id *resid,
369                               struct lustre_handle *lh, __u64 bits,
370                               enum ldlm_mode mode)
371 {
372         struct lfsck_thread_info        *info   = lfsck_env_info(env);
373         union ldlm_policy_data          *policy = &info->lti_policy;
374         __u64                            flags  = LDLM_FL_ATOMIC_CB;
375         int                              rc;
376
377         LASSERT(lfsck->li_namespace != NULL);
378
379         memset(policy, 0, sizeof(*policy));
380         policy->l_inodebits.bits = bits;
381         if (dt_object_remote(obj)) {
382                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
383
384                 memset(einfo, 0, sizeof(*einfo));
385                 einfo->ei_type = LDLM_IBITS;
386                 einfo->ei_mode = mode;
387                 einfo->ei_cb_bl = ldlm_blocking_ast;
388                 einfo->ei_cb_cp = ldlm_completion_ast;
389                 einfo->ei_res_id = resid;
390
391                 rc = dt_object_lock(env, obj, lh, einfo, policy);
392         } else {
393                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
394                                             LDLM_IBITS, policy, mode,
395                                             &flags, ldlm_blocking_ast,
396                                             ldlm_completion_ast, NULL, NULL,
397                                             0, LVB_T_NONE, NULL, lh);
398         }
399
400         if (rc == ELDLM_OK) {
401                 rc = 0;
402         } else {
403                 memset(lh, 0, sizeof(*lh));
404                 rc = -EIO;
405         }
406
407         return rc;
408 }
409
410 /**
411  * Request the specified ibits lock for the given object.
412  *
413  * Before the LFSCK modifying on the namespace visible object,
414  * it needs to acquire related ibits ldlm lock.
415  *
416  * \param[in] env       pointer to the thread context
417  * \param[in] lfsck     pointer to the lfsck instance
418  * \param[in] obj       pointer to the dt_object to be locked
419  * \param[out] lh       pointer to the lock handle
420  * \param[in] bits      the bits for the ldlm lock to be acquired
421  * \param[in] mode      the mode for the ldlm lock to be acquired
422  *
423  * \retval              0 for success
424  * \retval              negative error number on failure
425  */
426 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
427                      struct dt_object *obj, struct lustre_handle *lh,
428                      __u64 bits, enum ldlm_mode mode)
429 {
430         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
431
432         LASSERT(!lustre_handle_is_used(lh));
433
434         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
435         return __lfsck_ibits_lock(env, lfsck, obj, resid, lh, bits, mode);
436 }
437
438 /**
439  * Release the the specified ibits lock.
440  *
441  * If the lock has been acquired before, release it
442  * and cleanup the handle. Otherwise, do nothing.
443  *
444  * \param[in] lh        pointer to the lock handle
445  * \param[in] mode      the mode for the ldlm lock to be released
446  */
447 void lfsck_ibits_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
448 {
449         if (lustre_handle_is_used(lh)) {
450                 ldlm_lock_decref(lh, mode);
451                 memset(lh, 0, sizeof(*lh));
452         }
453 }
454
455 /**
456  * Request compound ibits locks for the given <obj, name> pairs.
457  *
458  * Before the LFSCK modifying on the namespace visible object, it needs to
459  * acquire related ibits ldlm lock. Usually, we can use lfsck_ibits_lock for
460  * the lock purpose. But the simple lfsck_ibits_lock for directory-based
461  * modificationis (such as insert name entry to the directory) may be too
462  * coarse-grained and not efficient.
463  *
464  * The lfsck_lock() will request compound ibits locks on the specified
465  * <obj, name> pairs: the PDO (Parallel Directory Operations) ibits (UPDATE)
466  * lock on the directory object, and the regular ibits lock on the name hash.
467  *
468  * \param[in] env       pointer to the thread context
469  * \param[in] lfsck     pointer to the lfsck instance
470  * \param[in] obj       pointer to the dt_object to be locked
471  * \param[in] name      used for building the PDO lock resource
472  * \param[out] llh      pointer to the lfsck_lock_handle
473  * \param[in] bits      the bits for the ldlm lock to be acquired
474  * \param[in] mode      the mode for the ldlm lock to be acquired
475  *
476  * \retval              0 for success
477  * \retval              negative error number on failure
478  */
479 int lfsck_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
480                struct dt_object *obj, const char *name,
481                struct lfsck_lock_handle *llh, __u64 bits, enum ldlm_mode mode)
482 {
483         struct ldlm_res_id *resid = &lfsck_env_info(env)->lti_resid;
484         int                 rc;
485
486         LASSERT(S_ISDIR(lfsck_object_type(obj)));
487         LASSERT(name != NULL);
488         LASSERT(name[0] != 0);
489         LASSERT(!lustre_handle_is_used(&llh->llh_pdo_lh));
490         LASSERT(!lustre_handle_is_used(&llh->llh_reg_lh));
491
492         switch (mode) {
493         case LCK_EX:
494                 llh->llh_pdo_mode = LCK_EX;
495                 break;
496         case LCK_PW:
497                 llh->llh_pdo_mode = LCK_CW;
498                 break;
499         case LCK_PR:
500                 llh->llh_pdo_mode = LCK_CR;
501                 break;
502         default:
503                 CDEBUG(D_LFSCK, "%s: unexpected PDO lock mode %u on the obj "
504                        DFID"\n", lfsck_lfsck2name(lfsck), mode,
505                        PFID(lfsck_dto2fid(obj)));
506                 LBUG();
507         }
508
509         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
510         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_pdo_lh,
511                                 MDS_INODELOCK_UPDATE, llh->llh_pdo_mode);
512         if (rc != 0)
513                 return rc;
514
515         llh->llh_reg_mode = mode;
516         resid->name[LUSTRE_RES_ID_HSH_OFF] = full_name_hash(name, strlen(name));
517         LASSERT(resid->name[LUSTRE_RES_ID_HSH_OFF] != 0);
518         rc = __lfsck_ibits_lock(env, lfsck, obj, resid, &llh->llh_reg_lh,
519                                 bits, llh->llh_reg_mode);
520         if (rc != 0)
521                 lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
522
523         return rc;
524 }
525
526 /**
527  * Release the the compound ibits locks.
528  *
529  * \param[in] llh       pointer to the lfsck_lock_handle to be released
530  */
531 void lfsck_unlock(struct lfsck_lock_handle *llh)
532 {
533         lfsck_ibits_unlock(&llh->llh_reg_lh, llh->llh_reg_mode);
534         lfsck_ibits_unlock(&llh->llh_pdo_lh, llh->llh_pdo_mode);
535 }
536
537 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
538                               struct lfsck_instance *lfsck,
539                               const struct lu_fid *fid)
540 {
541         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
542         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
543         int                      rc;
544
545         fld_range_set_mdt(range);
546         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
547         if (rc == 0)
548                 rc = range->lsr_index;
549
550         return rc;
551 }
552
553 const char dot[] = ".";
554 const char dotdot[] = "..";
555 static const char dotlustre[] = ".lustre";
556 static const char lostfound[] = "lost+found";
557
558 /**
559  * Remove the name entry from the .lustre/lost+found directory.
560  *
561  * No need to care about the object referenced by the name entry,
562  * either the name entry is invalid or redundant, or the referenced
563  * object has been processed or will be handled by others.
564  *
565  * \param[in] env       pointer to the thread context
566  * \param[in] lfsck     pointer to the lfsck instance
567  * \param[in] name      the name for the name entry to be removed
568  *
569  * \retval              0 for success
570  * \retval              negative error number on failure
571  */
572 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
573                                        struct lfsck_instance *lfsck,
574                                        const char *name)
575 {
576         struct dt_object        *parent = lfsck->li_lpf_root_obj;
577         struct dt_device        *dev    = lfsck_obj2dev(parent);
578         struct thandle          *th;
579         struct lfsck_lock_handle *llh   = &lfsck_env_info(env)->lti_llh;
580         int                      rc;
581         ENTRY;
582
583         rc = lfsck_lock(env, lfsck, parent, name, llh,
584                         MDS_INODELOCK_UPDATE, LCK_PW);
585         if (rc != 0)
586                 RETURN(rc);
587
588         th = dt_trans_create(env, dev);
589         if (IS_ERR(th))
590                 GOTO(unlock, rc = PTR_ERR(th));
591
592         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
593         if (rc != 0)
594                 GOTO(stop, rc);
595
596         rc = dt_declare_ref_del(env, parent, th);
597         if (rc != 0)
598                 GOTO(stop, rc);
599
600         rc = dt_trans_start_local(env, dev, th);
601         if (rc != 0)
602                 GOTO(stop, rc);
603
604         rc = dt_delete(env, parent, (const struct dt_key *)name, th);
605         if (rc != 0)
606                 GOTO(stop, rc);
607
608         dt_write_lock(env, parent, 0);
609         rc = dt_ref_del(env, parent, th);
610         dt_write_unlock(env, parent);
611
612         GOTO(stop, rc);
613
614 stop:
615         dt_trans_stop(env, dev, th);
616
617 unlock:
618         lfsck_unlock(llh);
619
620         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
621                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
622
623         return rc;
624 }
625
626 static int lfsck_create_lpf_local(const struct lu_env *env,
627                                   struct lfsck_instance *lfsck,
628                                   struct dt_object *child,
629                                   struct lu_attr *la,
630                                   struct dt_object_format *dof,
631                                   const char *name)
632 {
633         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
634         struct dt_object        *parent = lfsck->li_lpf_root_obj;
635         struct dt_device        *dev    = lfsck_obj2dev(child);
636         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
637         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
638         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
639         struct thandle          *th     = NULL;
640         struct linkea_data       ldata  = { NULL };
641         struct lu_buf            linkea_buf;
642         const struct lu_name    *cname;
643         loff_t                   pos    = 0;
644         int                      len    = sizeof(struct lfsck_bookmark);
645         int                      rc;
646         ENTRY;
647
648         rc = linkea_data_new(&ldata,
649                              &lfsck_env_info(env)->lti_linkea_buf2);
650         if (rc != 0)
651                 RETURN(rc);
652
653         cname = lfsck_name_get_const(env, name, strlen(name));
654         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
655         if (rc != 0)
656                 RETURN(rc);
657
658         th = dt_trans_create(env, dev);
659         if (IS_ERR(th))
660                 RETURN(PTR_ERR(th));
661
662         /* 1a. create child */
663         rc = dt_declare_create(env, child, la, NULL, dof, th);
664         if (rc != 0)
665                 GOTO(stop, rc);
666
667         if (!dt_try_as_dir(env, child))
668                 GOTO(stop, rc = -ENOTDIR);
669
670         /* 2a. increase child nlink */
671         rc = dt_declare_ref_add(env, child, th);
672         if (rc != 0)
673                 GOTO(stop, rc);
674
675         /* 3a. insert dot into child dir */
676         rec->rec_type = S_IFDIR;
677         rec->rec_fid = cfid;
678         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
679                                (const struct dt_key *)dot, th);
680         if (rc != 0)
681                 GOTO(stop, rc);
682
683         /* 4a. insert dotdot into child dir */
684         rec->rec_fid = &LU_LPF_FID;
685         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
686                                (const struct dt_key *)dotdot, th);
687         if (rc != 0)
688                 GOTO(stop, rc);
689
690         /* 5a. insert linkEA for child */
691         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
692                        ldata.ld_leh->leh_len);
693         rc = dt_declare_xattr_set(env, child, &linkea_buf,
694                                   XATTR_NAME_LINK, 0, th);
695         if (rc != 0)
696                 GOTO(stop, rc);
697
698         /* 6a. insert name into parent dir */
699         rec->rec_type = S_IFDIR;
700         rec->rec_fid = cfid;
701         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
702                                (const struct dt_key *)name, th);
703         if (rc != 0)
704                 GOTO(stop, rc);
705
706         /* 7a. increase parent nlink */
707         rc = dt_declare_ref_add(env, parent, th);
708         if (rc != 0)
709                 GOTO(stop, rc);
710
711         /* 8a. update bookmark */
712         rc = dt_declare_record_write(env, bk_obj,
713                                      lfsck_buf_get(env, bk, len), 0, th);
714         if (rc != 0)
715                 GOTO(stop, rc);
716
717         rc = dt_trans_start_local(env, dev, th);
718         if (rc != 0)
719                 GOTO(stop, rc);
720
721         dt_write_lock(env, child, 0);
722         /* 1b. create child */
723         rc = dt_create(env, child, la, NULL, dof, th);
724         if (rc != 0)
725                 GOTO(unlock, rc);
726
727         /* 2b. increase child nlink */
728         rc = dt_ref_add(env, child, th);
729         if (rc != 0)
730                 GOTO(unlock, rc);
731
732         /* 3b. insert dot into child dir */
733         rec->rec_fid = cfid;
734         rc = dt_insert(env, child, (const struct dt_rec *)rec,
735                        (const struct dt_key *)dot, th, 1);
736         if (rc != 0)
737                 GOTO(unlock, rc);
738
739         /* 4b. insert dotdot into child dir */
740         rec->rec_fid = &LU_LPF_FID;
741         rc = dt_insert(env, child, (const struct dt_rec *)rec,
742                        (const struct dt_key *)dotdot, th, 1);
743         if (rc != 0)
744                 GOTO(unlock, rc);
745
746         /* 5b. insert linkEA for child. */
747         rc = dt_xattr_set(env, child, &linkea_buf,
748                           XATTR_NAME_LINK, 0, th);
749         dt_write_unlock(env, child);
750         if (rc != 0)
751                 GOTO(stop, rc);
752
753         /* 6b. insert name into parent dir */
754         rec->rec_fid = cfid;
755         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
756                        (const struct dt_key *)name, th, 1);
757         if (rc != 0)
758                 GOTO(stop, rc);
759
760         dt_write_lock(env, parent, 0);
761         /* 7b. increase parent nlink */
762         rc = dt_ref_add(env, parent, th);
763         dt_write_unlock(env, parent);
764         if (rc != 0)
765                 GOTO(stop, rc);
766
767         bk->lb_lpf_fid = *cfid;
768         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
769
770         /* 8b. update bookmark */
771         rc = dt_record_write(env, bk_obj,
772                              lfsck_buf_get(env, bk, len), &pos, th);
773
774         GOTO(stop, rc);
775
776 unlock:
777         dt_write_unlock(env, child);
778
779 stop:
780         dt_trans_stop(env, dev, th);
781
782         return rc;
783 }
784
785 static int lfsck_create_lpf_remote(const struct lu_env *env,
786                                    struct lfsck_instance *lfsck,
787                                    struct dt_object *child,
788                                    struct lu_attr *la,
789                                    struct dt_object_format *dof,
790                                    const char *name)
791 {
792         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
793         struct dt_object        *parent = lfsck->li_lpf_root_obj;
794         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
795         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
796         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
797         struct thandle          *th     = NULL;
798         struct linkea_data       ldata  = { NULL };
799         struct lu_buf            linkea_buf;
800         const struct lu_name    *cname;
801         struct dt_device        *dev;
802         loff_t                   pos    = 0;
803         int                      len    = sizeof(struct lfsck_bookmark);
804         int                      rc;
805         ENTRY;
806
807         rc = linkea_data_new(&ldata,
808                              &lfsck_env_info(env)->lti_linkea_buf2);
809         if (rc != 0)
810                 RETURN(rc);
811
812         cname = lfsck_name_get_const(env, name, strlen(name));
813         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
814         if (rc != 0)
815                 RETURN(rc);
816
817         /* Create .lustre/lost+found/MDTxxxx. */
818
819         /* XXX: Currently, cross-MDT create operation needs to create the child
820          *      object firstly, then insert name into the parent directory. For
821          *      this case, the child object resides on current MDT (local), but
822          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
823          *      easy to contain all the sub-modifications orderly within single
824          *      transaction.
825          *
826          *      To avoid more inconsistency, we split the create operation into
827          *      two transactions:
828          *
829          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
830          *         locally.
831          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
832          *         remotely.
833          *
834          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
835          *      repair such inconsistency when LFSCK run next time. */
836
837         /* Transaction I: locally */
838
839         dev = lfsck_obj2dev(child);
840         th = dt_trans_create(env, dev);
841         if (IS_ERR(th))
842                 RETURN(PTR_ERR(th));
843
844         /* 1a. create child */
845         rc = dt_declare_create(env, child, la, NULL, dof, th);
846         if (rc != 0)
847                 GOTO(stop, rc);
848
849         if (!dt_try_as_dir(env, child))
850                 GOTO(stop, rc = -ENOTDIR);
851
852         /* 2a. increase child nlink */
853         rc = dt_declare_ref_add(env, child, th);
854         if (rc != 0)
855                 GOTO(stop, rc);
856
857         /* 3a. insert dot into child dir */
858         rec->rec_type = S_IFDIR;
859         rec->rec_fid = cfid;
860         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
861                                (const struct dt_key *)dot, th);
862         if (rc != 0)
863                 GOTO(stop, rc);
864
865         /* 4a. insert dotdot into child dir */
866         rec->rec_fid = &LU_LPF_FID;
867         rc = dt_declare_insert(env, child, (const struct dt_rec *)rec,
868                                (const struct dt_key *)dotdot, th);
869         if (rc != 0)
870                 GOTO(stop, rc);
871
872         /* 5a. insert linkEA for child */
873         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
874                        ldata.ld_leh->leh_len);
875         rc = dt_declare_xattr_set(env, child, &linkea_buf,
876                                   XATTR_NAME_LINK, 0, th);
877         if (rc != 0)
878                 GOTO(stop, rc);
879
880         /* 6a. update bookmark */
881         rc = dt_declare_record_write(env, bk_obj,
882                                      lfsck_buf_get(env, bk, len), 0, th);
883         if (rc != 0)
884                 GOTO(stop, rc);
885
886         rc = dt_trans_start_local(env, dev, th);
887         if (rc != 0)
888                 GOTO(stop, rc);
889
890         dt_write_lock(env, child, 0);
891         /* 1b. create child */
892         rc = dt_create(env, child, la, NULL, dof, th);
893         if (rc != 0)
894                 GOTO(unlock, rc);
895
896         /* 2b. increase child nlink */
897         rc = dt_ref_add(env, child, th);
898         if (rc != 0)
899                 GOTO(unlock, rc);
900
901         /* 3b. insert dot into child dir */
902         rec->rec_type = S_IFDIR;
903         rec->rec_fid = cfid;
904         rc = dt_insert(env, child, (const struct dt_rec *)rec,
905                        (const struct dt_key *)dot, th, 1);
906         if (rc != 0)
907                 GOTO(unlock, rc);
908
909         /* 4b. insert dotdot into child dir */
910         rec->rec_fid = &LU_LPF_FID;
911         rc = dt_insert(env, child, (const struct dt_rec *)rec,
912                        (const struct dt_key *)dotdot, th, 1);
913         if (rc != 0)
914                 GOTO(unlock, rc);
915
916         /* 5b. insert linkEA for child */
917         rc = dt_xattr_set(env, child, &linkea_buf,
918                           XATTR_NAME_LINK, 0, th);
919         if (rc != 0)
920                 GOTO(unlock, rc);
921
922         bk->lb_lpf_fid = *cfid;
923         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
924
925         /* 6b. update bookmark */
926         rc = dt_record_write(env, bk_obj,
927                              lfsck_buf_get(env, bk, len), &pos, th);
928
929         dt_write_unlock(env, child);
930         dt_trans_stop(env, dev, th);
931         if (rc != 0)
932                 RETURN(rc);
933
934         /* Transaction II: remotely */
935
936         dev = lfsck_obj2dev(parent);
937         th = dt_trans_create(env, dev);
938         if (IS_ERR(th))
939                 RETURN(PTR_ERR(th));
940
941         th->th_sync = 1;
942         /* 5a. insert name into parent dir */
943         rec->rec_fid = cfid;
944         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
945                                (const struct dt_key *)name, th);
946         if (rc != 0)
947                 GOTO(stop, rc);
948
949         /* 6a. increase parent nlink */
950         rc = dt_declare_ref_add(env, parent, th);
951         if (rc != 0)
952                 GOTO(stop, rc);
953
954         rc = dt_trans_start_local(env, dev, th);
955         if (rc != 0)
956                 GOTO(stop, rc);
957
958         /* 5b. insert name into parent dir */
959         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
960                        (const struct dt_key *)name, th, 1);
961         if (rc != 0)
962                 GOTO(stop, rc);
963
964         dt_write_lock(env, parent, 0);
965         /* 6b. increase parent nlink */
966         rc = dt_ref_add(env, parent, th);
967         dt_write_unlock(env, parent);
968
969         GOTO(stop, rc);
970
971 unlock:
972         dt_write_unlock(env, child);
973 stop:
974         dt_trans_stop(env, dev, th);
975
976         if (rc != 0 && dev == lfsck_obj2dev(parent))
977                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
978                        "for orphans, but failed to insert the name %s "
979                        "to the .lustre/lost+found/. Such inconsistency "
980                        "will be repaired when LFSCK run next time: rc = %d\n",
981                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
982
983         return rc;
984 }
985
986 /**
987  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
988  *
989  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
990  * orphans and other uncertain inconsistent objects found during the
991  * LFSCK. Such directory will be created by the LFSCK engine on the
992  * local MDT before the LFSCK scanning.
993  *
994  * \param[in] env       pointer to the thread context
995  * \param[in] lfsck     pointer to the lfsck instance
996  *
997  * \retval              0 for success
998  * \retval              negative error number on failure
999  */
1000 static int lfsck_create_lpf(const struct lu_env *env,
1001                             struct lfsck_instance *lfsck)
1002 {
1003         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
1004         struct lfsck_thread_info *info  = lfsck_env_info(env);
1005         struct lu_fid            *cfid  = &info->lti_fid2;
1006         struct lu_attr           *la    = &info->lti_la;
1007         struct dt_object_format  *dof   = &info->lti_dof;
1008         struct dt_object         *parent = lfsck->li_lpf_root_obj;
1009         struct dt_object         *child = NULL;
1010         struct lfsck_lock_handle *llh   = &info->lti_llh;
1011         char                      name[8];
1012         int                       node  = lfsck_dev_idx(lfsck);
1013         int                       rc    = 0;
1014         ENTRY;
1015
1016         LASSERT(lfsck->li_master);
1017         LASSERT(parent != NULL);
1018         LASSERT(lfsck->li_lpf_obj == NULL);
1019
1020         snprintf(name, 8, "MDT%04x", node);
1021         rc = lfsck_lock(env, lfsck, parent, name, llh,
1022                         MDS_INODELOCK_UPDATE, LCK_PW);
1023         if (rc != 0)
1024                 RETURN(rc);
1025
1026         if (fid_is_zero(&bk->lb_lpf_fid)) {
1027                 /* There is corner case that: in former LFSCK scanning we have
1028                  * created the .lustre/lost+found/MDTxxxx but failed to update
1029                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
1030                  * it from MDT0 firstly. */
1031                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1032                                (const struct dt_key *)name);
1033                 if (rc != 0 && rc != -ENOENT)
1034                         GOTO(unlock, rc);
1035
1036                 if (rc == 0) {
1037                         bk->lb_lpf_fid = *cfid;
1038                         rc = lfsck_bookmark_store(env, lfsck);
1039                 } else {
1040                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
1041                 }
1042                 if (rc != 0)
1043                         GOTO(unlock, rc);
1044         } else {
1045                 *cfid = bk->lb_lpf_fid;
1046         }
1047
1048         child = lfsck_object_find_bottom(env, lfsck, cfid);
1049         if (IS_ERR(child))
1050                 GOTO(unlock, rc = PTR_ERR(child));
1051
1052         if (dt_object_exists(child) != 0) {
1053                 if (unlikely(!dt_try_as_dir(env, child)))
1054                         rc = -ENOTDIR;
1055                 else
1056                         lfsck->li_lpf_obj = child;
1057
1058                 GOTO(unlock, rc);
1059         }
1060
1061         memset(la, 0, sizeof(*la));
1062         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
1063         la->la_mode = S_IFDIR | S_IRWXU;
1064         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1065                        LA_UID | LA_GID;
1066         memset(dof, 0, sizeof(*dof));
1067         dof->dof_type = dt_mode_to_dft(S_IFDIR);
1068
1069         if (node == 0)
1070                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
1071         else
1072                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
1073         if (rc == 0)
1074                 lfsck->li_lpf_obj = child;
1075
1076         GOTO(unlock, rc);
1077
1078 unlock:
1079         lfsck_unlock(llh);
1080         if (rc != 0 && child != NULL && !IS_ERR(child))
1081                 lfsck_object_put(env, child);
1082
1083         return rc;
1084 }
1085
1086 /**
1087  * Scan .lustre/lost+found for bad name entries and remove them.
1088  *
1089  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
1090  * index in the system. Any other formatted name is invalid and should be
1091  * removed.
1092  *
1093  * \param[in] env       pointer to the thread context
1094  * \param[in] lfsck     pointer to the lfsck instance
1095  *
1096  * \retval              0 for success
1097  * \retval              negative error number on failure
1098  */
1099 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
1100                                       struct lfsck_instance *lfsck)
1101 {
1102         struct dt_object        *parent = lfsck->li_lpf_root_obj;
1103         struct lu_dirent        *ent    =
1104                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
1105         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
1106         struct dt_it            *it;
1107         int                      rc;
1108         ENTRY;
1109
1110         it = iops->init(env, parent, LUDA_64BITHASH);
1111         if (IS_ERR(it))
1112                 RETURN(PTR_ERR(it));
1113
1114         rc = iops->load(env, it, 0);
1115         if (rc == 0)
1116                 rc = iops->next(env, it);
1117         else if (rc > 0)
1118                 rc = 0;
1119
1120         while (rc == 0) {
1121                 int off = 3;
1122
1123                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
1124                 if (rc != 0)
1125                         break;
1126
1127                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1128                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1129                         goto next;
1130
1131                 /* name length must be strlen("MDTxxxx") */
1132                 if (ent->lde_namelen != 7)
1133                         goto remove;
1134
1135                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1136                         goto remove;
1137
1138                 while (off < 7 && isxdigit(ent->lde_name[off]))
1139                         off++;
1140
1141                 if (off != 7) {
1142
1143 remove:
1144                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1145                                                          ent->lde_name);
1146                         if (rc != 0)
1147                                 break;
1148                 }
1149
1150 next:
1151                 rc = iops->next(env, it);
1152         }
1153
1154         iops->put(env, it);
1155         iops->fini(env, it);
1156
1157         RETURN(rc > 0 ? 0 : rc);
1158 }
1159
1160 static int lfsck_update_lpf_entry(const struct lu_env *env,
1161                                   struct lfsck_instance *lfsck,
1162                                   struct dt_object *parent,
1163                                   struct dt_object *child,
1164                                   const char *name,
1165                                   enum lfsck_verify_lpf_types type)
1166 {
1167         int rc;
1168
1169         if (type == LVLT_BY_BOOKMARK) {
1170                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1171                                              lfsck_dto2fid(child), S_IFDIR);
1172         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1173                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1174                 rc = lfsck_bookmark_store(env, lfsck);
1175
1176                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1177                        " in the bookmark file: rc = %d\n",
1178                        lfsck_lfsck2name(lfsck),
1179                        PFID(lfsck_dto2fid(child)), rc);
1180         }
1181
1182         return rc;
1183 }
1184
1185 /**
1186  * Check whether the @child back references the @parent.
1187  *
1188  * Two cases:
1189  * 1) The child's FID is stored in the bookmark file. If the child back
1190  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1191  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1192  *    the child back references another parent2, then:
1193  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1194  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1195  *      references the child. So keep them there. As the LFSCK processing,
1196  *      the parent3 may be found, then when the LFSCK run next time, the
1197  *      inconsistency can be repaired.
1198  *
1199  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1200  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1201  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1202  *    back references another parent2, then:
1203  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1204  *      from .lustre/lost+found/;
1205  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1206  *      sub-directory name entry and update the child;
1207  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1208  *      or not, then keep them there.
1209  *
1210  * \param[in] env       pointer to the thread context
1211  * \param[in] lfsck     pointer to the lfsck instance
1212  * \param[in] child     pointer to the lost+found sub-directory object
1213  * \param[in] name      the name for lost+found sub-directory object
1214  * \param[out] fid      pointer to the buffer to hold the FID of the object
1215  *                      (called it as parent2) that is referenced via the
1216  *                      child's dotdot entry; it also can be the FID that
1217  *                      is referenced by the name entry under the parent2.
1218  * \param[in] type      to indicate where the child's FID is stored in
1219  *
1220  * \retval              positive number for uncertain inconsistency
1221  * \retval              0 for success
1222  * \retval              negative error number on failure
1223  */
1224 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1225                                   struct lfsck_instance *lfsck,
1226                                   struct dt_object *child, const char *name,
1227                                   struct lu_fid *fid,
1228                                   enum lfsck_verify_lpf_types type)
1229 {
1230         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1231         struct lfsck_thread_info *info    = lfsck_env_info(env);
1232         char                     *name2   = info->lti_key;
1233         struct lu_fid            *fid2    = &info->lti_fid3;
1234         struct dt_object         *parent2 = NULL;
1235         struct lustre_handle      lh      = { 0 };
1236         int                       rc;
1237         ENTRY;
1238
1239         fid_zero(fid);
1240         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1241                        (const struct dt_key *)dotdot);
1242         if (rc != 0)
1243                 GOTO(linkea, rc);
1244
1245         if (!fid_is_sane(fid))
1246                 GOTO(linkea, rc = -EINVAL);
1247
1248         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1249                 const struct lu_name *cname;
1250
1251                 if (lfsck->li_lpf_obj == NULL) {
1252                         lu_object_get(&child->do_lu);
1253                         lfsck->li_lpf_obj = child;
1254                 }
1255
1256                 cname = lfsck_name_get_const(env, name, strlen(name));
1257                 rc = lfsck_verify_linkea(env, child, cname, &LU_LPF_FID);
1258                 if (rc == 0)
1259                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1260                                                     name, type);
1261
1262                 GOTO(out_done, rc);
1263         }
1264
1265         parent2 = lfsck_object_find_bottom(env, lfsck, fid);
1266         if (IS_ERR(parent2))
1267                 GOTO(linkea, parent2);
1268
1269         if (!dt_object_exists(parent2)) {
1270                 lfsck_object_put(env, parent2);
1271
1272                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1273         }
1274
1275         if (!dt_try_as_dir(env, parent2)) {
1276                 lfsck_object_put(env, parent2);
1277
1278                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1279         }
1280
1281 linkea:
1282         /* To prevent rename/unlink race */
1283         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1284                               MDS_INODELOCK_UPDATE, LCK_PR);
1285         if (rc != 0)
1286                 GOTO(out_put, rc);
1287
1288         dt_read_lock(env, child, 0);
1289         rc = lfsck_links_get_first(env, child, name2, fid2);
1290         if (rc != 0) {
1291                 dt_read_unlock(env, child);
1292                 lfsck_ibits_unlock(&lh, LCK_PR);
1293
1294                 GOTO(out_put, rc = 1);
1295         }
1296
1297         /* It is almost impossible that the bookmark file (or the name entry)
1298          * and the linkEA hit the same data corruption. Trust the linkEA. */
1299         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1300                 dt_read_unlock(env, child);
1301                 lfsck_ibits_unlock(&lh, LCK_PR);
1302
1303                 *fid = *fid2;
1304                 if (lfsck->li_lpf_obj == NULL) {
1305                         lu_object_get(&child->do_lu);
1306                         lfsck->li_lpf_obj = child;
1307                 }
1308
1309                 /* Update the child's dotdot entry */
1310                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1311                                              &LU_LPF_FID, S_IFDIR);
1312                 if (rc == 0)
1313                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1314                                                     name, type);
1315
1316                 GOTO(out_put, rc);
1317         }
1318
1319         if (parent2 == NULL || IS_ERR(parent2)) {
1320                 dt_read_unlock(env, child);
1321                 lfsck_ibits_unlock(&lh, LCK_PR);
1322
1323                 GOTO(out_done, rc = 1);
1324         }
1325
1326         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1327                        (const struct dt_key *)name2);
1328         dt_read_unlock(env, child);
1329         lfsck_ibits_unlock(&lh, LCK_PR);
1330         if (rc != 0 && rc != -ENOENT)
1331                 GOTO(out_put, rc);
1332
1333         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1334                 if (type == LVLT_BY_BOOKMARK)
1335                         GOTO(out_put, rc = 1);
1336
1337                 /* Trust the name entry, update the child's dotdot entry. */
1338                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1339                                              &LU_LPF_FID, S_IFDIR);
1340
1341                 GOTO(out_put, rc);
1342         }
1343
1344         if (type == LVLT_BY_BOOKMARK) {
1345                 /* Invalid FID record in the bookmark file, reset it. */
1346                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1347                 rc = lfsck_bookmark_store(env, lfsck);
1348
1349                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1350                        " in the bookmark file: rc = %d\n",
1351                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1352         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1353                 /* The name entry is wrong, remove it. */
1354                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1355         }
1356
1357         GOTO(out_put, rc);
1358
1359 out_put:
1360         if (parent2 != NULL && !IS_ERR(parent2))
1361                 lfsck_object_put(env, parent2);
1362
1363 out_done:
1364         return rc;
1365 }
1366
1367 /**
1368  * Verify the /ROOT/.lustre/lost+found/ directory.
1369  *
1370  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1371  * the LFSCK does not exactly know how to handle, such as orphans. So before
1372  * the LFSCK scanning the system, the consistency of such directory needs to
1373  * be verified firstly to allow the users to use it during the LFSCK.
1374  *
1375  * \param[in] env       pointer to the thread context
1376  * \param[in] lfsck     pointer to the lfsck instance
1377  *
1378  * \retval              positive number for uncertain inconsistency
1379  * \retval              0 for success
1380  * \retval              negative error number on failure
1381  */
1382 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1383 {
1384         struct lfsck_thread_info *info   = lfsck_env_info(env);
1385         struct lu_fid            *pfid   = &info->lti_fid;
1386         struct lu_fid            *cfid   = &info->lti_fid2;
1387         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1388         struct dt_object         *parent;
1389         /* child1's FID is in the bookmark file. */
1390         struct dt_object         *child1 = NULL;
1391         /* child2's FID is in the name entry MDTxxxx. */
1392         struct dt_object         *child2 = NULL;
1393         const struct lu_name     *cname;
1394         char                      name[8];
1395         int                       node   = lfsck_dev_idx(lfsck);
1396         int                       rc     = 0;
1397         ENTRY;
1398
1399         LASSERT(lfsck->li_master);
1400
1401         if (lfsck->li_lpf_root_obj != NULL)
1402                 RETURN(0);
1403
1404         if (node == 0) {
1405                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
1406                                                   &LU_LPF_FID);
1407         } else {
1408                 struct lfsck_tgt_desc *ltd;
1409
1410                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1411                 if (unlikely(ltd == NULL))
1412                         RETURN(-ENXIO);
1413
1414                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1415                                                   &LU_LPF_FID);
1416                 lfsck_tgt_put(ltd);
1417         }
1418
1419         if (IS_ERR(parent))
1420                 RETURN(PTR_ERR(parent));
1421
1422         LASSERT(dt_object_exists(parent));
1423
1424         if (unlikely(!dt_try_as_dir(env, parent))) {
1425                 lfsck_object_put(env, parent);
1426
1427                 GOTO(put, rc = -ENOTDIR);
1428         }
1429
1430         lfsck->li_lpf_root_obj = parent;
1431         if (node == 0) {
1432                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1433                 if (rc != 0)
1434                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1435                                "for bad sub-directories: rc = %d\n",
1436                                lfsck_lfsck2name(lfsck), rc);
1437         }
1438
1439         /* child2 */
1440         snprintf(name, 8, "MDT%04x", node);
1441         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1442                        (const struct dt_key *)name);
1443         if (rc == -ENOENT) {
1444                 rc = 0;
1445                 goto find_child1;
1446         }
1447
1448         if (rc != 0)
1449                 GOTO(put, rc);
1450
1451         /* Invalid FID in the name entry, remove the name entry. */
1452         if (!fid_is_norm(cfid)) {
1453                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1454                 if (rc != 0)
1455                         GOTO(put, rc);
1456
1457                 goto find_child1;
1458         }
1459
1460         child2 = lfsck_object_find_bottom(env, lfsck, cfid);
1461         if (IS_ERR(child2))
1462                 GOTO(put, rc = PTR_ERR(child2));
1463
1464         if (unlikely(!dt_object_exists(child2) ||
1465                      dt_object_remote(child2)) ||
1466                      !S_ISDIR(lfsck_object_type(child2))) {
1467                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1468                 if (rc != 0)
1469                         GOTO(put, rc);
1470
1471                 goto find_child1;
1472         }
1473
1474         if (unlikely(!dt_try_as_dir(env, child2))) {
1475                 lfsck_object_put(env, child2);
1476                 child2 = NULL;
1477                 rc = -ENOTDIR;
1478         }
1479
1480 find_child1:
1481         if (fid_is_zero(&bk->lb_lpf_fid))
1482                 goto check_child2;
1483
1484         if (likely(lu_fid_eq(cfid, &bk->lb_lpf_fid))) {
1485                 if (lfsck->li_lpf_obj == NULL) {
1486                         lu_object_get(&child2->do_lu);
1487                         lfsck->li_lpf_obj = child2;
1488                 }
1489
1490                 cname = lfsck_name_get_const(env, name, strlen(name));
1491                 rc = lfsck_verify_linkea(env, child2, cname, &LU_LPF_FID);
1492
1493                 GOTO(put, rc);
1494         }
1495
1496         if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1497                 struct lu_fid tfid = bk->lb_lpf_fid;
1498
1499                 /* Invalid FID record in the bookmark file, reset it. */
1500                 fid_zero(&bk->lb_lpf_fid);
1501                 rc = lfsck_bookmark_store(env, lfsck);
1502
1503                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1504                        " in the bookmark file: rc = %d\n",
1505                        lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1506
1507                 if (rc != 0)
1508                         GOTO(put, rc);
1509
1510                 goto check_child2;
1511         }
1512
1513         child1 = lfsck_object_find_bottom(env, lfsck, &bk->lb_lpf_fid);
1514         if (IS_ERR(child1)) {
1515                 child1 = NULL;
1516                 goto check_child2;
1517         }
1518
1519         if (unlikely(!dt_object_exists(child1) ||
1520                      dt_object_remote(child1)) ||
1521                      !S_ISDIR(lfsck_object_type(child1))) {
1522                 /* Invalid FID record in the bookmark file, reset it. */
1523                 fid_zero(&bk->lb_lpf_fid);
1524                 rc = lfsck_bookmark_store(env, lfsck);
1525
1526                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1527                        " in the bookmark file: rc = %d\n",
1528                        lfsck_lfsck2name(lfsck),
1529                        PFID(lfsck_dto2fid(child1)), rc);
1530
1531                 if (rc != 0)
1532                         GOTO(put, rc);
1533
1534                 lfsck_object_put(env, child1);
1535                 child1 = NULL;
1536                 goto check_child2;
1537         }
1538
1539         if (unlikely(!dt_try_as_dir(env, child1))) {
1540                 lfsck_object_put(env, child1);
1541                 child1 = NULL;
1542                 rc = -ENOTDIR;
1543                 goto check_child2;
1544         }
1545
1546         rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name, pfid,
1547                                     LVLT_BY_BOOKMARK);
1548         if (lu_fid_eq(pfid, &LU_LPF_FID))
1549                 GOTO(put, rc);
1550
1551 check_child2:
1552         if (child2 != NULL)
1553                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1554                                             pfid, LVLT_BY_NAMEENTRY);
1555
1556         GOTO(put, rc);
1557
1558 put:
1559         if (lfsck->li_lpf_obj != NULL) {
1560                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1561                         lfsck_object_put(env, lfsck->li_lpf_obj);
1562                         lfsck->li_lpf_obj = NULL;
1563                         rc = -ENOTDIR;
1564                 }
1565         } else if (rc == 0) {
1566                 rc = lfsck_create_lpf(env, lfsck);
1567         }
1568
1569         if (child2 != NULL && !IS_ERR(child2))
1570                 lfsck_object_put(env, child2);
1571         if (child1 != NULL && !IS_ERR(child1))
1572                 lfsck_object_put(env, child1);
1573
1574         return rc;
1575 }
1576
1577 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1578 {
1579         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1580         struct seq_server_site  *ss     = lfsck_dev_site(lfsck);
1581         char                    *prefix;
1582         int                      rc     = 0;
1583         ENTRY;
1584
1585         if (unlikely(ss == NULL))
1586                 RETURN(-ENXIO);
1587
1588         OBD_ALLOC_PTR(lfsck->li_seq);
1589         if (lfsck->li_seq == NULL)
1590                 RETURN(-ENOMEM);
1591
1592         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1593         if (prefix == NULL)
1594                 GOTO(out, rc = -ENOMEM);
1595
1596         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1597         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1598                              ss->ss_server_seq);
1599         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1600         if (rc != 0)
1601                 GOTO(out, rc);
1602
1603         if (fid_is_sane(&bk->lb_last_fid))
1604                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1605
1606         RETURN(0);
1607
1608 out:
1609         OBD_FREE_PTR(lfsck->li_seq);
1610         lfsck->li_seq = NULL;
1611
1612         return rc;
1613 }
1614
1615 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1616 {
1617         if (lfsck->li_seq != NULL) {
1618                 seq_client_fini(lfsck->li_seq);
1619                 OBD_FREE_PTR(lfsck->li_seq);
1620                 lfsck->li_seq = NULL;
1621         }
1622 }
1623
1624 void lfsck_instance_cleanup(const struct lu_env *env,
1625                             struct lfsck_instance *lfsck)
1626 {
1627         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1628         struct lfsck_component  *com;
1629         struct lfsck_component  *next;
1630         struct lfsck_lmv_unit   *llu;
1631         struct lfsck_lmv_unit   *llu_next;
1632         struct lfsck_lmv        *llmv;
1633         ENTRY;
1634
1635         LASSERT(list_empty(&lfsck->li_link));
1636         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1637
1638         if (lfsck->li_obj_oit != NULL) {
1639                 lfsck_object_put(env, lfsck->li_obj_oit);
1640                 lfsck->li_obj_oit = NULL;
1641         }
1642
1643         LASSERT(lfsck->li_obj_dir == NULL);
1644         LASSERT(lfsck->li_lmv == NULL);
1645
1646         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1647                 llmv = &llu->llu_lmv;
1648
1649                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1650                          "still in using: %u\n",
1651                          atomic_read(&llmv->ll_ref));
1652
1653                 lfsck_lmv_put(env, llmv);
1654         }
1655
1656         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1657                 lfsck_component_cleanup(env, com);
1658         }
1659
1660         LASSERT(list_empty(&lfsck->li_list_dir));
1661
1662         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1663                                  lc_link) {
1664                 lfsck_component_cleanup(env, com);
1665         }
1666
1667         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1668                 lfsck_component_cleanup(env, com);
1669         }
1670
1671         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1672         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1673
1674         if (lfsck->li_lfsck_dir != NULL) {
1675                 lfsck_object_put(env, lfsck->li_lfsck_dir);
1676                 lfsck->li_lfsck_dir = NULL;
1677         }
1678
1679         if (lfsck->li_bookmark_obj != NULL) {
1680                 lfsck_object_put(env, lfsck->li_bookmark_obj);
1681                 lfsck->li_bookmark_obj = NULL;
1682         }
1683
1684         if (lfsck->li_lpf_obj != NULL) {
1685                 lfsck_object_put(env, lfsck->li_lpf_obj);
1686                 lfsck->li_lpf_obj = NULL;
1687         }
1688
1689         if (lfsck->li_lpf_root_obj != NULL) {
1690                 lfsck_object_put(env, lfsck->li_lpf_root_obj);
1691                 lfsck->li_lpf_root_obj = NULL;
1692         }
1693
1694         if (lfsck->li_los != NULL) {
1695                 local_oid_storage_fini(env, lfsck->li_los);
1696                 lfsck->li_los = NULL;
1697         }
1698
1699         lfsck_fid_fini(lfsck);
1700
1701         OBD_FREE_PTR(lfsck);
1702 }
1703
1704 static inline struct lfsck_instance *
1705 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1706 {
1707         struct lfsck_instance *lfsck;
1708
1709         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1710                 if (lfsck->li_bottom == key) {
1711                         if (ref)
1712                                 lfsck_instance_get(lfsck);
1713                         if (unlink)
1714                                 list_del_init(&lfsck->li_link);
1715
1716                         return lfsck;
1717                 }
1718         }
1719
1720         return NULL;
1721 }
1722
1723 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1724                                            bool unlink)
1725 {
1726         struct lfsck_instance *lfsck;
1727
1728         spin_lock(&lfsck_instance_lock);
1729         lfsck = __lfsck_instance_find(key, ref, unlink);
1730         spin_unlock(&lfsck_instance_lock);
1731
1732         return lfsck;
1733 }
1734
1735 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1736 {
1737         struct lfsck_instance *tmp;
1738
1739         spin_lock(&lfsck_instance_lock);
1740         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1741                 if (lfsck->li_bottom == tmp->li_bottom) {
1742                         spin_unlock(&lfsck_instance_lock);
1743                         return -EEXIST;
1744                 }
1745         }
1746
1747         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1748         spin_unlock(&lfsck_instance_lock);
1749         return 0;
1750 }
1751
1752 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1753                     const char *prefix)
1754 {
1755         int flag;
1756         int i;
1757         bool newline = (bits != 0 ? false : true);
1758         int rc;
1759
1760         rc = seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1761         if (rc < 0)
1762                 return rc;
1763
1764         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1765                 if (flag & bits) {
1766                         bits &= ~flag;
1767                         if (names[i] != NULL) {
1768                                 if (bits == 0)
1769                                         newline = true;
1770
1771                                 rc = seq_printf(m, "%s%c", names[i],
1772                                                 newline ? '\n' : ',');
1773                                 if (rc < 0)
1774                                         return rc;
1775                         }
1776                 }
1777         }
1778
1779         if (!newline)
1780                 rc = seq_printf(m, "\n");
1781
1782         return rc;
1783 }
1784
1785 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *name)
1786 {
1787         int rc;
1788
1789         if (time == 0) {
1790                 rc = seq_printf(m, "%s_time: N/A\n", name);
1791                 if (rc == 0)
1792                         rc = seq_printf(m, "time_since_%s: N/A\n", name);
1793
1794                 return rc;
1795         }
1796
1797         rc = seq_printf(m, "%s_time: "LPU64"\n", name, time);
1798         if (rc == 0)
1799                 rc = seq_printf(m, "time_since_%s: "LPU64" seconds\n",
1800                                 name, cfs_time_current_sec() - time);
1801
1802         return rc;
1803 }
1804
1805 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1806                    const char *prefix)
1807 {
1808         if (fid_is_zero(&pos->lp_dir_parent)) {
1809                 if (pos->lp_oit_cookie == 0)
1810                         return seq_printf(m, "%s: N/A, N/A, N/A\n", prefix);
1811
1812                 return seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1813                                   prefix, pos->lp_oit_cookie);
1814         }
1815
1816         return seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1817                           prefix, pos->lp_oit_cookie,
1818                           PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1819 }
1820
1821 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1822                     struct lfsck_position *pos, bool init)
1823 {
1824         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1825
1826         if (unlikely(lfsck->li_di_oit == NULL)) {
1827                 memset(pos, 0, sizeof(*pos));
1828                 return;
1829         }
1830
1831         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1832         if (!lfsck->li_current_oit_processed && !init)
1833                 pos->lp_oit_cookie--;
1834
1835         LASSERT(pos->lp_oit_cookie > 0);
1836
1837         if (lfsck->li_di_dir != NULL) {
1838                 struct dt_object *dto = lfsck->li_obj_dir;
1839
1840                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1841                                                         lfsck->li_di_dir);
1842
1843                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1844                         fid_zero(&pos->lp_dir_parent);
1845                         pos->lp_dir_cookie = 0;
1846                 } else {
1847                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1848                 }
1849         } else {
1850                 fid_zero(&pos->lp_dir_parent);
1851                 pos->lp_dir_cookie = 0;
1852         }
1853 }
1854
1855 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1856 {
1857         bool dirty = false;
1858
1859         if (limit != LFSCK_SPEED_NO_LIMIT) {
1860                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1861                         lfsck->li_sleep_rate = limit /
1862                                                msecs_to_jiffies(MSEC_PER_SEC);
1863                         lfsck->li_sleep_jif = 1;
1864                 } else {
1865                         lfsck->li_sleep_rate = 1;
1866                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1867                                               limit;
1868                 }
1869         } else {
1870                 lfsck->li_sleep_jif = 0;
1871                 lfsck->li_sleep_rate = 0;
1872         }
1873
1874         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1875                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1876                 dirty = true;
1877         }
1878
1879         return dirty;
1880 }
1881
1882 void lfsck_control_speed(struct lfsck_instance *lfsck)
1883 {
1884         struct ptlrpc_thread *thread = &lfsck->li_thread;
1885         struct l_wait_info    lwi;
1886
1887         if (lfsck->li_sleep_jif > 0 &&
1888             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1889                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1890                                        LWI_ON_SIGNAL_NOOP, NULL);
1891
1892                 l_wait_event(thread->t_ctl_waitq,
1893                              !thread_is_running(thread),
1894                              &lwi);
1895                 lfsck->li_new_scanned = 0;
1896         }
1897 }
1898
1899 void lfsck_control_speed_by_self(struct lfsck_component *com)
1900 {
1901         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1902         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1903         struct l_wait_info       lwi;
1904
1905         if (lfsck->li_sleep_jif > 0 &&
1906             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1907                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1908                                        LWI_ON_SIGNAL_NOOP, NULL);
1909
1910                 l_wait_event(thread->t_ctl_waitq,
1911                              !thread_is_running(thread),
1912                              &lwi);
1913                 com->lc_new_scanned = 0;
1914         }
1915 }
1916
1917 static struct lfsck_thread_args *
1918 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1919                        struct lfsck_component *com,
1920                        struct lfsck_start_param *lsp)
1921 {
1922         struct lfsck_thread_args *lta;
1923         int                       rc;
1924
1925         OBD_ALLOC_PTR(lta);
1926         if (lta == NULL)
1927                 return ERR_PTR(-ENOMEM);
1928
1929         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1930         if (rc != 0) {
1931                 OBD_FREE_PTR(lta);
1932                 return ERR_PTR(rc);
1933         }
1934
1935         lta->lta_lfsck = lfsck_instance_get(lfsck);
1936         if (com != NULL)
1937                 lta->lta_com = lfsck_component_get(com);
1938
1939         lta->lta_lsp = lsp;
1940
1941         return lta;
1942 }
1943
1944 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1945 {
1946         if (lta->lta_com != NULL)
1947                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1948         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1949         lu_env_fini(&lta->lta_env);
1950         OBD_FREE_PTR(lta);
1951 }
1952
1953 struct lfsck_assistant_data *
1954 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1955                           const char *name)
1956 {
1957         struct lfsck_assistant_data *lad;
1958
1959         OBD_ALLOC_PTR(lad);
1960         if (lad != NULL) {
1961                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1962                 if (lad->lad_bitmap == NULL) {
1963                         OBD_FREE_PTR(lad);
1964                         return NULL;
1965                 }
1966
1967                 INIT_LIST_HEAD(&lad->lad_req_list);
1968                 spin_lock_init(&lad->lad_lock);
1969                 INIT_LIST_HEAD(&lad->lad_ost_list);
1970                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1971                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1972                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1973                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1974                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1975                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1976                 lad->lad_ops = lao;
1977                 lad->lad_name = name;
1978         }
1979
1980         return lad;
1981 }
1982
1983 struct lfsck_assistant_object *
1984 lfsck_assistant_object_init(const struct lu_env *env, const struct lu_fid *fid,
1985                             const struct lu_attr *attr, __u64 cookie,
1986                             bool is_dir)
1987 {
1988         struct lfsck_assistant_object   *lso;
1989
1990         OBD_ALLOC_PTR(lso);
1991         if (lso == NULL)
1992                 return ERR_PTR(-ENOMEM);
1993
1994         lso->lso_fid = *fid;
1995         if (attr != NULL)
1996                 lso->lso_attr = *attr;
1997
1998         atomic_set(&lso->lso_ref, 1);
1999         lso->lso_oit_cookie = cookie;
2000         if (is_dir)
2001                 lso->lso_is_dir = 1;
2002
2003         return lso;
2004 }
2005
2006 struct dt_object *
2007 lfsck_assistant_object_load(const struct lu_env *env,
2008                             struct lfsck_instance *lfsck,
2009                             struct lfsck_assistant_object *lso)
2010 {
2011         struct dt_object *obj;
2012
2013         obj = lfsck_object_find_bottom(env, lfsck, &lso->lso_fid);
2014         if (IS_ERR(obj))
2015                 return obj;
2016
2017         if (unlikely(!dt_object_exists(obj) || lfsck_is_dead_obj(obj))) {
2018                 lso->lso_dead = 1;
2019                 lfsck_object_put(env, obj);
2020
2021                 return ERR_PTR(-ENOENT);
2022         }
2023
2024         if (lso->lso_is_dir && unlikely(!dt_try_as_dir(env, obj))) {
2025                 lfsck_object_put(env, obj);
2026
2027                 return ERR_PTR(-ENOTDIR);
2028         }
2029
2030         return obj;
2031 }
2032
2033 /**
2034  * Generic LFSCK asynchronous communication interpretor function.
2035  * The LFSCK RPC reply for both the event notification and status
2036  * querying will be handled here.
2037  *
2038  * \param[in] env       pointer to the thread context
2039  * \param[in] req       pointer to the LFSCK request
2040  * \param[in] args      pointer to the lfsck_async_interpret_args
2041  * \param[in] rc        the result for handling the LFSCK request
2042  *
2043  * \retval              0 for success
2044  * \retval              negative error number on failure
2045  */
2046 int lfsck_async_interpret_common(const struct lu_env *env,
2047                                  struct ptlrpc_request *req,
2048                                  void *args, int rc)
2049 {
2050         struct lfsck_async_interpret_args *laia = args;
2051         struct lfsck_component            *com  = laia->laia_com;
2052         struct lfsck_assistant_data       *lad  = com->lc_data;
2053         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
2054         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
2055         struct lfsck_request              *lr   = laia->laia_lr;
2056
2057         LASSERT(com->lc_lfsck->li_master);
2058
2059         switch (lr->lr_event) {
2060         case LE_START:
2061                 if (rc != 0) {
2062                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
2063                                "start: rc = %d\n",
2064                                lfsck_lfsck2name(com->lc_lfsck),
2065                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2066                                ltd->ltd_index, lad->lad_name, rc);
2067
2068                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2069                                 struct lfsck_layout *lo = com->lc_file_ram;
2070
2071                                 if (lr->lr_flags & LEF_TO_OST)
2072                                         lfsck_lad_set_bitmap(env, com,
2073                                                              ltd->ltd_index);
2074                                 else
2075                                         lo->ll_flags |= LF_INCOMPLETE;
2076                         } else {
2077                                 struct lfsck_namespace *ns = com->lc_file_ram;
2078
2079                                 /* If some MDT does not join the namespace
2080                                  * LFSCK, then we cannot know whether there
2081                                  * is some name entry on such MDT that with
2082                                  * the referenced MDT-object on this MDT or
2083                                  * not. So the namespace LFSCK on this MDT
2084                                  * cannot handle orphan MDT-objects properly.
2085                                  * So we mark the LFSCK as LF_INCOMPLETE and
2086                                  * skip orphan MDT-objects handling. */
2087                                 ns->ln_flags |= LF_INCOMPLETE;
2088                         }
2089                         break;
2090                 }
2091
2092                 spin_lock(&ltds->ltd_lock);
2093                 if (ltd->ltd_dead) {
2094                         spin_unlock(&ltds->ltd_lock);
2095                         break;
2096                 }
2097
2098                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2099                         struct list_head *list;
2100                         struct list_head *phase_list;
2101
2102                         if (ltd->ltd_layout_done) {
2103                                 spin_unlock(&ltds->ltd_lock);
2104                                 break;
2105                         }
2106
2107                         if (lr->lr_flags & LEF_TO_OST) {
2108                                 list = &lad->lad_ost_list;
2109                                 phase_list = &lad->lad_ost_phase1_list;
2110                         } else {
2111                                 list = &lad->lad_mdt_list;
2112                                 phase_list = &lad->lad_mdt_phase1_list;
2113                         }
2114
2115                         if (list_empty(&ltd->ltd_layout_list))
2116                                 list_add_tail(&ltd->ltd_layout_list, list);
2117                         if (list_empty(&ltd->ltd_layout_phase_list))
2118                                 list_add_tail(&ltd->ltd_layout_phase_list,
2119                                               phase_list);
2120                 } else {
2121                         if (ltd->ltd_namespace_done) {
2122                                 spin_unlock(&ltds->ltd_lock);
2123                                 break;
2124                         }
2125
2126                         if (list_empty(&ltd->ltd_namespace_list))
2127                                 list_add_tail(&ltd->ltd_namespace_list,
2128                                               &lad->lad_mdt_list);
2129                         if (list_empty(&ltd->ltd_namespace_phase_list))
2130                                 list_add_tail(&ltd->ltd_namespace_phase_list,
2131                                               &lad->lad_mdt_phase1_list);
2132                 }
2133                 spin_unlock(&ltds->ltd_lock);
2134                 break;
2135         case LE_STOP:
2136         case LE_PHASE1_DONE:
2137         case LE_PHASE2_DONE:
2138         case LE_PEER_EXIT:
2139                 if (rc != 0 && rc != -EALREADY)
2140                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
2141                               "event = %d, rc = %d\n",
2142                               lfsck_lfsck2name(com->lc_lfsck),
2143                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2144                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
2145                 break;
2146         case LE_QUERY: {
2147                 struct lfsck_reply *reply;
2148                 struct list_head *list;
2149                 struct list_head *phase_list;
2150
2151                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2152                         list = &ltd->ltd_layout_list;
2153                         phase_list = &ltd->ltd_layout_phase_list;
2154                 } else {
2155                         list = &ltd->ltd_namespace_list;
2156                         phase_list = &ltd->ltd_namespace_phase_list;
2157                 }
2158
2159                 if (rc != 0) {
2160                         spin_lock(&ltds->ltd_lock);
2161                         list_del_init(phase_list);
2162                         list_del_init(list);
2163                         spin_unlock(&ltds->ltd_lock);
2164                         break;
2165                 }
2166
2167                 reply = req_capsule_server_get(&req->rq_pill,
2168                                                &RMF_LFSCK_REPLY);
2169                 if (reply == NULL) {
2170                         rc = -EPROTO;
2171                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
2172                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
2173                                lad->lad_name, rc);
2174                         spin_lock(&ltds->ltd_lock);
2175                         list_del_init(phase_list);
2176                         list_del_init(list);
2177                         spin_unlock(&ltds->ltd_lock);
2178                         break;
2179                 }
2180
2181                 switch (reply->lr_status) {
2182                 case LS_SCANNING_PHASE1:
2183                         break;
2184                 case LS_SCANNING_PHASE2:
2185                         spin_lock(&ltds->ltd_lock);
2186                         list_del_init(phase_list);
2187                         if (ltd->ltd_dead) {
2188                                 spin_unlock(&ltds->ltd_lock);
2189                                 break;
2190                         }
2191
2192                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2193                                 if (ltd->ltd_layout_done) {
2194                                         spin_unlock(&ltds->ltd_lock);
2195                                         break;
2196                                 }
2197
2198                                 if (lr->lr_flags & LEF_TO_OST)
2199                                         list_add_tail(phase_list,
2200                                                 &lad->lad_ost_phase2_list);
2201                                 else
2202                                         list_add_tail(phase_list,
2203                                                 &lad->lad_mdt_phase2_list);
2204                         } else {
2205                                 if (ltd->ltd_namespace_done) {
2206                                         spin_unlock(&ltds->ltd_lock);
2207                                         break;
2208                                 }
2209
2210                                 list_add_tail(phase_list,
2211                                               &lad->lad_mdt_phase2_list);
2212                         }
2213                         spin_unlock(&ltds->ltd_lock);
2214                         break;
2215                 default:
2216                         spin_lock(&ltds->ltd_lock);
2217                         list_del_init(phase_list);
2218                         list_del_init(list);
2219                         spin_unlock(&ltds->ltd_lock);
2220                         break;
2221                 }
2222                 break;
2223         }
2224         default:
2225                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2226                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2227                 break;
2228         }
2229
2230         if (!laia->laia_shared) {
2231                 lfsck_tgt_put(ltd);
2232                 lfsck_component_put(env, com);
2233         }
2234
2235         return 0;
2236 }
2237
2238 static void lfsck_interpret(const struct lu_env *env,
2239                             struct lfsck_instance *lfsck,
2240                             struct ptlrpc_request *req, void *args, int result)
2241 {
2242         struct lfsck_async_interpret_args *laia = args;
2243         struct lfsck_component            *com;
2244
2245         LASSERT(laia->laia_com == NULL);
2246         LASSERT(laia->laia_shared);
2247
2248         spin_lock(&lfsck->li_lock);
2249         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2250                 laia->laia_com = com;
2251                 lfsck_async_interpret_common(env, req, laia, result);
2252         }
2253
2254         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2255                 laia->laia_com = com;
2256                 lfsck_async_interpret_common(env, req, laia, result);
2257         }
2258         spin_unlock(&lfsck->li_lock);
2259 }
2260
2261 static int lfsck_stop_notify(const struct lu_env *env,
2262                              struct lfsck_instance *lfsck,
2263                              struct lfsck_tgt_descs *ltds,
2264                              struct lfsck_tgt_desc *ltd, __u16 type)
2265 {
2266         struct lfsck_component *com;
2267         int                     rc = 0;
2268         ENTRY;
2269
2270         LASSERT(lfsck->li_master);
2271
2272         spin_lock(&lfsck->li_lock);
2273         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2274         if (com == NULL)
2275                 com = __lfsck_component_find(lfsck, type,
2276                                              &lfsck->li_list_double_scan);
2277         if (com != NULL)
2278                 lfsck_component_get(com);
2279         spin_unlock(&lfsck->li_lock);
2280
2281         if (com != NULL) {
2282                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2283                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2284                 struct lfsck_request              *lr    = &info->lti_lr;
2285                 struct lfsck_assistant_data       *lad   = com->lc_data;
2286                 struct list_head                  *list;
2287                 struct list_head                  *phase_list;
2288                 struct ptlrpc_request_set         *set;
2289
2290                 set = ptlrpc_prep_set();
2291                 if (set == NULL) {
2292                         lfsck_component_put(env, com);
2293
2294                         RETURN(-ENOMEM);
2295                 }
2296
2297                 if (type == LFSCK_TYPE_LAYOUT) {
2298                         list = &ltd->ltd_layout_list;
2299                         phase_list = &ltd->ltd_layout_phase_list;
2300                 } else {
2301                         list = &ltd->ltd_namespace_list;
2302                         phase_list = &ltd->ltd_namespace_phase_list;
2303                 }
2304
2305                 spin_lock(&ltds->ltd_lock);
2306                 if (list_empty(list)) {
2307                         LASSERT(list_empty(phase_list));
2308                         spin_unlock(&ltds->ltd_lock);
2309                         ptlrpc_set_destroy(set);
2310
2311                         RETURN(0);
2312                 }
2313
2314                 list_del_init(phase_list);
2315                 list_del_init(list);
2316                 spin_unlock(&ltds->ltd_lock);
2317
2318                 memset(lr, 0, sizeof(*lr));
2319                 lr->lr_index = lfsck_dev_idx(lfsck);
2320                 lr->lr_event = LE_PEER_EXIT;
2321                 lr->lr_active = type;
2322                 lr->lr_status = LS_CO_PAUSED;
2323                 if (ltds == &lfsck->li_ost_descs)
2324                         lr->lr_flags = LEF_TO_OST;
2325
2326                 laia->laia_com = com;
2327                 laia->laia_ltds = ltds;
2328                 atomic_inc(&ltd->ltd_ref);
2329                 laia->laia_ltd = ltd;
2330                 laia->laia_lr = lr;
2331                 laia->laia_shared = 0;
2332
2333                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2334                                          lfsck_async_interpret_common,
2335                                          laia, LFSCK_NOTIFY);
2336                 if (rc != 0) {
2337                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2338                                "co-stop for %s: rc = %d\n",
2339                                lfsck_lfsck2name(lfsck),
2340                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2341                                ltd->ltd_index, lad->lad_name, rc);
2342                         lfsck_tgt_put(ltd);
2343                 } else {
2344                         rc = ptlrpc_set_wait(set);
2345                 }
2346
2347                 ptlrpc_set_destroy(set);
2348                 lfsck_component_put(env, com);
2349         }
2350
2351         RETURN(rc);
2352 }
2353
2354 static int lfsck_async_interpret(const struct lu_env *env,
2355                                  struct ptlrpc_request *req,
2356                                  void *args, int rc)
2357 {
2358         struct lfsck_async_interpret_args *laia = args;
2359         struct lfsck_instance             *lfsck;
2360
2361         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2362                               li_mdt_descs);
2363         lfsck_interpret(env, lfsck, req, laia, rc);
2364         lfsck_tgt_put(laia->laia_ltd);
2365         if (rc != 0 && laia->laia_result != -EALREADY)
2366                 laia->laia_result = rc;
2367
2368         return 0;
2369 }
2370
2371 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2372                         struct lfsck_request *lr,
2373                         struct ptlrpc_request_set *set,
2374                         ptlrpc_interpterer_t interpreter,
2375                         void *args, int request)
2376 {
2377         struct lfsck_async_interpret_args *laia;
2378         struct ptlrpc_request             *req;
2379         struct lfsck_request              *tmp;
2380         struct req_format                 *format;
2381         int                                rc;
2382
2383         switch (request) {
2384         case LFSCK_NOTIFY:
2385                 format = &RQF_LFSCK_NOTIFY;
2386                 break;
2387         case LFSCK_QUERY:
2388                 format = &RQF_LFSCK_QUERY;
2389                 break;
2390         default:
2391                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2392                        exp->exp_obd->obd_name, request, -EINVAL);
2393                 return -EINVAL;
2394         }
2395
2396         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2397         if (req == NULL)
2398                 return -ENOMEM;
2399
2400         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2401         if (rc != 0) {
2402                 ptlrpc_request_free(req);
2403
2404                 return rc;
2405         }
2406
2407         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2408         *tmp = *lr;
2409         ptlrpc_request_set_replen(req);
2410
2411         laia = ptlrpc_req_async_args(req);
2412         *laia = *(struct lfsck_async_interpret_args *)args;
2413         if (laia->laia_com != NULL)
2414                 lfsck_component_get(laia->laia_com);
2415         req->rq_interpret_reply = interpreter;
2416         req->rq_allow_intr = 1;
2417         ptlrpc_set_add_req(set, req);
2418
2419         return 0;
2420 }
2421
2422 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2423                           struct lfsck_start_param *lsp)
2424 {
2425         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2426         struct lfsck_assistant_data     *lad     = com->lc_data;
2427         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2428         struct ptlrpc_thread            *athread = &lad->lad_thread;
2429         struct lfsck_thread_args        *lta;
2430         struct task_struct              *task;
2431         int                              rc;
2432         ENTRY;
2433
2434         lad->lad_assistant_status = 0;
2435         lad->lad_post_result = 0;
2436         lad->lad_to_post = 0;
2437         lad->lad_to_double_scan = 0;
2438         lad->lad_in_double_scan = 0;
2439         lad->lad_exit = 0;
2440         lad->lad_advance_lock = false;
2441         thread_set_flags(athread, 0);
2442
2443         lta = lfsck_thread_args_init(lfsck, com, lsp);
2444         if (IS_ERR(lta))
2445                 RETURN(PTR_ERR(lta));
2446
2447         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2448         if (IS_ERR(task)) {
2449                 rc = PTR_ERR(task);
2450                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2451                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2452                 lfsck_thread_args_fini(lta);
2453         } else {
2454                 struct l_wait_info lwi = { 0 };
2455
2456                 l_wait_event(mthread->t_ctl_waitq,
2457                              thread_is_running(athread) ||
2458                              thread_is_stopped(athread),
2459                              &lwi);
2460                 if (unlikely(!thread_is_running(athread)))
2461                         rc = lad->lad_assistant_status;
2462                 else
2463                         rc = 0;
2464         }
2465
2466         RETURN(rc);
2467 }
2468
2469 int lfsck_checkpoint_generic(const struct lu_env *env,
2470                              struct lfsck_component *com)
2471 {
2472         struct lfsck_assistant_data     *lad     = com->lc_data;
2473         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2474         struct ptlrpc_thread            *athread = &lad->lad_thread;
2475         struct l_wait_info               lwi     = { 0 };
2476
2477         l_wait_event(mthread->t_ctl_waitq,
2478                      list_empty(&lad->lad_req_list) ||
2479                      !thread_is_running(mthread) ||
2480                      thread_is_stopped(athread),
2481                      &lwi);
2482
2483         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2484                 return LFSCK_CHECKPOINT_SKIP;
2485
2486         return 0;
2487 }
2488
2489 void lfsck_post_generic(const struct lu_env *env,
2490                         struct lfsck_component *com, int *result)
2491 {
2492         struct lfsck_assistant_data     *lad     = com->lc_data;
2493         struct ptlrpc_thread            *athread = &lad->lad_thread;
2494         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2495         struct l_wait_info               lwi     = { 0 };
2496
2497         lad->lad_post_result = *result;
2498         if (*result <= 0)
2499                 lad->lad_exit = 1;
2500         lad->lad_to_post = 1;
2501
2502         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s post, rc = %d\n",
2503                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2504
2505         wake_up_all(&athread->t_ctl_waitq);
2506         l_wait_event(mthread->t_ctl_waitq,
2507                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2508                      thread_is_stopped(athread),
2509                      &lwi);
2510
2511         if (lad->lad_assistant_status < 0)
2512                 *result = lad->lad_assistant_status;
2513
2514         CDEBUG(D_LFSCK, "%s: the assistant has done %s post, rc = %d\n",
2515                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, *result);
2516 }
2517
2518 int lfsck_double_scan_generic(const struct lu_env *env,
2519                               struct lfsck_component *com, int status)
2520 {
2521         struct lfsck_assistant_data     *lad     = com->lc_data;
2522         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2523         struct ptlrpc_thread            *athread = &lad->lad_thread;
2524         struct l_wait_info               lwi     = { 0 };
2525
2526         if (status != LS_SCANNING_PHASE2)
2527                 lad->lad_exit = 1;
2528         else
2529                 lad->lad_to_double_scan = 1;
2530
2531         CDEBUG(D_LFSCK, "%s: waiting for assistant to do %s double_scan, "
2532                "status %d\n",
2533                lfsck_lfsck2name(com->lc_lfsck), lad->lad_name, status);
2534
2535         wake_up_all(&athread->t_ctl_waitq);
2536         l_wait_event(mthread->t_ctl_waitq,
2537                      lad->lad_in_double_scan ||
2538                      thread_is_stopped(athread),
2539                      &lwi);
2540
2541         CDEBUG(D_LFSCK, "%s: the assistant has done %s double_scan, "
2542                "status %d\n", lfsck_lfsck2name(com->lc_lfsck), lad->lad_name,
2543                lad->lad_assistant_status);
2544
2545         if (lad->lad_assistant_status < 0)
2546                 return lad->lad_assistant_status;
2547
2548         return 0;
2549 }
2550
2551 void lfsck_quit_generic(const struct lu_env *env,
2552                         struct lfsck_component *com)
2553 {
2554         struct lfsck_assistant_data     *lad     = com->lc_data;
2555         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2556         struct ptlrpc_thread            *athread = &lad->lad_thread;
2557         struct l_wait_info               lwi     = { 0 };
2558
2559         lad->lad_exit = 1;
2560         wake_up_all(&athread->t_ctl_waitq);
2561         l_wait_event(mthread->t_ctl_waitq,
2562                      thread_is_init(athread) ||
2563                      thread_is_stopped(athread),
2564                      &lwi);
2565 }
2566
2567 /* external interfaces */
2568
2569 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2570 {
2571         struct lu_env           env;
2572         struct lfsck_instance  *lfsck;
2573         int                     rc;
2574         ENTRY;
2575
2576         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2577         if (rc != 0)
2578                 RETURN(rc);
2579
2580         lfsck = lfsck_instance_find(key, true, false);
2581         if (likely(lfsck != NULL)) {
2582                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2583                 lfsck_instance_put(&env, lfsck);
2584         } else {
2585                 rc = -ENXIO;
2586         }
2587
2588         lu_env_fini(&env);
2589
2590         RETURN(rc);
2591 }
2592 EXPORT_SYMBOL(lfsck_get_speed);
2593
2594 int lfsck_set_speed(struct dt_device *key, int val)
2595 {
2596         struct lu_env           env;
2597         struct lfsck_instance  *lfsck;
2598         int                     rc;
2599         ENTRY;
2600
2601         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2602         if (rc != 0)
2603                 RETURN(rc);
2604
2605         lfsck = lfsck_instance_find(key, true, false);
2606         if (likely(lfsck != NULL)) {
2607                 mutex_lock(&lfsck->li_mutex);
2608                 if (__lfsck_set_speed(lfsck, val))
2609                         rc = lfsck_bookmark_store(&env, lfsck);
2610                 mutex_unlock(&lfsck->li_mutex);
2611                 lfsck_instance_put(&env, lfsck);
2612         } else {
2613                 rc = -ENXIO;
2614         }
2615
2616         lu_env_fini(&env);
2617
2618         RETURN(rc);
2619 }
2620 EXPORT_SYMBOL(lfsck_set_speed);
2621
2622 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2623 {
2624         struct lu_env           env;
2625         struct lfsck_instance  *lfsck;
2626         int                     rc;
2627         ENTRY;
2628
2629         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2630         if (rc != 0)
2631                 RETURN(rc);
2632
2633         lfsck = lfsck_instance_find(key, true, false);
2634         if (likely(lfsck != NULL)) {
2635                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2636                 lfsck_instance_put(&env, lfsck);
2637         } else {
2638                 rc = -ENXIO;
2639         }
2640
2641         lu_env_fini(&env);
2642
2643         RETURN(rc);
2644 }
2645 EXPORT_SYMBOL(lfsck_get_windows);
2646
2647 int lfsck_set_windows(struct dt_device *key, int val)
2648 {
2649         struct lu_env           env;
2650         struct lfsck_instance  *lfsck;
2651         int                     rc;
2652         ENTRY;
2653
2654         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2655         if (rc != 0)
2656                 RETURN(rc);
2657
2658         lfsck = lfsck_instance_find(key, true, false);
2659         if (likely(lfsck != NULL)) {
2660                 if (val < 1 || val > LFSCK_ASYNC_WIN_MAX) {
2661                         CWARN("%s: invalid async windows size that may "
2662                               "cause memory issues. The valid range is "
2663                               "[1 - %u].\n",
2664                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2665                         rc = -EINVAL;
2666                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2667                         mutex_lock(&lfsck->li_mutex);
2668                         lfsck->li_bookmark_ram.lb_async_windows = val;
2669                         rc = lfsck_bookmark_store(&env, lfsck);
2670                         mutex_unlock(&lfsck->li_mutex);
2671                 }
2672                 lfsck_instance_put(&env, lfsck);
2673         } else {
2674                 rc = -ENXIO;
2675         }
2676
2677         lu_env_fini(&env);
2678
2679         RETURN(rc);
2680 }
2681 EXPORT_SYMBOL(lfsck_set_windows);
2682
2683 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2684 {
2685         struct lu_env           env;
2686         struct lfsck_instance  *lfsck;
2687         struct lfsck_component *com;
2688         int                     rc;
2689         ENTRY;
2690
2691         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2692         if (rc != 0)
2693                 RETURN(rc);
2694
2695         lfsck = lfsck_instance_find(key, true, false);
2696         if (likely(lfsck != NULL)) {
2697                 com = lfsck_component_find(lfsck, type);
2698                 if (likely(com != NULL)) {
2699                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2700                         lfsck_component_put(&env, com);
2701                 } else {
2702                         rc = -ENOTSUPP;
2703                 }
2704
2705                 lfsck_instance_put(&env, lfsck);
2706         } else {
2707                 rc = -ENXIO;
2708         }
2709
2710         lu_env_fini(&env);
2711
2712         RETURN(rc);
2713 }
2714 EXPORT_SYMBOL(lfsck_dump);
2715
2716 static int lfsck_stop_all(const struct lu_env *env,
2717                           struct lfsck_instance *lfsck,
2718                           struct lfsck_stop *stop)
2719 {
2720         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2721         struct lfsck_request              *lr     = &info->lti_lr;
2722         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2723         struct ptlrpc_request_set         *set;
2724         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2725         struct lfsck_tgt_desc             *ltd;
2726         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2727         __u32                              idx;
2728         int                                rc     = 0;
2729         int                                rc1    = 0;
2730         ENTRY;
2731
2732         LASSERT(stop->ls_flags & LPF_BROADCAST);
2733
2734         set = ptlrpc_prep_set();
2735         if (unlikely(set == NULL))
2736                 RETURN(-ENOMEM);
2737
2738         memset(lr, 0, sizeof(*lr));
2739         lr->lr_event = LE_STOP;
2740         lr->lr_index = lfsck_dev_idx(lfsck);
2741         lr->lr_status = stop->ls_status;
2742         lr->lr_version = bk->lb_version;
2743         lr->lr_active = LFSCK_TYPES_ALL;
2744         lr->lr_param = stop->ls_flags;
2745
2746         laia->laia_com = NULL;
2747         laia->laia_ltds = ltds;
2748         laia->laia_lr = lr;
2749         laia->laia_result = 0;
2750         laia->laia_shared = 1;
2751
2752         down_read(&ltds->ltd_rw_sem);
2753         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2754                 ltd = lfsck_tgt_get(ltds, idx);
2755                 LASSERT(ltd != NULL);
2756
2757                 laia->laia_ltd = ltd;
2758                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2759                                          lfsck_async_interpret, laia,
2760                                          LFSCK_NOTIFY);
2761                 if (rc != 0) {
2762                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2763                         lfsck_tgt_put(ltd);
2764                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2765                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2766                         rc1 = rc;
2767                 }
2768         }
2769         up_read(&ltds->ltd_rw_sem);
2770
2771         rc = ptlrpc_set_wait(set);
2772         ptlrpc_set_destroy(set);
2773
2774         if (rc == 0)
2775                 rc = laia->laia_result;
2776
2777         if (rc == -EALREADY)
2778                 rc = 0;
2779
2780         if (rc != 0)
2781                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2782                        lfsck_lfsck2name(lfsck), rc);
2783
2784         RETURN(rc != 0 ? rc : rc1);
2785 }
2786
2787 static int lfsck_start_all(const struct lu_env *env,
2788                            struct lfsck_instance *lfsck,
2789                            struct lfsck_start *start)
2790 {
2791         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2792         struct lfsck_request              *lr     = &info->lti_lr;
2793         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2794         struct ptlrpc_request_set         *set;
2795         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2796         struct lfsck_tgt_desc             *ltd;
2797         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2798         __u32                              idx;
2799         int                                rc     = 0;
2800         ENTRY;
2801
2802         LASSERT(start->ls_flags & LPF_BROADCAST);
2803
2804         set = ptlrpc_prep_set();
2805         if (unlikely(set == NULL))
2806                 RETURN(-ENOMEM);
2807
2808         memset(lr, 0, sizeof(*lr));
2809         lr->lr_event = LE_START;
2810         lr->lr_index = lfsck_dev_idx(lfsck);
2811         lr->lr_speed = bk->lb_speed_limit;
2812         lr->lr_version = bk->lb_version;
2813         lr->lr_active = start->ls_active;
2814         lr->lr_param = start->ls_flags;
2815         lr->lr_async_windows = bk->lb_async_windows;
2816         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2817                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2818                        LSV_CREATE_MDTOBJ;
2819
2820         laia->laia_com = NULL;
2821         laia->laia_ltds = ltds;
2822         laia->laia_lr = lr;
2823         laia->laia_result = 0;
2824         laia->laia_shared = 1;
2825
2826         down_read(&ltds->ltd_rw_sem);
2827         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2828                 ltd = lfsck_tgt_get(ltds, idx);
2829                 LASSERT(ltd != NULL);
2830
2831                 laia->laia_ltd = ltd;
2832                 ltd->ltd_layout_done = 0;
2833                 ltd->ltd_namespace_done = 0;
2834                 ltd->ltd_synced_failures = 0;
2835                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2836                                          lfsck_async_interpret, laia,
2837                                          LFSCK_NOTIFY);
2838                 if (rc != 0) {
2839                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2840                         lfsck_tgt_put(ltd);
2841                         CERROR("%s: cannot notify MDT %x for LFSCK "
2842                                "start, failout: rc = %d\n",
2843                                lfsck_lfsck2name(lfsck), idx, rc);
2844                         break;
2845                 }
2846         }
2847         up_read(&ltds->ltd_rw_sem);
2848
2849         if (rc != 0) {
2850                 ptlrpc_set_destroy(set);
2851
2852                 RETURN(rc);
2853         }
2854
2855         rc = ptlrpc_set_wait(set);
2856         ptlrpc_set_destroy(set);
2857
2858         if (rc == 0)
2859                 rc = laia->laia_result;
2860
2861         if (rc != 0) {
2862                 struct lfsck_stop *stop = &info->lti_stop;
2863
2864                 CERROR("%s: cannot start LFSCK on some MDTs, "
2865                        "stop all: rc = %d\n",
2866                        lfsck_lfsck2name(lfsck), rc);
2867                 if (rc != -EALREADY) {
2868                         stop->ls_status = LS_FAILED;
2869                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2870                         lfsck_stop_all(env, lfsck, stop);
2871                 }
2872         }
2873
2874         RETURN(rc);
2875 }
2876
2877 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2878                 struct lfsck_start_param *lsp)
2879 {
2880         struct lfsck_start              *start  = lsp->lsp_start;
2881         struct lfsck_instance           *lfsck;
2882         struct lfsck_bookmark           *bk;
2883         struct ptlrpc_thread            *thread;
2884         struct lfsck_component          *com;
2885         struct l_wait_info               lwi    = { 0 };
2886         struct lfsck_thread_args        *lta;
2887         struct task_struct              *task;
2888         int                              rc     = 0;
2889         __u16                            valid  = 0;
2890         __u16                            flags  = 0;
2891         __u16                            type   = 1;
2892         ENTRY;
2893
2894         lfsck = lfsck_instance_find(key, true, false);
2895         if (unlikely(lfsck == NULL))
2896                 RETURN(-ENXIO);
2897
2898         /* System is not ready, try again later. */
2899         if (unlikely(lfsck->li_namespace == NULL))
2900                 GOTO(put, rc = -EAGAIN);
2901
2902         /* start == NULL means auto trigger paused LFSCK. */
2903         if ((start == NULL) &&
2904             (list_empty(&lfsck->li_list_scan) ||
2905              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2906                 GOTO(put, rc = 0);
2907
2908         bk = &lfsck->li_bookmark_ram;
2909         thread = &lfsck->li_thread;
2910         mutex_lock(&lfsck->li_mutex);
2911         spin_lock(&lfsck->li_lock);
2912         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2913                 rc = -EALREADY;
2914                 if (unlikely(start == NULL)) {
2915                         spin_unlock(&lfsck->li_lock);
2916                         GOTO(out, rc);
2917                 }
2918
2919                 while (start->ls_active != 0) {
2920                         if (!(type & start->ls_active)) {
2921                                 type <<= 1;
2922                                 continue;
2923                         }
2924
2925                         com = __lfsck_component_find(lfsck, type,
2926                                                      &lfsck->li_list_scan);
2927                         if (com == NULL)
2928                                 com = __lfsck_component_find(lfsck, type,
2929                                                 &lfsck->li_list_double_scan);
2930                         if (com == NULL) {
2931                                 rc = -EOPNOTSUPP;
2932                                 break;
2933                         }
2934
2935                         if (com->lc_ops->lfsck_join != NULL) {
2936                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2937                                 if (rc != 0 && rc != -EALREADY)
2938                                         break;
2939                         }
2940                         start->ls_active &= ~type;
2941                         type <<= 1;
2942                 }
2943                 spin_unlock(&lfsck->li_lock);
2944                 GOTO(out, rc);
2945         }
2946         spin_unlock(&lfsck->li_lock);
2947
2948         lfsck->li_status = 0;
2949         lfsck->li_oit_over = 0;
2950         lfsck->li_start_unplug = 0;
2951         lfsck->li_drop_dryrun = 0;
2952         lfsck->li_new_scanned = 0;
2953
2954         /* For auto trigger. */
2955         if (start == NULL)
2956                 goto trigger;
2957
2958         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2959                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2960                        lfsck_lfsck2name(lfsck));
2961
2962                 GOTO(out, rc = -EPERM);
2963         }
2964
2965         start->ls_version = bk->lb_version;
2966
2967         if (start->ls_active != 0) {
2968                 struct lfsck_component *next;
2969
2970                 if (start->ls_active == LFSCK_TYPES_ALL)
2971                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2972
2973                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2974                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2975                         GOTO(out, rc = -ENOTSUPP);
2976                 }
2977
2978                 list_for_each_entry_safe(com, next,
2979                                          &lfsck->li_list_scan, lc_link) {
2980                         if (!(com->lc_type & start->ls_active)) {
2981                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2982                                                              false);
2983                                 if (rc != 0)
2984                                         GOTO(out, rc);
2985                         }
2986                 }
2987
2988                 while (start->ls_active != 0) {
2989                         if (type & start->ls_active) {
2990                                 com = __lfsck_component_find(lfsck, type,
2991                                                         &lfsck->li_list_idle);
2992                                 if (com != NULL)
2993                                         /* The component status will be updated
2994                                          * when its prep() is called later by
2995                                          * the LFSCK main engine. */
2996                                         list_move_tail(&com->lc_link,
2997                                                        &lfsck->li_list_scan);
2998                                 start->ls_active &= ~type;
2999                         }
3000                         type <<= 1;
3001                 }
3002         }
3003
3004         if (list_empty(&lfsck->li_list_scan)) {
3005                 /* The speed limit will be used to control both the LFSCK and
3006                  * low layer scrub (if applied), need to be handled firstly. */
3007                 if (start->ls_valid & LSV_SPEED_LIMIT) {
3008                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
3009                                 rc = lfsck_bookmark_store(env, lfsck);
3010                                 if (rc != 0)
3011                                         GOTO(out, rc);
3012                         }
3013                 }
3014
3015                 goto trigger;
3016         }
3017
3018         if (start->ls_flags & LPF_RESET)
3019                 flags |= DOIF_RESET;
3020
3021         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
3022         if (rc != 0)
3023                 GOTO(out, rc);
3024
3025         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3026                 start->ls_active |= com->lc_type;
3027                 if (flags & DOIF_RESET) {
3028                         rc = com->lc_ops->lfsck_reset(env, com, false);
3029                         if (rc != 0)
3030                                 GOTO(out, rc);
3031                 }
3032         }
3033
3034 trigger:
3035         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
3036         if (bk->lb_param & LPF_DRYRUN)
3037                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
3038
3039         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
3040                 valid |= DOIV_ERROR_HANDLE;
3041                 if (start->ls_flags & LPF_FAILOUT)
3042                         flags |= DOIF_FAILOUT;
3043         }
3044
3045         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
3046                 valid |= DOIV_DRYRUN;
3047                 if (start->ls_flags & LPF_DRYRUN)
3048                         flags |= DOIF_DRYRUN;
3049         }
3050
3051         if (!list_empty(&lfsck->li_list_scan))
3052                 flags |= DOIF_OUTUSED;
3053
3054         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
3055         thread_set_flags(thread, 0);
3056         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
3057         if (IS_ERR(lta))
3058                 GOTO(out, rc = PTR_ERR(lta));
3059
3060         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
3061         task = kthread_run(lfsck_master_engine, lta, "lfsck");
3062         if (IS_ERR(task)) {
3063                 rc = PTR_ERR(task);
3064                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
3065                        lfsck_lfsck2name(lfsck), rc);
3066                 lfsck_thread_args_fini(lta);
3067
3068                 GOTO(out, rc);
3069         }
3070
3071         l_wait_event(thread->t_ctl_waitq,
3072                      thread_is_running(thread) ||
3073                      thread_is_stopped(thread),
3074                      &lwi);
3075         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
3076                 lfsck->li_start_unplug = 1;
3077                 wake_up_all(&thread->t_ctl_waitq);
3078
3079                 GOTO(out, rc = 0);
3080         }
3081
3082         /* release lfsck::li_mutex to avoid deadlock. */
3083         mutex_unlock(&lfsck->li_mutex);
3084         rc = lfsck_start_all(env, lfsck, start);
3085         if (rc != 0) {
3086                 spin_lock(&lfsck->li_lock);
3087                 if (thread_is_stopped(thread)) {
3088                         spin_unlock(&lfsck->li_lock);
3089                 } else {
3090                         lfsck->li_status = LS_FAILED;
3091                         lfsck->li_flags = 0;
3092                         thread_set_flags(thread, SVC_STOPPING);
3093                         spin_unlock(&lfsck->li_lock);
3094
3095                         lfsck->li_start_unplug = 1;
3096                         wake_up_all(&thread->t_ctl_waitq);
3097                         l_wait_event(thread->t_ctl_waitq,
3098                                      thread_is_stopped(thread),
3099                                      &lwi);
3100                 }
3101         } else {
3102                 lfsck->li_start_unplug = 1;
3103                 wake_up_all(&thread->t_ctl_waitq);
3104         }
3105
3106         GOTO(put, rc);
3107
3108 out:
3109         mutex_unlock(&lfsck->li_mutex);
3110
3111 put:
3112         lfsck_instance_put(env, lfsck);
3113
3114         return rc < 0 ? rc : 0;
3115 }
3116 EXPORT_SYMBOL(lfsck_start);
3117
3118 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
3119                struct lfsck_stop *stop)
3120 {
3121         struct lfsck_instance   *lfsck;
3122         struct ptlrpc_thread    *thread;
3123         struct l_wait_info       lwi    = { 0 };
3124         int                      rc     = 0;
3125         int                      rc1    = 0;
3126         ENTRY;
3127
3128         lfsck = lfsck_instance_find(key, true, false);
3129         if (unlikely(lfsck == NULL))
3130                 RETURN(-ENXIO);
3131
3132         thread = &lfsck->li_thread;
3133         /* release lfsck::li_mutex to avoid deadlock. */
3134         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
3135                 if (!lfsck->li_master) {
3136                         CERROR("%s: only allow to specify '-A' via MDS\n",
3137                                lfsck_lfsck2name(lfsck));
3138
3139                         GOTO(out, rc = -EPERM);
3140                 }
3141
3142                 rc1 = lfsck_stop_all(env, lfsck, stop);
3143         }
3144
3145         mutex_lock(&lfsck->li_mutex);
3146         spin_lock(&lfsck->li_lock);
3147         /* no error if LFSCK is already stopped, or was never started */
3148         if (thread_is_init(thread) || thread_is_stopped(thread)) {
3149                 spin_unlock(&lfsck->li_lock);
3150                 GOTO(out, rc = 0);
3151         }
3152
3153         if (stop != NULL) {
3154                 lfsck->li_status = stop->ls_status;
3155                 lfsck->li_flags = stop->ls_flags;
3156         } else {
3157                 lfsck->li_status = LS_STOPPED;
3158                 lfsck->li_flags = 0;
3159         }
3160
3161         thread_set_flags(thread, SVC_STOPPING);
3162
3163         if (lfsck->li_master) {
3164                 struct lfsck_component *com;
3165                 struct lfsck_assistant_data *lad;
3166
3167                 list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
3168                         lad = com->lc_data;
3169                         spin_lock(&lad->lad_lock);
3170                         if (lad->lad_task != NULL)
3171                                 force_sig(SIGINT, lad->lad_task);
3172                         spin_unlock(&lad->lad_lock);
3173                 }
3174
3175                 list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
3176                         lad = com->lc_data;
3177                         spin_lock(&lad->lad_lock);
3178                         if (lad->lad_task != NULL)
3179                                 force_sig(SIGINT, lad->lad_task);
3180                         spin_unlock(&lad->lad_lock);
3181                 }
3182         }
3183
3184         spin_unlock(&lfsck->li_lock);
3185
3186         wake_up_all(&thread->t_ctl_waitq);
3187         l_wait_event(thread->t_ctl_waitq,
3188                      thread_is_stopped(thread),
3189                      &lwi);
3190
3191         GOTO(out, rc = 0);
3192
3193 out:
3194         mutex_unlock(&lfsck->li_mutex);
3195         lfsck_instance_put(env, lfsck);
3196
3197         return rc != 0 ? rc : rc1;
3198 }
3199 EXPORT_SYMBOL(lfsck_stop);
3200
3201 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
3202                     struct lfsck_request *lr, struct thandle *th)
3203 {
3204         int rc = -EOPNOTSUPP;
3205         ENTRY;
3206
3207         switch (lr->lr_event) {
3208         case LE_START: {
3209                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
3210                 struct lfsck_start_param  lsp;
3211
3212                 memset(start, 0, sizeof(*start));
3213                 start->ls_valid = lr->lr_valid;
3214                 start->ls_speed_limit = lr->lr_speed;
3215                 start->ls_version = lr->lr_version;