Whamcloud - gitweb
LU-4788 lfsck: LFSCK code framework adjustment (1)
[fs/lustre-release.git] / lustre / lfsck / lfsck_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_lib.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <libcfs/list.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fld.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre_lfsck.h>
41 #include <lustre/lustre_lfsck_user.h>
42
43 #include "lfsck_internal.h"
44
45 /* define lfsck thread key */
46 LU_KEY_INIT(lfsck, struct lfsck_thread_info);
47
48 static void lfsck_key_fini(const struct lu_context *ctx,
49                            struct lu_context_key *key, void *data)
50 {
51         struct lfsck_thread_info *info = data;
52
53         lu_buf_free(&info->lti_linkea_buf);
54         lu_buf_free(&info->lti_big_buf);
55         OBD_FREE_PTR(info);
56 }
57
58 LU_CONTEXT_KEY_DEFINE(lfsck, LCT_MD_THREAD | LCT_DT_THREAD);
59 LU_KEY_INIT_GENERIC(lfsck);
60
61 static struct list_head lfsck_instance_list;
62 static struct list_head lfsck_ost_orphan_list;
63 static struct list_head lfsck_mdt_orphan_list;
64 static DEFINE_SPINLOCK(lfsck_instance_lock);
65
66 static const char *lfsck_status_names[] = {
67         [LS_INIT]               = "init",
68         [LS_SCANNING_PHASE1]    = "scanning-phase1",
69         [LS_SCANNING_PHASE2]    = "scanning-phase2",
70         [LS_COMPLETED]          = "completed",
71         [LS_FAILED]             = "failed",
72         [LS_STOPPED]            = "stopped",
73         [LS_PAUSED]             = "paused",
74         [LS_CRASHED]            = "crashed",
75         [LS_PARTIAL]            = "partial",
76         [LS_CO_FAILED]          = "co-failed",
77         [LS_CO_STOPPED]         = "co-stopped",
78         [LS_CO_PAUSED]          = "co-paused"
79 };
80
81 const char *lfsck_flags_names[] = {
82         "scanned-once",
83         "inconsistent",
84         "upgrade",
85         "incomplete",
86         "crashed_lastid",
87         NULL
88 };
89
90 const char *lfsck_param_names[] = {
91         NULL,
92         "failout",
93         "dryrun",
94         "all_targets",
95         "broadcast",
96         "orphan",
97         "create_ostobj",
98         NULL
99 };
100
101 const char *lfsck_status2names(enum lfsck_status status)
102 {
103         if (unlikely(status < 0 || status >= LS_MAX))
104                 return "unknown";
105
106         return lfsck_status_names[status];
107 }
108
109 static int lfsck_tgt_descs_init(struct lfsck_tgt_descs *ltds)
110 {
111         spin_lock_init(&ltds->ltd_lock);
112         init_rwsem(&ltds->ltd_rw_sem);
113         INIT_LIST_HEAD(&ltds->ltd_orphan);
114         ltds->ltd_tgts_bitmap = CFS_ALLOCATE_BITMAP(BITS_PER_LONG);
115         if (ltds->ltd_tgts_bitmap == NULL)
116                 return -ENOMEM;
117
118         return 0;
119 }
120
121 static void lfsck_tgt_descs_fini(struct lfsck_tgt_descs *ltds)
122 {
123         struct lfsck_tgt_desc   *ltd;
124         struct lfsck_tgt_desc   *next;
125         int                      idx;
126
127         down_write(&ltds->ltd_rw_sem);
128
129         list_for_each_entry_safe(ltd, next, &ltds->ltd_orphan,
130                                  ltd_orphan_list) {
131                 list_del_init(&ltd->ltd_orphan_list);
132                 lfsck_tgt_put(ltd);
133         }
134
135         if (unlikely(ltds->ltd_tgts_bitmap == NULL)) {
136                 up_write(&ltds->ltd_rw_sem);
137
138                 return;
139         }
140
141         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
142                 ltd = LTD_TGT(ltds, idx);
143                 if (likely(ltd != NULL)) {
144                         LASSERT(list_empty(&ltd->ltd_layout_list));
145                         LASSERT(list_empty(&ltd->ltd_layout_phase_list));
146
147                         ltds->ltd_tgtnr--;
148                         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, idx);
149                         LTD_TGT(ltds, idx) = NULL;
150                         lfsck_tgt_put(ltd);
151                 }
152         }
153
154         LASSERTF(ltds->ltd_tgtnr == 0, "tgt count unmatched: %d\n",
155                  ltds->ltd_tgtnr);
156
157         for (idx = 0; idx < TGT_PTRS; idx++) {
158                 if (ltds->ltd_tgts_idx[idx] != NULL) {
159                         OBD_FREE_PTR(ltds->ltd_tgts_idx[idx]);
160                         ltds->ltd_tgts_idx[idx] = NULL;
161                 }
162         }
163
164         CFS_FREE_BITMAP(ltds->ltd_tgts_bitmap);
165         ltds->ltd_tgts_bitmap = NULL;
166         up_write(&ltds->ltd_rw_sem);
167 }
168
169 static int __lfsck_add_target(const struct lu_env *env,
170                               struct lfsck_instance *lfsck,
171                               struct lfsck_tgt_desc *ltd,
172                               bool for_ost, bool locked)
173 {
174         struct lfsck_tgt_descs *ltds;
175         __u32                   index = ltd->ltd_index;
176         int                     rc    = 0;
177         ENTRY;
178
179         if (for_ost)
180                 ltds = &lfsck->li_ost_descs;
181         else
182                 ltds = &lfsck->li_mdt_descs;
183
184         if (!locked)
185                 down_write(&ltds->ltd_rw_sem);
186
187         LASSERT(ltds->ltd_tgts_bitmap != NULL);
188
189         if (index >= ltds->ltd_tgts_bitmap->size) {
190                 __u32 newsize = max((__u32)ltds->ltd_tgts_bitmap->size,
191                                     (__u32)BITS_PER_LONG);
192                 cfs_bitmap_t *old_bitmap = ltds->ltd_tgts_bitmap;
193                 cfs_bitmap_t *new_bitmap;
194
195                 while (newsize < index + 1)
196                         newsize <<= 1;
197
198                 new_bitmap = CFS_ALLOCATE_BITMAP(newsize);
199                 if (new_bitmap == NULL)
200                         GOTO(unlock, rc = -ENOMEM);
201
202                 if (ltds->ltd_tgtnr > 0)
203                         cfs_bitmap_copy(new_bitmap, old_bitmap);
204                 ltds->ltd_tgts_bitmap = new_bitmap;
205                 CFS_FREE_BITMAP(old_bitmap);
206         }
207
208         if (cfs_bitmap_check(ltds->ltd_tgts_bitmap, index)) {
209                 CERROR("%s: the device %s (%u) is registered already\n",
210                        lfsck_lfsck2name(lfsck),
211                        ltd->ltd_tgt->dd_lu_dev.ld_obd->obd_name, index);
212                 GOTO(unlock, rc = -EEXIST);
213         }
214
215         if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL) {
216                 OBD_ALLOC_PTR(ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK]);
217                 if (ltds->ltd_tgts_idx[index / TGT_PTRS_PER_BLOCK] == NULL)
218                         GOTO(unlock, rc = -ENOMEM);
219         }
220
221         LTD_TGT(ltds, index) = ltd;
222         cfs_bitmap_set(ltds->ltd_tgts_bitmap, index);
223         ltds->ltd_tgtnr++;
224
225         GOTO(unlock, rc = 0);
226
227 unlock:
228         if (!locked)
229                 up_write(&ltds->ltd_rw_sem);
230
231         return rc;
232 }
233
234 static int lfsck_add_target_from_orphan(const struct lu_env *env,
235                                         struct lfsck_instance *lfsck)
236 {
237         struct lfsck_tgt_descs  *ltds    = &lfsck->li_ost_descs;
238         struct lfsck_tgt_desc   *ltd;
239         struct lfsck_tgt_desc   *next;
240         struct list_head        *head    = &lfsck_ost_orphan_list;
241         int                      rc;
242         bool                     for_ost = true;
243
244 again:
245         spin_lock(&lfsck_instance_lock);
246         list_for_each_entry_safe(ltd, next, head, ltd_orphan_list) {
247                 if (ltd->ltd_key == lfsck->li_bottom)
248                         list_move_tail(&ltd->ltd_orphan_list,
249                                        &ltds->ltd_orphan);
250         }
251         spin_unlock(&lfsck_instance_lock);
252
253         down_write(&ltds->ltd_rw_sem);
254         while (!list_empty(&ltds->ltd_orphan)) {
255                 ltd = list_entry(ltds->ltd_orphan.next,
256                                  struct lfsck_tgt_desc,
257                                  ltd_orphan_list);
258                 list_del_init(&ltd->ltd_orphan_list);
259                 rc = __lfsck_add_target(env, lfsck, ltd, for_ost, true);
260                 /* Do not hold the semaphore for too long time. */
261                 up_write(&ltds->ltd_rw_sem);
262                 if (rc != 0)
263                         return rc;
264
265                 down_write(&ltds->ltd_rw_sem);
266         }
267         up_write(&ltds->ltd_rw_sem);
268
269         if (for_ost) {
270                 ltds = &lfsck->li_mdt_descs;
271                 head = &lfsck_mdt_orphan_list;
272                 for_ost = false;
273                 goto again;
274         }
275
276         return 0;
277 }
278
279 static inline struct lfsck_component *
280 __lfsck_component_find(struct lfsck_instance *lfsck, __u16 type,
281                        struct list_head *list)
282 {
283         struct lfsck_component *com;
284
285         list_for_each_entry(com, list, lc_link) {
286                 if (com->lc_type == type)
287                         return com;
288         }
289         return NULL;
290 }
291
292 struct lfsck_component *
293 lfsck_component_find(struct lfsck_instance *lfsck, __u16 type)
294 {
295         struct lfsck_component *com;
296
297         spin_lock(&lfsck->li_lock);
298         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
299         if (com != NULL)
300                 goto unlock;
301
302         com = __lfsck_component_find(lfsck, type,
303                                      &lfsck->li_list_double_scan);
304         if (com != NULL)
305                 goto unlock;
306
307         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_idle);
308
309 unlock:
310         if (com != NULL)
311                 lfsck_component_get(com);
312         spin_unlock(&lfsck->li_lock);
313         return com;
314 }
315
316 void lfsck_component_cleanup(const struct lu_env *env,
317                              struct lfsck_component *com)
318 {
319         if (!list_empty(&com->lc_link))
320                 list_del_init(&com->lc_link);
321         if (!list_empty(&com->lc_link_dir))
322                 list_del_init(&com->lc_link_dir);
323
324         lfsck_component_put(env, com);
325 }
326
327 int lfsck_fid_alloc(const struct lu_env *env, struct lfsck_instance *lfsck,
328                     struct lu_fid *fid, bool locked)
329 {
330         struct lfsck_bookmark   *bk = &lfsck->li_bookmark_ram;
331         int                      rc = 0;
332         ENTRY;
333
334         if (!locked)
335                 mutex_lock(&lfsck->li_mutex);
336
337         rc = seq_client_alloc_fid(env, lfsck->li_seq, fid);
338         if (rc >= 0) {
339                 bk->lb_last_fid = *fid;
340                 /* We do not care about whether the subsequent sub-operations
341                  * failed or not. The worst case is that one FID is lost that
342                  * is not a big issue for the LFSCK since it is relative rare
343                  * for LFSCK create. */
344                 rc = lfsck_bookmark_store(env, lfsck);
345         }
346
347         if (!locked)
348                 mutex_unlock(&lfsck->li_mutex);
349
350         RETURN(rc);
351 }
352
353 static const char dot[] = ".";
354 static const char dotdot[] = "..";
355 static const char dotlustre[] = ".lustre";
356 static const char lostfound[] = "lost+found";
357
358 static int lfsck_create_lpf_local(const struct lu_env *env,
359                                   struct lfsck_instance *lfsck,
360                                   struct dt_object *parent,
361                                   struct dt_object *child,
362                                   struct lu_attr *la,
363                                   struct dt_object_format *dof,
364                                   const char *name)
365 {
366         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
367         struct dt_device        *dev    = lfsck->li_bottom;
368         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
369         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
370         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
371         struct thandle          *th     = NULL;
372         struct linkea_data       ldata  = { 0 };
373         struct lu_buf            linkea_buf;
374         const struct lu_name    *cname;
375         loff_t                   pos    = 0;
376         int                      len    = sizeof(struct lfsck_bookmark);
377         int                      rc;
378         ENTRY;
379
380         rc = linkea_data_new(&ldata,
381                              &lfsck_env_info(env)->lti_linkea_buf);
382         if (rc != 0)
383                 RETURN(rc);
384
385         cname = lfsck_name_get_const(env, name, strlen(name));
386         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
387         if (rc != 0)
388                 RETURN(rc);
389
390         th = dt_trans_create(env, dev);
391         if (IS_ERR(th))
392                 RETURN(PTR_ERR(th));
393
394         /* 1a. create child */
395         rc = dt_declare_create(env, child, la, NULL, dof, th);
396         if (rc != 0)
397                 GOTO(stop, rc);
398
399         /* 2a. increase child nlink */
400         rc = dt_declare_ref_add(env, child, th);
401         if (rc != 0)
402                 GOTO(stop, rc);
403
404         /* 3a. insert linkEA for child */
405         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
406         linkea_buf.lb_len = ldata.ld_leh->leh_len;
407         rc = dt_declare_xattr_set(env, child, &linkea_buf,
408                                   XATTR_NAME_LINK, 0, th);
409         if (rc != 0)
410                 GOTO(stop, rc);
411
412         /* 4a. insert name into parent dir */
413         rec->rec_type = S_IFDIR;
414         rec->rec_fid = cfid;
415         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
416                                (const struct dt_key *)name, th);
417         if (rc != 0)
418                 GOTO(stop, rc);
419
420         /* 5a. increase parent nlink */
421         rc = dt_declare_ref_add(env, parent, th);
422         if (rc != 0)
423                 GOTO(stop, rc);
424
425         /* 6a. update bookmark */
426         rc = dt_declare_record_write(env, bk_obj,
427                                      lfsck_buf_get(env, bk, len), 0, th);
428         if (rc != 0)
429                 GOTO(stop, rc);
430
431         rc = dt_trans_start_local(env, dev, th);
432         if (rc != 0)
433                 GOTO(stop, rc);
434
435         dt_write_lock(env, child, 0);
436         /* 1b.1. create child */
437         rc = dt_create(env, child, la, NULL, dof, th);
438         if (rc != 0)
439                 GOTO(unlock, rc);
440
441         if (unlikely(!dt_try_as_dir(env, child)))
442                 GOTO(unlock, rc = -ENOTDIR);
443
444         /* 1b.2. insert dot into child dir */
445         rec->rec_fid = cfid;
446         rc = dt_insert(env, child, (const struct dt_rec *)rec,
447                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
448         if (rc != 0)
449                 GOTO(unlock, rc);
450
451         /* 1b.3. insert dotdot into child dir */
452         rec->rec_fid = &LU_LPF_FID;
453         rc = dt_insert(env, child, (const struct dt_rec *)rec,
454                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
455         if (rc != 0)
456                 GOTO(unlock, rc);
457
458         /* 2b. increase child nlink */
459         rc = dt_ref_add(env, child, th);
460         if (rc != 0)
461                 GOTO(unlock, rc);
462
463         /* 3b. insert linkEA for child. */
464         rc = dt_xattr_set(env, child, &linkea_buf,
465                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
466         dt_write_unlock(env, child);
467         if (rc != 0)
468                 GOTO(stop, rc);
469
470         /* 4b. insert name into parent dir */
471         rec->rec_fid = cfid;
472         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
473                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
474         if (rc != 0)
475                 GOTO(stop, rc);
476
477         dt_write_lock(env, parent, 0);
478         /* 5b. increase parent nlink */
479         rc = dt_ref_add(env, parent, th);
480         dt_write_unlock(env, parent);
481         if (rc != 0)
482                 GOTO(stop, rc);
483
484         bk->lb_lpf_fid = *cfid;
485         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
486
487         /* 6b. update bookmark */
488         rc = dt_record_write(env, bk_obj,
489                              lfsck_buf_get(env, bk, len), &pos, th);
490
491         GOTO(stop, rc);
492
493 unlock:
494         dt_write_unlock(env, child);
495
496 stop:
497         dt_trans_stop(env, dev, th);
498
499         return rc;
500 }
501
502 static int lfsck_create_lpf_remote(const struct lu_env *env,
503                                    struct lfsck_instance *lfsck,
504                                    struct dt_object *parent,
505                                    struct dt_object *child,
506                                    struct lu_attr *la,
507                                    struct dt_object_format *dof,
508                                    const char *name)
509 {
510         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
511         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
512         struct dt_object        *bk_obj = lfsck->li_bookmark_obj;
513         const struct lu_fid     *cfid   = lfsck_dto2fid(child);
514         struct thandle          *th     = NULL;
515         struct linkea_data       ldata  = { 0 };
516         struct lu_buf            linkea_buf;
517         const struct lu_name    *cname;
518         struct dt_device        *dev;
519         loff_t                   pos    = 0;
520         int                      len    = sizeof(struct lfsck_bookmark);
521         int                      rc;
522         ENTRY;
523
524         rc = linkea_data_new(&ldata,
525                              &lfsck_env_info(env)->lti_linkea_buf);
526         if (rc != 0)
527                 RETURN(rc);
528
529         cname = lfsck_name_get_const(env, name, strlen(name));
530         rc = linkea_add_buf(&ldata, cname, lfsck_dto2fid(parent));
531         if (rc != 0)
532                 RETURN(rc);
533
534         /* Create .lustre/lost+found/MDTxxxx. */
535
536         /* XXX: Currently, cross-MDT create operation needs to create the child
537          *      object firstly, then insert name into the parent directory. For
538          *      this case, the child object resides on current MDT (local), but
539          *      the parent ".lustre/lost+found" may be on remote MDT. It is not
540          *      easy to contain all the sub-modifications orderly within single
541          *      transaction.
542          *
543          *      To avoid more inconsistency, we split the create operation into
544          *      two transactions:
545          *
546          *      1) create the child and update the lfsck_bookmark::lb_lpf_fid
547          *         locally.
548          *      2) insert the name "MDTXXXX" in the parent ".lustre/lost+found"
549          *         remotely.
550          *
551          *      If 1) done, but 2) failed, then go ahead, the LFSCK will try to
552          *      repair such inconsistency when LFSCK run next time. */
553
554         /* Transaction I: locally */
555
556         dev = lfsck->li_bottom;
557         th = dt_trans_create(env, dev);
558         if (IS_ERR(th))
559                 RETURN(PTR_ERR(th));
560
561         /* 1a. create child */
562         rc = dt_declare_create(env, child, la, NULL, dof, th);
563         if (rc != 0)
564                 GOTO(stop, rc);
565
566         /* 2a. increase child nlink */
567         rc = dt_declare_ref_add(env, child, th);
568         if (rc != 0)
569                 GOTO(stop, rc);
570
571         /* 3a. insert linkEA for child */
572         linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
573         linkea_buf.lb_len = ldata.ld_leh->leh_len;
574         rc = dt_declare_xattr_set(env, child, &linkea_buf,
575                                   XATTR_NAME_LINK, 0, th);
576         if (rc != 0)
577                 GOTO(stop, rc);
578
579         /* 4a. update bookmark */
580         rc = dt_declare_record_write(env, bk_obj,
581                                      lfsck_buf_get(env, bk, len), 0, th);
582         if (rc != 0)
583                 GOTO(stop, rc);
584
585         rc = dt_trans_start_local(env, dev, th);
586         if (rc != 0)
587                 GOTO(stop, rc);
588
589         dt_write_lock(env, child, 0);
590         /* 1b.1. create child */
591         rc = dt_create(env, child, la, NULL, dof, th);
592         if (rc != 0)
593                 GOTO(unlock, rc);
594
595         if (unlikely(!dt_try_as_dir(env, child)))
596                 GOTO(unlock, rc = -ENOTDIR);
597
598         /* 1b.2. insert dot into child dir */
599         rec->rec_type = S_IFDIR;
600         rec->rec_fid = cfid;
601         rc = dt_insert(env, child, (const struct dt_rec *)rec,
602                        (const struct dt_key *)dot, th, BYPASS_CAPA, 1);
603         if (rc != 0)
604                 GOTO(unlock, rc);
605
606         /* 1b.3. insert dotdot into child dir */
607         rec->rec_fid = &LU_LPF_FID;
608         rc = dt_insert(env, child, (const struct dt_rec *)rec,
609                        (const struct dt_key *)dotdot, th, BYPASS_CAPA, 1);
610         if (rc != 0)
611                 GOTO(unlock, rc);
612
613         /* 2b. increase child nlink */
614         rc = dt_ref_add(env, child, th);
615         if (rc != 0)
616                 GOTO(unlock, rc);
617
618         /* 3b. insert linkEA for child */
619         rc = dt_xattr_set(env, child, &linkea_buf,
620                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
621         if (rc != 0)
622                 GOTO(unlock, rc);
623
624         bk->lb_lpf_fid = *cfid;
625         lfsck_bookmark_cpu_to_le(&lfsck->li_bookmark_disk, bk);
626
627         /* 4b. update bookmark */
628         rc = dt_record_write(env, bk_obj,
629                              lfsck_buf_get(env, bk, len), &pos, th);
630
631         dt_write_unlock(env, child);
632         dt_trans_stop(env, dev, th);
633         if (rc != 0)
634                 RETURN(rc);
635
636         /* Transaction II: remotely */
637
638         dev = lfsck->li_next;
639         th = dt_trans_create(env, dev);
640         if (IS_ERR(th))
641                 RETURN(PTR_ERR(th));
642
643         /* 5a. insert name into parent dir */
644         rec->rec_fid = cfid;
645         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
646                                (const struct dt_key *)name, th);
647         if (rc != 0)
648                 GOTO(stop, rc);
649
650         /* 6a. increase parent nlink */
651         rc = dt_declare_ref_add(env, parent, th);
652         if (rc != 0)
653                 GOTO(stop, rc);
654
655         rc = dt_trans_start(env, dev, th);
656         if (rc != 0)
657                 GOTO(stop, rc);
658
659         /* 5b. insert name into parent dir */
660         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
661                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
662         if (rc != 0)
663                 GOTO(stop, rc);
664
665         dt_write_lock(env, parent, 0);
666         /* 6b. increase parent nlink */
667         rc = dt_ref_add(env, parent, th);
668         dt_write_unlock(env, parent);
669
670         GOTO(stop, rc);
671
672 unlock:
673         dt_write_unlock(env, child);
674 stop:
675         dt_trans_stop(env, dev, th);
676
677         if (rc != 0 && dev == lfsck->li_next)
678                 CDEBUG(D_LFSCK, "%s: partially created the object "DFID
679                        "for orphans, but failed to insert the name %s "
680                        "to the .lustre/lost+found/. Such inconsistency "
681                        "will be repaired when LFSCK run next time: rc = %d\n",
682                        lfsck_lfsck2name(lfsck), PFID(cfid), name, rc);
683
684         return rc;
685 }
686
687 /* Do NOT create .lustre/lost+found/MDTxxxx when register the lfsck instance,
688  * because the MDT0 maybe not reaady for sequence allocation yet. We do that
689  * only when it is required, such as orphan OST-objects repairing. */
690 int lfsck_create_lpf(const struct lu_env *env, struct lfsck_instance *lfsck)
691 {
692         struct lfsck_bookmark    *bk    = &lfsck->li_bookmark_ram;
693         struct lfsck_thread_info *info  = lfsck_env_info(env);
694         struct lu_fid            *cfid  = &info->lti_fid2;
695         struct lu_attr           *la    = &info->lti_la;
696         struct dt_object_format  *dof   = &info->lti_dof;
697         struct dt_object         *parent = NULL;
698         struct dt_object         *child = NULL;
699         char                      name[8];
700         int                       node  = lfsck_dev_idx(lfsck->li_bottom);
701         int                       rc    = 0;
702         ENTRY;
703
704         LASSERT(lfsck->li_master);
705
706         sprintf(name, "MDT%04x", node);
707         if (node == 0) {
708                 parent = lfsck_object_find_by_dev(env, lfsck->li_bottom,
709                                                   &LU_LPF_FID);
710         } else {
711                 struct lfsck_tgt_desc *ltd;
712
713                 ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, 0);
714                 if (unlikely(ltd == NULL))
715                         RETURN(-ENXIO);
716
717                 parent = lfsck_object_find_by_dev(env, ltd->ltd_tgt,
718                                                   &LU_LPF_FID);
719                 lfsck_tgt_put(ltd);
720         }
721         if (IS_ERR(parent))
722                 RETURN(PTR_ERR(parent));
723
724         if (unlikely(!dt_try_as_dir(env, parent)))
725                 GOTO(out, rc = -ENOTDIR);
726
727         mutex_lock(&lfsck->li_mutex);
728         if (lfsck->li_lpf_obj != NULL)
729                 GOTO(unlock, rc = 0);
730
731         if (fid_is_zero(&bk->lb_lpf_fid)) {
732                 /* There is corner case that: in former LFSCK scanning we have
733                  * created the .lustre/lost+found/MDTxxxx but failed to update
734                  * the lfsck_bookmark::lb_lpf_fid successfully. So need lookup
735                  * it from MDT0 firstly. */
736                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
737                                (const struct dt_key *)name, BYPASS_CAPA);
738                 if (rc != 0 && rc != -ENOENT)
739                         GOTO(unlock, rc);
740
741                 if (rc == 0) {
742                         bk->lb_lpf_fid = *cfid;
743                         rc = lfsck_bookmark_store(env, lfsck);
744                 } else {
745                         rc = lfsck_fid_alloc(env, lfsck, cfid, true);
746                 }
747                 if (rc != 0)
748                         GOTO(unlock, rc);
749         } else {
750                 *cfid = bk->lb_lpf_fid;
751         }
752
753         child = lfsck_object_find_by_dev(env, lfsck->li_bottom, cfid);
754         if (IS_ERR(child))
755                 GOTO(unlock, rc = PTR_ERR(child));
756
757         if (dt_object_exists(child) != 0) {
758                 if (unlikely(!dt_try_as_dir(env, child)))
759                         rc = -ENOTDIR;
760                 else
761                         lfsck->li_lpf_obj = child;
762
763                 GOTO(unlock, rc);
764         }
765
766         memset(la, 0, sizeof(*la));
767         la->la_atime = la->la_mtime = la->la_ctime = cfs_time_current_sec();
768         la->la_mode = S_IFDIR | S_IRWXU;
769         la->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
770                        LA_UID | LA_GID;
771         memset(dof, 0, sizeof(*dof));
772         dof->dof_type = dt_mode_to_dft(S_IFDIR);
773
774         if (node == 0)
775                 rc = lfsck_create_lpf_local(env, lfsck, parent, child, la,
776                                             dof, name);
777         else
778                 rc = lfsck_create_lpf_remote(env, lfsck, parent, child, la,
779                                              dof, name);
780         if (rc == 0)
781                 lfsck->li_lpf_obj = child;
782
783         GOTO(unlock, rc);
784
785 unlock:
786         mutex_unlock(&lfsck->li_mutex);
787         if (rc != 0 && child != NULL && !IS_ERR(child))
788                 lu_object_put(env, &child->do_lu);
789 out:
790         if (parent != NULL && !IS_ERR(parent))
791                 lu_object_put(env, &parent->do_lu);
792
793         return rc;
794 }
795
796 static int lfsck_fid_init(struct lfsck_instance *lfsck)
797 {
798         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
799         struct seq_server_site  *ss;
800         char                    *prefix;
801         int                      rc     = 0;
802         ENTRY;
803
804         ss = lu_site2seq(lfsck->li_bottom->dd_lu_dev.ld_site);
805         if (unlikely(ss == NULL))
806                 RETURN(-ENXIO);
807
808         OBD_ALLOC_PTR(lfsck->li_seq);
809         if (lfsck->li_seq == NULL)
810                 RETURN(-ENOMEM);
811
812         OBD_ALLOC(prefix, MAX_OBD_NAME + 7);
813         if (prefix == NULL)
814                 GOTO(out, rc = -ENOMEM);
815
816         snprintf(prefix, MAX_OBD_NAME + 7, "lfsck-%s", lfsck_lfsck2name(lfsck));
817         rc = seq_client_init(lfsck->li_seq, NULL, LUSTRE_SEQ_METADATA, prefix,
818                              ss->ss_server_seq);
819         OBD_FREE(prefix, MAX_OBD_NAME + 7);
820         if (rc != 0)
821                 GOTO(out, rc);
822
823         if (fid_is_sane(&bk->lb_last_fid))
824                 lfsck->li_seq->lcs_fid = bk->lb_last_fid;
825
826         RETURN(0);
827
828 out:
829         OBD_FREE_PTR(lfsck->li_seq);
830         lfsck->li_seq = NULL;
831
832         return rc;
833 }
834
835 static void lfsck_fid_fini(struct lfsck_instance *lfsck)
836 {
837         if (lfsck->li_seq != NULL) {
838                 seq_client_fini(lfsck->li_seq);
839                 OBD_FREE_PTR(lfsck->li_seq);
840                 lfsck->li_seq = NULL;
841         }
842 }
843
844 void lfsck_instance_cleanup(const struct lu_env *env,
845                             struct lfsck_instance *lfsck)
846 {
847         struct ptlrpc_thread    *thread = &lfsck->li_thread;
848         struct lfsck_component  *com;
849         struct lfsck_component  *next;
850         ENTRY;
851
852         LASSERT(list_empty(&lfsck->li_link));
853         LASSERT(thread_is_init(thread) || thread_is_stopped(thread));
854
855         if (lfsck->li_obj_oit != NULL) {
856                 lu_object_put_nocache(env, &lfsck->li_obj_oit->do_lu);
857                 lfsck->li_obj_oit = NULL;
858         }
859
860         LASSERT(lfsck->li_obj_dir == NULL);
861
862         list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
863                 lfsck_component_cleanup(env, com);
864         }
865
866         LASSERT(list_empty(&lfsck->li_list_dir));
867
868         list_for_each_entry_safe(com, next, &lfsck->li_list_double_scan,
869                                  lc_link) {
870                 lfsck_component_cleanup(env, com);
871         }
872
873         list_for_each_entry_safe(com, next, &lfsck->li_list_idle, lc_link) {
874                 lfsck_component_cleanup(env, com);
875         }
876
877         lfsck_tgt_descs_fini(&lfsck->li_ost_descs);
878         lfsck_tgt_descs_fini(&lfsck->li_mdt_descs);
879
880         if (lfsck->li_bookmark_obj != NULL) {
881                 lu_object_put_nocache(env, &lfsck->li_bookmark_obj->do_lu);
882                 lfsck->li_bookmark_obj = NULL;
883         }
884
885         if (lfsck->li_lpf_obj != NULL) {
886                 lu_object_put(env, &lfsck->li_lpf_obj->do_lu);
887                 lfsck->li_lpf_obj = NULL;
888         }
889
890         if (lfsck->li_los != NULL) {
891                 local_oid_storage_fini(env, lfsck->li_los);
892                 lfsck->li_los = NULL;
893         }
894
895         lfsck_fid_fini(lfsck);
896
897         OBD_FREE_PTR(lfsck);
898 }
899
900 static inline struct lfsck_instance *
901 __lfsck_instance_find(struct dt_device *key, bool ref, bool unlink)
902 {
903         struct lfsck_instance *lfsck;
904
905         list_for_each_entry(lfsck, &lfsck_instance_list, li_link) {
906                 if (lfsck->li_bottom == key) {
907                         if (ref)
908                                 lfsck_instance_get(lfsck);
909                         if (unlink)
910                                 list_del_init(&lfsck->li_link);
911
912                         return lfsck;
913                 }
914         }
915
916         return NULL;
917 }
918
919 struct lfsck_instance *lfsck_instance_find(struct dt_device *key, bool ref,
920                                            bool unlink)
921 {
922         struct lfsck_instance *lfsck;
923
924         spin_lock(&lfsck_instance_lock);
925         lfsck = __lfsck_instance_find(key, ref, unlink);
926         spin_unlock(&lfsck_instance_lock);
927
928         return lfsck;
929 }
930
931 static inline int lfsck_instance_add(struct lfsck_instance *lfsck)
932 {
933         struct lfsck_instance *tmp;
934
935         spin_lock(&lfsck_instance_lock);
936         list_for_each_entry(tmp, &lfsck_instance_list, li_link) {
937                 if (lfsck->li_bottom == tmp->li_bottom) {
938                         spin_unlock(&lfsck_instance_lock);
939                         return -EEXIST;
940                 }
941         }
942
943         list_add_tail(&lfsck->li_link, &lfsck_instance_list);
944         spin_unlock(&lfsck_instance_lock);
945         return 0;
946 }
947
948 int lfsck_bits_dump(struct seq_file *m, int bits, const char *names[],
949                     const char *prefix)
950 {
951         int flag;
952         int i;
953         bool newline = (bits != 0 ? false : true);
954
955         seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
956
957         for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
958                 if (flag & bits) {
959                         bits &= ~flag;
960                         if (names[i] != NULL) {
961                                 if (bits == 0)
962                                         newline = true;
963
964                                 seq_printf(m, "%s%c", names[i],
965                                            newline ? '\n' : ',');
966                         }
967                 }
968         }
969
970         if (!newline)
971                 seq_printf(m, "\n");
972         return 0;
973 }
974
975 int lfsck_time_dump(struct seq_file *m, __u64 time, const char *prefix)
976 {
977         if (time != 0)
978                 seq_printf(m, "%s: "LPU64" seconds\n", prefix,
979                           cfs_time_current_sec() - time);
980         else
981                 seq_printf(m, "%s: N/A\n", prefix);
982         return 0;
983 }
984
985 int lfsck_pos_dump(struct seq_file *m, struct lfsck_position *pos,
986                    const char *prefix)
987 {
988         if (fid_is_zero(&pos->lp_dir_parent)) {
989                 if (pos->lp_oit_cookie == 0)
990                         seq_printf(m, "%s: N/A, N/A, N/A\n",
991                                    prefix);
992                 else
993                         seq_printf(m, "%s: "LPU64", N/A, N/A\n",
994                                    prefix, pos->lp_oit_cookie);
995         } else {
996                 seq_printf(m, "%s: "LPU64", "DFID", "LPX64"\n",
997                            prefix, pos->lp_oit_cookie,
998                            PFID(&pos->lp_dir_parent), pos->lp_dir_cookie);
999         }
1000         return 0;
1001 }
1002
1003 void lfsck_pos_fill(const struct lu_env *env, struct lfsck_instance *lfsck,
1004                     struct lfsck_position *pos, bool init)
1005 {
1006         const struct dt_it_ops *iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1007
1008         if (unlikely(lfsck->li_di_oit == NULL)) {
1009                 memset(pos, 0, sizeof(*pos));
1010                 return;
1011         }
1012
1013         pos->lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1014         if (!lfsck->li_current_oit_processed && !init)
1015                 pos->lp_oit_cookie--;
1016
1017         LASSERT(pos->lp_oit_cookie > 0);
1018
1019         if (lfsck->li_di_dir != NULL) {
1020                 struct dt_object *dto = lfsck->li_obj_dir;
1021
1022                 pos->lp_dir_cookie = dto->do_index_ops->dio_it.store(env,
1023                                                         lfsck->li_di_dir);
1024
1025                 if (pos->lp_dir_cookie >= MDS_DIR_END_OFF) {
1026                         fid_zero(&pos->lp_dir_parent);
1027                         pos->lp_dir_cookie = 0;
1028                 } else {
1029                         pos->lp_dir_parent = *lfsck_dto2fid(dto);
1030                 }
1031         } else {
1032                 fid_zero(&pos->lp_dir_parent);
1033                 pos->lp_dir_cookie = 0;
1034         }
1035 }
1036
1037 bool __lfsck_set_speed(struct lfsck_instance *lfsck, __u32 limit)
1038 {
1039         bool dirty = false;
1040
1041         if (limit != LFSCK_SPEED_NO_LIMIT) {
1042                 if (limit > HZ) {
1043                         lfsck->li_sleep_rate = limit / HZ;
1044                         lfsck->li_sleep_jif = 1;
1045                 } else {
1046                         lfsck->li_sleep_rate = 1;
1047                         lfsck->li_sleep_jif = HZ / limit;
1048                 }
1049         } else {
1050                 lfsck->li_sleep_jif = 0;
1051                 lfsck->li_sleep_rate = 0;
1052         }
1053
1054         if (lfsck->li_bookmark_ram.lb_speed_limit != limit) {
1055                 lfsck->li_bookmark_ram.lb_speed_limit = limit;
1056                 dirty = true;
1057         }
1058
1059         return dirty;
1060 }
1061
1062 void lfsck_control_speed(struct lfsck_instance *lfsck)
1063 {
1064         struct ptlrpc_thread *thread = &lfsck->li_thread;
1065         struct l_wait_info    lwi;
1066
1067         if (lfsck->li_sleep_jif > 0 &&
1068             lfsck->li_new_scanned >= lfsck->li_sleep_rate) {
1069                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1070                                        LWI_ON_SIGNAL_NOOP, NULL);
1071
1072                 l_wait_event(thread->t_ctl_waitq,
1073                              !thread_is_running(thread),
1074                              &lwi);
1075                 lfsck->li_new_scanned = 0;
1076         }
1077 }
1078
1079 void lfsck_control_speed_by_self(struct lfsck_component *com)
1080 {
1081         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1082         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1083         struct l_wait_info       lwi;
1084
1085         if (lfsck->li_sleep_jif > 0 &&
1086             com->lc_new_scanned >= lfsck->li_sleep_rate) {
1087                 lwi = LWI_TIMEOUT_INTR(lfsck->li_sleep_jif, NULL,
1088                                        LWI_ON_SIGNAL_NOOP, NULL);
1089
1090                 l_wait_event(thread->t_ctl_waitq,
1091                              !thread_is_running(thread),
1092                              &lwi);
1093                 com->lc_new_scanned = 0;
1094         }
1095 }
1096
1097 struct lfsck_thread_args *lfsck_thread_args_init(struct lfsck_instance *lfsck,
1098                                                  struct lfsck_component *com,
1099                                                  struct lfsck_start_param *lsp)
1100 {
1101         struct lfsck_thread_args *lta;
1102         int                       rc;
1103
1104         OBD_ALLOC_PTR(lta);
1105         if (lta == NULL)
1106                 return ERR_PTR(-ENOMEM);
1107
1108         rc = lu_env_init(&lta->lta_env, LCT_MD_THREAD | LCT_DT_THREAD);
1109         if (rc != 0) {
1110                 OBD_FREE_PTR(lta);
1111                 return ERR_PTR(rc);
1112         }
1113
1114         lta->lta_lfsck = lfsck_instance_get(lfsck);
1115         if (com != NULL)
1116                 lta->lta_com = lfsck_component_get(com);
1117
1118         lta->lta_lsp = lsp;
1119
1120         return lta;
1121 }
1122
1123 void lfsck_thread_args_fini(struct lfsck_thread_args *lta)
1124 {
1125         if (lta->lta_com != NULL)
1126                 lfsck_component_put(&lta->lta_env, lta->lta_com);
1127         lfsck_instance_put(&lta->lta_env, lta->lta_lfsck);
1128         lu_env_fini(&lta->lta_env);
1129         OBD_FREE_PTR(lta);
1130 }
1131
1132 static void lfsck_interpret(const struct lu_env *env,
1133                             struct lfsck_instance *lfsck,
1134                             struct ptlrpc_request *req, void *args, int result)
1135 {
1136         struct lfsck_async_interpret_args *laia = args;
1137         struct lfsck_component            *com;
1138
1139         LASSERT(laia->laia_com == NULL);
1140         LASSERT(laia->laia_shared);
1141
1142         spin_lock(&lfsck->li_lock);
1143         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1144                 if (com->lc_ops->lfsck_interpret != NULL) {
1145                         laia->laia_com = com;
1146                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1147                 }
1148         }
1149
1150         list_for_each_entry(com, &lfsck->li_list_double_scan, lc_link) {
1151                 if (com->lc_ops->lfsck_interpret != NULL) {
1152                         laia->laia_com = com;
1153                         com->lc_ops->lfsck_interpret(env, req, laia, result);
1154                 }
1155         }
1156         spin_unlock(&lfsck->li_lock);
1157 }
1158
1159 static int lfsck_stop_notify(const struct lu_env *env,
1160                              struct lfsck_instance *lfsck,
1161                              struct lfsck_tgt_descs *ltds,
1162                              struct lfsck_tgt_desc *ltd, __u16 type)
1163 {
1164         struct ptlrpc_request_set *set;
1165         struct lfsck_component    *com;
1166         int                        rc  = 0;
1167         ENTRY;
1168
1169         spin_lock(&lfsck->li_lock);
1170         com = __lfsck_component_find(lfsck, type, &lfsck->li_list_scan);
1171         if (com == NULL)
1172                 com = __lfsck_component_find(lfsck, type,
1173                                              &lfsck->li_list_double_scan);
1174         if (com != NULL)
1175                 lfsck_component_get(com);
1176         spin_unlock(&lfsck->li_lock);
1177
1178         if (com != NULL) {
1179                 if (com->lc_ops->lfsck_stop_notify != NULL) {
1180                         set = ptlrpc_prep_set();
1181                         if (set == NULL) {
1182                                 lfsck_component_put(env, com);
1183
1184                                 RETURN(-ENOMEM);
1185                         }
1186
1187                         rc = com->lc_ops->lfsck_stop_notify(env, com, ltds,
1188                                                             ltd, set);
1189                         if (rc == 0)
1190                                 rc = ptlrpc_set_wait(set);
1191
1192                         ptlrpc_set_destroy(set);
1193                 }
1194
1195                 lfsck_component_put(env, com);
1196         }
1197
1198         RETURN(rc);
1199 }
1200
1201 static int lfsck_async_interpret(const struct lu_env *env,
1202                                  struct ptlrpc_request *req,
1203                                  void *args, int rc)
1204 {
1205         struct lfsck_async_interpret_args *laia = args;
1206         struct lfsck_instance             *lfsck;
1207
1208         lfsck = container_of0(laia->laia_ltds, struct lfsck_instance,
1209                               li_mdt_descs);
1210         lfsck_interpret(env, lfsck, req, laia, rc);
1211         lfsck_tgt_put(laia->laia_ltd);
1212         if (rc != 0 && laia->laia_result != -EALREADY)
1213                 laia->laia_result = rc;
1214
1215         return 0;
1216 }
1217
1218 int lfsck_async_request(const struct lu_env *env, struct obd_export *exp,
1219                         struct lfsck_request *lr,
1220                         struct ptlrpc_request_set *set,
1221                         ptlrpc_interpterer_t interpreter,
1222                         void *args, int request)
1223 {
1224         struct lfsck_async_interpret_args *laia;
1225         struct ptlrpc_request             *req;
1226         struct lfsck_request              *tmp;
1227         struct req_format                 *format;
1228         int                                rc;
1229
1230         switch (request) {
1231         case LFSCK_NOTIFY:
1232                 format = &RQF_LFSCK_NOTIFY;
1233                 break;
1234         case LFSCK_QUERY:
1235                 format = &RQF_LFSCK_QUERY;
1236                 break;
1237         default:
1238                 CDEBUG(D_LFSCK, "%s: unknown async request %d: rc = %d\n",
1239                        exp->exp_obd->obd_name, request, -EINVAL);
1240                 return -EINVAL;
1241         }
1242
1243         req = ptlrpc_request_alloc(class_exp2cliimp(exp), format);
1244         if (req == NULL)
1245                 return -ENOMEM;
1246
1247         rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, request);
1248         if (rc != 0) {
1249                 ptlrpc_request_free(req);
1250
1251                 return rc;
1252         }
1253
1254         tmp = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
1255         *tmp = *lr;
1256         ptlrpc_request_set_replen(req);
1257
1258         laia = ptlrpc_req_async_args(req);
1259         *laia = *(struct lfsck_async_interpret_args *)args;
1260         if (laia->laia_com != NULL)
1261                 lfsck_component_get(laia->laia_com);
1262         req->rq_interpret_reply = interpreter;
1263         ptlrpc_set_add_req(set, req);
1264
1265         return 0;
1266 }
1267
1268 /* external interfaces */
1269
1270 int lfsck_get_speed(struct seq_file *m, struct dt_device *key)
1271 {
1272         struct lu_env           env;
1273         struct lfsck_instance  *lfsck;
1274         int                     rc;
1275         ENTRY;
1276
1277         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1278         if (rc != 0)
1279                 RETURN(rc);
1280
1281         lfsck = lfsck_instance_find(key, true, false);
1282         if (likely(lfsck != NULL)) {
1283                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_speed_limit);
1284                 lfsck_instance_put(&env, lfsck);
1285         } else {
1286                 rc = -ENXIO;
1287         }
1288
1289         lu_env_fini(&env);
1290
1291         RETURN(rc);
1292 }
1293 EXPORT_SYMBOL(lfsck_get_speed);
1294
1295 int lfsck_set_speed(struct dt_device *key, int val)
1296 {
1297         struct lu_env           env;
1298         struct lfsck_instance  *lfsck;
1299         int                     rc;
1300         ENTRY;
1301
1302         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1303         if (rc != 0)
1304                 RETURN(rc);
1305
1306         lfsck = lfsck_instance_find(key, true, false);
1307         if (likely(lfsck != NULL)) {
1308                 mutex_lock(&lfsck->li_mutex);
1309                 if (__lfsck_set_speed(lfsck, val))
1310                         rc = lfsck_bookmark_store(&env, lfsck);
1311                 mutex_unlock(&lfsck->li_mutex);
1312                 lfsck_instance_put(&env, lfsck);
1313         } else {
1314                 rc = -ENXIO;
1315         }
1316
1317         lu_env_fini(&env);
1318
1319         RETURN(rc);
1320 }
1321 EXPORT_SYMBOL(lfsck_set_speed);
1322
1323 int lfsck_get_windows(struct seq_file *m, struct dt_device *key)
1324 {
1325         struct lu_env           env;
1326         struct lfsck_instance  *lfsck;
1327         int                     rc;
1328         ENTRY;
1329
1330         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1331         if (rc != 0)
1332                 RETURN(rc);
1333
1334         lfsck = lfsck_instance_find(key, true, false);
1335         if (likely(lfsck != NULL)) {
1336                 seq_printf(m, "%u\n", lfsck->li_bookmark_ram.lb_async_windows);
1337                 lfsck_instance_put(&env, lfsck);
1338         } else {
1339                 rc = -ENXIO;
1340         }
1341
1342         lu_env_fini(&env);
1343
1344         RETURN(rc);
1345 }
1346 EXPORT_SYMBOL(lfsck_get_windows);
1347
1348 int lfsck_set_windows(struct dt_device *key, int val)
1349 {
1350         struct lu_env           env;
1351         struct lfsck_instance  *lfsck;
1352         int                     rc;
1353         ENTRY;
1354
1355         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1356         if (rc != 0)
1357                 RETURN(rc);
1358
1359         lfsck = lfsck_instance_find(key, true, false);
1360         if (likely(lfsck != NULL)) {
1361                 if (val > LFSCK_ASYNC_WIN_MAX) {
1362                         CWARN("%s: Too large async window size, which "
1363                               "may cause memory issues. The valid range "
1364                               "is [0 - %u]. If you do not want to restrict "
1365                               "the window size for async requests pipeline, "
1366                               "just set it as 0.\n",
1367                               lfsck_lfsck2name(lfsck), LFSCK_ASYNC_WIN_MAX);
1368                         rc = -EINVAL;
1369                 } else if (lfsck->li_bookmark_ram.lb_async_windows != val) {
1370                         mutex_lock(&lfsck->li_mutex);
1371                         lfsck->li_bookmark_ram.lb_async_windows = val;
1372                         rc = lfsck_bookmark_store(&env, lfsck);
1373                         mutex_unlock(&lfsck->li_mutex);
1374                 }
1375                 lfsck_instance_put(&env, lfsck);
1376         } else {
1377                 rc = -ENXIO;
1378         }
1379
1380         lu_env_fini(&env);
1381
1382         RETURN(rc);
1383 }
1384 EXPORT_SYMBOL(lfsck_set_windows);
1385
1386 int lfsck_dump(struct seq_file *m, struct dt_device *key, enum lfsck_type type)
1387 {
1388         struct lu_env           env;
1389         struct lfsck_instance  *lfsck;
1390         struct lfsck_component *com;
1391         int                     rc;
1392         ENTRY;
1393
1394         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD);
1395         if (rc != 0)
1396                 RETURN(rc);
1397
1398         lfsck = lfsck_instance_find(key, true, false);
1399         if (likely(lfsck != NULL)) {
1400                 com = lfsck_component_find(lfsck, type);
1401                 if (likely(com != NULL)) {
1402                         rc = com->lc_ops->lfsck_dump(&env, com, m);
1403                         lfsck_component_put(&env, com);
1404                 } else {
1405                         rc = -ENOTSUPP;
1406                 }
1407
1408                 lfsck_instance_put(&env, lfsck);
1409         } else {
1410                 rc = -ENXIO;
1411         }
1412
1413         lu_env_fini(&env);
1414
1415         RETURN(rc);
1416 }
1417 EXPORT_SYMBOL(lfsck_dump);
1418
1419 static int lfsck_stop_all(const struct lu_env *env,
1420                           struct lfsck_instance *lfsck,
1421                           struct lfsck_stop *stop)
1422 {
1423         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1424         struct lfsck_request              *lr     = &info->lti_lr;
1425         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1426         struct ptlrpc_request_set         *set;
1427         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1428         struct lfsck_tgt_desc             *ltd;
1429         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1430         __u32                              idx;
1431         int                                rc     = 0;
1432         int                                rc1    = 0;
1433         ENTRY;
1434
1435         LASSERT(stop->ls_flags & LPF_BROADCAST);
1436
1437         set = ptlrpc_prep_set();
1438         if (unlikely(set == NULL))
1439                 RETURN(-ENOMEM);
1440
1441         memset(lr, 0, sizeof(*lr));
1442         lr->lr_event = LE_STOP;
1443         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1444         lr->lr_status = stop->ls_status;
1445         lr->lr_version = bk->lb_version;
1446         lr->lr_active = LFSCK_TYPES_ALL;
1447         lr->lr_param = stop->ls_flags;
1448
1449         laia->laia_com = NULL;
1450         laia->laia_ltds = ltds;
1451         laia->laia_lr = lr;
1452         laia->laia_result = 0;
1453         laia->laia_shared = 1;
1454
1455         down_read(&ltds->ltd_rw_sem);
1456         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1457                 ltd = lfsck_tgt_get(ltds, idx);
1458                 LASSERT(ltd != NULL);
1459
1460                 laia->laia_ltd = ltd;
1461                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1462                                          lfsck_async_interpret, laia,
1463                                          LFSCK_NOTIFY);
1464                 if (rc != 0) {
1465                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1466                         lfsck_tgt_put(ltd);
1467                         CERROR("%s: cannot notify MDT %x for LFSCK stop: "
1468                                "rc = %d\n", lfsck_lfsck2name(lfsck), idx, rc);
1469                         rc1 = rc;
1470                 }
1471         }
1472         up_read(&ltds->ltd_rw_sem);
1473
1474         rc = ptlrpc_set_wait(set);
1475         ptlrpc_set_destroy(set);
1476
1477         if (rc == 0)
1478                 rc = laia->laia_result;
1479
1480         if (rc == -EALREADY)
1481                 rc = 0;
1482
1483         if (rc != 0)
1484                 CERROR("%s: fail to stop LFSCK on some MDTs: rc = %d\n",
1485                        lfsck_lfsck2name(lfsck), rc);
1486
1487         RETURN(rc != 0 ? rc : rc1);
1488 }
1489
1490 static int lfsck_start_all(const struct lu_env *env,
1491                            struct lfsck_instance *lfsck,
1492                            struct lfsck_start *start)
1493 {
1494         struct lfsck_thread_info          *info   = lfsck_env_info(env);
1495         struct lfsck_request              *lr     = &info->lti_lr;
1496         struct lfsck_async_interpret_args *laia   = &info->lti_laia;
1497         struct ptlrpc_request_set         *set;
1498         struct lfsck_tgt_descs            *ltds   = &lfsck->li_mdt_descs;
1499         struct lfsck_tgt_desc             *ltd;
1500         struct lfsck_bookmark             *bk     = &lfsck->li_bookmark_ram;
1501         __u32                              idx;
1502         int                                rc     = 0;
1503         ENTRY;
1504
1505         LASSERT(start->ls_flags & LPF_BROADCAST);
1506
1507         set = ptlrpc_prep_set();
1508         if (unlikely(set == NULL))
1509                 RETURN(-ENOMEM);
1510
1511         memset(lr, 0, sizeof(*lr));
1512         lr->lr_event = LE_START;
1513         lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
1514         lr->lr_speed = bk->lb_speed_limit;
1515         lr->lr_version = bk->lb_version;
1516         lr->lr_active = start->ls_active;
1517         lr->lr_param = start->ls_flags;
1518         lr->lr_async_windows = bk->lb_async_windows;
1519         lr->lr_valid = LSV_SPEED_LIMIT | LSV_ERROR_HANDLE | LSV_DRYRUN |
1520                        LSV_ASYNC_WINDOWS;
1521
1522         laia->laia_com = NULL;
1523         laia->laia_ltds = ltds;
1524         laia->laia_lr = lr;
1525         laia->laia_result = 0;
1526         laia->laia_shared = 1;
1527
1528         down_read(&ltds->ltd_rw_sem);
1529         cfs_foreach_bit(ltds->ltd_tgts_bitmap, idx) {
1530                 ltd = lfsck_tgt_get(ltds, idx);
1531                 LASSERT(ltd != NULL);
1532
1533                 laia->laia_ltd = ltd;
1534                 ltd->ltd_layout_done = 0;
1535                 rc = lfsck_async_request(env, ltd->ltd_exp, lr, set,
1536                                          lfsck_async_interpret, laia,
1537                                          LFSCK_NOTIFY);
1538                 if (rc != 0) {
1539                         lfsck_interpret(env, lfsck, NULL, laia, rc);
1540                         lfsck_tgt_put(ltd);
1541                         CERROR("%s: cannot notify MDT %x for LFSCK "
1542                                "start, failout: rc = %d\n",
1543                                lfsck_lfsck2name(lfsck), idx, rc);
1544                         break;
1545                 }
1546         }
1547         up_read(&ltds->ltd_rw_sem);
1548
1549         if (rc != 0) {
1550                 ptlrpc_set_destroy(set);
1551
1552                 RETURN(rc);
1553         }
1554
1555         rc = ptlrpc_set_wait(set);
1556         ptlrpc_set_destroy(set);
1557
1558         if (rc == 0)
1559                 rc = laia->laia_result;
1560
1561         if (rc != 0) {
1562                 struct lfsck_stop *stop = &info->lti_stop;
1563
1564                 CERROR("%s: cannot start LFSCK on some MDTs, "
1565                        "stop all: rc = %d\n",
1566                        lfsck_lfsck2name(lfsck), rc);
1567                 if (rc != -EALREADY) {
1568                         stop->ls_status = LS_FAILED;
1569                         stop->ls_flags = LPF_ALL_TGT | LPF_BROADCAST;
1570                         lfsck_stop_all(env, lfsck, stop);
1571                 }
1572         }
1573
1574         RETURN(rc);
1575 }
1576
1577 int lfsck_start(const struct lu_env *env, struct dt_device *key,
1578                 struct lfsck_start_param *lsp)
1579 {
1580         struct lfsck_start              *start  = lsp->lsp_start;
1581         struct lfsck_instance           *lfsck;
1582         struct lfsck_bookmark           *bk;
1583         struct ptlrpc_thread            *thread;
1584         struct lfsck_component          *com;
1585         struct l_wait_info               lwi    = { 0 };
1586         struct lfsck_thread_args        *lta;
1587         struct task_struct              *task;
1588         int                              rc     = 0;
1589         __u16                            valid  = 0;
1590         __u16                            flags  = 0;
1591         __u16                            type   = 1;
1592         ENTRY;
1593
1594         lfsck = lfsck_instance_find(key, true, false);
1595         if (unlikely(lfsck == NULL))
1596                 RETURN(-ENXIO);
1597
1598         /* System is not ready, try again later. */
1599         if (unlikely(lfsck->li_namespace == NULL))
1600                 GOTO(put, rc = -EAGAIN);
1601
1602         /* start == NULL means auto trigger paused LFSCK. */
1603         if ((start == NULL) &&
1604             (list_empty(&lfsck->li_list_scan) ||
1605              OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_AUTO)))
1606                 GOTO(put, rc = 0);
1607
1608         bk = &lfsck->li_bookmark_ram;
1609         thread = &lfsck->li_thread;
1610         mutex_lock(&lfsck->li_mutex);
1611         spin_lock(&lfsck->li_lock);
1612         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1613                 rc = -EALREADY;
1614                 if (unlikely(start == NULL)) {
1615                         spin_unlock(&lfsck->li_lock);
1616                         GOTO(out, rc);
1617                 }
1618
1619                 while (start->ls_active != 0) {
1620                         if (!(type & start->ls_active)) {
1621                                 type <<= 1;
1622                                 continue;
1623                         }
1624
1625                         com = __lfsck_component_find(lfsck, type,
1626                                                      &lfsck->li_list_scan);
1627                         if (com == NULL)
1628                                 com = __lfsck_component_find(lfsck, type,
1629                                                 &lfsck->li_list_double_scan);
1630                         if (com == NULL) {
1631                                 rc = -EOPNOTSUPP;
1632                                 break;
1633                         }
1634
1635                         if (com->lc_ops->lfsck_join != NULL) {
1636                                 rc = com->lc_ops->lfsck_join( env, com, lsp);
1637                                 if (rc != 0 && rc != -EALREADY)
1638                                         break;
1639                         }
1640                         start->ls_active &= ~type;
1641                         type <<= 1;
1642                 }
1643                 spin_unlock(&lfsck->li_lock);
1644                 GOTO(out, rc);
1645         }
1646         spin_unlock(&lfsck->li_lock);
1647
1648         lfsck->li_status = 0;
1649         lfsck->li_oit_over = 0;
1650         lfsck->li_start_unplug = 0;
1651         lfsck->li_drop_dryrun = 0;
1652         lfsck->li_new_scanned = 0;
1653
1654         /* For auto trigger. */
1655         if (start == NULL)
1656                 goto trigger;
1657
1658         if (start->ls_flags & LPF_BROADCAST && !lfsck->li_master) {
1659                 CERROR("%s: only allow to specify '-A | -o' via MDS\n",
1660                        lfsck_lfsck2name(lfsck));
1661
1662                 GOTO(out, rc = -EPERM);
1663         }
1664
1665         start->ls_version = bk->lb_version;
1666
1667         if (start->ls_active != 0) {
1668                 struct lfsck_component *next;
1669
1670                 if (start->ls_active == LFSCK_TYPES_ALL)
1671                         start->ls_active = LFSCK_TYPES_SUPPORTED;
1672
1673                 if (start->ls_active & ~LFSCK_TYPES_SUPPORTED) {
1674                         start->ls_active &= ~LFSCK_TYPES_SUPPORTED;
1675                         GOTO(out, rc = -ENOTSUPP);
1676                 }
1677
1678                 list_for_each_entry_safe(com, next,
1679                                          &lfsck->li_list_scan, lc_link) {
1680                         if (!(com->lc_type & start->ls_active)) {
1681                                 rc = com->lc_ops->lfsck_post(env, com, 0,
1682                                                              false);
1683                                 if (rc != 0)
1684                                         GOTO(out, rc);
1685                         }
1686                 }
1687
1688                 while (start->ls_active != 0) {
1689                         if (type & start->ls_active) {
1690                                 com = __lfsck_component_find(lfsck, type,
1691                                                         &lfsck->li_list_idle);
1692                                 if (com != NULL)
1693                                         /* The component status will be updated
1694                                          * when its prep() is called later by
1695                                          * the LFSCK main engine. */
1696                                         list_move_tail(&com->lc_link,
1697                                                        &lfsck->li_list_scan);
1698                                 start->ls_active &= ~type;
1699                         }
1700                         type <<= 1;
1701                 }
1702         }
1703
1704         if (list_empty(&lfsck->li_list_scan)) {
1705                 /* The speed limit will be used to control both the LFSCK and
1706                  * low layer scrub (if applied), need to be handled firstly. */
1707                 if (start->ls_valid & LSV_SPEED_LIMIT) {
1708                         if (__lfsck_set_speed(lfsck, start->ls_speed_limit)) {
1709                                 rc = lfsck_bookmark_store(env, lfsck);
1710                                 if (rc != 0)
1711                                         GOTO(out, rc);
1712                         }
1713                 }
1714
1715                 goto trigger;
1716         }
1717
1718         if (start->ls_flags & LPF_RESET)
1719                 flags |= DOIF_RESET;
1720
1721         rc = lfsck_set_param(env, lfsck, start, !!(flags & DOIF_RESET));
1722         if (rc != 0)
1723                 GOTO(out, rc);
1724
1725         list_for_each_entry(com, &lfsck->li_list_scan, lc_link) {
1726                 start->ls_active |= com->lc_type;
1727                 if (flags & DOIF_RESET) {
1728                         rc = com->lc_ops->lfsck_reset(env, com, false);
1729                         if (rc != 0)
1730                                 GOTO(out, rc);
1731                 }
1732         }
1733
1734 trigger:
1735         lfsck->li_args_dir = LUDA_64BITHASH | LUDA_VERIFY;
1736         if (bk->lb_param & LPF_DRYRUN)
1737                 lfsck->li_args_dir |= LUDA_VERIFY_DRYRUN;
1738
1739         if (start != NULL && start->ls_valid & LSV_ERROR_HANDLE) {
1740                 valid |= DOIV_ERROR_HANDLE;
1741                 if (start->ls_flags & LPF_FAILOUT)
1742                         flags |= DOIF_FAILOUT;
1743         }
1744
1745         if (start != NULL && start->ls_valid & LSV_DRYRUN) {
1746                 valid |= DOIV_DRYRUN;
1747                 if (start->ls_flags & LPF_DRYRUN)
1748                         flags |= DOIF_DRYRUN;
1749         }
1750
1751         if (!list_empty(&lfsck->li_list_scan))
1752                 flags |= DOIF_OUTUSED;
1753
1754         lfsck->li_args_oit = (flags << DT_OTABLE_IT_FLAGS_SHIFT) | valid;
1755         thread_set_flags(thread, 0);
1756         lta = lfsck_thread_args_init(lfsck, NULL, lsp);
1757         if (IS_ERR(lta))
1758                 GOTO(out, rc = PTR_ERR(lta));
1759
1760         __lfsck_set_speed(lfsck, bk->lb_speed_limit);
1761         task = kthread_run(lfsck_master_engine, lta, "lfsck");
1762         if (IS_ERR(task)) {
1763                 rc = PTR_ERR(task);
1764                 CERROR("%s: cannot start LFSCK thread: rc = %d\n",
1765                        lfsck_lfsck2name(lfsck), rc);
1766                 lfsck_thread_args_fini(lta);
1767
1768                 GOTO(out, rc);
1769         }
1770
1771         l_wait_event(thread->t_ctl_waitq,
1772                      thread_is_running(thread) ||
1773                      thread_is_stopped(thread),
1774                      &lwi);
1775         if (start == NULL || !(start->ls_flags & LPF_BROADCAST)) {
1776                 lfsck->li_start_unplug = 1;
1777                 wake_up_all(&thread->t_ctl_waitq);
1778
1779                 GOTO(out, rc = 0);
1780         }
1781
1782         /* release lfsck::li_mutex to avoid deadlock. */
1783         mutex_unlock(&lfsck->li_mutex);
1784         rc = lfsck_start_all(env, lfsck, start);
1785         if (rc != 0) {
1786                 spin_lock(&lfsck->li_lock);
1787                 if (thread_is_stopped(thread)) {
1788                         spin_unlock(&lfsck->li_lock);
1789                 } else {
1790                         lfsck->li_status = LS_FAILED;
1791                         lfsck->li_flags = 0;
1792                         thread_set_flags(thread, SVC_STOPPING);
1793                         spin_unlock(&lfsck->li_lock);
1794
1795                         lfsck->li_start_unplug = 1;
1796                         wake_up_all(&thread->t_ctl_waitq);
1797                         l_wait_event(thread->t_ctl_waitq,
1798                                      thread_is_stopped(thread),
1799                                      &lwi);
1800                 }
1801         } else {
1802                 lfsck->li_start_unplug = 1;
1803                 wake_up_all(&thread->t_ctl_waitq);
1804         }
1805
1806         GOTO(put, rc);
1807
1808 out:
1809         mutex_unlock(&lfsck->li_mutex);
1810
1811 put:
1812         lfsck_instance_put(env, lfsck);
1813
1814         return rc < 0 ? rc : 0;
1815 }
1816 EXPORT_SYMBOL(lfsck_start);
1817
1818 int lfsck_stop(const struct lu_env *env, struct dt_device *key,
1819                struct lfsck_stop *stop)
1820 {
1821         struct lfsck_instance   *lfsck;
1822         struct ptlrpc_thread    *thread;
1823         struct l_wait_info       lwi    = { 0 };
1824         int                      rc     = 0;
1825         int                      rc1    = 0;
1826         ENTRY;
1827
1828         lfsck = lfsck_instance_find(key, true, false);
1829         if (unlikely(lfsck == NULL))
1830                 RETURN(-ENXIO);
1831
1832         thread = &lfsck->li_thread;
1833         /* release lfsck::li_mutex to avoid deadlock. */
1834         if (stop != NULL && stop->ls_flags & LPF_BROADCAST) {
1835                 if (!lfsck->li_master) {
1836                         CERROR("%s: only allow to specify '-A' via MDS\n",
1837                                lfsck_lfsck2name(lfsck));
1838
1839                         GOTO(out, rc = -EPERM);
1840                 }
1841
1842                 rc1 = lfsck_stop_all(env, lfsck, stop);
1843         }
1844
1845         mutex_lock(&lfsck->li_mutex);
1846         spin_lock(&lfsck->li_lock);
1847         /* no error if LFSCK is already stopped, or was never started */
1848         if (thread_is_init(thread) || thread_is_stopped(thread)) {
1849                 spin_unlock(&lfsck->li_lock);
1850                 GOTO(out, rc = 0);
1851         }
1852
1853         if (stop != NULL) {
1854                 lfsck->li_status = stop->ls_status;
1855                 lfsck->li_flags = stop->ls_flags;
1856         } else {
1857                 lfsck->li_status = LS_STOPPED;
1858                 lfsck->li_flags = 0;
1859         }
1860
1861         thread_set_flags(thread, SVC_STOPPING);
1862         spin_unlock(&lfsck->li_lock);
1863
1864         wake_up_all(&thread->t_ctl_waitq);
1865         l_wait_event(thread->t_ctl_waitq,
1866                      thread_is_stopped(thread),
1867                      &lwi);
1868
1869         GOTO(out, rc = 0);
1870
1871 out:
1872         mutex_unlock(&lfsck->li_mutex);
1873         lfsck_instance_put(env, lfsck);
1874
1875         return rc != 0 ? rc : rc1;
1876 }
1877 EXPORT_SYMBOL(lfsck_stop);
1878
1879 int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
1880                     struct lfsck_request *lr)
1881 {
1882         int rc = -EOPNOTSUPP;
1883         ENTRY;
1884
1885         switch (lr->lr_event) {
1886         case LE_START: {
1887                 struct lfsck_start       *start = &lfsck_env_info(env)->lti_start;
1888                 struct lfsck_start_param  lsp;
1889
1890                 memset(start, 0, sizeof(*start));
1891                 start->ls_valid = lr->lr_valid;
1892                 start->ls_speed_limit = lr->lr_speed;
1893                 start->ls_version = lr->lr_version;
1894                 start->ls_active = lr->lr_active;
1895                 start->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1896                 start->ls_async_windows = lr->lr_async_windows;
1897
1898                 lsp.lsp_start = start;
1899                 lsp.lsp_index = lr->lr_index;
1900                 lsp.lsp_index_valid = 1;
1901                 rc = lfsck_start(env, key, &lsp);
1902                 break;
1903         }
1904         case LE_STOP: {
1905                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
1906
1907                 memset(stop, 0, sizeof(*stop));
1908                 stop->ls_status = lr->lr_status;
1909                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1910                 rc = lfsck_stop(env, key, stop);
1911                 break;
1912         }
1913         case LE_PHASE1_DONE:
1914         case LE_PHASE2_DONE:
1915         case LE_FID_ACCESSED:
1916         case LE_PEER_EXIT:
1917         case LE_CONDITIONAL_DESTROY:
1918         case LE_PAIRS_VERIFY: {
1919                 struct lfsck_instance  *lfsck;
1920                 struct lfsck_component *com;
1921
1922                 lfsck = lfsck_instance_find(key, true, false);
1923                 if (unlikely(lfsck == NULL))
1924                         RETURN(-ENXIO);
1925
1926                 com = lfsck_component_find(lfsck, lr->lr_active);
1927                 if (likely(com != NULL)) {
1928                         rc = com->lc_ops->lfsck_in_notify(env, com, lr);
1929                         lfsck_component_put(env, com);
1930                 }
1931
1932                 lfsck_instance_put(env, lfsck);
1933                 break;
1934         }
1935         default:
1936                 break;
1937         }
1938
1939         RETURN(rc);
1940 }
1941 EXPORT_SYMBOL(lfsck_in_notify);
1942
1943 int lfsck_query(const struct lu_env *env, struct dt_device *key,
1944                 struct lfsck_request *lr)
1945 {
1946         struct lfsck_instance  *lfsck;
1947         struct lfsck_component *com;
1948         int                     rc;
1949         ENTRY;
1950
1951         lfsck = lfsck_instance_find(key, true, false);
1952         if (unlikely(lfsck == NULL))
1953                 RETURN(-ENXIO);
1954
1955         com = lfsck_component_find(lfsck, lr->lr_active);
1956         if (likely(com != NULL)) {
1957                 rc = com->lc_ops->lfsck_query(env, com);
1958                 lfsck_component_put(env, com);
1959         } else {
1960                 rc = -ENOTSUPP;
1961         }
1962
1963         lfsck_instance_put(env, lfsck);
1964
1965         RETURN(rc);
1966 }
1967 EXPORT_SYMBOL(lfsck_query);
1968
1969 int lfsck_register_namespace(const struct lu_env *env, struct dt_device *key,
1970                              struct ldlm_namespace *ns)
1971 {
1972         struct lfsck_instance  *lfsck;
1973         int                     rc      = -ENXIO;
1974
1975         lfsck = lfsck_instance_find(key, true, false);
1976         if (likely(lfsck != NULL)) {
1977                 lfsck->li_namespace = ns;
1978                 lfsck_instance_put(env, lfsck);
1979                 rc = 0;
1980         }
1981
1982         return rc;
1983 }
1984 EXPORT_SYMBOL(lfsck_register_namespace);
1985
1986 int lfsck_register(const struct lu_env *env, struct dt_device *key,
1987                    struct dt_device *next, struct obd_device *obd,
1988                    lfsck_out_notify notify, void *notify_data, bool master)
1989 {
1990         struct lfsck_instance   *lfsck;
1991         struct dt_object        *root  = NULL;
1992         struct dt_object        *obj   = NULL;
1993         struct lu_fid           *fid   = &lfsck_env_info(env)->lti_fid;
1994         int                      rc;
1995         ENTRY;
1996
1997         lfsck = lfsck_instance_find(key, false, false);
1998         if (unlikely(lfsck != NULL))
1999                 RETURN(-EEXIST);
2000
2001         OBD_ALLOC_PTR(lfsck);
2002         if (lfsck == NULL)
2003                 RETURN(-ENOMEM);
2004
2005         mutex_init(&lfsck->li_mutex);
2006         spin_lock_init(&lfsck->li_lock);
2007         INIT_LIST_HEAD(&lfsck->li_link);
2008         INIT_LIST_HEAD(&lfsck->li_list_scan);
2009         INIT_LIST_HEAD(&lfsck->li_list_dir);
2010         INIT_LIST_HEAD(&lfsck->li_list_double_scan);
2011         INIT_LIST_HEAD(&lfsck->li_list_idle);
2012         atomic_set(&lfsck->li_ref, 1);
2013         atomic_set(&lfsck->li_double_scan_count, 0);
2014         init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
2015         lfsck->li_out_notify = notify;
2016         lfsck->li_out_notify_data = notify_data;
2017         lfsck->li_next = next;
2018         lfsck->li_bottom = key;
2019         lfsck->li_obd = obd;
2020
2021         rc = lfsck_tgt_descs_init(&lfsck->li_ost_descs);
2022         if (rc != 0)
2023                 GOTO(out, rc);
2024
2025         rc = lfsck_tgt_descs_init(&lfsck->li_mdt_descs);
2026         if (rc != 0)
2027                 GOTO(out, rc);
2028
2029         fid->f_seq = FID_SEQ_LOCAL_NAME;
2030         fid->f_oid = 1;
2031         fid->f_ver = 0;
2032         rc = local_oid_storage_init(env, key, fid, &lfsck->li_los);
2033         if (rc != 0)
2034                 GOTO(out, rc);
2035
2036         rc = dt_root_get(env, key, fid);
2037         if (rc != 0)
2038                 GOTO(out, rc);
2039
2040         root = dt_locate(env, key, fid);
2041         if (IS_ERR(root))
2042                 GOTO(out, rc = PTR_ERR(root));
2043
2044         if (unlikely(!dt_try_as_dir(env, root)))
2045                 GOTO(out, rc = -ENOTDIR);
2046
2047         lfsck->li_local_root_fid = *fid;
2048         if (master) {
2049                 lfsck->li_master = 1;
2050                 if (lfsck_dev_idx(key) == 0) {
2051                         struct lu_fid *pfid = &lfsck_env_info(env)->lti_fid2;
2052                         const struct lu_name *cname;
2053
2054                         rc = dt_lookup(env, root,
2055                                 (struct dt_rec *)(&lfsck->li_global_root_fid),
2056                                 (const struct dt_key *)"ROOT", BYPASS_CAPA);
2057                         if (rc != 0)
2058                                 GOTO(out, rc);
2059
2060                         obj = dt_locate(env, key, &lfsck->li_global_root_fid);
2061                         if (IS_ERR(obj))
2062                                 GOTO(out, rc = PTR_ERR(obj));
2063
2064                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2065                                 (const struct dt_key *)dotlustre, BYPASS_CAPA);
2066                         if (rc != 0)
2067                                 GOTO(out, rc);
2068
2069                         lu_object_put(env, &obj->do_lu);
2070                         obj = dt_locate(env, key, fid);
2071                         if (IS_ERR(obj))
2072                                 GOTO(out, rc = PTR_ERR(obj));
2073
2074                         cname = lfsck_name_get_const(env, dotlustre,
2075                                                      strlen(dotlustre));
2076                         rc = lfsck_verify_linkea(env, key, obj, cname,
2077                                                  &lfsck->li_global_root_fid);
2078                         if (rc != 0)
2079                                 GOTO(out, rc);
2080
2081                         *pfid = *fid;
2082                         rc = dt_lookup(env, obj, (struct dt_rec *)fid,
2083                                        (const struct dt_key *)lostfound,
2084                                        BYPASS_CAPA);
2085                         if (rc != 0)
2086                                 GOTO(out, rc);
2087
2088                         lu_object_put(env, &obj->do_lu);
2089                         obj = dt_locate(env, key, fid);
2090                         if (IS_ERR(obj))
2091                                 GOTO(out, rc = PTR_ERR(obj));
2092
2093                         cname = lfsck_name_get_const(env, lostfound,
2094                                                      strlen(lostfound));
2095                         rc = lfsck_verify_linkea(env, key, obj, cname, pfid);
2096                         if (rc != 0)
2097                                 GOTO(out, rc);
2098
2099                         lu_object_put(env, &obj->do_lu);
2100                         obj = NULL;
2101                 }
2102         }
2103
2104         fid->f_seq = FID_SEQ_LOCAL_FILE;
2105         fid->f_oid = OTABLE_IT_OID;
2106         fid->f_ver = 0;
2107         obj = dt_locate(env, key, fid);
2108         if (IS_ERR(obj))
2109                 GOTO(out, rc = PTR_ERR(obj));
2110
2111         lu_object_get(&obj->do_lu);
2112         lfsck->li_obj_oit = obj;
2113         rc = obj->do_ops->do_index_try(env, obj, &dt_otable_features);
2114         if (rc != 0)
2115                 GOTO(out, rc);
2116
2117         rc = lfsck_bookmark_setup(env, lfsck);
2118         if (rc != 0)
2119                 GOTO(out, rc);
2120
2121         if (master) {
2122                 rc = lfsck_fid_init(lfsck);
2123                 if (rc < 0)
2124                         GOTO(out, rc);
2125
2126                 rc = lfsck_namespace_setup(env, lfsck);
2127                 if (rc < 0)
2128                         GOTO(out, rc);
2129         }
2130
2131         rc = lfsck_layout_setup(env, lfsck);
2132         if (rc < 0)
2133                 GOTO(out, rc);
2134
2135         /* XXX: more LFSCK components initialization to be added here. */
2136
2137         rc = lfsck_instance_add(lfsck);
2138         if (rc == 0)
2139                 rc = lfsck_add_target_from_orphan(env, lfsck);
2140 out:
2141         if (obj != NULL && !IS_ERR(obj))
2142                 lu_object_put(env, &obj->do_lu);
2143         if (root != NULL && !IS_ERR(root))
2144                 lu_object_put(env, &root->do_lu);
2145         if (rc != 0)
2146                 lfsck_instance_cleanup(env, lfsck);
2147         return rc;
2148 }
2149 EXPORT_SYMBOL(lfsck_register);
2150
2151 void lfsck_degister(const struct lu_env *env, struct dt_device *key)
2152 {
2153         struct lfsck_instance *lfsck;
2154
2155         lfsck = lfsck_instance_find(key, false, true);
2156         if (lfsck != NULL)
2157                 lfsck_instance_put(env, lfsck);
2158 }
2159 EXPORT_SYMBOL(lfsck_degister);
2160
2161 int lfsck_add_target(const struct lu_env *env, struct dt_device *key,
2162                      struct dt_device *tgt, struct obd_export *exp,
2163                      __u32 index, bool for_ost)
2164 {
2165         struct lfsck_instance   *lfsck;
2166         struct lfsck_tgt_desc   *ltd;
2167         int                      rc;
2168         ENTRY;
2169
2170         OBD_ALLOC_PTR(ltd);
2171         if (ltd == NULL)
2172                 RETURN(-ENOMEM);
2173
2174         ltd->ltd_tgt = tgt;
2175         ltd->ltd_key = key;
2176         ltd->ltd_exp = exp;
2177         INIT_LIST_HEAD(&ltd->ltd_orphan_list);
2178         INIT_LIST_HEAD(&ltd->ltd_layout_list);
2179         INIT_LIST_HEAD(&ltd->ltd_layout_phase_list);
2180         atomic_set(&ltd->ltd_ref, 1);
2181         ltd->ltd_index = index;
2182
2183         spin_lock(&lfsck_instance_lock);
2184         lfsck = __lfsck_instance_find(key, true, false);
2185         if (lfsck == NULL) {
2186                 if (for_ost)
2187                         list_add_tail(&ltd->ltd_orphan_list,
2188                                       &lfsck_ost_orphan_list);
2189                 else
2190                         list_add_tail(&ltd->ltd_orphan_list,
2191                                       &lfsck_mdt_orphan_list);
2192                 spin_unlock(&lfsck_instance_lock);
2193
2194                 RETURN(0);
2195         }
2196         spin_unlock(&lfsck_instance_lock);
2197
2198         rc = __lfsck_add_target(env, lfsck, ltd, for_ost, false);
2199         if (rc != 0)
2200                 lfsck_tgt_put(ltd);
2201
2202         lfsck_instance_put(env, lfsck);
2203
2204         RETURN(rc);
2205 }
2206 EXPORT_SYMBOL(lfsck_add_target);
2207
2208 void lfsck_del_target(const struct lu_env *env, struct dt_device *key,
2209                       struct dt_device *tgt, __u32 index, bool for_ost)
2210 {
2211         struct lfsck_instance   *lfsck;
2212         struct lfsck_tgt_descs  *ltds;
2213         struct lfsck_tgt_desc   *ltd;
2214         struct list_head        *head;
2215
2216         if (for_ost)
2217                 head = &lfsck_ost_orphan_list;
2218         else
2219                 head = &lfsck_mdt_orphan_list;
2220
2221         spin_lock(&lfsck_instance_lock);
2222         list_for_each_entry(ltd, head, ltd_orphan_list) {
2223                 if (ltd->ltd_tgt == tgt) {
2224                         list_del_init(&ltd->ltd_orphan_list);
2225                         spin_unlock(&lfsck_instance_lock);
2226                         lfsck_tgt_put(ltd);
2227
2228                         return;
2229                 }
2230         }
2231
2232         ltd = NULL;
2233         lfsck = __lfsck_instance_find(key, true, false);
2234         spin_unlock(&lfsck_instance_lock);
2235         if (unlikely(lfsck == NULL))
2236                 return;
2237
2238         if (for_ost)
2239                 ltds = &lfsck->li_ost_descs;
2240         else
2241                 ltds = &lfsck->li_mdt_descs;
2242
2243         down_write(&ltds->ltd_rw_sem);
2244         LASSERT(ltds->ltd_tgts_bitmap != NULL);
2245
2246         if (unlikely(index >= ltds->ltd_tgts_bitmap->size))
2247                 goto unlock;
2248
2249         ltd = LTD_TGT(ltds, index);
2250         if (unlikely(ltd == NULL))
2251                 goto unlock;
2252
2253         LASSERT(ltds->ltd_tgtnr > 0);
2254
2255         ltds->ltd_tgtnr--;
2256         cfs_bitmap_clear(ltds->ltd_tgts_bitmap, index);
2257         LTD_TGT(ltds, index) = NULL;
2258
2259 unlock:
2260         if (ltd == NULL) {
2261                 if (for_ost)
2262                         head = &lfsck->li_ost_descs.ltd_orphan;
2263                 else
2264                         head = &lfsck->li_mdt_descs.ltd_orphan;
2265
2266                 list_for_each_entry(ltd, head, ltd_orphan_list) {
2267                         if (ltd->ltd_tgt == tgt) {
2268                                 list_del_init(&ltd->ltd_orphan_list);
2269                                 break;
2270                         }
2271                 }
2272         }
2273
2274         up_write(&ltds->ltd_rw_sem);
2275         if (ltd != NULL) {
2276                 spin_lock(&ltds->ltd_lock);
2277                 ltd->ltd_dead = 1;
2278                 spin_unlock(&ltds->ltd_lock);
2279                 lfsck_stop_notify(env, lfsck, ltds, ltd, LFSCK_TYPE_LAYOUT);
2280                 lfsck_tgt_put(ltd);
2281         }
2282
2283         lfsck_instance_put(env, lfsck);
2284 }
2285 EXPORT_SYMBOL(lfsck_del_target);
2286
2287 static int __init lfsck_init(void)
2288 {
2289         int rc;
2290
2291         INIT_LIST_HEAD(&lfsck_instance_list);
2292         INIT_LIST_HEAD(&lfsck_ost_orphan_list);
2293         INIT_LIST_HEAD(&lfsck_mdt_orphan_list);
2294         lfsck_key_init_generic(&lfsck_thread_key, NULL);
2295         rc = lu_context_key_register(&lfsck_thread_key);
2296         if (rc == 0) {
2297                 tgt_register_lfsck_in_notify(lfsck_in_notify);
2298                 tgt_register_lfsck_query(lfsck_query);
2299         }
2300
2301         return rc;
2302 }
2303
2304 static void __exit lfsck_exit(void)
2305 {
2306         struct lfsck_tgt_desc *ltd;
2307         struct lfsck_tgt_desc *next;
2308
2309         LASSERT(list_empty(&lfsck_instance_list));
2310
2311         list_for_each_entry_safe(ltd, next, &lfsck_ost_orphan_list,
2312                                  ltd_orphan_list) {
2313                 list_del_init(&ltd->ltd_orphan_list);
2314                 lfsck_tgt_put(ltd);
2315         }
2316
2317         list_for_each_entry_safe(ltd, next, &lfsck_mdt_orphan_list,
2318                                  ltd_orphan_list) {
2319                 list_del_init(&ltd->ltd_orphan_list);
2320                 lfsck_tgt_put(ltd);
2321         }
2322
2323         lu_context_key_degister(&lfsck_thread_key);
2324 }
2325
2326 MODULE_AUTHOR("Intel Corporation <http://www.intel.com/>");
2327 MODULE_DESCRIPTION("LFSCK");
2328 MODULE_LICENSE("GPL");
2329
2330 cfs_module(lfsck, LUSTRE_VERSION_STRING, lfsck_init, lfsck_exit);