Whamcloud - gitweb
LU-5820 lfsck: use multiple namespace LFSCK trace files
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_CHECKPOINT_SKIP   1
46
47 /* define lfsck thread key */
48 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
49
50 static void lfsck_key_fini(const struct lu_context *ctx,
51                            struct lu_context_key *key, void *data)
52 {
53         struct lfsck_thread_info *info = data;
54
55         lu_buf_free(&info->lti_linkea_buf);
56         lu_buf_free(&info->lti_linkea_buf2);
57         lu_buf_free(&info->lti_big_buf);
58         OBD_FREE_PTR(info);
59 }
60
61 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
62 LU_KEY_INIT_GENERIC(lfsck);
63
64 static struct list_head lfsck_instance_list;
65 static struct list_head lfsck_ost_orphan_list;
66 static struct list_head lfsck_mdt_orphan_list;
67 static DEFINE_SPINLOCK(lfsck_instance_lock);
68
69 static const char *lfsck_status_names[] = {
70         [LS_INIT]               = "init",
71         [LS_SCANNING_PHASE1]    = "scanning-phase1",
72         [LS_SCANNING_PHASE2]    = "scanning-phase2",
73         [LS_COMPLETED]          = "completed",
74         [LS_FAILED]             = "failed",
75         [LS_STOPPED]            = "stopped",
76         [LS_PAUSED]             = "paused",
77         [LS_CRASHED]            = "crashed",
78         [LS_PARTIAL]            = "partial",
79         [LS_CO_FAILED]          = "co-failed",
80         [LS_CO_STOPPED]         = "co-stopped",
81         [LS_CO_PAUSED]          = "co-paused"
82 };
83
84 const char *lfsck_flags_names[] = {
85         "scanned-once",
86         "inconsistent",
87         "upgrade",
88         "incomplete",
89         "crashed_lastid",
90         NULL
91 };
92
93 const char *lfsck_param_names[] = {
94         NULL,
95         "failout",
96         "dryrun",
97         "all_targets",
98         "broadcast",
99         "orphan",
100         "create_ostobj",
101         "create_mdtobj",
102         NULL
103 };
104
105 enum lfsck_verify_lpf_types {
106         LVLT_BY_BOOKMARK        = 0,
107         LVLT_BY_NAMEENTRY       = 1,
108 };
109
110 const char *lfsck_status2names(enum lfsck_status status)
111 {
112         if (unlikely(status < 0 || status >= LS_MAX))
113                 return "unknown";
114
115         return lfsck_status_names[status];
116 }
117
118 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
119 {
120         spin_lock_init(&ltds->ltd_lock);
121         init_rwsem(&ltds->ltd_rw_sem);
122         INIT_LIST_HEAD(&ltds->ltd_orphan);
123         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
124         if (ltds->ltd_tgts_bitmap == NULL)
125                 return -ENOMEM;
126
127         return 0;
128 }
129
130 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
131 {
132         struct lfsck_tgt_desc   *ltd;
133         struct lfsck_tgt_desc   *next;
134         int                      idx;
135
136         down_write(&ltds->ltd_rw_sem);
137
138         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
139                                  ltd_orphan_list) {
140                 list_del_init(&ltd->ltd_orphan_list);
141                 lfsck_tgt_put(ltd);
142         }
143
144         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
145                 up_write(&ltds->ltd_rw_sem);
146
147                 return;
148         }
149
150         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
151                 ltd = LTD_TGT(ltds, idx);
152                 if (likely(ltd != NULL)) {
153                         LASSERT(list_empty(&ltd->ltd_layout_list));
154                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
155                         LASSERT(list_empty(&ltd->ltd_namespace_list));
156                         LASSERT(list_empty(&ltd->ltd_namespace_phase_list));
157
158                         ltds->ltd_tgtnr--;
159                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
160                         LTD_TGT(ltds, idx) = NULL;
161                         lfsck_tgt_put(ltd);
162                 }
163         }
164
165         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
166                  ltds->ltd_tgtnr);
167
168         for (idx = 0; idx < TGT_PTRS; idx++) {
169                 if (ltds->ltd_tgts_idx[idx] != NULL) {
170                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
171                         ltds->ltd_tgts_idx[idx] = NULL;
172                 }
173         }
174
175         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
176         ltds->ltd_tgts_bitmap = NULL;
177         up_write(&ltds->ltd_rw_sem);
178 }
179
180 static int __lfsck_add_target(const struct lu_env *env,
181                               struct lfsck_instance *lfsck,
182                               struct lfsck_tgt_desc *ltd,
183                               bool for_ost, bool locked)
184 {
185         struct lfsck_tgt_descs *ltds;
186         __u32                   index = ltd->ltd_index;
187         int                     rc    = 0;
188         ENTRY;
189
190         if (for_ost)
191                 ltds = &lfsck->li_ost_descs;
192         else
193                 ltds = &lfsck->li_mdt_descs;
194
195         if (!locked)
196                 down_write(&ltds->ltd_rw_sem);
197
198         LASSERT(ltds->ltd_tgts_bitmap != NULL);
199
200         if (index >= ltds->ltd_tgts_bitmap->size) {
201                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
202                                     (__u32)BITS_PER_LONG);
203                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
204                 cfs_bitmap_t *new_bitmap;
205
206                 while (newsize < index + 1)
207                         newsize <<= 1;
208
209                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
210                 if (new_bitmap == NULL)
211                         GOTO(unlock, rc = -ENOMEM);
212
213                 if (ltds->ltd_tgtnr > 0)
214                         cfs_bitmap_copy(new_bitmap, old_bitmap);
215                 ltds->ltd_tgts_bitmap = new_bitmap;
216                 CFS_FREE_BITMAP(old_bitmap);
217         }
218
219         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
220                 CERROR("%s: the device %s (%u) is registered already\n",
221                        lfsck_lfsck2name(lfsck),
222                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
223                 GOTO(unlock, rc = -EEXIST);
224         }
225
226         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
227                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
228                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
229                         GOTO(unlock, rc = -ENOMEM);
230         }
231
232         LTD_TGT(ltds, index) = ltd;
233         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
234         ltds->ltd_tgtnr++;
235
236         GOTO(unlock, rc = 0);
237
238 unlock:
239         if (!locked)
240                 up_write(&ltds->ltd_rw_sem);
241
242         return rc;
243 }
244
245 static int lfsck_add_target_from_orphan(const struct lu_env *env,
246                                         struct lfsck_instance *lfsck)
247 {
248         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
249         struct lfsck_tgt_desc   *ltd;
250         struct lfsck_tgt_desc   *next;
251         struct list_head        *head    = &lfsck_ost_orphan_list;
252         int                      rc;
253         bool                     for_ost = true;
254
255 again:
256         spin_lock(&lfsck_instance_lock);
257         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
258                 if (ltd->ltd_key == lfsck->li_bottom)
259                         list_move_tail(&ltd->ltd_orphan_list,
260                                        &ltds->ltd_orphan);
261         }
262         spin_unlock(&lfsck_instance_lock);
263
264         down_write(&ltds->ltd_rw_sem);
265         while (!list_empty(&ltds->ltd_orphan)) {
266                 ltd = list_entry(ltds->ltd_orphan.next,
267                                  struct lfsck_tgt_desc,
268                                  ltd_orphan_list);
269                 list_del_init(&ltd->ltd_orphan_list);
270                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
271                 /* Do not hold the semaphore for too long time. */
272                 up_write(&ltds->ltd_rw_sem);
273                 if (rc != 0)
274                         return rc;
275
276                 down_write(&ltds->ltd_rw_sem);
277         }
278         up_write(&ltds->ltd_rw_sem);
279
280         if (for_ost) {
281                 ltds = &lfsck->li_mdt_descs;
282                 head = &lfsck_mdt_orphan_list;
283                 for_ost = false;
284                 goto again;
285         }
286
287         return 0;
288 }
289
290 static inline struct lfsck_component *
291 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
292                        struct list_head *list)
293 {
294         struct lfsck_component *com;
295
296         list_for_each_entry(com, list, lc_link) {
297                 if (com->lc_type == type)
298                         return com;
299         }
300         return NULL;
301 }
302
303 struct lfsck_component *
304 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
305 {
306         struct lfsck_component *com;
307
308         spin_lock(&lfsck->li_lock);
309         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
310         if (com != NULL)
311                 goto unlock;
312
313         com = __lfsck_component_find(lfsck, type,
314                                      &lfsck->li_list_double_scan);
315         if (com != NULL)
316                 goto unlock;
317
318         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
319
320 unlock:
321         if (com != NULL)
322                 lfsck_component_get(com);
323         spin_unlock(&lfsck->li_lock);
324         return com;
325 }
326
327 void lfsck_component_cleanup(const struct lu_env *env,
328                              struct lfsck_component *com)
329 {
330         if (!list_empty(&com->lc_link))
331                 list_del_init(&com->lc_link);
332         if (!list_empty(&com->lc_link_dir))
333                 list_del_init(&com->lc_link_dir);
334
335         lfsck_component_put(env, com);
336 }
337
338 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
339                     struct lu_fid *fid, bool locked)
340 {
341         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
342         int                      rc = 0;
343         ENTRY;
344
345         if (!locked)
346                 mutex_lock(&lfsck->li_mutex);
347
348         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
349         if (rc >= 0) {
350                 bk->lb_last_fid = *fid;
351                 /* We do not care about whether the subsequent sub-operations
352                  * failed or not. The worst case is that one FID is lost that
353                  * is not a big issue for the LFSCK since it is relative rare
354                  * for LFSCK create. */
355                 rc = lfsck_bookmark_store(env, lfsck);
356         }
357
358         if (!locked)
359                 mutex_unlock(&lfsck->li_mutex);
360
361         RETURN(rc);
362 }
363
364 /**
365  * Request the specified ibits lock for the given object.
366  *
367  * Before the LFSCK modifying on the namespace visible object,
368  * it needs to acquire related ibits ldlm lock.
369  *
370  * \param[in] env       pointer to the thread context
371  * \param[in] lfsck     pointer to the lfsck instance
372  * \param[in] obj       pointer to the dt_object to be locked
373  * \param[out] lh       pointer to the lock handle
374  * \param[in] ibits     the bits for the ldlm lock to be acquired
375  * \param[in] mode      the mode for the ldlm lock to be acquired
376  *
377  * \retval              0 for success
378  * \retval              negative error number on failure
379  */
380 int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
381                      struct dt_object *obj, struct lustre_handle *lh,
382                      __u64 bits, ldlm_mode_t mode)
383 {
384         struct lfsck_thread_info        *info   = lfsck_env_info(env);
385         ldlm_policy_data_t              *policy = &info->lti_policy;
386         struct ldlm_res_id              *resid  = &info->lti_resid;
387         __u64                            flags  = LDLM_FL_ATOMIC_CB;
388         int                              rc;
389
390         LASSERT(lfsck->li_namespace != NULL);
391
392         memset(policy, 0, sizeof(*policy));
393         policy->l_inodebits.bits = bits;
394         fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
395         if (dt_object_remote(obj)) {
396                 struct ldlm_enqueue_info *einfo = &info->lti_einfo;
397
398                 memset(einfo, 0, sizeof(*einfo));
399                 einfo->ei_type = LDLM_IBITS;
400                 einfo->ei_mode = mode;
401                 einfo->ei_cb_bl = ldlm_blocking_ast;
402                 einfo->ei_cb_cp = ldlm_completion_ast;
403                 einfo->ei_res_id = resid;
404
405                 rc = dt_object_lock(env, obj, lh, einfo, policy);
406         } else {
407                 rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
408                                             LDLM_IBITS, policy, mode,
409                                             &flags, ldlm_blocking_ast,
410                                             ldlm_completion_ast, NULL, NULL,
411                                             0, LVB_T_NONE, NULL, lh);
412         }
413
414         if (rc == ELDLM_OK) {
415                 rc = 0;
416         } else {
417                 memset(lh, 0, sizeof(*lh));
418                 rc = -EIO;
419         }
420
421         return rc;
422 }
423
424 /**
425  * Release the the specified ibits lock.
426  *
427  * If the lock has been acquired before, release it
428  * and cleanup the handle. Otherwise, do nothing.
429  *
430  * \param[in] lh        pointer to the lock handle
431  * \param[in] mode      the mode for the ldlm lock to be released
432  */
433 void lfsck_ibits_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
434 {
435         if (lustre_handle_is_used(lh)) {
436                 ldlm_lock_decref(lh, mode);
437                 memset(lh, 0, sizeof(*lh));
438         }
439 }
440
441 int lfsck_find_mdt_idx_by_fid(const struct lu_env *env,
442                               struct lfsck_instance *lfsck,
443                               const struct lu_fid *fid)
444 {
445         struct seq_server_site  *ss     =
446                         lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
447         struct lu_seq_range     *range  = &lfsck_env_info(env)->lti_range;
448         int                      rc;
449
450         fld_range_set_mdt(range);
451         rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
452         if (rc == 0)
453                 rc = range->lsr_index;
454
455         return rc;
456 }
457
458 const char dot[] = ".";
459 const char dotdot[] = "..";
460 static const char dotlustre[] = ".lustre";
461 static const char lostfound[] = "lost+found";
462
463 /**
464  * Remove the name entry from the .lustre/lost+found directory.
465  *
466  * No need to care about the object referenced by the name entry,
467  * either the name entry is invalid or redundant, or the referenced
468  * object has been processed or will be handled by others.
469  *
470  * \param[in] env       pointer to the thread context
471  * \param[in] lfsck     pointer to the lfsck instance
472  * \param[in] name      the name for the name entry to be removed
473  *
474  * \retval              0 for success
475  * \retval              negative error number on failure
476  */
477 static int lfsck_lpf_remove_name_entry(const struct lu_env *env,
478                                        struct lfsck_instance *lfsck,
479                                        const char *name)
480 {
481         struct dt_object        *parent = lfsck->li_lpf_root_obj;
482         struct dt_device        *dev    = lfsck->li_next;
483         struct thandle          *th;
484         struct lustre_handle     lh     = { 0 };
485         int                      rc;
486         ENTRY;
487
488         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
489                               MDS_INODELOCK_UPDATE, LCK_EX);
490         if (rc != 0)
491                 RETURN(rc);
492
493         th = dt_trans_create(env, dev);
494         if (IS_ERR(th))
495                 GOTO(unlock, rc = PTR_ERR(th));
496
497         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
498         if (rc != 0)
499                 GOTO(stop, rc);
500
501         rc = dt_declare_ref_del(env, parent, th);
502         if (rc != 0)
503                 GOTO(stop, rc);
504
505         rc = dt_trans_start(env, dev, th);
506         if (rc != 0)
507                 GOTO(stop, rc);
508
509         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
510                        BYPASS_CAPA);
511         if (rc != 0)
512                 GOTO(stop, rc);
513
514         dt_write_lock(env, parent, 0);
515         rc = dt_ref_del(env, parent, th);
516         dt_write_unlock(env, parent);
517
518         GOTO(stop, rc);
519
520 stop:
521         dt_trans_stop(env, dev, th);
522
523 unlock:
524         lfsck_ibits_unlock(&lh, LCK_EX);
525
526         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s: rc = %d\n",
527                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(parent)), name, rc);
528
529         return rc;
530 }
531
532 static int lfsck_create_lpf_local(const struct lu_env *env,
533                                   struct lfsck_instance *lfsck,
534                                   struct dt_object *child,
535                                   struct lu_attr *la,
536                                   struct dt_object_format *dof,
537                                   const char *name)
538 {
539         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
540         struct dt_object        *parent = lfsck->li_lpf_root_obj;
541         struct dt_device        *dev    = lfsck->li_bottom;
542         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
543         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
544         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
545         struct thandle          *th     = NULL;
546         struct linkea_data       ldata  = { NULL };
547         struct lu_buf            linkea_buf;
548         const struct lu_name    *cname;
549         loff_t                   pos    = 0;
550         int                      len    = sizeof(struct lfsck_bookmark);
551         int                      rc;
552         ENTRY;
553
554         rc = linkea_data_new(&ldata,
555                              &lfsck_env_info(env)->lti_linkea_buf2);
556         if (rc != 0)
557                 RETURN(rc);
558
559         cname = lfsck_name_get_const(env, name, strlen(name));
560         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
561         if (rc != 0)
562                 RETURN(rc);
563
564         th = dt_trans_create(env, dev);
565         if (IS_ERR(th))
566                 RETURN(PTR_ERR(th));
567
568         /* 1a. create child */
569         rc = dt_declare_create(env, child, la, NULL, dof, th);
570         if (rc != 0)
571                 GOTO(stop, rc);
572
573         /* 2a. increase child nlink */
574         rc = dt_declare_ref_add(env, child, th);
575         if (rc != 0)
576                 GOTO(stop, rc);
577
578         /* 3a. insert linkEA for child */
579         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
580                        ldata.ld_leh->leh_len);
581         rc = dt_declare_xattr_set(env, child, &linkea_buf,
582                                   XATTR_NAME_LINK, 0, th);
583         if (rc != 0)
584                 GOTO(stop, rc);
585
586         /* 4a. insert name into parent dir */
587         rec->rec_type = S_IFDIR;
588         rec->rec_fid = cfid;
589         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
590                                (const struct dt_key *)name, th);
591         if (rc != 0)
592                 GOTO(stop, rc);
593
594         /* 5a. increase parent nlink */
595         rc = dt_declare_ref_add(env, parent, th);
596         if (rc != 0)
597                 GOTO(stop, rc);
598
599         /* 6a. update bookmark */
600         rc = dt_declare_record_write(env, bk_obj,
601                                      lfsck_buf_get(env, bk, len), 0, th);
602         if (rc != 0)
603                 GOTO(stop, rc);
604
605         rc = dt_trans_start_local(env, dev, th);
606         if (rc != 0)
607                 GOTO(stop, rc);
608
609         dt_write_lock(env, child, 0);
610         /* 1b.1. create child */
611         rc = dt_create(env, child, la, NULL, dof, th);
612         if (rc != 0)
613                 GOTO(unlock, rc);
614
615         if (unlikely(!dt_try_as_dir(env, child)))
616                 GOTO(unlock, rc = -ENOTDIR);
617
618         /* 1b.2. insert dot into child dir */
619         rec->rec_fid = cfid;
620         rc = dt_insert(env, child, (const struct dt_rec *)rec,
621                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
622         if (rc != 0)
623                 GOTO(unlock, rc);
624
625         /* 1b.3. insert dotdot into child dir */
626         rec->rec_fid = &LU_LPF_FID;
627         rc = dt_insert(env, child, (const struct dt_rec *)rec,
628                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
629         if (rc != 0)
630                 GOTO(unlock, rc);
631
632         /* 2b. increase child nlink */
633         rc = dt_ref_add(env, child, th);
634         if (rc != 0)
635                 GOTO(unlock, rc);
636
637         /* 3b. insert linkEA for child. */
638         rc = dt_xattr_set(env, child, &linkea_buf,
639                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
640         dt_write_unlock(env, child);
641         if (rc != 0)
642                 GOTO(stop, rc);
643
644         /* 4b. insert name into parent dir */
645         rec->rec_fid = cfid;
646         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
647                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
648         if (rc != 0)
649                 GOTO(stop, rc);
650
651         dt_write_lock(env, parent, 0);
652         /* 5b. increase parent nlink */
653         rc = dt_ref_add(env, parent, th);
654         dt_write_unlock(env, parent);
655         if (rc != 0)
656                 GOTO(stop, rc);
657
658         bk->lb_lpf_fid = *cfid;
659         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
660
661         /* 6b. update bookmark */
662         rc = dt_record_write(env, bk_obj,
663                              lfsck_buf_get(env, bk, len), &pos, th);
664
665         GOTO(stop, rc);
666
667 unlock:
668         dt_write_unlock(env, child);
669
670 stop:
671         dt_trans_stop(env, dev, th);
672
673         return rc;
674 }
675
676 static int lfsck_create_lpf_remote(const struct lu_env *env,
677                                    struct lfsck_instance *lfsck,
678                                    struct dt_object *child,
679                                    struct lu_attr *la,
680                                    struct dt_object_format *dof,
681                                    const char *name)
682 {
683         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
684         struct dt_object        *parent = lfsck->li_lpf_root_obj;
685         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
686         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
687         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
688         struct thandle          *th     = NULL;
689         struct linkea_data       ldata  = { NULL };
690         struct lu_buf            linkea_buf;
691         const struct lu_name    *cname;
692         struct dt_device        *dev;
693         loff_t                   pos    = 0;
694         int                      len    = sizeof(struct lfsck_bookmark);
695         int                      rc;
696         ENTRY;
697
698         rc = linkea_data_new(&ldata,
699                              &lfsck_env_info(env)->lti_linkea_buf2);
700         if (rc != 0)
701                 RETURN(rc);
702
703         cname = lfsck_name_get_const(env, name, strlen(name));
704         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
705         if (rc != 0)
706                 RETURN(rc);
707
708         /* Create .lustre/lost+found/MDTxxxx. */
709
710         /* XXX: Currently, cross-MDT create operation needs to create the child
711          *      object firstly, then insert name into the parent directory. For
712          *      this case, the child object resides on current MDT (local), but
713          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
714          *      easy to contain all the sub-modifications orderly within single
715          *      transaction.
716          *
717          *      To avoid more inconsistency, we split the create operation into
718          *      two transactions:
719          *
720          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
721          *         locally.
722          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
723          *         remotely.
724          *
725          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
726          *      repair such inconsistency when LFSCK run next time. */
727
728         /* Transaction I: locally */
729
730         dev = lfsck->li_bottom;
731         th = dt_trans_create(env, dev);
732         if (IS_ERR(th))
733                 RETURN(PTR_ERR(th));
734
735         /* 1a. create child */
736         rc = dt_declare_create(env, child, la, NULL, dof, th);
737         if (rc != 0)
738                 GOTO(stop, rc);
739
740         /* 2a. increase child nlink */
741         rc = dt_declare_ref_add(env, child, th);
742         if (rc != 0)
743                 GOTO(stop, rc);
744
745         /* 3a. insert linkEA for child */
746         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
747                        ldata.ld_leh->leh_len);
748         rc = dt_declare_xattr_set(env, child, &linkea_buf,
749                                   XATTR_NAME_LINK, 0, th);
750         if (rc != 0)
751                 GOTO(stop, rc);
752
753         /* 4a. update bookmark */
754         rc = dt_declare_record_write(env, bk_obj,
755                                      lfsck_buf_get(env, bk, len), 0, th);
756         if (rc != 0)
757                 GOTO(stop, rc);
758
759         rc = dt_trans_start_local(env, dev, th);
760         if (rc != 0)
761                 GOTO(stop, rc);
762
763         dt_write_lock(env, child, 0);
764         /* 1b.1. create child */
765         rc = dt_create(env, child, la, NULL, dof, th);
766         if (rc != 0)
767                 GOTO(unlock, rc);
768
769         if (unlikely(!dt_try_as_dir(env, child)))
770                 GOTO(unlock, rc = -ENOTDIR);
771
772         /* 1b.2. insert dot into child dir */
773         rec->rec_type = S_IFDIR;
774         rec->rec_fid = cfid;
775         rc = dt_insert(env, child, (const struct dt_rec *)rec,
776                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
777         if (rc != 0)
778                 GOTO(unlock, rc);
779
780         /* 1b.3. insert dotdot into child dir */
781         rec->rec_fid = &LU_LPF_FID;
782         rc = dt_insert(env, child, (const struct dt_rec *)rec,
783                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
784         if (rc != 0)
785                 GOTO(unlock, rc);
786
787         /* 2b. increase child nlink */
788         rc = dt_ref_add(env, child, th);
789         if (rc != 0)
790                 GOTO(unlock, rc);
791
792         /* 3b. insert linkEA for child */
793         rc = dt_xattr_set(env, child, &linkea_buf,
794                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
795         if (rc != 0)
796                 GOTO(unlock, rc);
797
798         bk->lb_lpf_fid = *cfid;
799         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
800
801         /* 4b. update bookmark */
802         rc = dt_record_write(env, bk_obj,
803                              lfsck_buf_get(env, bk, len), &pos, th);
804
805         dt_write_unlock(env, child);
806         dt_trans_stop(env, dev, th);
807         if (rc != 0)
808                 RETURN(rc);
809
810         /* Transaction II: remotely */
811
812         dev = lfsck->li_next;
813         th = dt_trans_create(env, dev);
814         if (IS_ERR(th))
815                 RETURN(PTR_ERR(th));
816
817         /* 5a. insert name into parent dir */
818         rec->rec_fid = cfid;
819         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
820                                (const struct dt_key *)name, th);
821         if (rc != 0)
822                 GOTO(stop, rc);
823
824         /* 6a. increase parent nlink */
825         rc = dt_declare_ref_add(env, parent, th);
826         if (rc != 0)
827                 GOTO(stop, rc);
828
829         rc = dt_trans_start(env, dev, th);
830         if (rc != 0)
831                 GOTO(stop, rc);
832
833         /* 5b. insert name into parent dir */
834         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
835                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
836         if (rc != 0)
837                 GOTO(stop, rc);
838
839         dt_write_lock(env, parent, 0);
840         /* 6b. increase parent nlink */
841         rc = dt_ref_add(env, parent, th);
842         dt_write_unlock(env, parent);
843
844         GOTO(stop, rc);
845
846 unlock:
847         dt_write_unlock(env, child);
848 stop:
849         dt_trans_stop(env, dev, th);
850
851         if (rc != 0 && dev == lfsck->li_next)
852                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
853                        "for orphans, but failed to insert the name %s "
854                        "to the .lustre/lost+found/. Such inconsistency "
855                        "will be repaired when LFSCK run next time: rc = %d\n",
856                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
857
858         return rc;
859 }
860
861 /**
862  * Create the MDTxxxx directory under /ROOT/.lustre/lost+found/
863  *
864  * The /ROOT/.lustre/lost+found/MDTxxxx/ directory is used for holding
865  * orphans and other uncertain inconsistent objects found during the
866  * LFSCK. Such directory will be created by the LFSCK engine on the
867  * local MDT before the LFSCK scanning.
868  *
869  * \param[in] env       pointer to the thread context
870  * \param[in] lfsck     pointer to the lfsck instance
871  *
872  * \retval              0 for success
873  * \retval              negative error number on failure
874  */
875 static int lfsck_create_lpf(const struct lu_env *env,
876                             struct lfsck_instance *lfsck)
877 {
878         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
879         struct lfsck_thread_info *info  = lfsck_env_info(env);
880         struct lu_fid            *cfid  = &info->lti_fid2;
881         struct lu_attr           *la    = &info->lti_la;
882         struct dt_object_format  *dof   = &info->lti_dof;
883         struct dt_object         *parent = lfsck->li_lpf_root_obj;
884         struct dt_object         *child = NULL;
885         struct lustre_handle      lh    = { 0 };
886         char                      name[8];
887         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
888         int                       rc    = 0;
889         ENTRY;
890
891         LASSERT(lfsck->li_master);
892         LASSERT(parent != NULL);
893         LASSERT(lfsck->li_lpf_obj == NULL);
894
895         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
896                               MDS_INODELOCK_UPDATE, LCK_EX);
897         if (rc != 0)
898                 RETURN(rc);
899
900         snprintf(name, 8, "MDT%04x", node);
901         if (fid_is_zero(&bk->lb_lpf_fid)) {
902                 /* There is corner case that: in former LFSCK scanning we have
903                  * created the .lustre/lost+found/MDTxxxx but failed to update
904                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
905                  * it from MDT0 firstly. */
906                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
907                                (const struct dt_key *)name, BYPASS_CAPA);
908                 if (rc != 0 && rc != -ENOENT)
909                         GOTO(unlock, rc);
910
911                 if (rc == 0) {
912                         bk->lb_lpf_fid = *cfid;
913                         rc = lfsck_bookmark_store(env, lfsck);
914                 } else {
915                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
916                 }
917                 if (rc != 0)
918                         GOTO(unlock, rc);
919         } else {
920                 *cfid = bk->lb_lpf_fid;
921         }
922
923         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
924         if (IS_ERR(child))
925                 GOTO(unlock, rc = PTR_ERR(child));
926
927         if (dt_object_exists(child) != 0) {
928                 if (unlikely(!dt_try_as_dir(env, child)))
929                         rc = -ENOTDIR;
930                 else
931                         lfsck->li_lpf_obj = child;
932
933                 GOTO(unlock, rc);
934         }
935
936         memset(la, 0, sizeof(*la));
937         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
938         la->la_mode = S_IFDIR | S_IRWXU;
939         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
940                        LA_UID | LA_GID;
941         memset(dof, 0, sizeof(*dof));
942         dof->dof_type = dt_mode_to_dft(S_IFDIR);
943
944         if (node == 0)
945                 rc = lfsck_create_lpf_local(env, lfsck, child, la, dof, name);
946         else
947                 rc = lfsck_create_lpf_remote(env, lfsck, child, la, dof, name);
948         if (rc == 0)
949                 lfsck->li_lpf_obj = child;
950
951         GOTO(unlock, rc);
952
953 unlock:
954         lfsck_ibits_unlock(&lh, LCK_EX);
955         if (rc != 0 && child != NULL && !IS_ERR(child))
956                 lu_object_put(env, &child->do_lu);
957
958         return rc;
959 }
960
961 /**
962  * Scan .lustre/lost+found for bad name entries and remove them.
963  *
964  * The valid name entry should be "MDTxxxx", the "xxxx" is the MDT device
965  * index in the system. Any other formatted name is invalid and should be
966  * removed.
967  *
968  * \param[in] env       pointer to the thread context
969  * \param[in] lfsck     pointer to the lfsck instance
970  *
971  * \retval              0 for success
972  * \retval              negative error number on failure
973  */
974 static int lfsck_scan_lpf_bad_entries(const struct lu_env *env,
975                                       struct lfsck_instance *lfsck)
976 {
977         struct dt_object        *parent = lfsck->li_lpf_root_obj;
978         struct lu_dirent        *ent    =
979                         (struct lu_dirent *)lfsck_env_info(env)->lti_key;
980         const struct dt_it_ops  *iops   = &parent->do_index_ops->dio_it;
981         struct dt_it            *it;
982         int                      rc;
983         ENTRY;
984
985         it = iops->init(env, parent, LUDA_64BITHASH, BYPASS_CAPA);
986         if (IS_ERR(it))
987                 RETURN(PTR_ERR(it));
988
989         rc = iops->load(env, it, 0);
990         if (rc == 0)
991                 rc = iops->next(env, it);
992         else if (rc > 0)
993                 rc = 0;
994
995         while (rc == 0) {
996                 int off = 3;
997
998                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
999                 if (rc != 0)
1000                         break;
1001
1002                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
1003                 if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
1004                         goto next;
1005
1006                 /* name length must be strlen("MDTxxxx") */
1007                 if (ent->lde_namelen != 7)
1008                         goto remove;
1009
1010                 if (memcmp(ent->lde_name, "MDT", off) != 0)
1011                         goto remove;
1012
1013                 while (off < 7 && isxdigit(ent->lde_name[off]))
1014                         off++;
1015
1016                 if (off != 7) {
1017
1018 remove:
1019                         rc = lfsck_lpf_remove_name_entry(env, lfsck,
1020                                                          ent->lde_name);
1021                         if (rc != 0)
1022                                 break;
1023                 }
1024
1025 next:
1026                 rc = iops->next(env, it);
1027         }
1028
1029         iops->put(env, it);
1030         iops->fini(env, it);
1031
1032         RETURN(rc > 0 ? 0 : rc);
1033 }
1034
1035 static int lfsck_update_lpf_entry(const struct lu_env *env,
1036                                   struct lfsck_instance *lfsck,
1037                                   struct dt_object *parent,
1038                                   struct dt_object *child,
1039                                   const char *name,
1040                                   enum lfsck_verify_lpf_types type)
1041 {
1042         int rc;
1043
1044         if (type == LVLT_BY_BOOKMARK) {
1045                 rc = lfsck_update_name_entry(env, lfsck, parent, name,
1046                                              lfsck_dto2fid(child), S_IFDIR);
1047         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1048                 lfsck->li_bookmark_ram.lb_lpf_fid = *lfsck_dto2fid(child);
1049                 rc = lfsck_bookmark_store(env, lfsck);
1050
1051                 CDEBUG(D_LFSCK, "%s: update LPF fid "DFID
1052                        " in the bookmark file: rc = %d\n",
1053                        lfsck_lfsck2name(lfsck),
1054                        PFID(lfsck_dto2fid(child)), rc);
1055         }
1056
1057         return rc;
1058 }
1059
1060 /**
1061  * Check whether the @child back references the @parent.
1062  *
1063  * Two cases:
1064  * 1) The child's FID is stored in the bookmark file. If the child back
1065  *    references the parent (LU_LPF_FID object) via its ".." entry, then
1066  *    insert the name (MDTxxxx) to the .lustre/lost+found; otherwise, if
1067  *    the child back references another parent2, then:
1068  * 1.1) If the parent2 recognizes the child, then update the bookmark file;
1069  * 1.2) Otherwise, the LFSCK cannot know whether there will be parent3 that
1070  *      references the child. So keep them there. As the LFSCK processing,
1071  *      the parent3 may be found, then when the LFSCK run next time, the
1072  *      inconsistency can be repaired.
1073  *
1074  * 2) The child's FID is stored in the .lustre/lost+found/ sub-directory name
1075  *    entry (MDTxxxx). If the child back references the parent (LU_LPF_FID obj)
1076  *    via its ".." entry, then update the bookmark file, otherwise, if the child
1077  *    back references another parent2, then:
1078  * 2.1) If the parent2 recognizes the child, then remove the sub-directory
1079  *      from .lustre/lost+found/;
1080  * 2.2) Otherwise, if the parent2 does not recognizes the child, trust the
1081  *      sub-directory name entry and update the child;
1082  * 2.3) Otherwise, if we do not know whether the parent2 recognizes the child
1083  *      or not, then keep them there.
1084  *
1085  * \param[in] env       pointer to the thread context
1086  * \param[in] lfsck     pointer to the lfsck instance
1087  * \param[in] child     pointer to the lost+found sub-directory object
1088  * \param[in] name      the name for lost+found sub-directory object
1089  * \param[out] fid      pointer to the buffer to hold the FID of the object
1090  *                      (called it as parent2) that is referenced via the
1091  *                      child's dotdot entry; it also can be the FID that
1092  *                      is referenced by the name entry under the parent2.
1093  * \param[in] type      to indicate where the child's FID is stored in
1094  *
1095  * \retval              positive number for uncertain inconsistency
1096  * \retval              0 for success
1097  * \retval              negative error number on failure
1098  */
1099 static int lfsck_verify_lpf_pairs(const struct lu_env *env,
1100                                   struct lfsck_instance *lfsck,
1101                                   struct dt_object *child, const char *name,
1102                                   struct lu_fid *fid,
1103                                   enum lfsck_verify_lpf_types type)
1104 {
1105         struct dt_object         *parent  = lfsck->li_lpf_root_obj;
1106         struct lfsck_thread_info *info    = lfsck_env_info(env);
1107         char                     *name2   = info->lti_key;
1108         struct lu_fid            *fid2    = &info->lti_fid3;
1109         struct dt_object         *parent2 = NULL;
1110         struct lustre_handle      lh      = { 0 };
1111         int                       rc;
1112         ENTRY;
1113
1114         fid_zero(fid);
1115         rc = dt_lookup(env, child, (struct dt_rec *)fid,
1116                        (const struct dt_key *)dotdot, BYPASS_CAPA);
1117         if (rc != 0)
1118                 GOTO(linkea, rc);
1119
1120         if (!fid_is_sane(fid))
1121                 GOTO(linkea, rc = -EINVAL);
1122
1123         if (lu_fid_eq(fid, &LU_LPF_FID)) {
1124                 const struct lu_name *cname;
1125
1126                 if (lfsck->li_lpf_obj == NULL) {
1127                         lu_object_get(&child->do_lu);
1128                         lfsck->li_lpf_obj = child;
1129                 }
1130
1131                 cname = lfsck_name_get_const(env, name, strlen(name));
1132                 rc = lfsck_verify_linkea(env, lfsck->li_bottom, child, cname,
1133                                          &LU_LPF_FID);
1134                 if (rc == 0)
1135                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1136                                                     name, type);
1137
1138                 GOTO(out_done, rc);
1139         }
1140
1141         parent2 = lfsck_object_find_by_dev(env, lfsck->li_next, fid);
1142         if (IS_ERR(parent2))
1143                 GOTO(linkea, parent2);
1144
1145         if (!dt_object_exists(parent2)) {
1146                 lu_object_put(env, &parent2->do_lu);
1147
1148                 GOTO(linkea, parent2 = ERR_PTR(-ENOENT));
1149         }
1150
1151         if (!dt_try_as_dir(env, parent2)) {
1152                 lu_object_put(env, &parent2->do_lu);
1153
1154                 GOTO(linkea, parent2 = ERR_PTR(-ENOTDIR));
1155         }
1156
1157 linkea:
1158         /* To prevent rename/unlink race */
1159         rc = lfsck_ibits_lock(env, lfsck, child, &lh,
1160                               MDS_INODELOCK_UPDATE, LCK_PR);
1161         if (rc != 0)
1162                 GOTO(out_put, rc);
1163
1164         dt_read_lock(env, child, 0);
1165         rc = lfsck_links_get_first(env, child, name2, fid2);
1166         if (rc != 0) {
1167                 dt_read_unlock(env, child);
1168                 lfsck_ibits_unlock(&lh, LCK_PR);
1169
1170                 GOTO(out_put, rc = 1);
1171         }
1172
1173         /* It is almost impossible that the bookmark file (or the name entry)
1174          * and the linkEA hit the same data corruption. Trust the linkEA. */
1175         if (lu_fid_eq(fid2, &LU_LPF_FID) && strcmp(name, name2) == 0) {
1176                 dt_read_unlock(env, child);
1177                 lfsck_ibits_unlock(&lh, LCK_PR);
1178
1179                 *fid = *fid2;
1180                 if (lfsck->li_lpf_obj == NULL) {
1181                         lu_object_get(&child->do_lu);
1182                         lfsck->li_lpf_obj = child;
1183                 }
1184
1185                 /* Update the child's dotdot entry */
1186                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1187                                              &LU_LPF_FID, S_IFDIR);
1188                 if (rc == 0)
1189                         rc = lfsck_update_lpf_entry(env, lfsck, parent, child,
1190                                                     name, type);
1191
1192                 GOTO(out_put, rc);
1193         }
1194
1195         if (parent2 == NULL || IS_ERR(parent2)) {
1196                 dt_read_unlock(env, child);
1197                 lfsck_ibits_unlock(&lh, LCK_PR);
1198
1199                 GOTO(out_done, rc = 1);
1200         }
1201
1202         rc = dt_lookup(env, parent2, (struct dt_rec *)fid,
1203                        (const struct dt_key *)name2, BYPASS_CAPA);
1204         dt_read_unlock(env, child);
1205         lfsck_ibits_unlock(&lh, LCK_PR);
1206         if (rc != 0 && rc != -ENOENT)
1207                 GOTO(out_put, rc);
1208
1209         if (rc == -ENOENT || !lu_fid_eq(fid, lfsck_dto2fid(child))) {
1210                 if (type == LVLT_BY_BOOKMARK)
1211                         GOTO(out_put, rc = 1);
1212
1213                 /* Trust the name entry, update the child's dotdot entry. */
1214                 rc = lfsck_update_name_entry(env, lfsck, child, dotdot,
1215                                              &LU_LPF_FID, S_IFDIR);
1216
1217                 GOTO(out_put, rc);
1218         }
1219
1220         if (type == LVLT_BY_BOOKMARK) {
1221                 /* Invalid FID record in the bookmark file, reset it. */
1222                 fid_zero(&lfsck->li_bookmark_ram.lb_lpf_fid);
1223                 rc = lfsck_bookmark_store(env, lfsck);
1224
1225                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1226                        " in the bookmark file: rc = %d\n",
1227                        lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)), rc);
1228         } else /* if (type == LVLT_BY_NAMEENTRY) */ {
1229                 /* The name entry is wrong, remove it. */
1230                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1231         }
1232
1233         GOTO(out_put, rc);
1234
1235 out_put:
1236         if (parent2 != NULL && !IS_ERR(parent2))
1237                 lu_object_put(env, &parent2->do_lu);
1238
1239 out_done:
1240         return rc;
1241 }
1242
1243 /**
1244  * Verify the /ROOT/.lustre/lost+found/ directory.
1245  *
1246  * /ROOT/.lustre/lost+found/ is a special directory to hold the objects that
1247  * the LFSCK does not exactly know how to handle, such as orphans. So before
1248  * the LFSCK scanning the system, the consistency of such directory needs to
1249  * be verified firstly to allow the users to use it during the LFSCK.
1250  *
1251  * \param[in] env       pointer to the thread context
1252  * \param[in] lfsck     pointer to the lfsck instance
1253  *
1254  * \retval              positive number for uncertain inconsistency
1255  * \retval              0 for success
1256  * \retval              negative error number on failure
1257  */
1258 int lfsck_verify_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
1259 {
1260         struct lfsck_thread_info *info   = lfsck_env_info(env);
1261         struct lu_fid            *pfid   = &info->lti_fid;
1262         struct lu_fid            *cfid   = &info->lti_fid2;
1263         struct lfsck_bookmark    *bk     = &lfsck->li_bookmark_ram;
1264         struct dt_object         *parent;
1265         /* child1's FID is in the bookmark file. */
1266         struct dt_object         *child1 = NULL;
1267         /* child2's FID is in the name entry MDTxxxx. */
1268         struct dt_object         *child2 = NULL;
1269         struct dt_device         *dev    = lfsck->li_bottom;
1270         const struct lu_name     *cname;
1271         char                      name[8];
1272         int                       node   = lfsck_dev_idx(dev);
1273         int                       rc     = 0;
1274         ENTRY;
1275
1276         LASSERT(lfsck->li_master);
1277
1278         if (lfsck->li_lpf_root_obj != NULL)
1279                 RETURN(0);
1280
1281         if (node == 0) {
1282                 parent = lfsck_object_find_by_dev(env, dev, &LU_LPF_FID);
1283         } else {
1284                 struct lfsck_tgt_desc *ltd;
1285
1286                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
1287                 if (unlikely(ltd == NULL))
1288                         RETURN(-ENXIO);
1289
1290                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
1291                                                   &LU_LPF_FID);
1292                 lfsck_tgt_put(ltd);
1293         }
1294
1295         if (IS_ERR(parent))
1296                 RETURN(PTR_ERR(parent));
1297
1298         LASSERT(dt_object_exists(parent));
1299
1300         if (unlikely(!dt_try_as_dir(env, parent))) {
1301                 lu_object_put(env, &parent->do_lu);
1302
1303                 GOTO(put, rc = -ENOTDIR);
1304         }
1305
1306         lfsck->li_lpf_root_obj = parent;
1307         if (node == 0) {
1308                 rc = lfsck_scan_lpf_bad_entries(env, lfsck);
1309                 if (rc != 0)
1310                         CDEBUG(D_LFSCK, "%s: scan .lustre/lost+found/ "
1311                                "for bad sub-directories: rc = %d\n",
1312                                lfsck_lfsck2name(lfsck), rc);
1313         }
1314
1315         if (!fid_is_zero(&bk->lb_lpf_fid)) {
1316                 if (unlikely(!fid_is_norm(&bk->lb_lpf_fid))) {
1317                         struct lu_fid tfid = bk->lb_lpf_fid;
1318
1319                         /* Invalid FID record in the bookmark file, reset it. */
1320                         fid_zero(&bk->lb_lpf_fid);
1321                         rc = lfsck_bookmark_store(env, lfsck);
1322
1323                         CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1324                                " in the bookmark file: rc = %d\n",
1325                                lfsck_lfsck2name(lfsck), PFID(&tfid), rc);
1326
1327                         if (rc != 0)
1328                                 GOTO(put, rc);
1329                 } else {
1330                         child1 = lfsck_object_find_by_dev(env, dev,
1331                                                           &bk->lb_lpf_fid);
1332                         if (IS_ERR(child1)) {
1333                                 child1 = NULL;
1334                                 goto find_child2;
1335                         }
1336
1337                         if (unlikely(!dt_object_exists(child1) ||
1338                                      dt_object_remote(child1)) ||
1339                                      !S_ISDIR(lfsck_object_type(child1))) {
1340                                 /* Invalid FID record in the bookmark file,
1341                                  * reset it. */
1342                                 fid_zero(&bk->lb_lpf_fid);
1343                                 rc = lfsck_bookmark_store(env, lfsck);
1344
1345                                 CDEBUG(D_LFSCK, "%s: reset invalid LPF fid "DFID
1346                                        " in the bookmark file: rc = %d\n",
1347                                        lfsck_lfsck2name(lfsck),
1348                                        PFID(lfsck_dto2fid(child1)), rc);
1349
1350                                 if (rc != 0)
1351                                         GOTO(put, rc);
1352
1353                                 lu_object_put(env, &child1->do_lu);
1354                                 child1 = NULL;
1355                         } else if (unlikely(!dt_try_as_dir(env, child1))) {
1356                                 GOTO(put, rc = -ENOTDIR);
1357                         }
1358                 }
1359         }
1360
1361 find_child2:
1362         snprintf(name, 8, "MDT%04x", node);
1363         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
1364                        (const struct dt_key *)name, BYPASS_CAPA);
1365         if (rc == -ENOENT) {
1366                 if (!fid_is_zero(&bk->lb_lpf_fid))
1367                         goto check_child1;
1368
1369                 GOTO(put, rc = 0);
1370         }
1371
1372         if (rc != 0)
1373                 GOTO(put, rc);
1374
1375         /* Invalid FID in the name entry, remove the name entry. */
1376         if (!fid_is_norm(cfid)) {
1377                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1378                 if (rc != 0)
1379                         GOTO(put, rc);
1380
1381                 goto check_child1;
1382         }
1383
1384         child2 = lfsck_object_find_by_dev(env, dev, cfid);
1385         if (IS_ERR(child2))
1386                 GOTO(put, rc = PTR_ERR(child2));
1387
1388         if (unlikely(!dt_object_exists(child2) ||
1389                      dt_object_remote(child2)) ||
1390                      !S_ISDIR(lfsck_object_type(child2))) {
1391                 rc = lfsck_lpf_remove_name_entry(env, lfsck, name);
1392                 if (rc != 0)
1393                         GOTO(put, rc);
1394
1395                 goto check_child1;
1396         }
1397
1398         if (unlikely(!dt_try_as_dir(env, child2)))
1399                 GOTO(put, rc = -ENOTDIR);
1400
1401         if (child1 == NULL) {
1402                 rc = lfsck_verify_lpf_pairs(env, lfsck, child2, name,
1403                                             pfid, LVLT_BY_NAMEENTRY);
1404         } else if (!lu_fid_eq(cfid, &bk->lb_lpf_fid)) {
1405                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1406                                             pfid, LVLT_BY_BOOKMARK);
1407                 if (!lu_fid_eq(pfid, &LU_LPF_FID))
1408                         rc = lfsck_verify_lpf_pairs(env, lfsck, child2,
1409                                                     name, pfid,
1410                                                     LVLT_BY_NAMEENTRY);
1411         } else {
1412                 if (lfsck->li_lpf_obj == NULL) {
1413                         lu_object_get(&child2->do_lu);
1414                         lfsck->li_lpf_obj = child2;
1415                 }
1416
1417                 cname = lfsck_name_get_const(env, name, strlen(name));
1418                 rc = lfsck_verify_linkea(env, dev, child2, cname, &LU_LPF_FID);
1419         }
1420
1421         GOTO(put, rc);
1422
1423 check_child1:
1424         if (child1 != NULL)
1425                 rc = lfsck_verify_lpf_pairs(env, lfsck, child1, name,
1426                                             pfid, LVLT_BY_BOOKMARK);
1427
1428         GOTO(put, rc);
1429
1430 put:
1431         if (lfsck->li_lpf_obj != NULL) {
1432                 if (unlikely(!dt_try_as_dir(env, lfsck->li_lpf_obj))) {
1433                         lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1434                         lfsck->li_lpf_obj = NULL;
1435                         rc = -ENOTDIR;
1436                 }
1437         } else if (rc == 0) {
1438                 rc = lfsck_create_lpf(env, lfsck);
1439         }
1440
1441         if (child2 != NULL && !IS_ERR(child2))
1442                 lu_object_put(env, &child2->do_lu);
1443         if (child1 != NULL && !IS_ERR(child1))
1444                 lu_object_put(env, &child1->do_lu);
1445
1446         return rc;
1447 }
1448
1449 static int lfsck_fid_init(struct lfsck_instance *lfsck)
1450 {
1451         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1452         struct seq_server_site  *ss;
1453         char                    *prefix;
1454         int                      rc     = 0;
1455         ENTRY;
1456
1457         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
1458         if (unlikely(ss == NULL))
1459                 RETURN(-ENXIO);
1460
1461         OBD_ALLOC_PTR(lfsck->li_seq);
1462         if (lfsck->li_seq == NULL)
1463                 RETURN(-ENOMEM);
1464
1465         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
1466         if (prefix == NULL)
1467                 GOTO(out, rc = -ENOMEM);
1468
1469         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
1470         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
1471                              ss->ss_server_seq);
1472         OBD_FREE(prefix, MAX_OBD_NAME + 7);
1473         if (rc != 0)
1474                 GOTO(out, rc);
1475
1476         if (fid_is_sane(&bk->lb_last_fid))
1477                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
1478
1479         RETURN(0);
1480
1481 out:
1482         OBD_FREE_PTR(lfsck->li_seq);
1483         lfsck->li_seq = NULL;
1484
1485         return rc;
1486 }
1487
1488 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
1489 {
1490         if (lfsck->li_seq != NULL) {
1491                 seq_client_fini(lfsck->li_seq);
1492                 OBD_FREE_PTR(lfsck->li_seq);
1493                 lfsck->li_seq = NULL;
1494         }
1495 }
1496
1497 void lfsck_instance_cleanup(const struct lu_env *env,
1498                             struct lfsck_instance *lfsck)
1499 {
1500         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1501         struct lfsck_component  *com;
1502         struct lfsck_component  *next;
1503         struct lfsck_lmv_unit   *llu;
1504         struct lfsck_lmv_unit   *llu_next;
1505         struct lfsck_lmv        *llmv;
1506         ENTRY;
1507
1508         LASSERT(list_empty(&lfsck->li_link));
1509         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
1510
1511         if (lfsck->li_obj_oit != NULL) {
1512                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
1513                 lfsck->li_obj_oit = NULL;
1514         }
1515
1516         LASSERT(lfsck->li_obj_dir == NULL);
1517         LASSERT(lfsck->li_lmv == NULL);
1518
1519         list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
1520                 llmv = &llu->llu_lmv;
1521
1522                 LASSERTF(atomic_read(&llmv->ll_ref) == 1,
1523                          "still in using: %u\n",
1524                          atomic_read(&llmv->ll_ref));
1525
1526                 lfsck_lmv_put(env, llmv);
1527         }
1528
1529         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
1530                 lfsck_component_cleanup(env, com);
1531         }
1532
1533         LASSERT(list_empty(&lfsck->li_list_dir));
1534
1535         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
1536                                  lc_link) {
1537                 lfsck_component_cleanup(env, com);
1538         }
1539
1540         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
1541                 lfsck_component_cleanup(env, com);
1542         }
1543
1544         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
1545         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
1546
1547         if (lfsck->li_lfsck_dir != NULL) {
1548                 lu_object_put_nocache(env, &lfsck->li_lfsck_dir->do_lu);
1549                 lfsck->li_lfsck_dir = NULL;
1550         }
1551
1552         if (lfsck->li_bookmark_obj != NULL) {
1553                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
1554                 lfsck->li_bookmark_obj = NULL;
1555         }
1556
1557         if (lfsck->li_lpf_obj != NULL) {
1558                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
1559                 lfsck->li_lpf_obj = NULL;
1560         }
1561
1562         if (lfsck->li_lpf_root_obj != NULL) {
1563                 lu_object_put(env, &lfsck->li_lpf_root_obj->do_lu);
1564                 lfsck->li_lpf_root_obj = NULL;
1565         }
1566
1567         if (lfsck->li_los != NULL) {
1568                 local_oid_storage_fini(env, lfsck->li_los);
1569                 lfsck->li_los = NULL;
1570         }
1571
1572         lfsck_fid_fini(lfsck);
1573
1574         OBD_FREE_PTR(lfsck);
1575 }
1576
1577 static inline struct lfsck_instance *
1578 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
1579 {
1580         struct lfsck_instance *lfsck;
1581
1582         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
1583                 if (lfsck->li_bottom == key) {
1584                         if (ref)
1585                                 lfsck_instance_get(lfsck);
1586                         if (unlink)
1587                                 list_del_init(&lfsck->li_link);
1588
1589                         return lfsck;
1590                 }
1591         }
1592
1593         return NULL;
1594 }
1595
1596 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
1597                                            bool unlink)
1598 {
1599         struct lfsck_instance *lfsck;
1600
1601         spin_lock(&lfsck_instance_lock);
1602         lfsck = __lfsck_instance_find(key, ref, unlink);
1603         spin_unlock(&lfsck_instance_lock);
1604
1605         return lfsck;
1606 }
1607
1608 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
1609 {
1610         struct lfsck_instance *tmp;
1611
1612         spin_lock(&lfsck_instance_lock);
1613         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
1614                 if (lfsck->li_bottom == tmp->li_bottom) {
1615                         spin_unlock(&lfsck_instance_lock);
1616                         return -EEXIST;
1617                 }
1618         }
1619
1620         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
1621         spin_unlock(&lfsck_instance_lock);
1622         return 0;
1623 }
1624
1625 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
1626                     const char *prefix)
1627 {
1628         int flag;
1629         int i;
1630         bool newline = (bits != 0 ? false : true);
1631
1632         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
1633
1634         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
1635                 if (flag & bits) {
1636                         bits &= ~flag;
1637                         if (names[i] != NULL) {
1638                                 if (bits == 0)
1639                                         newline = true;
1640
1641                                 seq_printf(m, "%s%c", names[i],
1642                                            newline ? '\n' : ',');
1643                         }
1644                 }
1645         }
1646
1647         if (!newline)
1648                 seq_printf(m, "\n");
1649         return 0;
1650 }
1651
1652 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
1653 {
1654         if (time != 0)
1655                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
1656                           cfs_time_current_sec() - time);
1657         else
1658                 seq_printf(m, "%s: N/A\n", prefix);
1659         return 0;
1660 }
1661
1662 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
1663                    const char *prefix)
1664 {
1665         if (fid_is_zero(&pos->lp_dir_parent)) {
1666                 if (pos->lp_oit_cookie == 0)
1667                         seq_printf(m, "%s: N/A, N/A, N/A\n",
1668                                    prefix);
1669                 else
1670                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
1671                                    prefix, pos->lp_oit_cookie);
1672         } else {
1673                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
1674                            prefix, pos->lp_oit_cookie,
1675                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
1676         }
1677         return 0;
1678 }
1679
1680 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1681                     struct lfsck_position *pos, bool init)
1682 {
1683         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1684
1685         if (unlikely(lfsck->li_di_oit == NULL)) {
1686                 memset(pos, 0, sizeof(*pos));
1687                 return;
1688         }
1689
1690         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1691         if (!lfsck->li_current_oit_processed && !init)
1692                 pos->lp_oit_cookie--;
1693
1694         LASSERT(pos->lp_oit_cookie > 0);
1695
1696         if (lfsck->li_di_dir != NULL) {
1697                 struct dt_object *dto = lfsck->li_obj_dir;
1698
1699                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1700                                                         lfsck->li_di_dir);
1701
1702                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1703                         fid_zero(&pos->lp_dir_parent);
1704                         pos->lp_dir_cookie = 0;
1705                 } else {
1706                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1707                 }
1708         } else {
1709                 fid_zero(&pos->lp_dir_parent);
1710                 pos->lp_dir_cookie = 0;
1711         }
1712 }
1713
1714 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1715 {
1716         bool dirty = false;
1717
1718         if (limit != LFSCK_SPEED_NO_LIMIT) {
1719                 if (limit > msecs_to_jiffies(MSEC_PER_SEC)) {
1720                         lfsck->li_sleep_rate = limit /
1721                                                msecs_to_jiffies(MSEC_PER_SEC);
1722                         lfsck->li_sleep_jif = 1;
1723                 } else {
1724                         lfsck->li_sleep_rate = 1;
1725                         lfsck->li_sleep_jif = msecs_to_jiffies(MSEC_PER_SEC) /
1726                                               limit;
1727                 }
1728         } else {
1729                 lfsck->li_sleep_jif = 0;
1730                 lfsck->li_sleep_rate = 0;
1731         }
1732
1733         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1734                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1735                 dirty = true;
1736         }
1737
1738         return dirty;
1739 }
1740
1741 void lfsck_control_speed(struct lfsck_instance *lfsck)
1742 {
1743         struct ptlrpc_thread *thread = &lfsck->li_thread;
1744         struct l_wait_info    lwi;
1745
1746         if (lfsck->li_sleep_jif > 0 &&
1747             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1748                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1749                                        LWI_ON_SIGNAL_NOOP, NULL);
1750
1751                 l_wait_event(thread->t_ctl_waitq,
1752                              !thread_is_running(thread),
1753                              &lwi);
1754                 lfsck->li_new_scanned = 0;
1755         }
1756 }
1757
1758 void lfsck_control_speed_by_self(struct lfsck_component *com)
1759 {
1760         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1761         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1762         struct l_wait_info       lwi;
1763
1764         if (lfsck->li_sleep_jif > 0 &&
1765             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1766                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1767                                        LWI_ON_SIGNAL_NOOP, NULL);
1768
1769                 l_wait_event(thread->t_ctl_waitq,
1770                              !thread_is_running(thread),
1771                              &lwi);
1772                 com->lc_new_scanned = 0;
1773         }
1774 }
1775
1776 static struct lfsck_thread_args *
1777 lfsck_thread_args_init(struct lfsck_instance *lfsck,
1778                        struct lfsck_component *com,
1779                        struct lfsck_start_param *lsp)
1780 {
1781         struct lfsck_thread_args *lta;
1782         int                       rc;
1783
1784         OBD_ALLOC_PTR(lta);
1785         if (lta == NULL)
1786                 return ERR_PTR(-ENOMEM);
1787
1788         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1789         if (rc != 0) {
1790                 OBD_FREE_PTR(lta);
1791                 return ERR_PTR(rc);
1792         }
1793
1794         lta->lta_lfsck = lfsck_instance_get(lfsck);
1795         if (com != NULL)
1796                 lta->lta_com = lfsck_component_get(com);
1797
1798         lta->lta_lsp = lsp;
1799
1800         return lta;
1801 }
1802
1803 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1804 {
1805         if (lta->lta_com != NULL)
1806                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1807         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1808         lu_env_fini(&lta->lta_env);
1809         OBD_FREE_PTR(lta);
1810 }
1811
1812 struct lfsck_assistant_data *
1813 lfsck_assistant_data_init(struct lfsck_assistant_operations *lao,
1814                           const char *name)
1815 {
1816         struct lfsck_assistant_data *lad;
1817
1818         OBD_ALLOC_PTR(lad);
1819         if (lad != NULL) {
1820                 lad->lad_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
1821                 if (lad->lad_bitmap == NULL) {
1822                         OBD_FREE_PTR(lad);
1823                         return NULL;
1824                 }
1825
1826                 INIT_LIST_HEAD(&lad->lad_req_list);
1827                 spin_lock_init(&lad->lad_lock);
1828                 INIT_LIST_HEAD(&lad->lad_ost_list);
1829                 INIT_LIST_HEAD(&lad->lad_ost_phase1_list);
1830                 INIT_LIST_HEAD(&lad->lad_ost_phase2_list);
1831                 INIT_LIST_HEAD(&lad->lad_mdt_list);
1832                 INIT_LIST_HEAD(&lad->lad_mdt_phase1_list);
1833                 INIT_LIST_HEAD(&lad->lad_mdt_phase2_list);
1834                 init_waitqueue_head(&lad->lad_thread.t_ctl_waitq);
1835                 lad->lad_ops = lao;
1836                 lad->lad_name = name;
1837         }
1838
1839         return lad;
1840 }
1841
1842 /**
1843  * Generic LFSCK asynchronous communication interpretor function.
1844  * The LFSCK RPC reply for both the event notification and status
1845  * querying will be handled here.
1846  *
1847  * \param[in] env       pointer to the thread context
1848  * \param[in] req       pointer to the LFSCK request
1849  * \param[in] args      pointer to the lfsck_async_interpret_args
1850  * \param[in] rc        the result for handling the LFSCK request
1851  *
1852  * \retval              0 for success
1853  * \retval              negative error number on failure
1854  */
1855 int lfsck_async_interpret_common(const struct lu_env *env,
1856                                  struct ptlrpc_request *req,
1857                                  void *args, int rc)
1858 {
1859         struct lfsck_async_interpret_args *laia = args;
1860         struct lfsck_component            *com  = laia->laia_com;
1861         struct lfsck_assistant_data       *lad  = com->lc_data;
1862         struct lfsck_tgt_descs            *ltds = laia->laia_ltds;
1863         struct lfsck_tgt_desc             *ltd  = laia->laia_ltd;
1864         struct lfsck_request              *lr   = laia->laia_lr;
1865
1866         LASSERT(com->lc_lfsck->li_master);
1867
1868         switch (lr->lr_event) {
1869         case LE_START:
1870                 if (rc != 0) {
1871                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s "
1872                                "start: rc = %d\n",
1873                                lfsck_lfsck2name(com->lc_lfsck),
1874                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1875                                ltd->ltd_index, lad->lad_name, rc);
1876
1877                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1878                                 struct lfsck_layout *lo = com->lc_file_ram;
1879
1880                                 if (lr->lr_flags & LEF_TO_OST)
1881                                         lfsck_lad_set_bitmap(env, com,
1882                                                              ltd->ltd_index);
1883                                 else
1884                                         lo->ll_flags |= LF_INCOMPLETE;
1885                         } else {
1886                                 struct lfsck_namespace *ns = com->lc_file_ram;
1887
1888                                 /* If some MDT does not join the namespace
1889                                  * LFSCK, then we cannot know whether there
1890                                  * is some name entry on such MDT that with
1891                                  * the referenced MDT-object on this MDT or
1892                                  * not. So the namespace LFSCK on this MDT
1893                                  * cannot handle orphan MDT-objects properly.
1894                                  * So we mark the LFSCK as LF_INCOMPLETE and
1895                                  * skip orphan MDT-objects handling. */
1896                                 ns->ln_flags |= LF_INCOMPLETE;
1897                         }
1898                         break;
1899                 }
1900
1901                 spin_lock(&ltds->ltd_lock);
1902                 if (ltd->ltd_dead) {
1903                         spin_unlock(&ltds->ltd_lock);
1904                         break;
1905                 }
1906
1907                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1908                         struct list_head *list;
1909                         struct list_head *phase_list;
1910
1911                         if (ltd->ltd_layout_done) {
1912                                 spin_unlock(&ltds->ltd_lock);
1913                                 break;
1914                         }
1915
1916                         if (lr->lr_flags & LEF_TO_OST) {
1917                                 list = &lad->lad_ost_list;
1918                                 phase_list = &lad->lad_ost_phase1_list;
1919                         } else {
1920                                 list = &lad->lad_mdt_list;
1921                                 phase_list = &lad->lad_mdt_phase1_list;
1922                         }
1923
1924                         if (list_empty(&ltd->ltd_layout_list))
1925                                 list_add_tail(&ltd->ltd_layout_list, list);
1926                         if (list_empty(&ltd->ltd_layout_phase_list))
1927                                 list_add_tail(&ltd->ltd_layout_phase_list,
1928                                               phase_list);
1929                 } else {
1930                         if (ltd->ltd_namespace_done) {
1931                                 spin_unlock(&ltds->ltd_lock);
1932                                 break;
1933                         }
1934
1935                         if (list_empty(&ltd->ltd_namespace_list))
1936                                 list_add_tail(&ltd->ltd_namespace_list,
1937                                               &lad->lad_mdt_list);
1938                         if (list_empty(&ltd->ltd_namespace_phase_list))
1939                                 list_add_tail(&ltd->ltd_namespace_phase_list,
1940                                               &lad->lad_mdt_phase1_list);
1941                 }
1942                 spin_unlock(&ltds->ltd_lock);
1943                 break;
1944         case LE_STOP:
1945         case LE_PHASE1_DONE:
1946         case LE_PHASE2_DONE:
1947         case LE_PEER_EXIT:
1948                 if (rc != 0 && rc != -EALREADY)
1949                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for %s: "
1950                               "event = %d, rc = %d\n",
1951                               lfsck_lfsck2name(com->lc_lfsck),
1952                               (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
1953                               ltd->ltd_index, lad->lad_name, lr->lr_event, rc);
1954                 break;
1955         case LE_QUERY: {
1956                 struct lfsck_reply *reply;
1957                 struct list_head *list;
1958                 struct list_head *phase_list;
1959
1960                 if (com->lc_type == LFSCK_TYPE_LAYOUT) {
1961                         list = &ltd->ltd_layout_list;
1962                         phase_list = &ltd->ltd_layout_phase_list;
1963                 } else {
1964                         list = &ltd->ltd_namespace_list;
1965                         phase_list = &ltd->ltd_namespace_phase_list;
1966                 }
1967
1968                 if (rc != 0) {
1969                         spin_lock(&ltds->ltd_lock);
1970                         list_del_init(phase_list);
1971                         list_del_init(list);
1972                         spin_unlock(&ltds->ltd_lock);
1973                         break;
1974                 }
1975
1976                 reply = req_capsule_server_get(&req->rq_pill,
1977                                                &RMF_LFSCK_REPLY);
1978                 if (reply == NULL) {
1979                         rc = -EPROTO;
1980                         CDEBUG(D_LFSCK, "%s: invalid query reply for %s: "
1981                                "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck),
1982                                lad->lad_name, rc);
1983                         spin_lock(&ltds->ltd_lock);
1984                         list_del_init(phase_list);
1985                         list_del_init(list);
1986                         spin_unlock(&ltds->ltd_lock);
1987                         break;
1988                 }
1989
1990                 switch (reply->lr_status) {
1991                 case LS_SCANNING_PHASE1:
1992                         break;
1993                 case LS_SCANNING_PHASE2:
1994                         spin_lock(&ltds->ltd_lock);
1995                         list_del_init(phase_list);
1996                         if (ltd->ltd_dead) {
1997                                 spin_unlock(&ltds->ltd_lock);
1998                                 break;
1999                         }
2000
2001                         if (com->lc_type == LFSCK_TYPE_LAYOUT) {
2002                                 if (ltd->ltd_layout_done) {
2003                                         spin_unlock(&ltds->ltd_lock);
2004                                         break;
2005                                 }
2006
2007                                 if (lr->lr_flags & LEF_TO_OST)
2008                                         list_add_tail(phase_list,
2009                                                 &lad->lad_ost_phase2_list);
2010                                 else
2011                                         list_add_tail(phase_list,
2012                                                 &lad->lad_mdt_phase2_list);
2013                         } else {
2014                                 if (ltd->ltd_namespace_done) {
2015                                         spin_unlock(&ltds->ltd_lock);
2016                                         break;
2017                                 }
2018
2019                                 list_add_tail(phase_list,
2020                                               &lad->lad_mdt_phase2_list);
2021                         }
2022                         spin_unlock(&ltds->ltd_lock);
2023                         break;
2024                 default:
2025                         spin_lock(&ltds->ltd_lock);
2026                         list_del_init(phase_list);
2027                         list_del_init(list);
2028                         spin_unlock(&ltds->ltd_lock);
2029                         break;
2030                 }
2031                 break;
2032         }
2033         default:
2034                 CDEBUG(D_LFSCK, "%s: unexpected event: rc = %d\n",
2035                        lfsck_lfsck2name(com->lc_lfsck), lr->lr_event);
2036                 break;
2037         }
2038
2039         if (!laia->laia_shared) {
2040                 lfsck_tgt_put(ltd);
2041                 lfsck_component_put(env, com);
2042         }
2043
2044         return 0;
2045 }
2046
2047 static void lfsck_interpret(const struct lu_env *env,
2048                             struct lfsck_instance *lfsck,
2049                             struct ptlrpc_request *req, void *args, int result)
2050 {
2051         struct lfsck_async_interpret_args *laia = args;
2052         struct lfsck_component            *com;
2053
2054         LASSERT(laia->laia_com == NULL);
2055         LASSERT(laia->laia_shared);
2056
2057         spin_lock(&lfsck->li_lock);
2058         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2059                 laia->laia_com = com;
2060                 lfsck_async_interpret_common(env, req, laia, result);
2061         }
2062
2063         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
2064                 laia->laia_com = com;
2065                 lfsck_async_interpret_common(env, req, laia, result);
2066         }
2067         spin_unlock(&lfsck->li_lock);
2068 }
2069
2070 static int lfsck_stop_notify(const struct lu_env *env,
2071                              struct lfsck_instance *lfsck,
2072                              struct lfsck_tgt_descs *ltds,
2073                              struct lfsck_tgt_desc *ltd, __u16 type)
2074 {
2075         struct lfsck_component *com;
2076         int                     rc = 0;
2077         ENTRY;
2078
2079         LASSERT(lfsck->li_master);
2080
2081         spin_lock(&lfsck->li_lock);
2082         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
2083         if (com == NULL)
2084                 com = __lfsck_component_find(lfsck, type,
2085                                              &lfsck->li_list_double_scan);
2086         if (com != NULL)
2087                 lfsck_component_get(com);
2088         spin_unlock(&lfsck->li_lock);
2089
2090         if (com != NULL) {
2091                 struct lfsck_thread_info          *info  = lfsck_env_info(env);
2092                 struct lfsck_async_interpret_args *laia  = &info->lti_laia;
2093                 struct lfsck_request              *lr    = &info->lti_lr;
2094                 struct lfsck_assistant_data       *lad   = com->lc_data;
2095                 struct list_head                  *list;
2096                 struct list_head                  *phase_list;
2097                 struct ptlrpc_request_set         *set;
2098
2099                 set = ptlrpc_prep_set();
2100                 if (set == NULL) {
2101                         lfsck_component_put(env, com);
2102
2103                         RETURN(-ENOMEM);
2104                 }
2105
2106                 if (type == LFSCK_TYPE_LAYOUT) {
2107                         list = &ltd->ltd_layout_list;
2108                         phase_list = &ltd->ltd_layout_phase_list;
2109                 } else {
2110                         list = &ltd->ltd_namespace_list;
2111                         phase_list = &ltd->ltd_namespace_phase_list;
2112                 }
2113
2114                 spin_lock(&ltds->ltd_lock);
2115                 if (list_empty(list)) {
2116                         LASSERT(list_empty(phase_list));
2117                         spin_unlock(&ltds->ltd_lock);
2118                         ptlrpc_set_destroy(set);
2119
2120                         RETURN(0);
2121                 }
2122
2123                 list_del_init(phase_list);
2124                 list_del_init(list);
2125                 spin_unlock(&ltds->ltd_lock);
2126
2127                 memset(lr, 0, sizeof(*lr));
2128                 lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2129                 lr->lr_event = LE_PEER_EXIT;
2130                 lr->lr_active = type;
2131                 lr->lr_status = LS_CO_PAUSED;
2132                 if (ltds == &lfsck->li_ost_descs)
2133                         lr->lr_flags = LEF_TO_OST;
2134
2135                 laia->laia_com = com;
2136                 laia->laia_ltds = ltds;
2137                 atomic_inc(&ltd->ltd_ref);
2138                 laia->laia_ltd = ltd;
2139                 laia->laia_lr = lr;
2140                 laia->laia_shared = 0;
2141
2142                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2143                                          lfsck_async_interpret_common,
2144                                          laia, LFSCK_NOTIFY);
2145                 if (rc != 0) {
2146                         CDEBUG(D_LFSCK, "%s: fail to notify %s %x for "
2147                                "co-stop for %s: rc = %d\n",
2148                                lfsck_lfsck2name(lfsck),
2149                                (lr->lr_flags & LEF_TO_OST) ? "OST" : "MDT",
2150                                ltd->ltd_index, lad->lad_name, rc);
2151                         lfsck_tgt_put(ltd);
2152                 } else {
2153                         rc = ptlrpc_set_wait(set);
2154                 }
2155
2156                 ptlrpc_set_destroy(set);
2157                 lfsck_component_put(env, com);
2158         }
2159
2160         RETURN(rc);
2161 }
2162
2163 static int lfsck_async_interpret(const struct lu_env *env,
2164                                  struct ptlrpc_request *req,
2165                                  void *args, int rc)
2166 {
2167         struct lfsck_async_interpret_args *laia = args;
2168         struct lfsck_instance             *lfsck;
2169
2170         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
2171                               li_mdt_descs);
2172         lfsck_interpret(env, lfsck, req, laia, rc);
2173         lfsck_tgt_put(laia->laia_ltd);
2174         if (rc != 0 && laia->laia_result != -EALREADY)
2175                 laia->laia_result = rc;
2176
2177         return 0;
2178 }
2179
2180 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
2181                         struct lfsck_request *lr,
2182                         struct ptlrpc_request_set *set,
2183                         ptlrpc_interpterer_t interpreter,
2184                         void *args, int request)
2185 {
2186         struct lfsck_async_interpret_args *laia;
2187         struct ptlrpc_request             *req;
2188         struct lfsck_request              *tmp;
2189         struct req_format                 *format;
2190         int                                rc;
2191
2192         switch (request) {
2193         case LFSCK_NOTIFY:
2194                 format = &RQF_LFSCK_NOTIFY;
2195                 break;
2196         case LFSCK_QUERY:
2197                 format = &RQF_LFSCK_QUERY;
2198                 break;
2199         default:
2200                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
2201                        exp->exp_obd->obd_name, request, -EINVAL);
2202                 return -EINVAL;
2203         }
2204
2205         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
2206         if (req == NULL)
2207                 return -ENOMEM;
2208
2209         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
2210         if (rc != 0) {
2211                 ptlrpc_request_free(req);
2212
2213                 return rc;
2214         }
2215
2216         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
2217         *tmp = *lr;
2218         ptlrpc_request_set_replen(req);
2219
2220         laia = ptlrpc_req_async_args(req);
2221         *laia = *(struct lfsck_async_interpret_args *)args;
2222         if (laia->laia_com != NULL)
2223                 lfsck_component_get(laia->laia_com);
2224         req->rq_interpret_reply = interpreter;
2225         ptlrpc_set_add_req(set, req);
2226
2227         return 0;
2228 }
2229
2230 int lfsck_start_assistant(const struct lu_env *env, struct lfsck_component *com,
2231                           struct lfsck_start_param *lsp)
2232 {
2233         struct lfsck_instance           *lfsck   = com->lc_lfsck;
2234         struct lfsck_assistant_data     *lad     = com->lc_data;
2235         struct ptlrpc_thread            *mthread = &lfsck->li_thread;
2236         struct ptlrpc_thread            *athread = &lad->lad_thread;
2237         struct lfsck_thread_args        *lta;
2238         struct task_struct              *task;
2239         int                              rc;
2240         ENTRY;
2241
2242         lad->lad_assistant_status = 0;
2243         lad->lad_post_result = 0;
2244         lad->lad_to_post = 0;
2245         lad->lad_to_double_scan = 0;
2246         lad->lad_in_double_scan = 0;
2247         lad->lad_exit = 0;
2248         thread_set_flags(athread, 0);
2249
2250         lta = lfsck_thread_args_init(lfsck, com, lsp);
2251         if (IS_ERR(lta))
2252                 RETURN(PTR_ERR(lta));
2253
2254         task = kthread_run(lfsck_assistant_engine, lta, lad->lad_name);
2255         if (IS_ERR(task)) {
2256                 rc = PTR_ERR(task);
2257                 CERROR("%s: cannot start LFSCK assistant thread for %s: "
2258                        "rc = %d\n", lfsck_lfsck2name(lfsck), lad->lad_name, rc);
2259                 lfsck_thread_args_fini(lta);
2260         } else {
2261                 struct l_wait_info lwi = { 0 };
2262
2263                 l_wait_event(mthread->t_ctl_waitq,
2264                              thread_is_running(athread) ||
2265                              thread_is_stopped(athread),
2266                              &lwi);
2267                 if (unlikely(!thread_is_running(athread)))
2268                         rc = lad->lad_assistant_status;
2269                 else
2270                         rc = 0;
2271         }
2272
2273         RETURN(rc);
2274 }
2275
2276 int lfsck_checkpoint_generic(const struct lu_env *env,
2277                              struct lfsck_component *com)
2278 {
2279         struct lfsck_assistant_data     *lad     = com->lc_data;
2280         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2281         struct ptlrpc_thread            *athread = &lad->lad_thread;
2282         struct l_wait_info               lwi     = { 0 };
2283
2284         if (com->lc_new_checked == 0)
2285                 return LFSCK_CHECKPOINT_SKIP;
2286
2287         l_wait_event(mthread->t_ctl_waitq,
2288                      list_empty(&lad->lad_req_list) ||
2289                      !thread_is_running(mthread) ||
2290                      thread_is_stopped(athread),
2291                      &lwi);
2292
2293         if (!thread_is_running(mthread) || thread_is_stopped(athread))
2294                 return LFSCK_CHECKPOINT_SKIP;
2295
2296         return 0;
2297 }
2298
2299 void lfsck_post_generic(const struct lu_env *env,
2300                         struct lfsck_component *com, int *result)
2301 {
2302         struct lfsck_assistant_data     *lad     = com->lc_data;
2303         struct ptlrpc_thread            *athread = &lad->lad_thread;
2304         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2305         struct l_wait_info               lwi     = { 0 };
2306
2307         lad->lad_post_result = *result;
2308         if (*result <= 0)
2309                 lad->lad_exit = 1;
2310         lad->lad_to_post = 1;
2311
2312         wake_up_all(&athread->t_ctl_waitq);
2313         l_wait_event(mthread->t_ctl_waitq,
2314                      (*result > 0 && list_empty(&lad->lad_req_list)) ||
2315                      thread_is_stopped(athread),
2316                      &lwi);
2317
2318         if (lad->lad_assistant_status < 0)
2319                 *result = lad->lad_assistant_status;
2320 }
2321
2322 int lfsck_double_scan_generic(const struct lu_env *env,
2323                               struct lfsck_component *com, int status)
2324 {
2325         struct lfsck_assistant_data     *lad     = com->lc_data;
2326         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2327         struct ptlrpc_thread            *athread = &lad->lad_thread;
2328         struct l_wait_info               lwi     = { 0 };
2329
2330         if (status != LS_SCANNING_PHASE2)
2331                 lad->lad_exit = 1;
2332         else
2333                 lad->lad_to_double_scan = 1;
2334
2335         wake_up_all(&athread->t_ctl_waitq);
2336         l_wait_event(mthread->t_ctl_waitq,
2337                      lad->lad_in_double_scan ||
2338                      thread_is_stopped(athread),
2339                      &lwi);
2340
2341         if (lad->lad_assistant_status < 0)
2342                 return lad->lad_assistant_status;
2343
2344         return 0;
2345 }
2346
2347 void lfsck_quit_generic(const struct lu_env *env,
2348                         struct lfsck_component *com)
2349 {
2350         struct lfsck_assistant_data     *lad     = com->lc_data;
2351         struct ptlrpc_thread            *mthread = &com->lc_lfsck->li_thread;
2352         struct ptlrpc_thread            *athread = &lad->lad_thread;
2353         struct l_wait_info               lwi     = { 0 };
2354
2355         lad->lad_exit = 1;
2356         wake_up_all(&athread->t_ctl_waitq);
2357         l_wait_event(mthread->t_ctl_waitq,
2358                      thread_is_init(athread) ||
2359                      thread_is_stopped(athread),
2360                      &lwi);
2361 }
2362
2363 /* external interfaces */
2364
2365 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
2366 {
2367         struct lu_env           env;
2368         struct lfsck_instance  *lfsck;
2369         int                     rc;
2370         ENTRY;
2371
2372         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2373         if (rc != 0)
2374                 RETURN(rc);
2375
2376         lfsck = lfsck_instance_find(key, true, false);
2377         if (likely(lfsck != NULL)) {
2378                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
2379                 lfsck_instance_put(&env, lfsck);
2380         } else {
2381                 rc = -ENXIO;
2382         }
2383
2384         lu_env_fini(&env);
2385
2386         RETURN(rc);
2387 }
2388 EXPORT_SYMBOL(lfsck_get_speed);
2389
2390 int lfsck_set_speed(struct dt_device *key, int val)
2391 {
2392         struct lu_env           env;
2393         struct lfsck_instance  *lfsck;
2394         int                     rc;
2395         ENTRY;
2396
2397         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2398         if (rc != 0)
2399                 RETURN(rc);
2400
2401         lfsck = lfsck_instance_find(key, true, false);
2402         if (likely(lfsck != NULL)) {
2403                 mutex_lock(&lfsck->li_mutex);
2404                 if (__lfsck_set_speed(lfsck, val))
2405                         rc = lfsck_bookmark_store(&env, lfsck);
2406                 mutex_unlock(&lfsck->li_mutex);
2407                 lfsck_instance_put(&env, lfsck);
2408         } else {
2409                 rc = -ENXIO;
2410         }
2411
2412         lu_env_fini(&env);
2413
2414         RETURN(rc);
2415 }
2416 EXPORT_SYMBOL(lfsck_set_speed);
2417
2418 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
2419 {
2420         struct lu_env           env;
2421         struct lfsck_instance  *lfsck;
2422         int                     rc;
2423         ENTRY;
2424
2425         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2426         if (rc != 0)
2427                 RETURN(rc);
2428
2429         lfsck = lfsck_instance_find(key, true, false);
2430         if (likely(lfsck != NULL)) {
2431                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
2432                 lfsck_instance_put(&env, lfsck);
2433         } else {
2434                 rc = -ENXIO;
2435         }
2436
2437         lu_env_fini(&env);
2438
2439         RETURN(rc);
2440 }
2441 EXPORT_SYMBOL(lfsck_get_windows);
2442
2443 int lfsck_set_windows(struct dt_device *key, int val)
2444 {
2445         struct lu_env           env;
2446         struct lfsck_instance  *lfsck;
2447         int                     rc;
2448         ENTRY;
2449
2450         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2451         if (rc != 0)
2452                 RETURN(rc);
2453
2454         lfsck = lfsck_instance_find(key, true, false);
2455         if (likely(lfsck != NULL)) {
2456                 if (val > LFSCK_ASYNC_WIN_MAX) {
2457                         CWARN("%s: Too large async window size, which "
2458                               "may cause memory issues. The valid range "
2459                               "is [0 - %u]. If you do not want to restrict "
2460                               "the window size for async requests pipeline, "
2461                               "just set it as 0.\n",
2462                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
2463                         rc = -EINVAL;
2464                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
2465                         mutex_lock(&lfsck->li_mutex);
2466                         lfsck->li_bookmark_ram.lb_async_windows = val;
2467                         rc = lfsck_bookmark_store(&env, lfsck);
2468                         mutex_unlock(&lfsck->li_mutex);
2469                 }
2470                 lfsck_instance_put(&env, lfsck);
2471         } else {
2472                 rc = -ENXIO;
2473         }
2474
2475         lu_env_fini(&env);
2476
2477         RETURN(rc);
2478 }
2479 EXPORT_SYMBOL(lfsck_set_windows);
2480
2481 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
2482 {
2483         struct lu_env           env;
2484         struct lfsck_instance  *lfsck;
2485         struct lfsck_component *com;
2486         int                     rc;
2487         ENTRY;
2488
2489         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
2490         if (rc != 0)
2491                 RETURN(rc);
2492
2493         lfsck = lfsck_instance_find(key, true, false);
2494         if (likely(lfsck != NULL)) {
2495                 com = lfsck_component_find(lfsck, type);
2496                 if (likely(com != NULL)) {
2497                         rc = com->lc_ops->lfsck_dump(&env, com, m);
2498                         lfsck_component_put(&env, com);
2499                 } else {
2500                         rc = -ENOTSUPP;
2501                 }
2502
2503                 lfsck_instance_put(&env, lfsck);
2504         } else {
2505                 rc = -ENXIO;
2506         }
2507
2508         lu_env_fini(&env);
2509
2510         RETURN(rc);
2511 }
2512 EXPORT_SYMBOL(lfsck_dump);
2513
2514 static int lfsck_stop_all(const struct lu_env *env,
2515                           struct lfsck_instance *lfsck,
2516                           struct lfsck_stop *stop)
2517 {
2518         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2519         struct lfsck_request              *lr     = &info->lti_lr;
2520         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2521         struct ptlrpc_request_set         *set;
2522         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2523         struct lfsck_tgt_desc             *ltd;
2524         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2525         __u32                              idx;
2526         int                                rc     = 0;
2527         int                                rc1    = 0;
2528         ENTRY;
2529
2530         LASSERT(stop->ls_flags & LPF_BROADCAST);
2531
2532         set = ptlrpc_prep_set();
2533         if (unlikely(set == NULL))
2534                 RETURN(-ENOMEM);
2535
2536         memset(lr, 0, sizeof(*lr));
2537         lr->lr_event = LE_STOP;
2538         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2539         lr->lr_status = stop->ls_status;
2540         lr->lr_version = bk->lb_version;
2541         lr->lr_active = LFSCK_TYPES_ALL;
2542         lr->lr_param = stop->ls_flags;
2543
2544         laia->laia_com = NULL;
2545         laia->laia_ltds = ltds;
2546         laia->laia_lr = lr;
2547         laia->laia_result = 0;
2548         laia->laia_shared = 1;
2549
2550         down_read(&ltds->ltd_rw_sem);
2551         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2552                 ltd = lfsck_tgt_get(ltds, idx);
2553                 LASSERT(ltd != NULL);
2554
2555                 laia->laia_ltd = ltd;
2556                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2557                                          lfsck_async_interpret, laia,
2558                                          LFSCK_NOTIFY);
2559                 if (rc != 0) {
2560                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2561                         lfsck_tgt_put(ltd);
2562                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
2563                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
2564                         rc1 = rc;
2565                 }
2566         }
2567         up_read(&ltds->ltd_rw_sem);
2568
2569         rc = ptlrpc_set_wait(set);
2570         ptlrpc_set_destroy(set);
2571
2572         if (rc == 0)
2573                 rc = laia->laia_result;
2574
2575         if (rc == -EALREADY)
2576                 rc = 0;
2577
2578         if (rc != 0)
2579                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
2580                        lfsck_lfsck2name(lfsck), rc);
2581
2582         RETURN(rc != 0 ? rc : rc1);
2583 }
2584
2585 static int lfsck_start_all(const struct lu_env *env,
2586                            struct lfsck_instance *lfsck,
2587                            struct lfsck_start *start)
2588 {
2589         struct lfsck_thread_info          *info   = lfsck_env_info(env);
2590         struct lfsck_request              *lr     = &info->lti_lr;
2591         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
2592         struct ptlrpc_request_set         *set;
2593         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
2594         struct lfsck_tgt_desc             *ltd;
2595         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
2596         __u32                              idx;
2597         int                                rc     = 0;
2598         ENTRY;
2599
2600         LASSERT(start->ls_flags & LPF_BROADCAST);
2601
2602         set = ptlrpc_prep_set();
2603         if (unlikely(set == NULL))
2604                 RETURN(-ENOMEM);
2605
2606         memset(lr, 0, sizeof(*lr));
2607         lr->lr_event = LE_START;
2608         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
2609         lr->lr_speed = bk->lb_speed_limit;
2610         lr->lr_version = bk->lb_version;
2611         lr->lr_active = start->ls_active;
2612         lr->lr_param = start->ls_flags;
2613         lr->lr_async_windows = bk->lb_async_windows;
2614         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
2615                        LSV_ASYNC_WINDOWS | LSV_CREATE_OSTOBJ |
2616                        LSV_CREATE_MDTOBJ;
2617
2618         laia->laia_com = NULL;
2619         laia->laia_ltds = ltds;
2620         laia->laia_lr = lr;
2621         laia->laia_result = 0;
2622         laia->laia_shared = 1;
2623
2624         down_read(&ltds->ltd_rw_sem);
2625         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
2626                 ltd = lfsck_tgt_get(ltds, idx);
2627                 LASSERT(ltd != NULL);
2628
2629                 laia->laia_ltd = ltd;
2630                 ltd->ltd_layout_done = 0;
2631                 ltd->ltd_namespace_done = 0;
2632                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
2633                                          lfsck_async_interpret, laia,
2634                                          LFSCK_NOTIFY);
2635                 if (rc != 0) {
2636                         lfsck_interpret(env, lfsck, NULL, laia, rc);
2637                         lfsck_tgt_put(ltd);
2638                         CERROR("%s: cannot notify MDT %x for LFSCK "
2639                                "start, failout: rc = %d\n",
2640                                lfsck_lfsck2name(lfsck), idx, rc);
2641                         break;
2642                 }
2643         }
2644         up_read(&ltds->ltd_rw_sem);
2645
2646         if (rc != 0) {
2647                 ptlrpc_set_destroy(set);
2648
2649                 RETURN(rc);
2650         }
2651
2652         rc = ptlrpc_set_wait(set);
2653         ptlrpc_set_destroy(set);
2654
2655         if (rc == 0)
2656                 rc = laia->laia_result;
2657
2658         if (rc != 0) {
2659                 struct lfsck_stop *stop = &info->lti_stop;
2660
2661                 CERROR("%s: cannot start LFSCK on some MDTs, "
2662                        "stop all: rc = %d\n",
2663                        lfsck_lfsck2name(lfsck), rc);
2664                 if (rc != -EALREADY) {
2665                         stop->ls_status = LS_FAILED;
2666                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
2667                         lfsck_stop_all(env, lfsck, stop);
2668                 }
2669         }
2670
2671         RETURN(rc);
2672 }
2673
2674 int lfsck_start(const struct lu_env *env, struct dt_device *key,
2675                 struct lfsck_start_param *lsp)
2676 {
2677         struct lfsck_start              *start  = lsp->lsp_start;
2678         struct lfsck_instance           *lfsck;
2679         struct lfsck_bookmark           *bk;
2680         struct ptlrpc_thread            *thread;
2681         struct lfsck_component          *com;
2682         struct l_wait_info               lwi    = { 0 };
2683         struct lfsck_thread_args        *lta;
2684         struct task_struct              *task;
2685         int                              rc     = 0;
2686         __u16                            valid  = 0;
2687         __u16                            flags  = 0;
2688         __u16                            type   = 1;
2689         ENTRY;
2690
2691         lfsck = lfsck_instance_find(key, true, false);
2692         if (unlikely(lfsck == NULL))
2693                 RETURN(-ENXIO);
2694
2695         /* System is not ready, try again later. */
2696         if (unlikely(lfsck->li_namespace == NULL))
2697                 GOTO(put, rc = -EAGAIN);
2698
2699         /* start == NULL means auto trigger paused LFSCK. */
2700         if ((start == NULL) &&
2701             (list_empty(&lfsck->li_list_scan) ||
2702              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
2703                 GOTO(put, rc = 0);
2704
2705         bk = &lfsck->li_bookmark_ram;
2706         thread = &lfsck->li_thread;
2707         mutex_lock(&lfsck->li_mutex);
2708         spin_lock(&lfsck->li_lock);
2709         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
2710                 rc = -EALREADY;
2711                 if (unlikely(start == NULL)) {
2712                         spin_unlock(&lfsck->li_lock);
2713                         GOTO(out, rc);
2714                 }
2715
2716                 while (start->ls_active != 0) {
2717                         if (!(type & start->ls_active)) {
2718                                 type <<= 1;
2719                                 continue;
2720                         }
2721
2722                         com = __lfsck_component_find(lfsck, type,
2723                                                      &lfsck->li_list_scan);
2724                         if (com == NULL)
2725                                 com = __lfsck_component_find(lfsck, type,
2726                                                 &lfsck->li_list_double_scan);
2727                         if (com == NULL) {
2728                                 rc = -EOPNOTSUPP;
2729                                 break;
2730                         }
2731
2732                         if (com->lc_ops->lfsck_join != NULL) {
2733                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
2734                                 if (rc != 0 && rc != -EALREADY)
2735                                         break;
2736                         }
2737                         start->ls_active &= ~type;
2738                         type <<= 1;
2739                 }
2740                 spin_unlock(&lfsck->li_lock);
2741                 GOTO(out, rc);
2742         }
2743         spin_unlock(&lfsck->li_lock);
2744
2745         lfsck->li_status = 0;
2746         lfsck->li_oit_over = 0;
2747         lfsck->li_start_unplug = 0;
2748         lfsck->li_drop_dryrun = 0;
2749         lfsck->li_new_scanned = 0;
2750
2751         /* For auto trigger. */
2752         if (start == NULL)
2753                 goto trigger;
2754
2755         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
2756                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
2757                        lfsck_lfsck2name(lfsck));
2758
2759                 GOTO(out, rc = -EPERM);
2760         }
2761
2762         start->ls_version = bk->lb_version;
2763
2764         if (start->ls_active != 0) {
2765                 struct lfsck_component *next;
2766
2767                 if (start->ls_active == LFSCK_TYPES_ALL)
2768                         start->ls_active = LFSCK_TYPES_SUPPORTED;
2769
2770                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
2771                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
2772                         GOTO(out, rc = -ENOTSUPP);
2773                 }
2774
2775                 list_for_each_entry_safe(com, next,
2776                                          &lfsck->li_list_scan, lc_link) {
2777                         if (!(com->lc_type & start->ls_active)) {
2778                                 rc = com->lc_ops->lfsck_post(env, com, 0,
2779                                                              false);
2780                                 if (rc != 0)
2781                                         GOTO(out, rc);
2782                         }
2783                 }
2784
2785                 while (start->ls_active != 0) {
2786                         if (type & start->ls_active) {
2787                                 com = __lfsck_component_find(lfsck, type,
2788                                                         &lfsck->li_list_idle);
2789                                 if (com != NULL)
2790                                         /* The component status will be updated
2791                                          * when its prep() is called later by
2792                                          * the LFSCK main engine. */
2793                                         list_move_tail(&com->lc_link,
2794                                                        &lfsck->li_list_scan);
2795                                 start->ls_active &= ~type;
2796                         }
2797                         type <<= 1;
2798                 }
2799         }
2800
2801         if (list_empty(&lfsck->li_list_scan)) {
2802                 /* The speed limit will be used to control both the LFSCK and
2803                  * low layer scrub (if applied), need to be handled firstly. */
2804                 if (start->ls_valid & LSV_SPEED_LIMIT) {
2805                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
2806                                 rc = lfsck_bookmark_store(env, lfsck);
2807                                 if (rc != 0)
2808                                         GOTO(out, rc);
2809                         }
2810                 }
2811
2812                 goto trigger;
2813         }
2814
2815         if (start->ls_flags & LPF_RESET)
2816                 flags |= DOIF_RESET;
2817
2818         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
2819         if (rc != 0)
2820                 GOTO(out, rc);
2821
2822         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
2823                 start->ls_active |= com->lc_type;
2824                 if (flags & DOIF_RESET) {
2825                         rc = com->lc_ops->lfsck_reset(env, com, false);
2826                         if (rc != 0)
2827                                 GOTO(out, rc);
2828                 }
2829         }
2830
2831 trigger:
2832         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY | LUDA_TYPE;
2833         if (bk->lb_param & LPF_DRYRUN)
2834                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
2835
2836         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
2837                 valid |= DOIV_ERROR_HANDLE;
2838                 if (start->ls_flags & LPF_FAILOUT)
2839                         flags |= DOIF_FAILOUT;
2840         }
2841
2842         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
2843                 valid |= DOIV_DRYRUN;
2844                 if (start->ls_flags & LPF_DRYRUN)
2845                         flags |= DOIF_DRYRUN;
2846         }
2847
2848         if (!list_empty(&lfsck->li_list_scan))
2849                 flags |= DOIF_OUTUSED;
2850
2851         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
2852         thread_set_flags(thread, 0);
2853         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
2854         if (IS_ERR(lta))
2855                 GOTO(out, rc = PTR_ERR(lta));
2856
2857         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
2858         task = kthread_run(lfsck_master_engine, lta, "lfsck");
2859         if (IS_ERR(task)) {
2860                 rc = PTR_ERR(task);
2861                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
2862                        lfsck_lfsck2name(lfsck), rc);
2863                 lfsck_thread_args_fini(lta);
2864
2865                 GOTO(out, rc);
2866         }
2867
2868         l_wait_event(thread->t_ctl_waitq,
2869                      thread_is_running(thread) ||
2870                      thread_is_stopped(thread),
2871                      &lwi);
2872         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
2873                 lfsck->li_start_unplug = 1;
2874                 wake_up_all(&thread->t_ctl_waitq);
2875
2876                 GOTO(out, rc = 0);
2877         }
2878
2879         /* release lfsck::li_mutex to avoid deadlock. */
2880         mutex_unlock(&lfsck->li_mutex);
2881         rc = lfsck_start_all(env, lfsck, start);
2882         if (rc != 0) {
2883                 spin_lock(&lfsck->li_lock);
2884                 if (thread_is_stopped(thread)) {
2885                         spin_unlock(&lfsck->li_lock);
2886                 } else {
2887                         lfsck->li_status = LS_FAILED;
2888                         lfsck->li_flags = 0;
2889                         thread_set_flags(thread, SVC_STOPPING);
2890                         spin_unlock(&lfsck->li_lock);
2891
2892                         lfsck->li_start_unplug = 1;
2893                         wake_up_all(&thread->t_ctl_waitq);
2894                         l_wait_event(thread->t_ctl_waitq,
2895                                      thread_is_stopped(thread),
2896                                      &lwi);
2897                 }
2898         } else {
2899                 lfsck->li_start_unplug = 1;
2900                 wake_up_all(&thread->t_ctl_waitq);
2901         }
2902
2903         GOTO(put, rc);
2904
2905 out:
2906         mutex_unlock(&lfsck->li_mutex);
2907
2908 put:
2909         lfsck_instance_put(env, lfsck);
2910
2911         return rc < 0 ? rc : 0;
2912 }
2913 EXPORT_SYMBOL(lfsck_start);
2914
2915 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
2916                struct lfsck_stop *stop)
2917 {
2918         struct lfsck_instance   *lfsck;
2919         struct ptlrpc_thread    *thread;
2920         struct l_wait_info       lwi    = { 0 };
2921         int                      rc     = 0;
2922         int                      rc1    = 0;
2923         ENTRY;
2924
2925         lfsck = lfsck_instance_find(key, true, false);
2926         if (unlikely(lfsck == NULL))
2927                 RETURN(-ENXIO);
2928
2929         thread = &lfsck->li_thread;
2930         /* release lfsck::li_mutex to avoid deadlock. */
2931         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
2932                 if (!lfsck->li_master) {
2933                         CERROR("%s: only allow to specify '-A' via MDS\n",
2934                                lfsck_lfsck2name(lfsck));
2935
2936                         GOTO(out, rc = -EPERM);
2937                 }
2938
2939                 rc1 = lfsck_stop_all(env, lfsck, stop);
2940         }
2941
2942         mutex_lock(&lfsck->li_mutex);
2943         spin_lock(&lfsck->li_lock);
2944         /* no error if LFSCK is already stopped, or was never started */
2945         if (thread_is_init(thread) || thread_is_stopped(thread)) {
2946                 spin_unlock(&lfsck->li_lock);
2947                 GOTO(out, rc = 0);
2948         }
2949
2950         if (stop != NULL) {
2951                 lfsck->li_status = stop->ls_status;
2952                 lfsck->li_flags = stop->ls_flags;
2953         } else {
2954                 lfsck->li_status = LS_STOPPED;
2955                 lfsck->li_flags = 0;
2956         }
2957
2958         thread_set_flags(thread, SVC_STOPPING);
2959         spin_unlock(&lfsck->li_lock);
2960
2961         wake_up_all(&thread->t_ctl_waitq);
2962         l_wait_event(thread->t_ctl_waitq,
2963                      thread_is_stopped(thread),
2964                      &lwi);
2965
2966         GOTO(out, rc = 0);
2967
2968 out:
2969         mutex_unlock(&lfsck->li_mutex);
2970         lfsck_instance_put(env, lfsck);
2971
2972         return rc != 0 ? rc : rc1;
2973 }
2974 EXPORT_SYMBOL(lfsck_stop);
2975
2976 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
2977                     struct lfsck_request *lr, struct thandle *th)
2978 {
2979         int rc = -EOPNOTSUPP;
2980         ENTRY;
2981
2982         switch (lr->lr_event) {
2983         case LE_START: {
2984                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
2985                 struct lfsck_start_param  lsp;
2986
2987                 memset(start, 0, sizeof(*start));
2988                 start->ls_valid = lr->lr_valid;
2989                 start->ls_speed_limit = lr->lr_speed;
2990                 start->ls_version = lr->lr_version;
2991                 start->ls_active = lr->lr_active;
2992                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
2993                 start->ls_async_windows = lr->lr_async_windows;
2994
2995                 lsp.lsp_start = start;
2996                 lsp.lsp_index = lr->lr_index;
2997                 lsp.lsp_index_valid = 1;
2998                 rc = lfsck_start(env, key, &lsp);
2999                 break;
3000         }
3001         case LE_STOP: {
3002                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
3003
3004                 memset(stop, 0, sizeof(*stop));
3005                 stop->ls_status = lr->lr_status;
3006                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
3007                 rc = lfsck_stop(env, key, stop);
3008                 break;
3009         }
3010         case LE_PHASE1_DONE:
3011         case LE_PHASE2_DONE:
3012         case LE_FID_ACCESSED:
3013         case LE_PEER_EXIT:
3014         case LE_CONDITIONAL_DESTROY:
3015         case LE_SKIP_NLINK_DECLARE:
3016         case LE_SKIP_NLINK:
3017         case LE_SET_LMV_MASTER:
3018         case LE_SET_LMV_SLAVE:
3019         case LE_PAIRS_VERIFY: {
3020                 struct lfsck_instance  *lfsck;
3021                 struct lfsck_component *com;
3022
3023                 lfsck = lfsck_instance_find(key, true, false);
3024                 if (unlikely(lfsck == NULL))
3025                         RETURN(-ENXIO);
3026
3027                 com = lfsck_component_find(lfsck, lr->lr_active);
3028                 if (likely(com != NULL)) {
3029                         rc = com->lc_ops->lfsck_in_notify(env, com, lr, th);
3030                         lfsck_component_put(env, com);
3031                 }
3032
3033                 lfsck_instance_put(env, lfsck);
3034                 break;
3035         }
3036         default:
3037                 break;
3038         }
3039
3040         RETURN(rc);
3041 }
3042 EXPORT_SYMBOL(lfsck_in_notify);
3043
3044 int lfsck_query(const struct lu_env *env, struct dt_device *key,
3045                 struct lfsck_request *lr)
3046 {
3047         struct lfsck_instance  *lfsck;
3048         struct lfsck_component *com;
3049         int                     rc;
3050         ENTRY;
3051
3052         lfsck = lfsck_instance_find(key, true, false);
3053         if (unlikely(lfsck == NULL))
3054                 RETURN(-ENXIO);
3055
3056         com = lfsck_component_find(lfsck, lr->lr_active);
3057         if (likely(com != NULL)) {
3058                 rc = com->lc_ops->lfsck_query(env, com);
3059                 lfsck_component_put(env, com);
3060         } else {
3061                 rc = -ENOTSUPP;
3062         }
3063
3064         lfsck_instance_put(env, lfsck);
3065
3066         RETURN(rc);
3067 }
3068
3069 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
3070                              struct ldlm_namespace *ns)
3071 {
3072         struct lfsck_instance  *lfsck;
3073         int                     rc      = -ENXIO;
3074
3075         lfsck = lfsck_instance_find(key, true, false);
3076         if (likely(lfsck != NULL)) {
3077                 lfsck->li_namespace = ns;
3078                 lfsck_instance_put(env, lfsck);
3079                 rc = 0;
3080         }
3081
3082         return rc;
3083 }
3084 EXPORT_SYMBOL(lfsck_register_namespace);
3085
3086 int lfsck_register(const struct lu_env *env, struct dt_device *key,
3087                    struct dt_device *next, struct obd_device *obd,
3088                    lfsck_out_notify notify, void *notify_data, bool master)
3089 {
3090         struct lfsck_instance   *lfsck;
3091         struct dt_object        *root  = NULL;
3092         struct dt_object        *obj   = NULL;
3093         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
3094         int                      rc;
3095         ENTRY;
3096
3097         lfsck = lfsck_instance_find(key, false, false);
3098         if (unlikely(lfsck != NULL))
3099                 RETURN(-EEXIST);
3100
3101         OBD_ALLOC_PTR(lfsck);
3102         if (lfsck == NULL)
3103                 RETURN(-ENOMEM);
3104
3105         mutex_init(&lfsck->li_mutex);
3106         spin_lock_init(&lfsck->li_lock);
3107         INIT_LIST_HEAD(&lfsck->li_link);
3108         INIT_LIST_HEAD(&lfsck->li_list_scan);
3109         INIT_LIST_HEAD(&lfsck->li_list_dir);
3110         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
3111         INIT_LIST_HEAD(&lfsck->li_list_idle);
3112         INIT_LIST_HEAD(&lfsck->li_list_lmv);
3113         atomic_set(&lfsck->li_ref, 1);
3114         atomic_set(&lfsck->li_double_scan_count, 0);
3115         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
3116         lfsck->li_out_notify = notify;
3117         lfsck->li_out_notify_data = notify_data;
3118         lfsck->li_next = next;
3119         lfsck->li_bottom = key;
3120         lfsck->li_obd = obd;
3121
3122         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
3123         if (rc != 0)
3124                 GOTO(out, rc);
3125
3126         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
3127         if (rc != 0)
3128                 GOTO(out, rc);
3129
3130         fid->f_seq = FID_SEQ_LOCAL_NAME;
3131         fid->f_oid = 1;
3132         fid->f_ver = 0;
3133         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
3134         if (rc != 0)
3135                 GOTO(out, rc);
3136
3137         rc = dt_root_get(env, key, fid);
3138         if (rc != 0)
3139                 GOTO(out, rc);
3140
3141         root = dt_locate(env, key, fid);
3142         if (IS_ERR(root))
3143                 GOTO(out, rc = PTR_ERR(root));
3144
3145         if (unlikely(!dt_try_as_dir(env, root)))
3146                 GOTO(out, rc = -ENOTDIR);
3147
3148         lfsck->li_local_root_fid = *fid;
3149         if (master) {
3150                 lfsck->li_master = 1;
3151                 if (lfsck_dev_idx(key) == 0) {
3152                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
3153                         const struct lu_name *cname;
3154
3155                         rc = dt_lookup(env, root,
3156                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
3157                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
3158                         if (rc != 0)
3159                                 GOTO(out, rc);
3160
3161                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
3162                         if (IS_ERR(obj))
3163                                 GOTO(out, rc = PTR_ERR(obj));
3164
3165                         if (unlikely(!dt_try_as_dir(env, obj)))
3166                                 GOTO(out, rc = -ENOTDIR);
3167
3168                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3169                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
3170                         if (rc != 0)
3171                                 GOTO(out, rc);
3172
3173                         lu_object_put(env, &obj->do_lu);
3174                         obj = dt_locate(env, key, fid);
3175                         if (IS_ERR(obj))
3176                                 GOTO(out, rc = PTR_ERR(obj));
3177
3178                         cname = lfsck_name_get_const(env, dotlustre,
3179                                                      strlen(dotlustre));
3180                         rc = lfsck_verify_linkea(env, key, obj, cname,
3181                                                  &lfsck->li_global_root_fid);
3182                         if (rc != 0)
3183                                 GOTO(out, rc);
3184
3185                         if (unlikely(!dt_try_as_dir(env, obj)))
3186                                 GOTO(out, rc = -ENOTDIR);
3187
3188                         *pfid = *fid;
3189                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
3190                                        (const struct dt_key *)lostfound,
3191                                        BYPASS_CAPA);
3192                         if (rc != 0)
3193                                 GOTO(out, rc);
3194
3195                         lu_object_put(env, &obj->do_lu);
3196                         obj = dt_locate(env, key, fid);
3197                         if (IS_ERR(obj))
3198                                 GOTO(out, rc = PTR_ERR(obj));
3199
3200                         cname = lfsck_name_get_const(env, lostfound,
3201                                                      strlen(lostfound));
3202                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
3203                         if (rc != 0)
3204                                 GOTO(out, rc);
3205
3206                         lu_object_put(env, &obj->do_lu);
3207                         obj = NULL;
3208                 }
3209         }
3210
3211         fid->f_seq = FID_SEQ_LOCAL_FILE;
3212         fid->f_oid = OTABLE_IT_OID;
3213         fid->f_ver = 0;
3214         obj = dt_locate(env, key, fid);
3215         if (IS_ERR(obj))
3216                 GOTO(out, rc = PTR_ERR(obj));
3217
3218         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
3219         if (rc != 0)
3220                 GOTO(out, rc);
3221
3222         lfsck->li_obj_oit = obj;
3223         obj = local_file_find_or_create(env, lfsck->li_los, root, LFSCK_DIR,
3224                                         S_IFDIR | S_IRUGO | S_IWUSR);
3225         if (IS_ERR(obj))
3226                 GOTO(out, rc = PTR_ERR(obj));
3227
3228         lu_object_get(&obj->do_lu);
3229         lfsck->li_lfsck_dir = obj;
3230         rc = lfsck_bookmark_setup(env, lfsck);
3231         if (rc != 0)
3232                 GOTO(out, rc);
3233
3234         if (master) {
3235                 rc = lfsck_fid_init(lfsck);
3236                 if (rc < 0)
3237                         GOTO(out, rc);
3238
3239                 rc = lfsck_namespace_setup(env, lfsck);
3240                 if (rc < 0)
3241                         GOTO(out, rc);
3242         }
3243
3244         rc = lfsck_layout_setup(env, lfsck);
3245         if (rc < 0)
3246                 GOTO(out, rc);
3247
3248         /* XXX: more LFSCK components initialization to be added here. */
3249
3250         rc = lfsck_instance_add(lfsck);
3251         if (rc == 0)
3252                 rc = lfsck_add_target_from_orphan(env, lfsck);
3253 out:
3254         if (obj != NULL && !IS_ERR(obj))
3255                 lu_object_put(env, &obj->do_lu);
3256         if (root != NULL && !IS_ERR(root))
3257                 lu_object_put(env, &root->do_lu);
3258         if (rc != 0)
3259                 lfsck_instance_cleanup(env, lfsck);
3260         return rc;
3261 }
3262 EXPORT_SYMBOL(lfsck_register);
3263
3264 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
3265 {
3266         struct lfsck_instance *lfsck;
3267
3268         lfsck = lfsck_instance_find(key, false, true);
3269         if (lfsck != NULL)
3270                 lfsck_instance_put(env, lfsck);
3271 }
3272 EXPORT_SYMBOL(lfsck_degister);
3273
3274 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
3275                      struct dt_device *tgt, struct obd_export *exp,
3276                      __u32 index, bool for_ost)
3277 {
3278         struct lfsck_instance   *lfsck;
3279         struct lfsck_tgt_desc   *ltd;
3280         int                      rc;
3281         ENTRY;
3282
3283         OBD_ALLOC_PTR(ltd);
3284         if (ltd == NULL)
3285                 RETURN(-ENOMEM);
3286
3287         ltd->ltd_tgt = tgt;
3288         ltd->ltd_key = key;
3289         ltd->ltd_exp = exp;
3290         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
3291         INIT_LIST_HEAD(&ltd->ltd_layout_list);
3292         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
3293         INIT_LIST_HEAD(&ltd->ltd_namespace_list);
3294         INIT_LIST_HEAD(&ltd->ltd_namespace_phase_list);
3295         atomic_set(&ltd->ltd_ref, 1);
3296         ltd->ltd_index = index;
3297
3298         spin_lock(&lfsck_instance_lock);
3299         lfsck = __lfsck_instance_find(key, true, false);
3300         if (lfsck == NULL) {
3301                 if (for_ost)
3302                         list_add_tail(&ltd->ltd_orphan_list,
3303                                       &lfsck_ost_orphan_list);
3304                 else
3305                         list_add_tail(&ltd->ltd_orphan_list,
3306                                       &lfsck_mdt_orphan_list);
3307                 spin_unlock(&lfsck_instance_lock);
3308
3309                 RETURN(0);
3310         }
3311         spin_unlock(&lfsck_instance_lock);
3312
3313         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
3314         if (rc != 0)
3315                 lfsck_tgt_put(ltd);
3316
3317         lfsck_instance_put(env, lfsck);
3318
3319         RETURN(rc);
3320 }
3321 EXPORT_SYMBOL(lfsck_add_target);
3322
3323 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
3324                       struct dt_device *tgt, __u32 index, bool for_ost)
3325 {
3326         struct lfsck_instance   *lfsck;
3327         struct lfsck_tgt_descs  *ltds;
3328         struct lfsck_tgt_desc   *ltd;
3329         struct list_head        *head;
3330
3331         if (for_ost)
3332                 head = &lfsck_ost_orphan_list;
3333         else
3334                 head = &lfsck_mdt_orphan_list;
3335
3336         spin_lock(&lfsck_instance_lock);
3337         list_for_each_entry(ltd, head, ltd_orphan_list) {
3338                 if (ltd->ltd_tgt == tgt) {
3339                         list_del_init(&ltd->ltd_orphan_list);
3340                         spin_unlock(&lfsck_instance_lock);
3341                         lfsck_tgt_put(ltd);
3342
3343                         return;
3344                 }
3345         }
3346
3347         ltd = NULL;
3348         lfsck = __lfsck_instance_find(key, true, false);
3349         spin_unlock(&lfsck_instance_lock);
3350         if (unlikely(lfsck == NULL))
3351                 return;
3352
3353         if (for_ost)
3354                 ltds = &lfsck->li_ost_descs;
3355         else
3356                 ltds = &lfsck->li_mdt_descs;
3357
3358         down_write(&ltds->ltd_rw_sem);
3359         LASSERT(ltds->ltd_tgts_bitmap != NULL);
3360
3361         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
3362                 goto unlock;
3363
3364         ltd = LTD_TGT(ltds, index);
3365         if (unlikely(ltd == NULL))
3366                 goto unlock;
3367
3368         LASSERT(ltds->ltd_tgtnr > 0);
3369
3370         ltds->ltd_tgtnr--;
3371         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
3372         LTD_TGT(ltds, index) = NULL;
3373
3374 unlock:
3375         if (ltd == NULL) {
3376                 if (for_ost)
3377                         head = &lfsck->li_ost_descs.ltd_orphan;
3378                 else
3379                         head = &lfsck->li_mdt_descs.ltd_orphan;
3380
3381                 list_for_each_entry(ltd, head, ltd_orphan_list) {
3382                         if (ltd->ltd_tgt == tgt) {
3383                                 list_del_init(&ltd->ltd_orphan_list);
3384                                 break;
3385                         }
3386                 }
3387         }
3388
3389         up_write(&ltds->ltd_rw_sem);
3390         if (ltd != NULL) {
3391                 spin_lock(&ltds->ltd_lock);
3392                 ltd->ltd_dead = 1;
3393                 spin_unlock(&ltds->ltd_lock);
3394                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_NAMESPACE);
3395                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
3396                 lfsck_tgt_put(ltd);
3397         }
3398
3399         lfsck_instance_put(env, lfsck);
3400 }
3401 EXPORT_SYMBOL(lfsck_del_target);
3402
3403 static int __init lfsck_init(void)
3404 {
3405         int rc;
3406
3407         INIT_LIST_HEAD(&lfsck_instance_list);
3408         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
3409         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
3410         lfsck_key_init_generic(&lfsck_thread_key, NULL);
3411         rc = lu_context_key_register(&lfsck_thread_key);
3412         if (rc == 0) {
3413                 tgt_register_lfsck_in_notify(lfsck_in_notify);
3414                 tgt_register_lfsck_query(lfsck_query);
3415         }
3416
3417         return rc;
3418 }
3419
3420 static void __exit lfsck_exit(void)
3421 {
3422         struct lfsck_tgt_desc *ltd;
3423         struct lfsck_tgt_desc *next;
3424
3425         LASSERT(list_empty(&lfsck_instance_list));
3426
3427         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
3428                                  ltd_orphan_list) {
3429                 list_del_init(&ltd->ltd_orphan_list);
3430                 lfsck_tgt_put(ltd);
3431         }
3432
3433         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
3434                                  ltd_orphan_list) {
3435                 list_del_init(&ltd->ltd_orphan_list);
3436                 lfsck_tgt_put(ltd);
3437         }
3438
3439         lu_context_key_degister(&lfsck_thread_key);
3440 }
3441
3442 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
3443 MODULE_DESCRIPTION("LFSCK");
3444 MODULE_LICENSE("GPL");
3445
3446 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);