Whamcloud - gitweb
LU-4788 lfsck: namespace LFSCK uses assistant thread
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 struct lfsck_namespace_req {
55         struct lfsck_assistant_req       lnr_lar;
56         struct dt_object                *lnr_obj;
57         struct lu_fid                    lnr_fid;
58         __u64                            lnr_oit_cookie;
59         __u64                            lnr_dir_cookie;
60         __u32                            lnr_attr;
61         __u32                            lnr_size;
62         __u16                            lnr_type;
63         __u16                            lnr_namelen;
64         char                             lnr_name[0];
65 };
66
67 static struct lfsck_namespace_req *
68 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
69                                    struct lu_dirent *ent, __u16 type)
70 {
71         struct lfsck_namespace_req *lnr;
72         int                         size;
73
74         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
75         OBD_ALLOC(lnr, size);
76         if (lnr == NULL)
77                 return ERR_PTR(-ENOMEM);
78
79         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
80         lu_object_get(&lfsck->li_obj_dir->do_lu);
81         lnr->lnr_obj = lfsck->li_obj_dir;
82         lnr->lnr_fid = ent->lde_fid;
83         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
84         lnr->lnr_dir_cookie = ent->lde_hash;
85         lnr->lnr_attr = ent->lde_attrs;
86         lnr->lnr_size = size;
87         lnr->lnr_type = type;
88         lnr->lnr_namelen = ent->lde_namelen;
89         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
90
91         return lnr;
92 }
93
94 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
95                                                struct lfsck_assistant_req *lar)
96 {
97         struct lfsck_namespace_req *lnr =
98                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
99
100         lu_object_put(env, &lnr->lnr_obj->do_lu);
101         OBD_FREE(lnr, lnr->lnr_size);
102 }
103
104 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
105                                       struct lfsck_namespace *src)
106 {
107         dst->ln_magic = le32_to_cpu(src->ln_magic);
108         dst->ln_status = le32_to_cpu(src->ln_status);
109         dst->ln_flags = le32_to_cpu(src->ln_flags);
110         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
111         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
112         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
113         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
114         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
115         dst->ln_time_last_checkpoint =
116                                 le64_to_cpu(src->ln_time_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
118                                  &src->ln_pos_latest_start);
119         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
120                                  &src->ln_pos_last_checkpoint);
121         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
122                                  &src->ln_pos_first_inconsistent);
123         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
124         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
125         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
126         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
127         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
128         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
129         dst->ln_objs_repaired_phase2 =
130                                 le64_to_cpu(src->ln_objs_repaired_phase2);
131         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
132         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
133         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
134         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
135                       &src->ln_fid_latest_scanned_phase2);
136         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
137         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
138 }
139
140 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
141                                       struct lfsck_namespace *src)
142 {
143         dst->ln_magic = cpu_to_le32(src->ln_magic);
144         dst->ln_status = cpu_to_le32(src->ln_status);
145         dst->ln_flags = cpu_to_le32(src->ln_flags);
146         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
147         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
148         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
149         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
150         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
151         dst->ln_time_last_checkpoint =
152                                 cpu_to_le64(src->ln_time_last_checkpoint);
153         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
154                                  &src->ln_pos_latest_start);
155         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
156                                  &src->ln_pos_last_checkpoint);
157         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
158                                  &src->ln_pos_first_inconsistent);
159         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
160         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
161         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
162         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
163         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
164         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
165         dst->ln_objs_repaired_phase2 =
166                                 cpu_to_le64(src->ln_objs_repaired_phase2);
167         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
168         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
169         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
170         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
171                       &src->ln_fid_latest_scanned_phase2);
172         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
173         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
174 }
175
176 static void lfsck_namespace_record_failure(const struct lu_env *env,
177                                            struct lfsck_instance *lfsck,
178                                            struct lfsck_namespace *ns)
179 {
180         struct lfsck_position pos;
181
182         ns->ln_items_failed++;
183         lfsck_pos_fill(env, lfsck, &pos, false);
184         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
185             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
186                 ns->ln_pos_first_inconsistent = pos;
187
188                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
189                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
190                        lfsck_lfsck2name(lfsck),
191                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
192                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
193                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
194         }
195 }
196
197 /**
198  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
199  * \retval 0: succeed.
200  * \retval -ve: failed cases.
201  */
202 static int lfsck_namespace_load(const struct lu_env *env,
203                                 struct lfsck_component *com)
204 {
205         int len = com->lc_file_size;
206         int rc;
207
208         rc = dt_xattr_get(env, com->lc_obj,
209                           lfsck_buf_get(env, com->lc_file_disk, len),
210                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
211         if (rc == len) {
212                 struct lfsck_namespace *ns = com->lc_file_ram;
213
214                 lfsck_namespace_le_to_cpu(ns,
215                                 (struct lfsck_namespace *)com->lc_file_disk);
216                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
217                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
218                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
219                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
220                         rc = 1;
221                 } else {
222                         rc = 0;
223                 }
224         } else if (rc != -ENODATA) {
225                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
226                        "expected = %d: rc = %d\n",
227                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
228                 if (rc >= 0)
229                         rc = 1;
230         }
231         return rc;
232 }
233
234 static int lfsck_namespace_store(const struct lu_env *env,
235                                  struct lfsck_component *com, bool init)
236 {
237         struct dt_object        *obj    = com->lc_obj;
238         struct lfsck_instance   *lfsck  = com->lc_lfsck;
239         struct thandle          *handle;
240         int                      len    = com->lc_file_size;
241         int                      rc;
242         ENTRY;
243
244         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
245                                   (struct lfsck_namespace *)com->lc_file_ram);
246         handle = dt_trans_create(env, lfsck->li_bottom);
247         if (IS_ERR(handle))
248                 GOTO(log, rc = PTR_ERR(handle));
249
250         rc = dt_declare_xattr_set(env, obj,
251                                   lfsck_buf_get(env, com->lc_file_disk, len),
252                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         rc = dt_xattr_set(env, obj,
261                           lfsck_buf_get(env, com->lc_file_disk, len),
262                           XATTR_NAME_LFSCK_NAMESPACE,
263                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
264                           handle, BYPASS_CAPA);
265
266         GOTO(out, rc);
267
268 out:
269         dt_trans_stop(env, lfsck->li_bottom, handle);
270
271 log:
272         if (rc != 0)
273                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
274                        lfsck_lfsck2name(lfsck), rc);
275         return rc;
276 }
277
278 static int lfsck_namespace_init(const struct lu_env *env,
279                                 struct lfsck_component *com)
280 {
281         struct lfsck_namespace *ns = com->lc_file_ram;
282         int rc;
283
284         memset(ns, 0, sizeof(*ns));
285         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
286         ns->ln_status = LS_INIT;
287         down_write(&com->lc_sem);
288         rc = lfsck_namespace_store(env, com, true);
289         up_write(&com->lc_sem);
290         return rc;
291 }
292
293 static int lfsck_namespace_lookup(const struct lu_env *env,
294                                   struct lfsck_component *com,
295                                   const struct lu_fid *fid, __u8 *flags)
296 {
297         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
298         int            rc;
299
300         fid_cpu_to_be(key, fid);
301         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
302                        (const struct dt_key *)key, BYPASS_CAPA);
303         return rc;
304 }
305
306 static int lfsck_namespace_delete(const struct lu_env *env,
307                                   struct lfsck_component *com,
308                                   const struct lu_fid *fid)
309 {
310         struct lfsck_instance   *lfsck  = com->lc_lfsck;
311         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
312         struct thandle          *handle;
313         struct dt_object        *obj    = com->lc_obj;
314         int                      rc;
315         ENTRY;
316
317         handle = dt_trans_create(env, lfsck->li_bottom);
318         if (IS_ERR(handle))
319                 RETURN(PTR_ERR(handle));
320
321         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
322         if (rc != 0)
323                 GOTO(out, rc);
324
325         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
326         if (rc != 0)
327                 GOTO(out, rc);
328
329         fid_cpu_to_be(key, fid);
330         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
331                        BYPASS_CAPA);
332
333         GOTO(out, rc);
334
335 out:
336         dt_trans_stop(env, lfsck->li_bottom, handle);
337         return rc;
338 }
339
340 static int lfsck_namespace_update(const struct lu_env *env,
341                                   struct lfsck_component *com,
342                                   const struct lu_fid *fid,
343                                   __u8 flags, bool force)
344 {
345         struct lfsck_instance   *lfsck  = com->lc_lfsck;
346         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
347         struct thandle          *handle;
348         struct dt_object        *obj    = com->lc_obj;
349         int                      rc;
350         bool                     exist  = false;
351         __u8                     tf;
352         ENTRY;
353
354         rc = lfsck_namespace_lookup(env, com, fid, &tf);
355         if (rc != 0 && rc != -ENOENT)
356                 RETURN(rc);
357
358         if (rc == 0) {
359                 if (!force || flags == tf)
360                         RETURN(0);
361
362                 exist = true;
363                 handle = dt_trans_create(env, lfsck->li_bottom);
364                 if (IS_ERR(handle))
365                         RETURN(PTR_ERR(handle));
366
367                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
368                                        handle);
369                 if (rc != 0)
370                         GOTO(out, rc);
371         } else {
372                 handle = dt_trans_create(env, lfsck->li_bottom);
373                 if (IS_ERR(handle))
374                         RETURN(PTR_ERR(handle));
375         }
376
377         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
378                                (const struct dt_key *)fid, handle);
379         if (rc != 0)
380                 GOTO(out, rc);
381
382         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
383         if (rc != 0)
384                 GOTO(out, rc);
385
386         fid_cpu_to_be(key, fid);
387         if (exist) {
388                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
389                                BYPASS_CAPA);
390                 if (rc != 0)
391                         GOTO(out, rc);
392         }
393
394         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
395                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
396
397         GOTO(out, rc);
398
399 out:
400         dt_trans_stop(env, lfsck->li_bottom, handle);
401         return rc;
402 }
403
404 static int lfsck_namespace_check_exist(const struct lu_env *env,
405                                        struct dt_object *dir,
406                                        struct dt_object *obj, const char *name)
407 {
408         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
409         int               rc;
410         ENTRY;
411
412         if (unlikely(lfsck_is_dead_obj(obj)))
413                 RETURN(LFSCK_NAMEENTRY_DEAD);
414
415         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
416                        (const struct dt_key *)name, BYPASS_CAPA);
417         if (rc == -ENOENT)
418                 RETURN(LFSCK_NAMEENTRY_REMOVED);
419
420         if (rc < 0)
421                 RETURN(rc);
422
423         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
424                 RETURN(LFSCK_NAMEENTRY_RECREATED);
425
426         RETURN(0);
427 }
428
429 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
430                                             struct dt_object *obj,
431                                             struct thandle *handle)
432 {
433         int rc;
434
435         /* For destroying all invalid linkEA entries. */
436         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
437         if (rc != 0)
438                 return rc;
439
440         /* For insert new linkEA entry. */
441         rc = dt_declare_xattr_set(env, obj,
442                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
443                         XATTR_NAME_LINK, 0, handle);
444         return rc;
445 }
446
447 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
448                             struct linkea_data *ldata)
449 {
450         int rc;
451
452         ldata->ld_buf =
453                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
454                                        PAGE_CACHE_SIZE);
455         if (ldata->ld_buf->lb_buf == NULL)
456                 return -ENOMEM;
457
458         if (!dt_object_exists(obj))
459                 return -ENODATA;
460
461         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
462         if (rc == -ERANGE) {
463                 /* Buf was too small, figure out what we need. */
464                 lu_buf_free(ldata->ld_buf);
465                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
466                                   BYPASS_CAPA);
467                 if (rc < 0)
468                         return rc;
469
470                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
471                 if (ldata->ld_buf->lb_buf == NULL)
472                         return -ENOMEM;
473
474                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
475                                   BYPASS_CAPA);
476         }
477         if (rc < 0)
478                 return rc;
479
480         linkea_init(ldata);
481
482         return 0;
483 }
484
485 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
486                              struct linkea_data *ldata, struct thandle *handle)
487 {
488         const struct lu_buf *buf = lfsck_buf_get_const(env,
489                                                        ldata->ld_buf->lb_buf,
490                                                        ldata->ld_leh->leh_len);
491
492         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
493                             BYPASS_CAPA);
494 }
495
496 /**
497  * \retval ve: removed entries
498  */
499 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
500                                      struct linkea_data *ldata,
501                                      struct lu_name *cname,
502                                      struct lu_fid *pfid)
503 {
504         struct link_ea_entry    *oldlee;
505         int                      oldlen;
506         int                      removed = 0;
507
508         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
509         oldlee = ldata->ld_lee;
510         oldlen = ldata->ld_reclen;
511         linkea_next_entry(ldata);
512         while (ldata->ld_lee != NULL) {
513                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
514                                    ldata->ld_lee->lee_reclen[1];
515                 if (unlikely(ldata->ld_reclen == oldlen &&
516                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
517                         linkea_del_buf(ldata, cname);
518                         removed++;
519                 } else {
520                         linkea_next_entry(ldata);
521                 }
522         }
523         ldata->ld_lee = oldlee;
524         ldata->ld_reclen = oldlen;
525         return removed;
526 }
527
528 /**
529  * \retval +ve  repaired
530  * \retval 0    no need to repair
531  * \retval -ve  error cases
532  */
533 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
534                                            struct lfsck_component *com,
535                                            struct dt_object *child, __u8 flags)
536 {
537         struct lfsck_thread_info *info    = lfsck_env_info(env);
538         struct lu_attr           *la      = &info->lti_la;
539         struct lu_name           *cname   = &info->lti_name;
540         struct lu_fid            *pfid    = &info->lti_fid;
541         struct lu_fid            *cfid    = &info->lti_fid2;
542         struct lfsck_instance   *lfsck    = com->lc_lfsck;
543         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
544         struct lfsck_namespace  *ns       = com->lc_file_ram;
545         struct linkea_data       ldata    = { 0 };
546         struct thandle          *handle   = NULL;
547         bool                     locked   = false;
548         bool                     update   = false;
549         int                      rc;
550         ENTRY;
551
552         if (com->lc_journal) {
553
554 again:
555                 LASSERT(!locked);
556
557                 update = false;
558                 com->lc_journal = 1;
559                 handle = dt_trans_create(env, lfsck->li_next);
560                 if (IS_ERR(handle))
561                         RETURN(rc = PTR_ERR(handle));
562
563                 rc = dt_declare_xattr_set(env, child,
564                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
565                         XATTR_NAME_LINK, 0, handle);
566                 if (rc != 0)
567                         GOTO(stop, rc);
568
569                 rc = dt_trans_start(env, lfsck->li_next, handle);
570                 if (rc != 0)
571                         GOTO(stop, rc);
572
573                 dt_write_lock(env, child, MOR_TGT_CHILD);
574                 locked = true;
575         }
576
577         if (unlikely(lfsck_is_dead_obj(child)))
578                 GOTO(stop, rc = 0);
579
580         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
581         if (rc == 0)
582                 rc = lfsck_links_read(env, child, &ldata);
583         if (rc != 0) {
584                 if ((bk->lb_param & LPF_DRYRUN) &&
585                     (rc == -EINVAL || rc == -ENODATA))
586                         rc = 1;
587
588                 GOTO(stop, rc);
589         }
590
591         linkea_first_entry(&ldata);
592         while (ldata.ld_lee != NULL) {
593                 struct dt_object *parent = NULL;
594
595                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
596                 if (rc > 0)
597                         update = true;
598
599                 if (!fid_is_sane(pfid))
600                         goto shrink;
601
602                 parent = lfsck_object_find(env, lfsck, pfid);
603                 if (IS_ERR(parent))
604                         GOTO(stop, rc = PTR_ERR(parent));
605
606                 if (!dt_object_exists(parent))
607                         goto shrink;
608
609                 /* XXX: Currently, skip remote object, the consistency for
610                  *      remote object will be processed in LFSCK phase III. */
611                 if (dt_object_remote(parent)) {
612                         lfsck_object_put(env, parent);
613                         linkea_next_entry(&ldata);
614                         continue;
615                 }
616
617                 if (unlikely(!dt_try_as_dir(env, parent)))
618                         goto shrink;
619
620                 /* To guarantee the 'name' is terminated with '0'. */
621                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
622                 info->lti_key[cname->ln_namelen] = 0;
623                 cname->ln_name = info->lti_key;
624                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
625                                (const struct dt_key *)cname->ln_name,
626                                BYPASS_CAPA);
627                 if (rc != 0 && rc != -ENOENT) {
628                         lfsck_object_put(env, parent);
629                         GOTO(stop, rc);
630                 }
631
632                 if (rc == 0) {
633                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
634                                 lfsck_object_put(env, parent);
635                                 linkea_next_entry(&ldata);
636                                 continue;
637                         }
638
639                         goto shrink;
640                 }
641
642                 /* If there is no name entry in the parent dir and the object
643                  * link count is less than the linkea entries count, then the
644                  * linkea entry should be removed. */
645                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
646                         goto shrink;
647
648                 /* XXX: For the case of there is a linkea entry, but without
649                  *      name entry pointing to the object and its hard links
650                  *      count is not less than the object name entries count,
651                  *      then seems we should add the 'missed' name entry back
652                  *      to namespace, but before LFSCK phase III finished, we
653                  *      do not know whether the object has some inconsistency
654                  *      on other MDTs. So now, do NOT add the name entry back
655                  *      to the namespace, but keep the linkEA entry. LU-2914 */
656                 lfsck_object_put(env, parent);
657                 linkea_next_entry(&ldata);
658                 continue;
659
660 shrink:
661                 if (parent != NULL)
662                         lfsck_object_put(env, parent);
663                 if (bk->lb_param & LPF_DRYRUN)
664                         RETURN(1);
665
666                 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
667                       "for the object: "DFID", parent "DFID", name %.*s\n",
668                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
669                       PFID(pfid), cname->ln_namelen, cname->ln_name);
670
671                 linkea_del_buf(&ldata, cname);
672                 update = true;
673         }
674
675         if (update) {
676                 if (!com->lc_journal) {
677                         com->lc_journal = 1;
678                         goto again;
679                 }
680
681                 rc = lfsck_links_write(env, child, &ldata, handle);
682         }
683
684         GOTO(stop, rc);
685
686 stop:
687         if (locked) {
688         /* XXX: For the case linkea entries count does not match the object hard
689          *      links count, we cannot update the later one simply. Before LFSCK
690          *      phase III finished, we cannot know whether there are some remote
691          *      name entries to be repaired or not. LU-2914 */
692                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
693                     ldata.ld_leh != NULL &&
694                     ldata.ld_leh->leh_reccount != la->la_nlink)
695                         CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
696                                "count %u may not match its hardlink count %u\n",
697                                lfsck_lfsck2name(lfsck), PFID(cfid),
698                                ldata.ld_leh->leh_reccount, la->la_nlink);
699
700                 dt_write_unlock(env, child);
701         }
702
703         if (handle != NULL)
704                 dt_trans_stop(env, lfsck->li_next, handle);
705
706         if (rc == 0 && update) {
707                 ns->ln_objs_nlink_repaired++;
708                 rc = 1;
709         }
710
711         return rc;
712 }
713
714 /* namespace APIs */
715
716 static int lfsck_namespace_reset(const struct lu_env *env,
717                                  struct lfsck_component *com, bool init)
718 {
719         struct lfsck_instance   *lfsck = com->lc_lfsck;
720         struct lfsck_namespace  *ns    = com->lc_file_ram;
721         struct dt_object        *root;
722         struct dt_object        *dto;
723         int                      rc;
724         ENTRY;
725
726         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
727         if (IS_ERR(root))
728                 GOTO(log, rc = PTR_ERR(root));
729
730         if (unlikely(!dt_try_as_dir(env, root)))
731                 GOTO(put, rc = -ENOTDIR);
732
733         down_write(&com->lc_sem);
734         if (init) {
735                 memset(ns, 0, sizeof(*ns));
736         } else {
737                 __u32 count = ns->ln_success_count;
738                 __u64 last_time = ns->ln_time_last_complete;
739
740                 memset(ns, 0, sizeof(*ns));
741                 ns->ln_success_count = count;
742                 ns->ln_time_last_complete = last_time;
743         }
744         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
745         ns->ln_status = LS_INIT;
746
747         rc = local_object_unlink(env, lfsck->li_bottom, root,
748                                  lfsck_namespace_name);
749         if (rc != 0)
750                 GOTO(out, rc);
751
752         lfsck_object_put(env, com->lc_obj);
753         com->lc_obj = NULL;
754         dto = local_index_find_or_create(env, lfsck->li_los, root,
755                                          lfsck_namespace_name,
756                                          S_IFREG | S_IRUGO | S_IWUSR,
757                                          &dt_lfsck_features);
758         if (IS_ERR(dto))
759                 GOTO(out, rc = PTR_ERR(dto));
760
761         com->lc_obj = dto;
762         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
763         if (rc != 0)
764                 GOTO(out, rc);
765
766         rc = lfsck_namespace_store(env, com, true);
767
768         GOTO(out, rc);
769
770 out:
771         up_write(&com->lc_sem);
772
773 put:
774         lu_object_put(env, &root->do_lu);
775 log:
776         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
777                lfsck_lfsck2name(lfsck), rc);
778         return rc;
779 }
780
781 static void
782 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
783                      bool new_checked)
784 {
785         struct lfsck_namespace *ns = com->lc_file_ram;
786
787         down_write(&com->lc_sem);
788         if (new_checked)
789                 com->lc_new_checked++;
790         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
791         up_write(&com->lc_sem);
792 }
793
794 static int lfsck_namespace_checkpoint(const struct lu_env *env,
795                                       struct lfsck_component *com, bool init)
796 {
797         struct lfsck_instance   *lfsck = com->lc_lfsck;
798         struct lfsck_namespace  *ns    = com->lc_file_ram;
799         int                      rc;
800
801         if (!init) {
802                 rc = lfsck_checkpoint_generic(env, com);
803                 if (rc != 0)
804                         goto log;
805         }
806
807         down_write(&com->lc_sem);
808         if (init) {
809                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
810         } else {
811                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
812                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
813                                 HALF_SEC - lfsck->li_time_last_checkpoint);
814                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
815                 ns->ln_items_checked += com->lc_new_checked;
816                 com->lc_new_checked = 0;
817         }
818
819         rc = lfsck_namespace_store(env, com, false);
820         up_write(&com->lc_sem);
821
822 log:
823         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
824                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
825                lfsck->li_pos_current.lp_oit_cookie,
826                PFID(&lfsck->li_pos_current.lp_dir_parent),
827                lfsck->li_pos_current.lp_dir_cookie, rc);
828
829         return rc > 0 ? 0 : rc;
830 }
831
832 static int lfsck_namespace_prep(const struct lu_env *env,
833                                 struct lfsck_component *com,
834                                 struct lfsck_start_param *lsp)
835 {
836         struct lfsck_instance   *lfsck  = com->lc_lfsck;
837         struct lfsck_namespace  *ns     = com->lc_file_ram;
838         struct lfsck_position   *pos    = &com->lc_pos_start;
839         int                      rc;
840
841         if (ns->ln_status == LS_COMPLETED) {
842                 rc = lfsck_namespace_reset(env, com, false);
843                 if (rc == 0)
844                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
845
846                 if (rc != 0) {
847                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
848                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
849
850                         return rc;
851                 }
852         }
853
854         down_write(&com->lc_sem);
855         ns->ln_time_latest_start = cfs_time_current_sec();
856         spin_lock(&lfsck->li_lock);
857
858         if (ns->ln_flags & LF_SCANNED_ONCE) {
859                 if (!lfsck->li_drop_dryrun ||
860                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
861                         ns->ln_status = LS_SCANNING_PHASE2;
862                         list_move_tail(&com->lc_link,
863                                        &lfsck->li_list_double_scan);
864                         if (!list_empty(&com->lc_link_dir))
865                                 list_del_init(&com->lc_link_dir);
866                         lfsck_pos_set_zero(pos);
867                 } else {
868                         ns->ln_status = LS_SCANNING_PHASE1;
869                         ns->ln_run_time_phase1 = 0;
870                         ns->ln_run_time_phase2 = 0;
871                         ns->ln_items_checked = 0;
872                         ns->ln_items_repaired = 0;
873                         ns->ln_items_failed = 0;
874                         ns->ln_dirs_checked = 0;
875                         ns->ln_mlinked_checked = 0;
876                         ns->ln_objs_checked_phase2 = 0;
877                         ns->ln_objs_repaired_phase2 = 0;
878                         ns->ln_objs_failed_phase2 = 0;
879                         ns->ln_objs_nlink_repaired = 0;
880                         ns->ln_objs_lost_found = 0;
881                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
882                         if (list_empty(&com->lc_link_dir))
883                                 list_add_tail(&com->lc_link_dir,
884                                               &lfsck->li_list_dir);
885                         *pos = ns->ln_pos_first_inconsistent;
886                 }
887         } else {
888                 ns->ln_status = LS_SCANNING_PHASE1;
889                 if (list_empty(&com->lc_link_dir))
890                         list_add_tail(&com->lc_link_dir,
891                                       &lfsck->li_list_dir);
892                 if (!lfsck->li_drop_dryrun ||
893                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
894                         *pos = ns->ln_pos_last_checkpoint;
895                         pos->lp_oit_cookie++;
896                 } else {
897                         *pos = ns->ln_pos_first_inconsistent;
898                 }
899         }
900
901         spin_unlock(&lfsck->li_lock);
902         up_write(&com->lc_sem);
903
904         rc = lfsck_start_assistant(env, com, lsp);
905
906         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
907                DFID", "LPX64"]: rc = %d\n",
908                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
909                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
910
911         return rc;
912 }
913
914 static int lfsck_namespace_exec_oit(const struct lu_env *env,
915                                     struct lfsck_component *com,
916                                     struct dt_object *obj)
917 {
918         down_write(&com->lc_sem);
919         com->lc_new_checked++;
920         if (S_ISDIR(lfsck_object_type(obj)))
921                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
922         up_write(&com->lc_sem);
923         return 0;
924 }
925
926 static int lfsck_namespace_exec_dir(const struct lu_env *env,
927                                     struct lfsck_component *com,
928                                     struct lu_dirent *ent, __u16 type)
929 {
930         struct lfsck_assistant_data     *lad    = com->lc_data;
931         struct lfsck_namespace_req      *lnr;
932         bool                             wakeup = false;
933
934         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
935         if (IS_ERR(lnr)) {
936                 struct lfsck_namespace *ns = com->lc_file_ram;
937
938                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
939                 return PTR_ERR(lnr);
940         }
941
942         spin_lock(&lad->lad_lock);
943         if (lad->lad_assistant_status < 0) {
944                 spin_unlock(&lad->lad_lock);
945                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
946                 return lad->lad_assistant_status;
947         }
948
949         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
950         if (lad->lad_prefetched == 0)
951                 wakeup = true;
952
953         lad->lad_prefetched++;
954         spin_unlock(&lad->lad_lock);
955         if (wakeup)
956                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
957
958         down_write(&com->lc_sem);
959         com->lc_new_checked++;
960         up_write(&com->lc_sem);
961
962         return 0;
963 }
964
965 static int lfsck_namespace_post(const struct lu_env *env,
966                                 struct lfsck_component *com,
967                                 int result, bool init)
968 {
969         struct lfsck_instance   *lfsck = com->lc_lfsck;
970         struct lfsck_namespace  *ns    = com->lc_file_ram;
971         int                      rc;
972         ENTRY;
973
974         lfsck_post_generic(env, com, &result);
975
976         down_write(&com->lc_sem);
977         spin_lock(&lfsck->li_lock);
978         if (!init)
979                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
980         if (result > 0) {
981                 ns->ln_status = LS_SCANNING_PHASE2;
982                 ns->ln_flags |= LF_SCANNED_ONCE;
983                 ns->ln_flags &= ~LF_UPGRADE;
984                 list_del_init(&com->lc_link_dir);
985                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
986         } else if (result == 0) {
987                 ns->ln_status = lfsck->li_status;
988                 if (ns->ln_status == 0)
989                         ns->ln_status = LS_STOPPED;
990                 if (ns->ln_status != LS_PAUSED) {
991                         list_del_init(&com->lc_link_dir);
992                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
993                 }
994         } else {
995                 ns->ln_status = LS_FAILED;
996                 list_del_init(&com->lc_link_dir);
997                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
998         }
999         spin_unlock(&lfsck->li_lock);
1000
1001         if (!init) {
1002                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1003                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1004                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1005                 ns->ln_items_checked += com->lc_new_checked;
1006                 com->lc_new_checked = 0;
1007         }
1008
1009         rc = lfsck_namespace_store(env, com, false);
1010         up_write(&com->lc_sem);
1011
1012         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
1013                lfsck_lfsck2name(lfsck), rc);
1014
1015         RETURN(rc);
1016 }
1017
1018 static int
1019 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1020                      struct seq_file *m)
1021 {
1022         struct lfsck_instance   *lfsck = com->lc_lfsck;
1023         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1024         struct lfsck_namespace  *ns    = com->lc_file_ram;
1025         int                      rc;
1026
1027         down_read(&com->lc_sem);
1028         seq_printf(m, "name: lfsck_namespace\n"
1029                    "magic: %#x\n"
1030                    "version: %d\n"
1031                    "status: %s\n",
1032                    ns->ln_magic,
1033                    bk->lb_version,
1034                    lfsck_status2names(ns->ln_status));
1035
1036         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
1037         if (rc < 0)
1038                 goto out;
1039
1040         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
1041         if (rc < 0)
1042                 goto out;
1043
1044         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
1045                              "time_since_last_completed");
1046         if (rc < 0)
1047                 goto out;
1048
1049         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
1050                              "time_since_latest_start");
1051         if (rc < 0)
1052                 goto out;
1053
1054         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
1055                              "time_since_last_checkpoint");
1056         if (rc < 0)
1057                 goto out;
1058
1059         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
1060                             "latest_start_position");
1061         if (rc < 0)
1062                 goto out;
1063
1064         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
1065                             "last_checkpoint_position");
1066         if (rc < 0)
1067                 goto out;
1068
1069         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
1070                             "first_failure_position");
1071         if (rc < 0)
1072                 goto out;
1073
1074         if (ns->ln_status == LS_SCANNING_PHASE1) {
1075                 struct lfsck_position pos;
1076                 const struct dt_it_ops *iops;
1077                 cfs_duration_t duration = cfs_time_current() -
1078                                           lfsck->li_time_last_checkpoint;
1079                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1080                 __u64 speed = checked;
1081                 __u64 new_checked = com->lc_new_checked * HZ;
1082                 __u32 rtime = ns->ln_run_time_phase1 +
1083                               cfs_duration_sec(duration + HALF_SEC);
1084
1085                 if (duration != 0)
1086                         do_div(new_checked, duration);
1087                 if (rtime != 0)
1088                         do_div(speed, rtime);
1089                 seq_printf(m, "checked_phase1: "LPU64"\n"
1090                               "checked_phase2: "LPU64"\n"
1091                               "updated_phase1: "LPU64"\n"
1092                               "updated_phase2: "LPU64"\n"
1093                               "failed_phase1: "LPU64"\n"
1094                               "failed_phase2: "LPU64"\n"
1095                               "directories: "LPU64"\n"
1096                               "multi_linked_files: "LPU64"\n"
1097                               "dirent_repaired: "LPU64"\n"
1098                               "linkea_repaired: "LPU64"\n"
1099                               "nlinks_repaired: "LPU64"\n"
1100                               "lost_found: "LPU64"\n"
1101                               "success_count: %u\n"
1102                               "run_time_phase1: %u seconds\n"
1103                               "run_time_phase2: %u seconds\n"
1104                               "average_speed_phase1: "LPU64" items/sec\n"
1105                               "average_speed_phase2: N/A\n"
1106                               "real_time_speed_phase1: "LPU64" items/sec\n"
1107                               "real_time_speed_phase2: N/A\n",
1108                               checked,
1109                               ns->ln_objs_checked_phase2,
1110                               ns->ln_items_repaired,
1111                               ns->ln_objs_repaired_phase2,
1112                               ns->ln_items_failed,
1113                               ns->ln_objs_failed_phase2,
1114                               ns->ln_dirs_checked,
1115                               ns->ln_mlinked_checked,
1116                               ns->ln_dirent_repaired,
1117                               ns->ln_linkea_repaired,
1118                               ns->ln_objs_nlink_repaired,
1119                               ns->ln_objs_lost_found,
1120                               ns->ln_success_count,
1121                               rtime,
1122                               ns->ln_run_time_phase2,
1123                               speed,
1124                               new_checked);
1125
1126                 LASSERT(lfsck->li_di_oit != NULL);
1127
1128                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1129
1130                 /* The low layer otable-based iteration position may NOT
1131                  * exactly match the namespace-based directory traversal
1132                  * cookie. Generally, it is not a serious issue. But the
1133                  * caller should NOT make assumption on that. */
1134                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1135                 if (!lfsck->li_current_oit_processed)
1136                         pos.lp_oit_cookie--;
1137
1138                 spin_lock(&lfsck->li_lock);
1139                 if (lfsck->li_di_dir != NULL) {
1140                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1141                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1142                                 fid_zero(&pos.lp_dir_parent);
1143                                 pos.lp_dir_cookie = 0;
1144                         } else {
1145                                 pos.lp_dir_parent =
1146                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1147                         }
1148                 } else {
1149                         fid_zero(&pos.lp_dir_parent);
1150                         pos.lp_dir_cookie = 0;
1151                 }
1152                 spin_unlock(&lfsck->li_lock);
1153                 lfsck_pos_dump(m, &pos, "current_position");
1154         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1155                 cfs_duration_t duration = cfs_time_current() -
1156                                           lfsck->li_time_last_checkpoint;
1157                 __u64 checked = ns->ln_objs_checked_phase2 +
1158                                 com->lc_new_checked;
1159                 __u64 speed1 = ns->ln_items_checked;
1160                 __u64 speed2 = checked;
1161                 __u64 new_checked = com->lc_new_checked * HZ;
1162                 __u32 rtime = ns->ln_run_time_phase2 +
1163                               cfs_duration_sec(duration + HALF_SEC);
1164
1165                 if (duration != 0)
1166                         do_div(new_checked, duration);
1167                 if (ns->ln_run_time_phase1 != 0)
1168                         do_div(speed1, ns->ln_run_time_phase1);
1169                 if (rtime != 0)
1170                         do_div(speed2, rtime);
1171                 seq_printf(m, "checked_phase1: "LPU64"\n"
1172                               "checked_phase2: "LPU64"\n"
1173                               "updated_phase1: "LPU64"\n"
1174                               "updated_phase2: "LPU64"\n"
1175                               "failed_phase1: "LPU64"\n"
1176                               "failed_phase2: "LPU64"\n"
1177                               "directories: "LPU64"\n"
1178                               "multi_linked_files: "LPU64"\n"
1179                               "dirent_repaired: "LPU64"\n"
1180                               "linkea_repaired: "LPU64"\n"
1181                               "nlinks_repaired: "LPU64"\n"
1182                               "lost_found: "LPU64"\n"
1183                               "success_count: %u\n"
1184                               "run_time_phase1: %u seconds\n"
1185                               "run_time_phase2: %u seconds\n"
1186                               "average_speed_phase1: "LPU64" items/sec\n"
1187                               "average_speed_phase2: "LPU64" objs/sec\n"
1188                               "real_time_speed_phase1: N/A\n"
1189                               "real_time_speed_phase2: "LPU64" objs/sec\n"
1190                               "current_position: "DFID"\n",
1191                               ns->ln_items_checked,
1192                               checked,
1193                               ns->ln_items_repaired,
1194                               ns->ln_objs_repaired_phase2,
1195                               ns->ln_items_failed,
1196                               ns->ln_objs_failed_phase2,
1197                               ns->ln_dirs_checked,
1198                               ns->ln_mlinked_checked,
1199                               ns->ln_dirent_repaired,
1200                               ns->ln_linkea_repaired,
1201                               ns->ln_objs_nlink_repaired,
1202                               ns->ln_objs_lost_found,
1203                               ns->ln_success_count,
1204                               ns->ln_run_time_phase1,
1205                               rtime,
1206                               speed1,
1207                               speed2,
1208                               new_checked,
1209                               PFID(&ns->ln_fid_latest_scanned_phase2));
1210         } else {
1211                 __u64 speed1 = ns->ln_items_checked;
1212                 __u64 speed2 = ns->ln_objs_checked_phase2;
1213
1214                 if (ns->ln_run_time_phase1 != 0)
1215                         do_div(speed1, ns->ln_run_time_phase1);
1216                 if (ns->ln_run_time_phase2 != 0)
1217                         do_div(speed2, ns->ln_run_time_phase2);
1218                 seq_printf(m, "checked_phase1: "LPU64"\n"
1219                               "checked_phase2: "LPU64"\n"
1220                               "updated_phase1: "LPU64"\n"
1221                               "updated_phase2: "LPU64"\n"
1222                               "failed_phase1: "LPU64"\n"
1223                               "failed_phase2: "LPU64"\n"
1224                               "directories: "LPU64"\n"
1225                               "multi_linked_files: "LPU64"\n"
1226                               "dirent_repaired: "LPU64"\n"
1227                               "linkea_repaired: "LPU64"\n"
1228                               "nlinks_repaired: "LPU64"\n"
1229                               "lost_found: "LPU64"\n"
1230                               "success_count: %u\n"
1231                               "run_time_phase1: %u seconds\n"
1232                               "run_time_phase2: %u seconds\n"
1233                               "average_speed_phase1: "LPU64" items/sec\n"
1234                               "average_speed_phase2: "LPU64" objs/sec\n"
1235                               "real_time_speed_phase1: N/A\n"
1236                               "real_time_speed_phase2: N/A\n"
1237                               "current_position: N/A\n",
1238                               ns->ln_items_checked,
1239                               ns->ln_objs_checked_phase2,
1240                               ns->ln_items_repaired,
1241                               ns->ln_objs_repaired_phase2,
1242                               ns->ln_items_failed,
1243                               ns->ln_objs_failed_phase2,
1244                               ns->ln_dirs_checked,
1245                               ns->ln_mlinked_checked,
1246                               ns->ln_dirent_repaired,
1247                               ns->ln_linkea_repaired,
1248                               ns->ln_objs_nlink_repaired,
1249                               ns->ln_objs_lost_found,
1250                               ns->ln_success_count,
1251                               ns->ln_run_time_phase1,
1252                               ns->ln_run_time_phase2,
1253                               speed1,
1254                               speed2);
1255         }
1256 out:
1257         up_read(&com->lc_sem);
1258         return 0;
1259 }
1260
1261 static int lfsck_namespace_double_scan(const struct lu_env *env,
1262                                        struct lfsck_component *com)
1263 {
1264         struct lfsck_namespace *ns = com->lc_file_ram;
1265
1266         return lfsck_double_scan_generic(env, com, ns->ln_status);
1267 }
1268
1269 static void lfsck_namespace_data_release(const struct lu_env *env,
1270                                          struct lfsck_component *com)
1271 {
1272         struct lfsck_assistant_data     *lad    = com->lc_data;
1273         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
1274         struct lfsck_tgt_desc           *ltd;
1275         struct lfsck_tgt_desc           *next;
1276
1277         LASSERT(lad != NULL);
1278         LASSERT(thread_is_init(&lad->lad_thread) ||
1279                 thread_is_stopped(&lad->lad_thread));
1280         LASSERT(list_empty(&lad->lad_req_list));
1281
1282         com->lc_data = NULL;
1283
1284         spin_lock(&ltds->ltd_lock);
1285         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
1286                                  ltd_namespace_phase_list) {
1287                 list_del_init(&ltd->ltd_namespace_phase_list);
1288         }
1289         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
1290                                  ltd_namespace_phase_list) {
1291                 list_del_init(&ltd->ltd_namespace_phase_list);
1292         }
1293         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
1294                                  ltd_namespace_list) {
1295                 list_del_init(&ltd->ltd_namespace_list);
1296         }
1297         spin_unlock(&ltds->ltd_lock);
1298
1299         OBD_FREE_PTR(lad);
1300 }
1301
1302 static int lfsck_namespace_in_notify(const struct lu_env *env,
1303                                      struct lfsck_component *com,
1304                                      struct lfsck_request *lr)
1305 {
1306         struct lfsck_instance           *lfsck = com->lc_lfsck;
1307         struct lfsck_namespace          *ns    = com->lc_file_ram;
1308         struct lfsck_assistant_data     *lad   = com->lc_data;
1309         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
1310         struct lfsck_tgt_desc           *ltd;
1311         bool                             fail  = false;
1312         ENTRY;
1313
1314         if (lr->lr_event != LE_PHASE1_DONE &&
1315             lr->lr_event != LE_PHASE2_DONE &&
1316             lr->lr_event != LE_PEER_EXIT)
1317                 RETURN(-EINVAL);
1318
1319         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
1320                "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
1321                lr->lr_index, lr->lr_status);
1322
1323         spin_lock(&ltds->ltd_lock);
1324         ltd = LTD_TGT(ltds, lr->lr_index);
1325         if (ltd == NULL) {
1326                 spin_unlock(&ltds->ltd_lock);
1327
1328                 RETURN(-ENXIO);
1329         }
1330
1331         list_del_init(&ltd->ltd_namespace_phase_list);
1332         switch (lr->lr_event) {
1333         case LE_PHASE1_DONE:
1334                 if (lr->lr_status <= 0) {
1335                         ltd->ltd_namespace_done = 1;
1336                         list_del_init(&ltd->ltd_namespace_list);
1337                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
1338                                "phase1 for namespace LFSCK: rc = %d.\n",
1339                                lfsck_lfsck2name(lfsck),
1340                                ltd->ltd_index, lr->lr_status);
1341                         ns->ln_flags |= LF_INCOMPLETE;
1342                         fail = true;
1343                         break;
1344                 }
1345
1346                 if (list_empty(&ltd->ltd_namespace_list))
1347                         list_add_tail(&ltd->ltd_namespace_list,
1348                                       &lad->lad_mdt_list);
1349                 list_add_tail(&ltd->ltd_namespace_phase_list,
1350                               &lad->lad_mdt_phase2_list);
1351                 break;
1352         case LE_PHASE2_DONE:
1353                 ltd->ltd_namespace_done = 1;
1354                 list_del_init(&ltd->ltd_namespace_list);
1355                 break;
1356         case LE_PEER_EXIT:
1357                 fail = true;
1358                 ltd->ltd_namespace_done = 1;
1359                 list_del_init(&ltd->ltd_namespace_list);
1360                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
1361                         CDEBUG(D_LFSCK,
1362                                "%s: the peer MDT %x exit namespace LFSCK\n",
1363                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
1364                         ns->ln_flags |= LF_INCOMPLETE;
1365                 }
1366                 break;
1367         default:
1368                 break;
1369         }
1370         spin_unlock(&ltds->ltd_lock);
1371
1372         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
1373                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
1374
1375                 memset(stop, 0, sizeof(*stop));
1376                 stop->ls_status = lr->lr_status;
1377                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1378                 lfsck_stop(env, lfsck->li_bottom, stop);
1379         } else if (lfsck_phase2_next_ready(lad)) {
1380                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
1381         }
1382
1383         RETURN(0);
1384 }
1385
1386 static int lfsck_namespace_query(const struct lu_env *env,
1387                                  struct lfsck_component *com)
1388 {
1389         struct lfsck_namespace *ns = com->lc_file_ram;
1390
1391         return ns->ln_status;
1392 }
1393
1394 static struct lfsck_operations lfsck_namespace_ops = {
1395         .lfsck_reset            = lfsck_namespace_reset,
1396         .lfsck_fail             = lfsck_namespace_fail,
1397         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1398         .lfsck_prep             = lfsck_namespace_prep,
1399         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1400         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1401         .lfsck_post             = lfsck_namespace_post,
1402         .lfsck_dump             = lfsck_namespace_dump,
1403         .lfsck_double_scan      = lfsck_namespace_double_scan,
1404         .lfsck_data_release     = lfsck_namespace_data_release,
1405         .lfsck_quit             = lfsck_quit_generic,
1406         .lfsck_in_notify        = lfsck_namespace_in_notify,
1407         .lfsck_query            = lfsck_namespace_query,
1408 };
1409
1410 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
1411                                                 struct lfsck_component *com,
1412                                                 struct lfsck_assistant_req *lar)
1413 {
1414         struct lfsck_thread_info   *info     = lfsck_env_info(env);
1415         struct lu_attr             *la       = &info->lti_la;
1416         struct lfsck_instance      *lfsck    = com->lc_lfsck;
1417         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
1418         struct lfsck_namespace     *ns       = com->lc_file_ram;
1419         struct linkea_data          ldata    = { 0 };
1420         const struct lu_name       *cname;
1421         struct thandle             *handle   = NULL;
1422         struct lfsck_namespace_req *lnr      =
1423                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
1424         struct dt_object           *dir      = lnr->lnr_obj;
1425         struct dt_object           *obj      = NULL;
1426         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
1427         bool                        repaired = false;
1428         bool                        locked   = false;
1429         bool                        remove;
1430         bool                        newdata;
1431         bool                        log      = false;
1432         int                         count    = 0;
1433         int                         rc;
1434         ENTRY;
1435
1436         if (lnr->lnr_attr & LUDA_UPGRADE) {
1437                 ns->ln_flags |= LF_UPGRADE;
1438                 ns->ln_dirent_repaired++;
1439                 repaired = true;
1440         } else if (lnr->lnr_attr & LUDA_REPAIR) {
1441                 ns->ln_flags |= LF_INCONSISTENT;
1442                 ns->ln_dirent_repaired++;
1443                 repaired = true;
1444         }
1445
1446         if (lnr->lnr_name[0] == '.' &&
1447             (lnr->lnr_namelen == 1 ||
1448              (lnr->lnr_namelen == 2 && lnr->lnr_name[1] == '.') ||
1449              fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
1450                 GOTO(out, rc = 0);
1451
1452         obj = lfsck_object_find(env, lfsck, &lnr->lnr_fid);
1453         if (IS_ERR(obj))
1454                 GOTO(out, rc = PTR_ERR(obj));
1455
1456         if (dt_object_exists(obj) == 0) {
1457                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
1458                 if (rc != 0)
1459                         GOTO(out, rc);
1460
1461                 /* XXX: dangling name entry, will handle it in other patch. */
1462                 GOTO(out, rc);
1463         }
1464
1465         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
1466         if (!(bk->lb_param & LPF_DRYRUN) &&
1467             (com->lc_journal || repaired)) {
1468
1469 again:
1470                 LASSERT(!locked);
1471
1472                 com->lc_journal = 1;
1473                 handle = dt_trans_create(env, lfsck->li_next);
1474                 if (IS_ERR(handle))
1475                         GOTO(out, rc = PTR_ERR(handle));
1476
1477                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
1478                 if (rc != 0)
1479                         GOTO(stop, rc);
1480
1481                 rc = dt_trans_start(env, lfsck->li_next, handle);
1482                 if (rc != 0)
1483                         GOTO(stop, rc);
1484
1485                 dt_write_lock(env, obj, MOR_TGT_CHILD);
1486                 locked = true;
1487         }
1488
1489         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
1490         if (rc != 0)
1491                 GOTO(stop, rc);
1492
1493         rc = lfsck_links_read(env, obj, &ldata);
1494         if (rc == 0) {
1495                 count = ldata.ld_leh->leh_reccount;
1496                 rc = linkea_links_find(&ldata, cname, pfid);
1497                 if ((rc == 0) &&
1498                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
1499                         goto record;
1500
1501                 ns->ln_flags |= LF_INCONSISTENT;
1502                 /* For dir, if there are more than one linkea entries, or the
1503                  * linkea entry does not match the name entry, then remove all
1504                  * and add the correct one. */
1505                 if (S_ISDIR(lfsck_object_type(obj))) {
1506                         remove = true;
1507                         newdata = true;
1508                 } else {
1509                         remove = false;
1510                         newdata = false;
1511                 }
1512                 goto nodata;
1513         } else if (unlikely(rc == -EINVAL)) {
1514                 count = 1;
1515                 ns->ln_flags |= LF_INCONSISTENT;
1516                 /* The magic crashed, we are not sure whether there are more
1517                  * corrupt data in the linkea, so remove all linkea entries. */
1518                 remove = true;
1519                 newdata = true;
1520                 goto nodata;
1521         } else if (rc == -ENODATA) {
1522                 count = 1;
1523                 ns->ln_flags |= LF_UPGRADE;
1524                 remove = false;
1525                 newdata = true;
1526
1527 nodata:
1528                 if (bk->lb_param & LPF_DRYRUN) {
1529                         ns->ln_linkea_repaired++;
1530                         repaired = true;
1531                         log = true;
1532                         goto record;
1533                 }
1534
1535                 if (!com->lc_journal)
1536                         goto again;
1537
1538                 if (remove) {
1539                         LASSERT(newdata);
1540
1541                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
1542                                           BYPASS_CAPA);
1543                         if (rc != 0)
1544                                 GOTO(stop, rc);
1545                 }
1546
1547                 if (newdata) {
1548                         rc = linkea_data_new(&ldata,
1549                                         &lfsck_env_info(env)->lti_linkea_buf);
1550                         if (rc != 0)
1551                                 GOTO(stop, rc);
1552                 }
1553
1554                 rc = linkea_add_buf(&ldata, cname, pfid);
1555                 if (rc != 0)
1556                         GOTO(stop, rc);
1557
1558                 rc = lfsck_links_write(env, obj, &ldata, handle);
1559                 if (rc != 0)
1560                         GOTO(stop, rc);
1561
1562                 count = ldata.ld_leh->leh_reccount;
1563                 ns->ln_linkea_repaired++;
1564                 repaired = true;
1565                 log = true;
1566         } else {
1567                 GOTO(stop, rc);
1568         }
1569
1570 record:
1571         LASSERT(count > 0);
1572
1573         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1574         if (rc != 0)
1575                 GOTO(stop, rc);
1576
1577         if ((count == 1) &&
1578             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
1579                 /* Usually, it is for single linked object or dir, do nothing.*/
1580                 GOTO(stop, rc);
1581
1582         /* Following modification will be in another transaction.  */
1583         if (handle != NULL) {
1584                 LASSERT(dt_write_locked(env, obj));
1585
1586                 dt_write_unlock(env, obj);
1587                 locked = false;
1588
1589                 dt_trans_stop(env, lfsck->li_next, handle);
1590                 handle = NULL;
1591         }
1592
1593         ns->ln_mlinked_checked++;
1594         rc = lfsck_namespace_update(env, com, &lnr->lnr_fid,
1595                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1596
1597         GOTO(out, rc);
1598
1599 stop:
1600         if (locked)
1601                 dt_write_unlock(env, obj);
1602
1603         if (handle != NULL)
1604                 dt_trans_stop(env, lfsck->li_next, handle);
1605
1606 out:
1607         down_write(&com->lc_sem);
1608         if (rc < 0) {
1609                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
1610                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
1611                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
1612                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
1613                        lnr->lnr_namelen, lnr->lnr_name, rc);
1614
1615                 lfsck_namespace_record_failure(env, lfsck, ns);
1616                 if (!(bk->lb_param & LPF_FAILOUT))
1617                         rc = 0;
1618         } else {
1619                 if (log)
1620                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
1621                                "repaired the entry: "DFID", parent "DFID
1622                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
1623                                PFID(&lnr->lnr_fid),
1624                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
1625                                lnr->lnr_namelen, lnr->lnr_name);
1626
1627                 if (repaired) {
1628                         ns->ln_items_repaired++;
1629                         if (bk->lb_param & LPF_DRYRUN &&
1630                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1631                                 lfsck_pos_fill(env, lfsck,
1632                                                &ns->ln_pos_first_inconsistent,
1633                                                false);
1634                 } else {
1635                         com->lc_journal = 0;
1636                 }
1637                 rc = 0;
1638         }
1639         up_write(&com->lc_sem);
1640
1641         if (obj != NULL && !IS_ERR(obj))
1642                 lfsck_object_put(env, obj);
1643         return rc;
1644 }
1645
1646 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
1647                                                 struct lfsck_component *com)
1648 {
1649         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1650         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1651         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1652         struct lfsck_namespace  *ns     = com->lc_file_ram;
1653         struct dt_object        *obj    = com->lc_obj;
1654         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1655         struct dt_object        *target;
1656         struct dt_it            *di;
1657         struct dt_key           *key;
1658         struct lu_fid            fid;
1659         int                      rc;
1660         __u8                     flags = 0;
1661         ENTRY;
1662
1663         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
1664                lfsck_lfsck2name(lfsck));
1665
1666         com->lc_new_checked = 0;
1667         com->lc_new_scanned = 0;
1668         com->lc_time_last_checkpoint = cfs_time_current();
1669         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1670                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1671
1672         di = iops->init(env, obj, 0, BYPASS_CAPA);
1673         if (IS_ERR(di))
1674                 RETURN(PTR_ERR(di));
1675
1676         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1677         rc = iops->get(env, di, (const struct dt_key *)&fid);
1678         if (rc < 0)
1679                 GOTO(fini, rc);
1680
1681         /* Skip the start one, which either has been processed or non-exist. */
1682         rc = iops->next(env, di);
1683         if (rc != 0)
1684                 GOTO(put, rc);
1685
1686         do {
1687                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1688                     cfs_fail_val > 0) {
1689                         struct l_wait_info lwi;
1690
1691                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1692                                           NULL, NULL);
1693                         l_wait_event(thread->t_ctl_waitq,
1694                                      !thread_is_running(thread),
1695                                      &lwi);
1696                 }
1697
1698                 key = iops->key(env, di);
1699                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1700                 target = lfsck_object_find(env, lfsck, &fid);
1701                 down_write(&com->lc_sem);
1702                 if (IS_ERR(target)) {
1703                         rc = PTR_ERR(target);
1704                         goto checkpoint;
1705                 }
1706
1707                 /* XXX: Currently, skip remote object, the consistency for
1708                  *      remote object will be processed in LFSCK phase III. */
1709                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1710                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1711                         if (rc == 0)
1712                                 rc = lfsck_namespace_double_scan_one(env, com,
1713                                                                 target, flags);
1714                 }
1715
1716                 lfsck_object_put(env, target);
1717
1718 checkpoint:
1719                 com->lc_new_checked++;
1720                 com->lc_new_scanned++;
1721                 ns->ln_fid_latest_scanned_phase2 = fid;
1722                 if (rc > 0)
1723                         ns->ln_objs_repaired_phase2++;
1724                 else if (rc < 0)
1725                         ns->ln_objs_failed_phase2++;
1726                 up_write(&com->lc_sem);
1727
1728                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1729                         lfsck_namespace_delete(env, com, &fid);
1730                 } else if (rc < 0) {
1731                         flags |= LLF_REPAIR_FAILED;
1732                         lfsck_namespace_update(env, com, &fid, flags, true);
1733                 }
1734
1735                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1736                         GOTO(put, rc);
1737
1738                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1739                                               cfs_time_current())) &&
1740                     com->lc_new_checked != 0) {
1741                         down_write(&com->lc_sem);
1742                         ns->ln_run_time_phase2 +=
1743                                 cfs_duration_sec(cfs_time_current() +
1744                                 HALF_SEC - com->lc_time_last_checkpoint);
1745                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1746                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1747                         com->lc_new_checked = 0;
1748                         rc = lfsck_namespace_store(env, com, false);
1749                         up_write(&com->lc_sem);
1750                         if (rc != 0)
1751                                 GOTO(put, rc);
1752
1753                         com->lc_time_last_checkpoint = cfs_time_current();
1754                         com->lc_time_next_checkpoint =
1755                                 com->lc_time_last_checkpoint +
1756                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1757                 }
1758
1759                 lfsck_control_speed_by_self(com);
1760                 if (unlikely(!thread_is_running(thread)))
1761                         GOTO(put, rc = 0);
1762
1763                 rc = iops->next(env, di);
1764         } while (rc == 0);
1765
1766         GOTO(put, rc);
1767
1768 put:
1769         iops->put(env, di);
1770
1771 fini:
1772         iops->fini(env, di);
1773         return rc;
1774 }
1775
1776 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
1777                                                struct lfsck_component *com,
1778                                                struct lfsck_position *pos)
1779 {
1780         struct lfsck_assistant_data     *lad = com->lc_data;
1781         struct lfsck_namespace_req      *lnr;
1782
1783         if (list_empty(&lad->lad_req_list))
1784                 return;
1785
1786         lnr = list_entry(lad->lad_req_list.next,
1787                          struct lfsck_namespace_req,
1788                          lnr_lar.lar_list);
1789         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
1790         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
1791         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
1792 }
1793
1794 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
1795                                               struct lfsck_component *com,
1796                                               int rc)
1797 {
1798         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1799         struct lfsck_namespace  *ns     = com->lc_file_ram;
1800
1801         down_write(&com->lc_sem);
1802         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1803                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1804         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1805         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1806         com->lc_new_checked = 0;
1807
1808         if (rc > 0) {
1809                 com->lc_journal = 0;
1810                 if (ns->ln_flags & LF_INCOMPLETE)
1811                         ns->ln_status = LS_PARTIAL;
1812                 else
1813                         ns->ln_status = LS_COMPLETED;
1814                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1815                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1816                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1817                 ns->ln_success_count++;
1818         } else if (rc == 0) {
1819                 ns->ln_status = lfsck->li_status;
1820                 if (ns->ln_status == 0)
1821                         ns->ln_status = LS_STOPPED;
1822         } else {
1823                 ns->ln_status = LS_FAILED;
1824         }
1825
1826         rc = lfsck_namespace_store(env, com, false);
1827         up_write(&com->lc_sem);
1828
1829         return rc;
1830 }
1831
1832 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
1833         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
1834         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
1835         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
1836         .la_double_scan_result  = lfsck_namespace_double_scan_result,
1837         .la_req_fini            = lfsck_namespace_assistant_req_fini,
1838 };
1839
1840 /**
1841  * Verify the specified linkEA entry for the given directory object.
1842  * If the object has no such linkEA entry or it has more other linkEA
1843  * entries, then re-generate the linkEA with the given information.
1844  *
1845  * \param[in] env       pointer to the thread context
1846  * \param[in] dev       pointer to the dt_device
1847  * \param[in] obj       pointer to the dt_object to be handled
1848  * \param[in] cname     the name for the child in the parent directory
1849  * \param[in] pfid      the parent directory's FID for the linkEA
1850  *
1851  * \retval              0 for success
1852  * \retval              negative error number on failure
1853  */
1854 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
1855                         struct dt_object *obj, const struct lu_name *cname,
1856                         const struct lu_fid *pfid)
1857 {
1858         struct linkea_data       ldata  = { 0 };
1859         struct lu_buf            linkea_buf;
1860         struct thandle          *th;
1861         int                      rc;
1862         int                      fl     = LU_XATTR_CREATE;
1863         bool                     dirty  = false;
1864         ENTRY;
1865
1866         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1867
1868         rc = lfsck_links_read(env, obj, &ldata);
1869         if (rc == -ENODATA) {
1870                 dirty = true;
1871         } else if (rc == 0) {
1872                 fl = LU_XATTR_REPLACE;
1873                 if (ldata.ld_leh->leh_reccount != 1) {
1874                         dirty = true;
1875                 } else {
1876                         rc = linkea_links_find(&ldata, cname, pfid);
1877                         if (rc != 0)
1878                                 dirty = true;
1879                 }
1880         }
1881
1882         if (!dirty)
1883                 RETURN(rc);
1884
1885         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
1886         if (rc != 0)
1887                 RETURN(rc);
1888
1889         rc = linkea_add_buf(&ldata, cname, pfid);
1890         if (rc != 0)
1891                 RETURN(rc);
1892
1893         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1894                        ldata.ld_leh->leh_len);
1895         th = dt_trans_create(env, dev);
1896         if (IS_ERR(th))
1897                 RETURN(PTR_ERR(th));
1898
1899         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1900                                   XATTR_NAME_LINK, fl, th);
1901         if (rc != 0)
1902                 GOTO(stop, rc = PTR_ERR(th));
1903
1904         rc = dt_trans_start_local(env, dev, th);
1905         if (rc != 0)
1906                 GOTO(stop, rc);
1907
1908         dt_write_lock(env, obj, 0);
1909         rc = dt_xattr_set(env, obj, &linkea_buf,
1910                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
1911         dt_write_unlock(env, obj);
1912
1913         GOTO(stop, rc);
1914
1915 stop:
1916         dt_trans_stop(env, dev, th);
1917         return rc;
1918 }
1919
1920 /**
1921  * Get the name and parent directory's FID from the first linkEA entry.
1922  *
1923  * \param[in] env       pointer to the thread context
1924  * \param[in] obj       pointer to the object which get linkEA from
1925  * \param[out] name     pointer to the buffer to hold the name
1926  *                      in the first linkEA entry
1927  * \param[out] pfid     pointer to the buffer to hold the parent
1928  *                      directory's FID in the first linkEA entry
1929  *
1930  * \retval              0 for success
1931  * \retval              negative error number on failure
1932  */
1933 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
1934                           char *name, struct lu_fid *pfid)
1935 {
1936         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
1937         struct linkea_data        ldata = { 0 };
1938         int                       rc;
1939
1940         rc = lfsck_links_read(env, obj, &ldata);
1941         if (rc != 0)
1942                 return rc;
1943
1944         linkea_first_entry(&ldata);
1945         if (ldata.ld_lee == NULL)
1946                 return -ENODATA;
1947
1948         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
1949         /* To guarantee the 'name' is terminated with '0'. */
1950         memcpy(name, cname->ln_name, cname->ln_namelen);
1951         name[cname->ln_namelen] = 0;
1952
1953         return 0;
1954 }
1955
1956 /**
1957  * Remove the name entry from the parent directory.
1958  *
1959  * No need to care about the object referenced by the name entry,
1960  * either the name entry is invalid or redundant, or the referenced
1961  * object has been processed has been or will be handled by others.
1962  *
1963  * \param[in] env       pointer to the thread context
1964  * \param[in] lfsck     pointer to the lfsck instance
1965  * \param[in] parent    pointer to the lost+found object
1966  * \param[in] name      the name for the name entry to be removed
1967  * \param[in] type      the type for the name entry to be removed
1968  *
1969  * \retval              0 for success
1970  * \retval              negative error number on failure
1971  */
1972 int lfsck_remove_name_entry(const struct lu_env *env,
1973                             struct lfsck_instance *lfsck,
1974                             struct dt_object *parent,
1975                             const char *name, __u32 type)
1976 {
1977         struct dt_device        *dev    = lfsck->li_next;
1978         struct thandle          *th;
1979         struct lustre_handle     lh     = { 0 };
1980         int                      rc;
1981         ENTRY;
1982
1983         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1984                               MDS_INODELOCK_UPDATE, LCK_EX);
1985         if (rc != 0)
1986                 RETURN(rc);
1987
1988         th = dt_trans_create(env, dev);
1989         if (IS_ERR(th))
1990                 GOTO(unlock, rc = PTR_ERR(th));
1991
1992         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
1993         if (rc != 0)
1994                 GOTO(stop, rc);
1995
1996         if (S_ISDIR(type)) {
1997                 rc = dt_declare_ref_del(env, parent, th);
1998                 if (rc != 0)
1999                         GOTO(stop, rc);
2000         }
2001
2002         rc = dt_trans_start(env, dev, th);
2003         if (rc != 0)
2004                 GOTO(stop, rc);
2005
2006         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2007                        BYPASS_CAPA);
2008         if (rc != 0)
2009                 GOTO(stop, rc);
2010
2011         if (S_ISDIR(type)) {
2012                 dt_write_lock(env, parent, 0);
2013                 rc = dt_ref_del(env, parent, th);
2014                 dt_write_unlock(env, parent);
2015         }
2016
2017         GOTO(stop, rc);
2018
2019 stop:
2020         dt_trans_stop(env, dev, th);
2021
2022 unlock:
2023         lfsck_ibits_unlock(&lh, LCK_EX);
2024
2025         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
2026                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
2027                PFID(lfsck_dto2fid(parent)), name, type, rc);
2028
2029         return rc;
2030 }
2031
2032 /**
2033  * Update the object's name entry with the given FID.
2034  *
2035  * \param[in] env       pointer to the thread context
2036  * \param[in] lfsck     pointer to the lfsck instance
2037  * \param[in] parent    pointer to the parent directory that holds
2038  *                      the name entry
2039  * \param[in] name      the name for the entry to be updated
2040  * \param[in] pfid      the new PFID for the name entry
2041  * \param[in] type      the type for the name entry to be updated
2042  *
2043  * \retval              0 for success
2044  * \retval              negative error number on failure
2045  */
2046 int lfsck_update_name_entry(const struct lu_env *env,
2047                             struct lfsck_instance *lfsck,
2048                             struct dt_object *parent, const char *name,
2049                             const struct lu_fid *pfid, __u32 type)
2050 {
2051         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
2052         struct dt_device        *dev    = lfsck->li_next;
2053         struct lustre_handle     lh     = { 0 };
2054         struct thandle          *th;
2055         int                      rc;
2056         bool                     exists = true;
2057         ENTRY;
2058
2059         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2060                               MDS_INODELOCK_UPDATE, LCK_EX);
2061         if (rc != 0)
2062                 RETURN(rc);
2063
2064         th = dt_trans_create(env, dev);
2065         if (IS_ERR(th))
2066                 GOTO(unlock, rc = PTR_ERR(th));
2067
2068         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2069         if (rc != 0)
2070                 GOTO(stop, rc);
2071
2072         rec->rec_type = type;
2073         rec->rec_fid = pfid;
2074         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
2075                                (const struct dt_key *)name, th);
2076         if (rc != 0)
2077                 GOTO(stop, rc);
2078
2079         rc = dt_declare_ref_add(env, parent, th);
2080         if (rc != 0)
2081                 GOTO(stop, rc);
2082
2083         rc = dt_trans_start(env, dev, th);
2084         if (rc != 0)
2085                 GOTO(stop, rc);
2086
2087         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2088                        BYPASS_CAPA);
2089         if (rc == -ENOENT) {
2090                 exists = false;
2091                 rc = 0;
2092         }
2093
2094         if (rc != 0)
2095                 GOTO(stop, rc);
2096
2097         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
2098                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2099         if (rc == 0 && S_ISDIR(type) && !exists) {
2100                 dt_write_lock(env, parent, 0);
2101                 rc = dt_ref_add(env, parent, th);
2102                 dt_write_unlock(env, parent);
2103         }
2104
2105         GOTO(stop, rc);
2106
2107 stop:
2108         dt_trans_stop(env, dev, th);
2109
2110 unlock:
2111         lfsck_ibits_unlock(&lh, LCK_EX);
2112
2113         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
2114                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
2115                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
2116
2117         return rc;
2118 }
2119
2120 int lfsck_namespace_setup(const struct lu_env *env,
2121                           struct lfsck_instance *lfsck)
2122 {
2123         struct lfsck_component  *com;
2124         struct lfsck_namespace  *ns;
2125         struct dt_object        *root = NULL;
2126         struct dt_object        *obj;
2127         int                      rc;
2128         ENTRY;
2129
2130         LASSERT(lfsck->li_master);
2131
2132         OBD_ALLOC_PTR(com);
2133         if (com == NULL)
2134                 RETURN(-ENOMEM);
2135
2136         INIT_LIST_HEAD(&com->lc_link);
2137         INIT_LIST_HEAD(&com->lc_link_dir);
2138         init_rwsem(&com->lc_sem);
2139         atomic_set(&com->lc_ref, 1);
2140         com->lc_lfsck = lfsck;
2141         com->lc_type = LFSCK_TYPE_NAMESPACE;
2142         com->lc_ops = &lfsck_namespace_ops;
2143         com->lc_data = lfsck_assistant_data_init(
2144                         &lfsck_namespace_assistant_ops,
2145                         "lfsck_namespace");
2146         if (com->lc_data == NULL)
2147                 GOTO(out, rc = -ENOMEM);
2148
2149         com->lc_file_size = sizeof(struct lfsck_namespace);
2150         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
2151         if (com->lc_file_ram == NULL)
2152                 GOTO(out, rc = -ENOMEM);
2153
2154         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
2155         if (com->lc_file_disk == NULL)
2156                 GOTO(out, rc = -ENOMEM);
2157
2158         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2159         if (IS_ERR(root))
2160                 GOTO(out, rc = PTR_ERR(root));
2161
2162         if (unlikely(!dt_try_as_dir(env, root)))
2163                 GOTO(out, rc = -ENOTDIR);
2164
2165         obj = local_index_find_or_create(env, lfsck->li_los, root,
2166                                          lfsck_namespace_name,
2167                                          S_IFREG | S_IRUGO | S_IWUSR,
2168                                          &dt_lfsck_features);
2169         if (IS_ERR(obj))
2170                 GOTO(out, rc = PTR_ERR(obj));
2171
2172         com->lc_obj = obj;
2173         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
2174         if (rc != 0)
2175                 GOTO(out, rc);
2176
2177         rc = lfsck_namespace_load(env, com);
2178         if (rc > 0)
2179                 rc = lfsck_namespace_reset(env, com, true);
2180         else if (rc == -ENODATA)
2181                 rc = lfsck_namespace_init(env, com);
2182         if (rc != 0)
2183                 GOTO(out, rc);
2184
2185         ns = com->lc_file_ram;
2186         switch (ns->ln_status) {
2187         case LS_INIT:
2188         case LS_COMPLETED:
2189         case LS_FAILED:
2190         case LS_STOPPED:
2191                 spin_lock(&lfsck->li_lock);
2192                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2193                 spin_unlock(&lfsck->li_lock);
2194                 break;
2195         default:
2196                 CERROR("%s: unknown lfsck_namespace status %d\n",
2197                        lfsck_lfsck2name(lfsck), ns->ln_status);
2198                 /* fall through */
2199         case LS_SCANNING_PHASE1:
2200         case LS_SCANNING_PHASE2:
2201                 /* No need to store the status to disk right now.
2202                  * If the system crashed before the status stored,
2203                  * it will be loaded back when next time. */
2204                 ns->ln_status = LS_CRASHED;
2205                 /* fall through */
2206         case LS_PAUSED:
2207         case LS_CRASHED:
2208                 spin_lock(&lfsck->li_lock);
2209                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
2210                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
2211                 spin_unlock(&lfsck->li_lock);
2212                 break;
2213         }
2214
2215         GOTO(out, rc = 0);
2216
2217 out:
2218         if (root != NULL && !IS_ERR(root))
2219                 lu_object_put(env, &root->do_lu);
2220         if (rc != 0) {
2221                 lfsck_component_cleanup(env, com);
2222                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
2223                        lfsck_lfsck2name(lfsck), rc);
2224         }
2225         return rc;
2226 }