Whamcloud - gitweb
83e9223c3dc656c01aa7ca3844bca1157744e015
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 struct lfsck_namespace_req {
55         struct lfsck_assistant_req       lnr_lar;
56         struct dt_object                *lnr_obj;
57         struct lu_fid                    lnr_fid;
58         __u64                            lnr_oit_cookie;
59         __u64                            lnr_dir_cookie;
60         __u32                            lnr_attr;
61         __u32                            lnr_size;
62         __u16                            lnr_type;
63         __u16                            lnr_namelen;
64         char                             lnr_name[0];
65 };
66
67 static struct lfsck_namespace_req *
68 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
69                                    struct lu_dirent *ent, __u16 type)
70 {
71         struct lfsck_namespace_req *lnr;
72         int                         size;
73
74         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
75         OBD_ALLOC(lnr, size);
76         if (lnr == NULL)
77                 return ERR_PTR(-ENOMEM);
78
79         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
80         lu_object_get(&lfsck->li_obj_dir->do_lu);
81         lnr->lnr_obj = lfsck->li_obj_dir;
82         lnr->lnr_fid = ent->lde_fid;
83         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
84         lnr->lnr_dir_cookie = ent->lde_hash;
85         lnr->lnr_attr = ent->lde_attrs;
86         lnr->lnr_size = size;
87         lnr->lnr_type = type;
88         lnr->lnr_namelen = ent->lde_namelen;
89         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
90
91         return lnr;
92 }
93
94 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
95                                                struct lfsck_assistant_req *lar)
96 {
97         struct lfsck_namespace_req *lnr =
98                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
99
100         lu_object_put(env, &lnr->lnr_obj->do_lu);
101         OBD_FREE(lnr, lnr->lnr_size);
102 }
103
104 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
105                                       struct lfsck_namespace *src)
106 {
107         dst->ln_magic = le32_to_cpu(src->ln_magic);
108         dst->ln_status = le32_to_cpu(src->ln_status);
109         dst->ln_flags = le32_to_cpu(src->ln_flags);
110         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
111         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
112         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
113         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
114         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
115         dst->ln_time_last_checkpoint =
116                                 le64_to_cpu(src->ln_time_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
118                                  &src->ln_pos_latest_start);
119         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
120                                  &src->ln_pos_last_checkpoint);
121         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
122                                  &src->ln_pos_first_inconsistent);
123         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
124         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
125         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
126         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
127         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
128         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
129         dst->ln_objs_repaired_phase2 =
130                                 le64_to_cpu(src->ln_objs_repaired_phase2);
131         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
132         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
133         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
134         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
135                       &src->ln_fid_latest_scanned_phase2);
136         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
137         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
138 }
139
140 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
141                                       struct lfsck_namespace *src)
142 {
143         dst->ln_magic = cpu_to_le32(src->ln_magic);
144         dst->ln_status = cpu_to_le32(src->ln_status);
145         dst->ln_flags = cpu_to_le32(src->ln_flags);
146         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
147         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
148         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
149         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
150         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
151         dst->ln_time_last_checkpoint =
152                                 cpu_to_le64(src->ln_time_last_checkpoint);
153         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
154                                  &src->ln_pos_latest_start);
155         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
156                                  &src->ln_pos_last_checkpoint);
157         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
158                                  &src->ln_pos_first_inconsistent);
159         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
160         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
161         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
162         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
163         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
164         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
165         dst->ln_objs_repaired_phase2 =
166                                 cpu_to_le64(src->ln_objs_repaired_phase2);
167         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
168         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
169         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
170         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
171                       &src->ln_fid_latest_scanned_phase2);
172         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
173         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
174 }
175
176 static void lfsck_namespace_record_failure(const struct lu_env *env,
177                                            struct lfsck_instance *lfsck,
178                                            struct lfsck_namespace *ns)
179 {
180         struct lfsck_position pos;
181
182         ns->ln_items_failed++;
183         lfsck_pos_fill(env, lfsck, &pos, false);
184         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
185             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
186                 ns->ln_pos_first_inconsistent = pos;
187
188                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
189                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
190                        lfsck_lfsck2name(lfsck),
191                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
192                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
193                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
194         }
195 }
196
197 /**
198  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
199  * \retval 0: succeed.
200  * \retval -ve: failed cases.
201  */
202 static int lfsck_namespace_load(const struct lu_env *env,
203                                 struct lfsck_component *com)
204 {
205         int len = com->lc_file_size;
206         int rc;
207
208         rc = dt_xattr_get(env, com->lc_obj,
209                           lfsck_buf_get(env, com->lc_file_disk, len),
210                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
211         if (rc == len) {
212                 struct lfsck_namespace *ns = com->lc_file_ram;
213
214                 lfsck_namespace_le_to_cpu(ns,
215                                 (struct lfsck_namespace *)com->lc_file_disk);
216                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
217                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
218                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
219                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
220                         rc = 1;
221                 } else {
222                         rc = 0;
223                 }
224         } else if (rc != -ENODATA) {
225                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
226                        "expected = %d: rc = %d\n",
227                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
228                 if (rc >= 0)
229                         rc = 1;
230         }
231         return rc;
232 }
233
234 static int lfsck_namespace_store(const struct lu_env *env,
235                                  struct lfsck_component *com, bool init)
236 {
237         struct dt_object        *obj    = com->lc_obj;
238         struct lfsck_instance   *lfsck  = com->lc_lfsck;
239         struct thandle          *handle;
240         int                      len    = com->lc_file_size;
241         int                      rc;
242         ENTRY;
243
244         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
245                                   (struct lfsck_namespace *)com->lc_file_ram);
246         handle = dt_trans_create(env, lfsck->li_bottom);
247         if (IS_ERR(handle))
248                 GOTO(log, rc = PTR_ERR(handle));
249
250         rc = dt_declare_xattr_set(env, obj,
251                                   lfsck_buf_get(env, com->lc_file_disk, len),
252                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         rc = dt_xattr_set(env, obj,
261                           lfsck_buf_get(env, com->lc_file_disk, len),
262                           XATTR_NAME_LFSCK_NAMESPACE,
263                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
264                           handle, BYPASS_CAPA);
265
266         GOTO(out, rc);
267
268 out:
269         dt_trans_stop(env, lfsck->li_bottom, handle);
270
271 log:
272         if (rc != 0)
273                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
274                        lfsck_lfsck2name(lfsck), rc);
275         return rc;
276 }
277
278 static int lfsck_namespace_init(const struct lu_env *env,
279                                 struct lfsck_component *com)
280 {
281         struct lfsck_namespace *ns = com->lc_file_ram;
282         int rc;
283
284         memset(ns, 0, sizeof(*ns));
285         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
286         ns->ln_status = LS_INIT;
287         down_write(&com->lc_sem);
288         rc = lfsck_namespace_store(env, com, true);
289         up_write(&com->lc_sem);
290         return rc;
291 }
292
293 static int lfsck_namespace_lookup(const struct lu_env *env,
294                                   struct lfsck_component *com,
295                                   const struct lu_fid *fid, __u8 *flags)
296 {
297         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
298         int            rc;
299
300         fid_cpu_to_be(key, fid);
301         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
302                        (const struct dt_key *)key, BYPASS_CAPA);
303         return rc;
304 }
305
306 static int lfsck_namespace_delete(const struct lu_env *env,
307                                   struct lfsck_component *com,
308                                   const struct lu_fid *fid)
309 {
310         struct lfsck_instance   *lfsck  = com->lc_lfsck;
311         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
312         struct thandle          *handle;
313         struct dt_object        *obj    = com->lc_obj;
314         int                      rc;
315         ENTRY;
316
317         handle = dt_trans_create(env, lfsck->li_bottom);
318         if (IS_ERR(handle))
319                 RETURN(PTR_ERR(handle));
320
321         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
322         if (rc != 0)
323                 GOTO(out, rc);
324
325         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
326         if (rc != 0)
327                 GOTO(out, rc);
328
329         fid_cpu_to_be(key, fid);
330         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
331                        BYPASS_CAPA);
332
333         GOTO(out, rc);
334
335 out:
336         dt_trans_stop(env, lfsck->li_bottom, handle);
337         return rc;
338 }
339
340 static int lfsck_namespace_update(const struct lu_env *env,
341                                   struct lfsck_component *com,
342                                   const struct lu_fid *fid,
343                                   __u8 flags, bool force)
344 {
345         struct lfsck_instance   *lfsck  = com->lc_lfsck;
346         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
347         struct thandle          *handle;
348         struct dt_object        *obj    = com->lc_obj;
349         int                      rc;
350         bool                     exist  = false;
351         __u8                     tf;
352         ENTRY;
353
354         rc = lfsck_namespace_lookup(env, com, fid, &tf);
355         if (rc != 0 && rc != -ENOENT)
356                 RETURN(rc);
357
358         if (rc == 0) {
359                 if (!force || flags == tf)
360                         RETURN(0);
361
362                 exist = true;
363                 handle = dt_trans_create(env, lfsck->li_bottom);
364                 if (IS_ERR(handle))
365                         RETURN(PTR_ERR(handle));
366
367                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
368                                        handle);
369                 if (rc != 0)
370                         GOTO(out, rc);
371         } else {
372                 handle = dt_trans_create(env, lfsck->li_bottom);
373                 if (IS_ERR(handle))
374                         RETURN(PTR_ERR(handle));
375         }
376
377         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
378                                (const struct dt_key *)fid, handle);
379         if (rc != 0)
380                 GOTO(out, rc);
381
382         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
383         if (rc != 0)
384                 GOTO(out, rc);
385
386         fid_cpu_to_be(key, fid);
387         if (exist) {
388                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
389                                BYPASS_CAPA);
390                 if (rc != 0)
391                         GOTO(out, rc);
392         }
393
394         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
395                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
396
397         GOTO(out, rc);
398
399 out:
400         dt_trans_stop(env, lfsck->li_bottom, handle);
401         return rc;
402 }
403
404 static int lfsck_namespace_check_exist(const struct lu_env *env,
405                                        struct dt_object *dir,
406                                        struct dt_object *obj, const char *name)
407 {
408         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
409         int               rc;
410         ENTRY;
411
412         if (unlikely(lfsck_is_dead_obj(obj)))
413                 RETURN(LFSCK_NAMEENTRY_DEAD);
414
415         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
416                        (const struct dt_key *)name, BYPASS_CAPA);
417         if (rc == -ENOENT)
418                 RETURN(LFSCK_NAMEENTRY_REMOVED);
419
420         if (rc < 0)
421                 RETURN(rc);
422
423         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
424                 RETURN(LFSCK_NAMEENTRY_RECREATED);
425
426         RETURN(0);
427 }
428
429 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
430                                             struct dt_object *obj,
431                                             struct thandle *handle)
432 {
433         int rc;
434
435         /* For destroying all invalid linkEA entries. */
436         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
437         if (rc != 0)
438                 return rc;
439
440         /* For insert new linkEA entry. */
441         rc = dt_declare_xattr_set(env, obj,
442                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
443                         XATTR_NAME_LINK, 0, handle);
444         return rc;
445 }
446
447 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
448                             struct linkea_data *ldata)
449 {
450         int rc;
451
452         ldata->ld_buf =
453                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
454                                        PAGE_CACHE_SIZE);
455         if (ldata->ld_buf->lb_buf == NULL)
456                 return -ENOMEM;
457
458         if (!dt_object_exists(obj))
459                 return -ENODATA;
460
461         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
462         if (rc == -ERANGE) {
463                 /* Buf was too small, figure out what we need. */
464                 lu_buf_free(ldata->ld_buf);
465                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
466                                   BYPASS_CAPA);
467                 if (rc < 0)
468                         return rc;
469
470                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
471                 if (ldata->ld_buf->lb_buf == NULL)
472                         return -ENOMEM;
473
474                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
475                                   BYPASS_CAPA);
476         }
477         if (rc < 0)
478                 return rc;
479
480         linkea_init(ldata);
481
482         return 0;
483 }
484
485 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
486                              struct linkea_data *ldata, struct thandle *handle)
487 {
488         const struct lu_buf *buf = lfsck_buf_get_const(env,
489                                                        ldata->ld_buf->lb_buf,
490                                                        ldata->ld_leh->leh_len);
491
492         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
493                             BYPASS_CAPA);
494 }
495
496 /**
497  * \retval ve: removed entries
498  */
499 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
500                                      struct linkea_data *ldata,
501                                      struct lu_name *cname,
502                                      struct lu_fid *pfid)
503 {
504         struct link_ea_entry    *oldlee;
505         int                      oldlen;
506         int                      removed = 0;
507
508         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
509         oldlee = ldata->ld_lee;
510         oldlen = ldata->ld_reclen;
511         linkea_next_entry(ldata);
512         while (ldata->ld_lee != NULL) {
513                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
514                                    ldata->ld_lee->lee_reclen[1];
515                 if (unlikely(ldata->ld_reclen == oldlen &&
516                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
517                         linkea_del_buf(ldata, cname);
518                         removed++;
519                 } else {
520                         linkea_next_entry(ldata);
521                 }
522         }
523         ldata->ld_lee = oldlee;
524         ldata->ld_reclen = oldlen;
525         return removed;
526 }
527
528 /**
529  * \retval +ve  repaired
530  * \retval 0    no need to repair
531  * \retval -ve  error cases
532  */
533 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
534                                            struct lfsck_component *com,
535                                            struct dt_object *child, __u8 flags)
536 {
537         struct lfsck_thread_info *info    = lfsck_env_info(env);
538         struct lu_attr           *la      = &info->lti_la;
539         struct lu_name           *cname   = &info->lti_name;
540         struct lu_fid            *pfid    = &info->lti_fid;
541         struct lu_fid            *cfid    = &info->lti_fid2;
542         struct lfsck_instance   *lfsck    = com->lc_lfsck;
543         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
544         struct lfsck_namespace  *ns       = com->lc_file_ram;
545         struct linkea_data       ldata    = { 0 };
546         struct thandle          *handle   = NULL;
547         bool                     locked   = false;
548         bool                     update   = false;
549         int                      rc;
550         ENTRY;
551
552         if (com->lc_journal) {
553
554 again:
555                 LASSERT(!locked);
556
557                 update = false;
558                 com->lc_journal = 1;
559                 handle = dt_trans_create(env, lfsck->li_next);
560                 if (IS_ERR(handle))
561                         RETURN(rc = PTR_ERR(handle));
562
563                 rc = dt_declare_xattr_set(env, child,
564                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
565                         XATTR_NAME_LINK, 0, handle);
566                 if (rc != 0)
567                         GOTO(stop, rc);
568
569                 rc = dt_trans_start(env, lfsck->li_next, handle);
570                 if (rc != 0)
571                         GOTO(stop, rc);
572
573                 dt_write_lock(env, child, MOR_TGT_CHILD);
574                 locked = true;
575         }
576
577         if (unlikely(lfsck_is_dead_obj(child)))
578                 GOTO(stop, rc = 0);
579
580         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
581         if (rc == 0)
582                 rc = lfsck_links_read(env, child, &ldata);
583         if (rc != 0) {
584                 if ((bk->lb_param & LPF_DRYRUN) &&
585                     (rc == -EINVAL || rc == -ENODATA))
586                         rc = 1;
587
588                 GOTO(stop, rc);
589         }
590
591         linkea_first_entry(&ldata);
592         while (ldata.ld_lee != NULL) {
593                 struct dt_object *parent = NULL;
594
595                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
596                 if (rc > 0)
597                         update = true;
598
599                 if (!fid_is_sane(pfid))
600                         goto shrink;
601
602                 parent = lfsck_object_find(env, lfsck, pfid);
603                 if (IS_ERR(parent))
604                         GOTO(stop, rc = PTR_ERR(parent));
605
606                 if (!dt_object_exists(parent))
607                         goto shrink;
608
609                 /* XXX: Currently, skip remote object, the consistency for
610                  *      remote object will be processed in LFSCK phase III. */
611                 if (dt_object_remote(parent)) {
612                         lfsck_object_put(env, parent);
613                         linkea_next_entry(&ldata);
614                         continue;
615                 }
616
617                 if (unlikely(!dt_try_as_dir(env, parent)))
618                         goto shrink;
619
620                 /* To guarantee the 'name' is terminated with '0'. */
621                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
622                 info->lti_key[cname->ln_namelen] = 0;
623                 cname->ln_name = info->lti_key;
624                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
625                                (const struct dt_key *)cname->ln_name,
626                                BYPASS_CAPA);
627                 if (rc != 0 && rc != -ENOENT) {
628                         lfsck_object_put(env, parent);
629                         GOTO(stop, rc);
630                 }
631
632                 if (rc == 0) {
633                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
634                                 lfsck_object_put(env, parent);
635                                 linkea_next_entry(&ldata);
636                                 continue;
637                         }
638
639                         goto shrink;
640                 }
641
642                 /* If there is no name entry in the parent dir and the object
643                  * link count is less than the linkea entries count, then the
644                  * linkea entry should be removed. */
645                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
646                         goto shrink;
647
648                 /* XXX: For the case of there is a linkea entry, but without
649                  *      name entry pointing to the object and its hard links
650                  *      count is not less than the object name entries count,
651                  *      then seems we should add the 'missed' name entry back
652                  *      to namespace, but before LFSCK phase III finished, we
653                  *      do not know whether the object has some inconsistency
654                  *      on other MDTs. So now, do NOT add the name entry back
655                  *      to the namespace, but keep the linkEA entry. LU-2914 */
656                 lfsck_object_put(env, parent);
657                 linkea_next_entry(&ldata);
658                 continue;
659
660 shrink:
661                 if (parent != NULL)
662                         lfsck_object_put(env, parent);
663                 if (bk->lb_param & LPF_DRYRUN)
664                         RETURN(1);
665
666                 CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
667                       "for the object: "DFID", parent "DFID", name %.*s\n",
668                       lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(child)),
669                       PFID(pfid), cname->ln_namelen, cname->ln_name);
670
671                 linkea_del_buf(&ldata, cname);
672                 update = true;
673         }
674
675         if (update) {
676                 if (!com->lc_journal) {
677                         com->lc_journal = 1;
678                         goto again;
679                 }
680
681                 rc = lfsck_links_write(env, child, &ldata, handle);
682         }
683
684         GOTO(stop, rc);
685
686 stop:
687         if (locked) {
688         /* XXX: For the case linkea entries count does not match the object hard
689          *      links count, we cannot update the later one simply. Before LFSCK
690          *      phase III finished, we cannot know whether there are some remote
691          *      name entries to be repaired or not. LU-2914 */
692                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
693                     ldata.ld_leh != NULL &&
694                     ldata.ld_leh->leh_reccount != la->la_nlink)
695                         CDEBUG(D_LFSCK, "%s: the object "DFID" linkEA entry "
696                                "count %u may not match its hardlink count %u\n",
697                                lfsck_lfsck2name(lfsck), PFID(cfid),
698                                ldata.ld_leh->leh_reccount, la->la_nlink);
699
700                 dt_write_unlock(env, child);
701         }
702
703         if (handle != NULL)
704                 dt_trans_stop(env, lfsck->li_next, handle);
705
706         if (rc == 0 && update) {
707                 ns->ln_objs_nlink_repaired++;
708                 rc = 1;
709         }
710
711         return rc;
712 }
713
714 /* namespace APIs */
715
716 static int lfsck_namespace_reset(const struct lu_env *env,
717                                  struct lfsck_component *com, bool init)
718 {
719         struct lfsck_instance   *lfsck = com->lc_lfsck;
720         struct lfsck_namespace  *ns    = com->lc_file_ram;
721         struct dt_object        *root;
722         struct dt_object        *dto;
723         int                      rc;
724         ENTRY;
725
726         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
727         if (IS_ERR(root))
728                 GOTO(log, rc = PTR_ERR(root));
729
730         if (unlikely(!dt_try_as_dir(env, root)))
731                 GOTO(put, rc = -ENOTDIR);
732
733         down_write(&com->lc_sem);
734         if (init) {
735                 memset(ns, 0, sizeof(*ns));
736         } else {
737                 __u32 count = ns->ln_success_count;
738                 __u64 last_time = ns->ln_time_last_complete;
739
740                 memset(ns, 0, sizeof(*ns));
741                 ns->ln_success_count = count;
742                 ns->ln_time_last_complete = last_time;
743         }
744         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
745         ns->ln_status = LS_INIT;
746
747         rc = local_object_unlink(env, lfsck->li_bottom, root,
748                                  lfsck_namespace_name);
749         if (rc != 0)
750                 GOTO(out, rc);
751
752         lfsck_object_put(env, com->lc_obj);
753         com->lc_obj = NULL;
754         dto = local_index_find_or_create(env, lfsck->li_los, root,
755                                          lfsck_namespace_name,
756                                          S_IFREG | S_IRUGO | S_IWUSR,
757                                          &dt_lfsck_features);
758         if (IS_ERR(dto))
759                 GOTO(out, rc = PTR_ERR(dto));
760
761         com->lc_obj = dto;
762         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
763         if (rc != 0)
764                 GOTO(out, rc);
765
766         rc = lfsck_namespace_store(env, com, true);
767
768         GOTO(out, rc);
769
770 out:
771         up_write(&com->lc_sem);
772
773 put:
774         lu_object_put(env, &root->do_lu);
775 log:
776         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
777                lfsck_lfsck2name(lfsck), rc);
778         return rc;
779 }
780
781 static void
782 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
783                      bool new_checked)
784 {
785         struct lfsck_namespace *ns = com->lc_file_ram;
786
787         down_write(&com->lc_sem);
788         if (new_checked)
789                 com->lc_new_checked++;
790         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
791         up_write(&com->lc_sem);
792 }
793
794 static int lfsck_namespace_checkpoint(const struct lu_env *env,
795                                       struct lfsck_component *com, bool init)
796 {
797         struct lfsck_instance   *lfsck = com->lc_lfsck;
798         struct lfsck_namespace  *ns    = com->lc_file_ram;
799         int                      rc;
800
801         if (!init) {
802                 rc = lfsck_checkpoint_generic(env, com);
803                 if (rc != 0)
804                         goto log;
805         }
806
807         down_write(&com->lc_sem);
808         if (init) {
809                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
810         } else {
811                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
812                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
813                                 HALF_SEC - lfsck->li_time_last_checkpoint);
814                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
815                 ns->ln_items_checked += com->lc_new_checked;
816                 com->lc_new_checked = 0;
817         }
818
819         rc = lfsck_namespace_store(env, com, false);
820         up_write(&com->lc_sem);
821
822 log:
823         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
824                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
825                lfsck->li_pos_current.lp_oit_cookie,
826                PFID(&lfsck->li_pos_current.lp_dir_parent),
827                lfsck->li_pos_current.lp_dir_cookie, rc);
828
829         return rc > 0 ? 0 : rc;
830 }
831
832 static int lfsck_namespace_prep(const struct lu_env *env,
833                                 struct lfsck_component *com,
834                                 struct lfsck_start_param *lsp)
835 {
836         struct lfsck_instance   *lfsck  = com->lc_lfsck;
837         struct lfsck_namespace  *ns     = com->lc_file_ram;
838         struct lfsck_position   *pos    = &com->lc_pos_start;
839         int                      rc;
840
841         if (ns->ln_status == LS_COMPLETED) {
842                 rc = lfsck_namespace_reset(env, com, false);
843                 if (rc == 0)
844                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
845
846                 if (rc != 0) {
847                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
848                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
849
850                         return rc;
851                 }
852         }
853
854         down_write(&com->lc_sem);
855         ns->ln_time_latest_start = cfs_time_current_sec();
856         spin_lock(&lfsck->li_lock);
857
858         if (ns->ln_flags & LF_SCANNED_ONCE) {
859                 if (!lfsck->li_drop_dryrun ||
860                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
861                         ns->ln_status = LS_SCANNING_PHASE2;
862                         list_move_tail(&com->lc_link,
863                                        &lfsck->li_list_double_scan);
864                         if (!list_empty(&com->lc_link_dir))
865                                 list_del_init(&com->lc_link_dir);
866                         lfsck_pos_set_zero(pos);
867                 } else {
868                         ns->ln_status = LS_SCANNING_PHASE1;
869                         ns->ln_run_time_phase1 = 0;
870                         ns->ln_run_time_phase2 = 0;
871                         ns->ln_items_checked = 0;
872                         ns->ln_items_repaired = 0;
873                         ns->ln_items_failed = 0;
874                         ns->ln_dirs_checked = 0;
875                         ns->ln_mlinked_checked = 0;
876                         ns->ln_objs_checked_phase2 = 0;
877                         ns->ln_objs_repaired_phase2 = 0;
878                         ns->ln_objs_failed_phase2 = 0;
879                         ns->ln_objs_nlink_repaired = 0;
880                         ns->ln_objs_lost_found = 0;
881                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
882                         if (list_empty(&com->lc_link_dir))
883                                 list_add_tail(&com->lc_link_dir,
884                                               &lfsck->li_list_dir);
885                         *pos = ns->ln_pos_first_inconsistent;
886                 }
887         } else {
888                 ns->ln_status = LS_SCANNING_PHASE1;
889                 if (list_empty(&com->lc_link_dir))
890                         list_add_tail(&com->lc_link_dir,
891                                       &lfsck->li_list_dir);
892                 if (!lfsck->li_drop_dryrun ||
893                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
894                         *pos = ns->ln_pos_last_checkpoint;
895                         pos->lp_oit_cookie++;
896                 } else {
897                         *pos = ns->ln_pos_first_inconsistent;
898                 }
899         }
900
901         spin_unlock(&lfsck->li_lock);
902         up_write(&com->lc_sem);
903
904         rc = lfsck_start_assistant(env, com, lsp);
905
906         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
907                DFID", "LPX64"]: rc = %d\n",
908                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
909                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
910
911         return rc;
912 }
913
914 static int lfsck_namespace_exec_oit(const struct lu_env *env,
915                                     struct lfsck_component *com,
916                                     struct dt_object *obj)
917 {
918         down_write(&com->lc_sem);
919         com->lc_new_checked++;
920         if (S_ISDIR(lfsck_object_type(obj)))
921                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
922         up_write(&com->lc_sem);
923         return 0;
924 }
925
926 static int lfsck_namespace_exec_dir(const struct lu_env *env,
927                                     struct lfsck_component *com,
928                                     struct lu_dirent *ent, __u16 type)
929 {
930         struct lfsck_assistant_data     *lad    = com->lc_data;
931         struct lfsck_namespace_req      *lnr;
932         bool                             wakeup = false;
933
934         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
935         if (IS_ERR(lnr)) {
936                 struct lfsck_namespace *ns = com->lc_file_ram;
937
938                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
939                 return PTR_ERR(lnr);
940         }
941
942         spin_lock(&lad->lad_lock);
943         if (lad->lad_assistant_status < 0) {
944                 spin_unlock(&lad->lad_lock);
945                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
946                 return lad->lad_assistant_status;
947         }
948
949         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
950         if (lad->lad_prefetched == 0)
951                 wakeup = true;
952
953         lad->lad_prefetched++;
954         spin_unlock(&lad->lad_lock);
955         if (wakeup)
956                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
957
958         down_write(&com->lc_sem);
959         com->lc_new_checked++;
960         up_write(&com->lc_sem);
961
962         return 0;
963 }
964
965 static int lfsck_namespace_post(const struct lu_env *env,
966                                 struct lfsck_component *com,
967                                 int result, bool init)
968 {
969         struct lfsck_instance   *lfsck = com->lc_lfsck;
970         struct lfsck_namespace  *ns    = com->lc_file_ram;
971         int                      rc;
972         ENTRY;
973
974         lfsck_post_generic(env, com, &result);
975
976         down_write(&com->lc_sem);
977         spin_lock(&lfsck->li_lock);
978         if (!init)
979                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
980         if (result > 0) {
981                 ns->ln_status = LS_SCANNING_PHASE2;
982                 ns->ln_flags |= LF_SCANNED_ONCE;
983                 ns->ln_flags &= ~LF_UPGRADE;
984                 list_del_init(&com->lc_link_dir);
985                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
986         } else if (result == 0) {
987                 ns->ln_status = lfsck->li_status;
988                 if (ns->ln_status == 0)
989                         ns->ln_status = LS_STOPPED;
990                 if (ns->ln_status != LS_PAUSED) {
991                         list_del_init(&com->lc_link_dir);
992                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
993                 }
994         } else {
995                 ns->ln_status = LS_FAILED;
996                 list_del_init(&com->lc_link_dir);
997                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
998         }
999         spin_unlock(&lfsck->li_lock);
1000
1001         if (!init) {
1002                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1003                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1004                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1005                 ns->ln_items_checked += com->lc_new_checked;
1006                 com->lc_new_checked = 0;
1007         }
1008
1009         rc = lfsck_namespace_store(env, com, false);
1010         up_write(&com->lc_sem);
1011
1012         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
1013                lfsck_lfsck2name(lfsck), rc);
1014
1015         RETURN(rc);
1016 }
1017
1018 static int
1019 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1020                      struct seq_file *m)
1021 {
1022         struct lfsck_instance   *lfsck = com->lc_lfsck;
1023         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1024         struct lfsck_namespace  *ns    = com->lc_file_ram;
1025         int                      rc;
1026
1027         down_read(&com->lc_sem);
1028         seq_printf(m, "name: lfsck_namespace\n"
1029                    "magic: %#x\n"
1030                    "version: %d\n"
1031                    "status: %s\n",
1032                    ns->ln_magic,
1033                    bk->lb_version,
1034                    lfsck_status2names(ns->ln_status));
1035
1036         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
1037         if (rc < 0)
1038                 goto out;
1039
1040         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
1041         if (rc < 0)
1042                 goto out;
1043
1044         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
1045                              "time_since_last_completed");
1046         if (rc < 0)
1047                 goto out;
1048
1049         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
1050                              "time_since_latest_start");
1051         if (rc < 0)
1052                 goto out;
1053
1054         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
1055                              "time_since_last_checkpoint");
1056         if (rc < 0)
1057                 goto out;
1058
1059         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
1060                             "latest_start_position");
1061         if (rc < 0)
1062                 goto out;
1063
1064         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
1065                             "last_checkpoint_position");
1066         if (rc < 0)
1067                 goto out;
1068
1069         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
1070                             "first_failure_position");
1071         if (rc < 0)
1072                 goto out;
1073
1074         if (ns->ln_status == LS_SCANNING_PHASE1) {
1075                 struct lfsck_position pos;
1076                 const struct dt_it_ops *iops;
1077                 cfs_duration_t duration = cfs_time_current() -
1078                                           lfsck->li_time_last_checkpoint;
1079                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1080                 __u64 speed = checked;
1081                 __u64 new_checked = com->lc_new_checked * HZ;
1082                 __u32 rtime = ns->ln_run_time_phase1 +
1083                               cfs_duration_sec(duration + HALF_SEC);
1084
1085                 if (duration != 0)
1086                         do_div(new_checked, duration);
1087                 if (rtime != 0)
1088                         do_div(speed, rtime);
1089                 seq_printf(m, "checked_phase1: "LPU64"\n"
1090                               "checked_phase2: "LPU64"\n"
1091                               "updated_phase1: "LPU64"\n"
1092                               "updated_phase2: "LPU64"\n"
1093                               "failed_phase1: "LPU64"\n"
1094                               "failed_phase2: "LPU64"\n"
1095                               "directories: "LPU64"\n"
1096                               "multi_linked_files: "LPU64"\n"
1097                               "dirent_repaired: "LPU64"\n"
1098                               "linkea_repaired: "LPU64"\n"
1099                               "nlinks_repaired: "LPU64"\n"
1100                               "lost_found: "LPU64"\n"
1101                               "success_count: %u\n"
1102                               "run_time_phase1: %u seconds\n"
1103                               "run_time_phase2: %u seconds\n"
1104                               "average_speed_phase1: "LPU64" items/sec\n"
1105                               "average_speed_phase2: N/A\n"
1106                               "real_time_speed_phase1: "LPU64" items/sec\n"
1107                               "real_time_speed_phase2: N/A\n",
1108                               checked,
1109                               ns->ln_objs_checked_phase2,
1110                               ns->ln_items_repaired,
1111                               ns->ln_objs_repaired_phase2,
1112                               ns->ln_items_failed,
1113                               ns->ln_objs_failed_phase2,
1114                               ns->ln_dirs_checked,
1115                               ns->ln_mlinked_checked,
1116                               ns->ln_dirent_repaired,
1117                               ns->ln_linkea_repaired,
1118                               ns->ln_objs_nlink_repaired,
1119                               ns->ln_objs_lost_found,
1120                               ns->ln_success_count,
1121                               rtime,
1122                               ns->ln_run_time_phase2,
1123                               speed,
1124                               new_checked);
1125
1126                 LASSERT(lfsck->li_di_oit != NULL);
1127
1128                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1129
1130                 /* The low layer otable-based iteration position may NOT
1131                  * exactly match the namespace-based directory traversal
1132                  * cookie. Generally, it is not a serious issue. But the
1133                  * caller should NOT make assumption on that. */
1134                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1135                 if (!lfsck->li_current_oit_processed)
1136                         pos.lp_oit_cookie--;
1137
1138                 spin_lock(&lfsck->li_lock);
1139                 if (lfsck->li_di_dir != NULL) {
1140                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1141                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1142                                 fid_zero(&pos.lp_dir_parent);
1143                                 pos.lp_dir_cookie = 0;
1144                         } else {
1145                                 pos.lp_dir_parent =
1146                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1147                         }
1148                 } else {
1149                         fid_zero(&pos.lp_dir_parent);
1150                         pos.lp_dir_cookie = 0;
1151                 }
1152                 spin_unlock(&lfsck->li_lock);
1153                 lfsck_pos_dump(m, &pos, "current_position");
1154         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1155                 cfs_duration_t duration = cfs_time_current() -
1156                                           lfsck->li_time_last_checkpoint;
1157                 __u64 checked = ns->ln_objs_checked_phase2 +
1158                                 com->lc_new_checked;
1159                 __u64 speed1 = ns->ln_items_checked;
1160                 __u64 speed2 = checked;
1161                 __u64 new_checked = com->lc_new_checked * HZ;
1162                 __u32 rtime = ns->ln_run_time_phase2 +
1163                               cfs_duration_sec(duration + HALF_SEC);
1164
1165                 if (duration != 0)
1166                         do_div(new_checked, duration);
1167                 if (ns->ln_run_time_phase1 != 0)
1168                         do_div(speed1, ns->ln_run_time_phase1);
1169                 if (rtime != 0)
1170                         do_div(speed2, rtime);
1171                 seq_printf(m, "checked_phase1: "LPU64"\n"
1172                               "checked_phase2: "LPU64"\n"
1173                               "updated_phase1: "LPU64"\n"
1174                               "updated_phase2: "LPU64"\n"
1175                               "failed_phase1: "LPU64"\n"
1176                               "failed_phase2: "LPU64"\n"
1177                               "directories: "LPU64"\n"
1178                               "multi_linked_files: "LPU64"\n"
1179                               "dirent_repaired: "LPU64"\n"
1180                               "linkea_repaired: "LPU64"\n"
1181                               "nlinks_repaired: "LPU64"\n"
1182                               "lost_found: "LPU64"\n"
1183                               "success_count: %u\n"
1184                               "run_time_phase1: %u seconds\n"
1185                               "run_time_phase2: %u seconds\n"
1186                               "average_speed_phase1: "LPU64" items/sec\n"
1187                               "average_speed_phase2: "LPU64" objs/sec\n"
1188                               "real_time_speed_phase1: N/A\n"
1189                               "real_time_speed_phase2: "LPU64" objs/sec\n"
1190                               "current_position: "DFID"\n",
1191                               ns->ln_items_checked,
1192                               checked,
1193                               ns->ln_items_repaired,
1194                               ns->ln_objs_repaired_phase2,
1195                               ns->ln_items_failed,
1196                               ns->ln_objs_failed_phase2,
1197                               ns->ln_dirs_checked,
1198                               ns->ln_mlinked_checked,
1199                               ns->ln_dirent_repaired,
1200                               ns->ln_linkea_repaired,
1201                               ns->ln_objs_nlink_repaired,
1202                               ns->ln_objs_lost_found,
1203                               ns->ln_success_count,
1204                               ns->ln_run_time_phase1,
1205                               rtime,
1206                               speed1,
1207                               speed2,
1208                               new_checked,
1209                               PFID(&ns->ln_fid_latest_scanned_phase2));
1210         } else {
1211                 __u64 speed1 = ns->ln_items_checked;
1212                 __u64 speed2 = ns->ln_objs_checked_phase2;
1213
1214                 if (ns->ln_run_time_phase1 != 0)
1215                         do_div(speed1, ns->ln_run_time_phase1);
1216                 if (ns->ln_run_time_phase2 != 0)
1217                         do_div(speed2, ns->ln_run_time_phase2);
1218                 seq_printf(m, "checked_phase1: "LPU64"\n"
1219                               "checked_phase2: "LPU64"\n"
1220                               "updated_phase1: "LPU64"\n"
1221                               "updated_phase2: "LPU64"\n"
1222                               "failed_phase1: "LPU64"\n"
1223                               "failed_phase2: "LPU64"\n"
1224                               "directories: "LPU64"\n"
1225                               "multi_linked_files: "LPU64"\n"
1226                               "dirent_repaired: "LPU64"\n"
1227                               "linkea_repaired: "LPU64"\n"
1228                               "nlinks_repaired: "LPU64"\n"
1229                               "lost_found: "LPU64"\n"
1230                               "success_count: %u\n"
1231                               "run_time_phase1: %u seconds\n"
1232                               "run_time_phase2: %u seconds\n"
1233                               "average_speed_phase1: "LPU64" items/sec\n"
1234                               "average_speed_phase2: "LPU64" objs/sec\n"
1235                               "real_time_speed_phase1: N/A\n"
1236                               "real_time_speed_phase2: N/A\n"
1237                               "current_position: N/A\n",
1238                               ns->ln_items_checked,
1239                               ns->ln_objs_checked_phase2,
1240                               ns->ln_items_repaired,
1241                               ns->ln_objs_repaired_phase2,
1242                               ns->ln_items_failed,
1243                               ns->ln_objs_failed_phase2,
1244                               ns->ln_dirs_checked,
1245                               ns->ln_mlinked_checked,
1246                               ns->ln_dirent_repaired,
1247                               ns->ln_linkea_repaired,
1248                               ns->ln_objs_nlink_repaired,
1249                               ns->ln_objs_lost_found,
1250                               ns->ln_success_count,
1251                               ns->ln_run_time_phase1,
1252                               ns->ln_run_time_phase2,
1253                               speed1,
1254                               speed2);
1255         }
1256 out:
1257         up_read(&com->lc_sem);
1258         return 0;
1259 }
1260
1261 static int lfsck_namespace_double_scan(const struct lu_env *env,
1262                                        struct lfsck_component *com)
1263 {
1264         struct lfsck_namespace *ns = com->lc_file_ram;
1265
1266         return lfsck_double_scan_generic(env, com, ns->ln_status);
1267 }
1268
1269 static void lfsck_namespace_data_release(const struct lu_env *env,
1270                                          struct lfsck_component *com)
1271 {
1272         struct lfsck_assistant_data     *lad    = com->lc_data;
1273         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
1274         struct lfsck_tgt_desc           *ltd;
1275         struct lfsck_tgt_desc           *next;
1276
1277         LASSERT(lad != NULL);
1278         LASSERT(thread_is_init(&lad->lad_thread) ||
1279                 thread_is_stopped(&lad->lad_thread));
1280         LASSERT(list_empty(&lad->lad_req_list));
1281
1282         com->lc_data = NULL;
1283
1284         spin_lock(&ltds->ltd_lock);
1285         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
1286                                  ltd_namespace_phase_list) {
1287                 list_del_init(&ltd->ltd_namespace_phase_list);
1288         }
1289         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
1290                                  ltd_namespace_phase_list) {
1291                 list_del_init(&ltd->ltd_namespace_phase_list);
1292         }
1293         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
1294                                  ltd_namespace_list) {
1295                 list_del_init(&ltd->ltd_namespace_list);
1296         }
1297         spin_unlock(&ltds->ltd_lock);
1298
1299         CFS_FREE_BITMAP(lad->lad_bitmap);
1300
1301         OBD_FREE_PTR(lad);
1302 }
1303
1304 static int lfsck_namespace_in_notify(const struct lu_env *env,
1305                                      struct lfsck_component *com,
1306                                      struct lfsck_request *lr)
1307 {
1308         struct lfsck_instance           *lfsck = com->lc_lfsck;
1309         struct lfsck_namespace          *ns    = com->lc_file_ram;
1310         struct lfsck_assistant_data     *lad   = com->lc_data;
1311         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
1312         struct lfsck_tgt_desc           *ltd;
1313         bool                             fail  = false;
1314         ENTRY;
1315
1316         if (lr->lr_event != LE_PHASE1_DONE &&
1317             lr->lr_event != LE_PHASE2_DONE &&
1318             lr->lr_event != LE_PEER_EXIT)
1319                 RETURN(-EINVAL);
1320
1321         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
1322                "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
1323                lr->lr_index, lr->lr_status);
1324
1325         spin_lock(&ltds->ltd_lock);
1326         ltd = LTD_TGT(ltds, lr->lr_index);
1327         if (ltd == NULL) {
1328                 spin_unlock(&ltds->ltd_lock);
1329
1330                 RETURN(-ENXIO);
1331         }
1332
1333         list_del_init(&ltd->ltd_namespace_phase_list);
1334         switch (lr->lr_event) {
1335         case LE_PHASE1_DONE:
1336                 if (lr->lr_status <= 0) {
1337                         ltd->ltd_namespace_done = 1;
1338                         list_del_init(&ltd->ltd_namespace_list);
1339                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
1340                                "phase1 for namespace LFSCK: rc = %d.\n",
1341                                lfsck_lfsck2name(lfsck),
1342                                ltd->ltd_index, lr->lr_status);
1343                         ns->ln_flags |= LF_INCOMPLETE;
1344                         fail = true;
1345                         break;
1346                 }
1347
1348                 if (list_empty(&ltd->ltd_namespace_list))
1349                         list_add_tail(&ltd->ltd_namespace_list,
1350                                       &lad->lad_mdt_list);
1351                 list_add_tail(&ltd->ltd_namespace_phase_list,
1352                               &lad->lad_mdt_phase2_list);
1353                 break;
1354         case LE_PHASE2_DONE:
1355                 ltd->ltd_namespace_done = 1;
1356                 list_del_init(&ltd->ltd_namespace_list);
1357                 break;
1358         case LE_PEER_EXIT:
1359                 fail = true;
1360                 ltd->ltd_namespace_done = 1;
1361                 list_del_init(&ltd->ltd_namespace_list);
1362                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
1363                         CDEBUG(D_LFSCK,
1364                                "%s: the peer MDT %x exit namespace LFSCK\n",
1365                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
1366                         ns->ln_flags |= LF_INCOMPLETE;
1367                 }
1368                 break;
1369         default:
1370                 break;
1371         }
1372         spin_unlock(&ltds->ltd_lock);
1373
1374         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
1375                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
1376
1377                 memset(stop, 0, sizeof(*stop));
1378                 stop->ls_status = lr->lr_status;
1379                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1380                 lfsck_stop(env, lfsck->li_bottom, stop);
1381         } else if (lfsck_phase2_next_ready(lad)) {
1382                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
1383         }
1384
1385         RETURN(0);
1386 }
1387
1388 static int lfsck_namespace_query(const struct lu_env *env,
1389                                  struct lfsck_component *com)
1390 {
1391         struct lfsck_namespace *ns = com->lc_file_ram;
1392
1393         return ns->ln_status;
1394 }
1395
1396 static struct lfsck_operations lfsck_namespace_ops = {
1397         .lfsck_reset            = lfsck_namespace_reset,
1398         .lfsck_fail             = lfsck_namespace_fail,
1399         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1400         .lfsck_prep             = lfsck_namespace_prep,
1401         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1402         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1403         .lfsck_post             = lfsck_namespace_post,
1404         .lfsck_dump             = lfsck_namespace_dump,
1405         .lfsck_double_scan      = lfsck_namespace_double_scan,
1406         .lfsck_data_release     = lfsck_namespace_data_release,
1407         .lfsck_quit             = lfsck_quit_generic,
1408         .lfsck_in_notify        = lfsck_namespace_in_notify,
1409         .lfsck_query            = lfsck_namespace_query,
1410 };
1411
1412 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
1413                                                 struct lfsck_component *com,
1414                                                 struct lfsck_assistant_req *lar)
1415 {
1416         struct lfsck_thread_info   *info     = lfsck_env_info(env);
1417         struct lu_attr             *la       = &info->lti_la;
1418         struct lfsck_instance      *lfsck    = com->lc_lfsck;
1419         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
1420         struct lfsck_namespace     *ns       = com->lc_file_ram;
1421         struct linkea_data          ldata    = { 0 };
1422         const struct lu_name       *cname;
1423         struct thandle             *handle   = NULL;
1424         struct lfsck_namespace_req *lnr      =
1425                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
1426         struct dt_object           *dir      = lnr->lnr_obj;
1427         struct dt_object           *obj      = NULL;
1428         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
1429         bool                        repaired = false;
1430         bool                        locked   = false;
1431         bool                        remove;
1432         bool                        newdata;
1433         bool                        log      = false;
1434         int                         count    = 0;
1435         int                         rc;
1436         ENTRY;
1437
1438         if (lnr->lnr_attr & LUDA_UPGRADE) {
1439                 ns->ln_flags |= LF_UPGRADE;
1440                 ns->ln_dirent_repaired++;
1441                 repaired = true;
1442         } else if (lnr->lnr_attr & LUDA_REPAIR) {
1443                 ns->ln_flags |= LF_INCONSISTENT;
1444                 ns->ln_dirent_repaired++;
1445                 repaired = true;
1446         }
1447
1448         if (lnr->lnr_name[0] == '.' &&
1449             (lnr->lnr_namelen == 1 ||
1450              (lnr->lnr_namelen == 2 && lnr->lnr_name[1] == '.') ||
1451              fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
1452                 GOTO(out, rc = 0);
1453
1454         obj = lfsck_object_find(env, lfsck, &lnr->lnr_fid);
1455         if (IS_ERR(obj))
1456                 GOTO(out, rc = PTR_ERR(obj));
1457
1458         if (dt_object_exists(obj) == 0) {
1459                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
1460                 if (rc != 0)
1461                         GOTO(out, rc);
1462
1463                 /* XXX: dangling name entry, will handle it in other patch. */
1464                 GOTO(out, rc);
1465         }
1466
1467         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
1468         if (!(bk->lb_param & LPF_DRYRUN) &&
1469             (com->lc_journal || repaired)) {
1470
1471 again:
1472                 LASSERT(!locked);
1473
1474                 com->lc_journal = 1;
1475                 handle = dt_trans_create(env, lfsck->li_next);
1476                 if (IS_ERR(handle))
1477                         GOTO(out, rc = PTR_ERR(handle));
1478
1479                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
1480                 if (rc != 0)
1481                         GOTO(stop, rc);
1482
1483                 rc = dt_trans_start(env, lfsck->li_next, handle);
1484                 if (rc != 0)
1485                         GOTO(stop, rc);
1486
1487                 dt_write_lock(env, obj, MOR_TGT_CHILD);
1488                 locked = true;
1489         }
1490
1491         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
1492         if (rc != 0)
1493                 GOTO(stop, rc);
1494
1495         rc = lfsck_links_read(env, obj, &ldata);
1496         if (rc == 0) {
1497                 count = ldata.ld_leh->leh_reccount;
1498                 rc = linkea_links_find(&ldata, cname, pfid);
1499                 if ((rc == 0) &&
1500                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
1501                         goto record;
1502
1503                 ns->ln_flags |= LF_INCONSISTENT;
1504                 /* For dir, if there are more than one linkea entries, or the
1505                  * linkea entry does not match the name entry, then remove all
1506                  * and add the correct one. */
1507                 if (S_ISDIR(lfsck_object_type(obj))) {
1508                         remove = true;
1509                         newdata = true;
1510                 } else {
1511                         remove = false;
1512                         newdata = false;
1513                 }
1514                 goto nodata;
1515         } else if (unlikely(rc == -EINVAL)) {
1516                 count = 1;
1517                 ns->ln_flags |= LF_INCONSISTENT;
1518                 /* The magic crashed, we are not sure whether there are more
1519                  * corrupt data in the linkea, so remove all linkea entries. */
1520                 remove = true;
1521                 newdata = true;
1522                 goto nodata;
1523         } else if (rc == -ENODATA) {
1524                 count = 1;
1525                 ns->ln_flags |= LF_UPGRADE;
1526                 remove = false;
1527                 newdata = true;
1528
1529 nodata:
1530                 if (bk->lb_param & LPF_DRYRUN) {
1531                         ns->ln_linkea_repaired++;
1532                         repaired = true;
1533                         log = true;
1534                         goto record;
1535                 }
1536
1537                 if (!com->lc_journal)
1538                         goto again;
1539
1540                 if (remove) {
1541                         LASSERT(newdata);
1542
1543                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
1544                                           BYPASS_CAPA);
1545                         if (rc != 0)
1546                                 GOTO(stop, rc);
1547                 }
1548
1549                 if (newdata) {
1550                         rc = linkea_data_new(&ldata,
1551                                         &lfsck_env_info(env)->lti_linkea_buf);
1552                         if (rc != 0)
1553                                 GOTO(stop, rc);
1554                 }
1555
1556                 rc = linkea_add_buf(&ldata, cname, pfid);
1557                 if (rc != 0)
1558                         GOTO(stop, rc);
1559
1560                 rc = lfsck_links_write(env, obj, &ldata, handle);
1561                 if (rc != 0)
1562                         GOTO(stop, rc);
1563
1564                 count = ldata.ld_leh->leh_reccount;
1565                 ns->ln_linkea_repaired++;
1566                 repaired = true;
1567                 log = true;
1568         } else {
1569                 GOTO(stop, rc);
1570         }
1571
1572 record:
1573         LASSERT(count > 0);
1574
1575         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1576         if (rc != 0)
1577                 GOTO(stop, rc);
1578
1579         if ((count == 1) &&
1580             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
1581                 /* Usually, it is for single linked object or dir, do nothing.*/
1582                 GOTO(stop, rc);
1583
1584         /* Following modification will be in another transaction.  */
1585         if (handle != NULL) {
1586                 LASSERT(dt_write_locked(env, obj));
1587
1588                 dt_write_unlock(env, obj);
1589                 locked = false;
1590
1591                 dt_trans_stop(env, lfsck->li_next, handle);
1592                 handle = NULL;
1593         }
1594
1595         ns->ln_mlinked_checked++;
1596         rc = lfsck_namespace_update(env, com, &lnr->lnr_fid,
1597                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1598
1599         GOTO(out, rc);
1600
1601 stop:
1602         if (locked)
1603                 dt_write_unlock(env, obj);
1604
1605         if (handle != NULL)
1606                 dt_trans_stop(env, lfsck->li_next, handle);
1607
1608 out:
1609         down_write(&com->lc_sem);
1610         if (rc < 0) {
1611                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
1612                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
1613                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
1614                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
1615                        lnr->lnr_namelen, lnr->lnr_name, rc);
1616
1617                 lfsck_namespace_record_failure(env, lfsck, ns);
1618                 if (!(bk->lb_param & LPF_FAILOUT))
1619                         rc = 0;
1620         } else {
1621                 if (log)
1622                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
1623                                "repaired the entry: "DFID", parent "DFID
1624                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
1625                                PFID(&lnr->lnr_fid),
1626                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
1627                                lnr->lnr_namelen, lnr->lnr_name);
1628
1629                 if (repaired) {
1630                         ns->ln_items_repaired++;
1631                         if (bk->lb_param & LPF_DRYRUN &&
1632                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1633                                 lfsck_pos_fill(env, lfsck,
1634                                                &ns->ln_pos_first_inconsistent,
1635                                                false);
1636                 } else {
1637                         com->lc_journal = 0;
1638                 }
1639                 rc = 0;
1640         }
1641         up_write(&com->lc_sem);
1642
1643         if (obj != NULL && !IS_ERR(obj))
1644                 lfsck_object_put(env, obj);
1645         return rc;
1646 }
1647
1648 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
1649                                                 struct lfsck_component *com)
1650 {
1651         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1652         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1653         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1654         struct lfsck_namespace  *ns     = com->lc_file_ram;
1655         struct dt_object        *obj    = com->lc_obj;
1656         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1657         struct dt_object        *target;
1658         struct dt_it            *di;
1659         struct dt_key           *key;
1660         struct lu_fid            fid;
1661         int                      rc;
1662         __u8                     flags = 0;
1663         ENTRY;
1664
1665         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
1666                lfsck_lfsck2name(lfsck));
1667
1668         com->lc_new_checked = 0;
1669         com->lc_new_scanned = 0;
1670         com->lc_time_last_checkpoint = cfs_time_current();
1671         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1672                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1673
1674         di = iops->init(env, obj, 0, BYPASS_CAPA);
1675         if (IS_ERR(di))
1676                 RETURN(PTR_ERR(di));
1677
1678         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1679         rc = iops->get(env, di, (const struct dt_key *)&fid);
1680         if (rc < 0)
1681                 GOTO(fini, rc);
1682
1683         /* Skip the start one, which either has been processed or non-exist. */
1684         rc = iops->next(env, di);
1685         if (rc != 0)
1686                 GOTO(put, rc);
1687
1688         do {
1689                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1690                     cfs_fail_val > 0) {
1691                         struct l_wait_info lwi;
1692
1693                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1694                                           NULL, NULL);
1695                         l_wait_event(thread->t_ctl_waitq,
1696                                      !thread_is_running(thread),
1697                                      &lwi);
1698                 }
1699
1700                 key = iops->key(env, di);
1701                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1702                 target = lfsck_object_find(env, lfsck, &fid);
1703                 down_write(&com->lc_sem);
1704                 if (IS_ERR(target)) {
1705                         rc = PTR_ERR(target);
1706                         goto checkpoint;
1707                 }
1708
1709                 /* XXX: Currently, skip remote object, the consistency for
1710                  *      remote object will be processed in LFSCK phase III. */
1711                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1712                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1713                         if (rc == 0)
1714                                 rc = lfsck_namespace_double_scan_one(env, com,
1715                                                                 target, flags);
1716                 }
1717
1718                 lfsck_object_put(env, target);
1719
1720 checkpoint:
1721                 com->lc_new_checked++;
1722                 com->lc_new_scanned++;
1723                 ns->ln_fid_latest_scanned_phase2 = fid;
1724                 if (rc > 0)
1725                         ns->ln_objs_repaired_phase2++;
1726                 else if (rc < 0)
1727                         ns->ln_objs_failed_phase2++;
1728                 up_write(&com->lc_sem);
1729
1730                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1731                         lfsck_namespace_delete(env, com, &fid);
1732                 } else if (rc < 0) {
1733                         flags |= LLF_REPAIR_FAILED;
1734                         lfsck_namespace_update(env, com, &fid, flags, true);
1735                 }
1736
1737                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1738                         GOTO(put, rc);
1739
1740                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1741                                               cfs_time_current())) &&
1742                     com->lc_new_checked != 0) {
1743                         down_write(&com->lc_sem);
1744                         ns->ln_run_time_phase2 +=
1745                                 cfs_duration_sec(cfs_time_current() +
1746                                 HALF_SEC - com->lc_time_last_checkpoint);
1747                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1748                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1749                         com->lc_new_checked = 0;
1750                         rc = lfsck_namespace_store(env, com, false);
1751                         up_write(&com->lc_sem);
1752                         if (rc != 0)
1753                                 GOTO(put, rc);
1754
1755                         com->lc_time_last_checkpoint = cfs_time_current();
1756                         com->lc_time_next_checkpoint =
1757                                 com->lc_time_last_checkpoint +
1758                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1759                 }
1760
1761                 lfsck_control_speed_by_self(com);
1762                 if (unlikely(!thread_is_running(thread)))
1763                         GOTO(put, rc = 0);
1764
1765                 rc = iops->next(env, di);
1766         } while (rc == 0);
1767
1768         GOTO(put, rc);
1769
1770 put:
1771         iops->put(env, di);
1772
1773 fini:
1774         iops->fini(env, di);
1775         return rc;
1776 }
1777
1778 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
1779                                                struct lfsck_component *com,
1780                                                struct lfsck_position *pos)
1781 {
1782         struct lfsck_assistant_data     *lad = com->lc_data;
1783         struct lfsck_namespace_req      *lnr;
1784
1785         if (list_empty(&lad->lad_req_list))
1786                 return;
1787
1788         lnr = list_entry(lad->lad_req_list.next,
1789                          struct lfsck_namespace_req,
1790                          lnr_lar.lar_list);
1791         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
1792         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
1793         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
1794 }
1795
1796 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
1797                                               struct lfsck_component *com,
1798                                               int rc)
1799 {
1800         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1801         struct lfsck_namespace  *ns     = com->lc_file_ram;
1802
1803         down_write(&com->lc_sem);
1804         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1805                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1806         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1807         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1808         com->lc_new_checked = 0;
1809
1810         if (rc > 0) {
1811                 com->lc_journal = 0;
1812                 if (ns->ln_flags & LF_INCOMPLETE)
1813                         ns->ln_status = LS_PARTIAL;
1814                 else
1815                         ns->ln_status = LS_COMPLETED;
1816                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
1817                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1818                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1819                 ns->ln_success_count++;
1820         } else if (rc == 0) {
1821                 ns->ln_status = lfsck->li_status;
1822                 if (ns->ln_status == 0)
1823                         ns->ln_status = LS_STOPPED;
1824         } else {
1825                 ns->ln_status = LS_FAILED;
1826         }
1827
1828         rc = lfsck_namespace_store(env, com, false);
1829         up_write(&com->lc_sem);
1830
1831         return rc;
1832 }
1833
1834 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
1835                                                     struct lfsck_component *com,
1836                                                     struct lfsck_request *lr)
1837 {
1838         /* XXX: TBD */
1839 }
1840
1841 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
1842         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
1843         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
1844         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
1845         .la_double_scan_result  = lfsck_namespace_double_scan_result,
1846         .la_req_fini            = lfsck_namespace_assistant_req_fini,
1847         .la_sync_failures       = lfsck_namespace_assistant_sync_failures,
1848 };
1849
1850 /**
1851  * Verify the specified linkEA entry for the given directory object.
1852  * If the object has no such linkEA entry or it has more other linkEA
1853  * entries, then re-generate the linkEA with the given information.
1854  *
1855  * \param[in] env       pointer to the thread context
1856  * \param[in] dev       pointer to the dt_device
1857  * \param[in] obj       pointer to the dt_object to be handled
1858  * \param[in] cname     the name for the child in the parent directory
1859  * \param[in] pfid      the parent directory's FID for the linkEA
1860  *
1861  * \retval              0 for success
1862  * \retval              negative error number on failure
1863  */
1864 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
1865                         struct dt_object *obj, const struct lu_name *cname,
1866                         const struct lu_fid *pfid)
1867 {
1868         struct linkea_data       ldata  = { 0 };
1869         struct lu_buf            linkea_buf;
1870         struct thandle          *th;
1871         int                      rc;
1872         int                      fl     = LU_XATTR_CREATE;
1873         bool                     dirty  = false;
1874         ENTRY;
1875
1876         LASSERT(S_ISDIR(lfsck_object_type(obj)));
1877
1878         rc = lfsck_links_read(env, obj, &ldata);
1879         if (rc == -ENODATA) {
1880                 dirty = true;
1881         } else if (rc == 0) {
1882                 fl = LU_XATTR_REPLACE;
1883                 if (ldata.ld_leh->leh_reccount != 1) {
1884                         dirty = true;
1885                 } else {
1886                         rc = linkea_links_find(&ldata, cname, pfid);
1887                         if (rc != 0)
1888                                 dirty = true;
1889                 }
1890         }
1891
1892         if (!dirty)
1893                 RETURN(rc);
1894
1895         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
1896         if (rc != 0)
1897                 RETURN(rc);
1898
1899         rc = linkea_add_buf(&ldata, cname, pfid);
1900         if (rc != 0)
1901                 RETURN(rc);
1902
1903         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
1904                        ldata.ld_leh->leh_len);
1905         th = dt_trans_create(env, dev);
1906         if (IS_ERR(th))
1907                 RETURN(PTR_ERR(th));
1908
1909         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
1910                                   XATTR_NAME_LINK, fl, th);
1911         if (rc != 0)
1912                 GOTO(stop, rc = PTR_ERR(th));
1913
1914         rc = dt_trans_start_local(env, dev, th);
1915         if (rc != 0)
1916                 GOTO(stop, rc);
1917
1918         dt_write_lock(env, obj, 0);
1919         rc = dt_xattr_set(env, obj, &linkea_buf,
1920                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
1921         dt_write_unlock(env, obj);
1922
1923         GOTO(stop, rc);
1924
1925 stop:
1926         dt_trans_stop(env, dev, th);
1927         return rc;
1928 }
1929
1930 /**
1931  * Get the name and parent directory's FID from the first linkEA entry.
1932  *
1933  * \param[in] env       pointer to the thread context
1934  * \param[in] obj       pointer to the object which get linkEA from
1935  * \param[out] name     pointer to the buffer to hold the name
1936  *                      in the first linkEA entry
1937  * \param[out] pfid     pointer to the buffer to hold the parent
1938  *                      directory's FID in the first linkEA entry
1939  *
1940  * \retval              0 for success
1941  * \retval              negative error number on failure
1942  */
1943 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
1944                           char *name, struct lu_fid *pfid)
1945 {
1946         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
1947         struct linkea_data        ldata = { 0 };
1948         int                       rc;
1949
1950         rc = lfsck_links_read(env, obj, &ldata);
1951         if (rc != 0)
1952                 return rc;
1953
1954         linkea_first_entry(&ldata);
1955         if (ldata.ld_lee == NULL)
1956                 return -ENODATA;
1957
1958         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
1959         /* To guarantee the 'name' is terminated with '0'. */
1960         memcpy(name, cname->ln_name, cname->ln_namelen);
1961         name[cname->ln_namelen] = 0;
1962
1963         return 0;
1964 }
1965
1966 /**
1967  * Remove the name entry from the parent directory.
1968  *
1969  * No need to care about the object referenced by the name entry,
1970  * either the name entry is invalid or redundant, or the referenced
1971  * object has been processed has been or will be handled by others.
1972  *
1973  * \param[in] env       pointer to the thread context
1974  * \param[in] lfsck     pointer to the lfsck instance
1975  * \param[in] parent    pointer to the lost+found object
1976  * \param[in] name      the name for the name entry to be removed
1977  * \param[in] type      the type for the name entry to be removed
1978  *
1979  * \retval              0 for success
1980  * \retval              negative error number on failure
1981  */
1982 int lfsck_remove_name_entry(const struct lu_env *env,
1983                             struct lfsck_instance *lfsck,
1984                             struct dt_object *parent,
1985                             const char *name, __u32 type)
1986 {
1987         struct dt_device        *dev    = lfsck->li_next;
1988         struct thandle          *th;
1989         struct lustre_handle     lh     = { 0 };
1990         int                      rc;
1991         ENTRY;
1992
1993         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
1994                               MDS_INODELOCK_UPDATE, LCK_EX);
1995         if (rc != 0)
1996                 RETURN(rc);
1997
1998         th = dt_trans_create(env, dev);
1999         if (IS_ERR(th))
2000                 GOTO(unlock, rc = PTR_ERR(th));
2001
2002         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2003         if (rc != 0)
2004                 GOTO(stop, rc);
2005
2006         if (S_ISDIR(type)) {
2007                 rc = dt_declare_ref_del(env, parent, th);
2008                 if (rc != 0)
2009                         GOTO(stop, rc);
2010         }
2011
2012         rc = dt_trans_start(env, dev, th);
2013         if (rc != 0)
2014                 GOTO(stop, rc);
2015
2016         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2017                        BYPASS_CAPA);
2018         if (rc != 0)
2019                 GOTO(stop, rc);
2020
2021         if (S_ISDIR(type)) {
2022                 dt_write_lock(env, parent, 0);
2023                 rc = dt_ref_del(env, parent, th);
2024                 dt_write_unlock(env, parent);
2025         }
2026
2027         GOTO(stop, rc);
2028
2029 stop:
2030         dt_trans_stop(env, dev, th);
2031
2032 unlock:
2033         lfsck_ibits_unlock(&lh, LCK_EX);
2034
2035         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
2036                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
2037                PFID(lfsck_dto2fid(parent)), name, type, rc);
2038
2039         return rc;
2040 }
2041
2042 /**
2043  * Update the object's name entry with the given FID.
2044  *
2045  * \param[in] env       pointer to the thread context
2046  * \param[in] lfsck     pointer to the lfsck instance
2047  * \param[in] parent    pointer to the parent directory that holds
2048  *                      the name entry
2049  * \param[in] name      the name for the entry to be updated
2050  * \param[in] pfid      the new PFID for the name entry
2051  * \param[in] type      the type for the name entry to be updated
2052  *
2053  * \retval              0 for success
2054  * \retval              negative error number on failure
2055  */
2056 int lfsck_update_name_entry(const struct lu_env *env,
2057                             struct lfsck_instance *lfsck,
2058                             struct dt_object *parent, const char *name,
2059                             const struct lu_fid *pfid, __u32 type)
2060 {
2061         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
2062         struct dt_device        *dev    = lfsck->li_next;
2063         struct lustre_handle     lh     = { 0 };
2064         struct thandle          *th;
2065         int                      rc;
2066         bool                     exists = true;
2067         ENTRY;
2068
2069         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2070                               MDS_INODELOCK_UPDATE, LCK_EX);
2071         if (rc != 0)
2072                 RETURN(rc);
2073
2074         th = dt_trans_create(env, dev);
2075         if (IS_ERR(th))
2076                 GOTO(unlock, rc = PTR_ERR(th));
2077
2078         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2079         if (rc != 0)
2080                 GOTO(stop, rc);
2081
2082         rec->rec_type = type;
2083         rec->rec_fid = pfid;
2084         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
2085                                (const struct dt_key *)name, th);
2086         if (rc != 0)
2087                 GOTO(stop, rc);
2088
2089         rc = dt_declare_ref_add(env, parent, th);
2090         if (rc != 0)
2091                 GOTO(stop, rc);
2092
2093         rc = dt_trans_start(env, dev, th);
2094         if (rc != 0)
2095                 GOTO(stop, rc);
2096
2097         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2098                        BYPASS_CAPA);
2099         if (rc == -ENOENT) {
2100                 exists = false;
2101                 rc = 0;
2102         }
2103
2104         if (rc != 0)
2105                 GOTO(stop, rc);
2106
2107         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
2108                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2109         if (rc == 0 && S_ISDIR(type) && !exists) {
2110                 dt_write_lock(env, parent, 0);
2111                 rc = dt_ref_add(env, parent, th);
2112                 dt_write_unlock(env, parent);
2113         }
2114
2115         GOTO(stop, rc);
2116
2117 stop:
2118         dt_trans_stop(env, dev, th);
2119
2120 unlock:
2121         lfsck_ibits_unlock(&lh, LCK_EX);
2122
2123         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
2124                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
2125                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
2126
2127         return rc;
2128 }
2129
2130 int lfsck_namespace_setup(const struct lu_env *env,
2131                           struct lfsck_instance *lfsck)
2132 {
2133         struct lfsck_component  *com;
2134         struct lfsck_namespace  *ns;
2135         struct dt_object        *root = NULL;
2136         struct dt_object        *obj;
2137         int                      rc;
2138         ENTRY;
2139
2140         LASSERT(lfsck->li_master);
2141
2142         OBD_ALLOC_PTR(com);
2143         if (com == NULL)
2144                 RETURN(-ENOMEM);
2145
2146         INIT_LIST_HEAD(&com->lc_link);
2147         INIT_LIST_HEAD(&com->lc_link_dir);
2148         init_rwsem(&com->lc_sem);
2149         atomic_set(&com->lc_ref, 1);
2150         com->lc_lfsck = lfsck;
2151         com->lc_type = LFSCK_TYPE_NAMESPACE;
2152         com->lc_ops = &lfsck_namespace_ops;
2153         com->lc_data = lfsck_assistant_data_init(
2154                         &lfsck_namespace_assistant_ops,
2155                         "lfsck_namespace");
2156         if (com->lc_data == NULL)
2157                 GOTO(out, rc = -ENOMEM);
2158
2159         com->lc_file_size = sizeof(struct lfsck_namespace);
2160         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
2161         if (com->lc_file_ram == NULL)
2162                 GOTO(out, rc = -ENOMEM);
2163
2164         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
2165         if (com->lc_file_disk == NULL)
2166                 GOTO(out, rc = -ENOMEM);
2167
2168         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2169         if (IS_ERR(root))
2170                 GOTO(out, rc = PTR_ERR(root));
2171
2172         if (unlikely(!dt_try_as_dir(env, root)))
2173                 GOTO(out, rc = -ENOTDIR);
2174
2175         obj = local_index_find_or_create(env, lfsck->li_los, root,
2176                                          lfsck_namespace_name,
2177                                          S_IFREG | S_IRUGO | S_IWUSR,
2178                                          &dt_lfsck_features);
2179         if (IS_ERR(obj))
2180                 GOTO(out, rc = PTR_ERR(obj));
2181
2182         com->lc_obj = obj;
2183         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
2184         if (rc != 0)
2185                 GOTO(out, rc);
2186
2187         rc = lfsck_namespace_load(env, com);
2188         if (rc > 0)
2189                 rc = lfsck_namespace_reset(env, com, true);
2190         else if (rc == -ENODATA)
2191                 rc = lfsck_namespace_init(env, com);
2192         if (rc != 0)
2193                 GOTO(out, rc);
2194
2195         ns = com->lc_file_ram;
2196         switch (ns->ln_status) {
2197         case LS_INIT:
2198         case LS_COMPLETED:
2199         case LS_FAILED:
2200         case LS_STOPPED:
2201                 spin_lock(&lfsck->li_lock);
2202                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2203                 spin_unlock(&lfsck->li_lock);
2204                 break;
2205         default:
2206                 CERROR("%s: unknown lfsck_namespace status %d\n",
2207                        lfsck_lfsck2name(lfsck), ns->ln_status);
2208                 /* fall through */
2209         case LS_SCANNING_PHASE1:
2210         case LS_SCANNING_PHASE2:
2211                 /* No need to store the status to disk right now.
2212                  * If the system crashed before the status stored,
2213                  * it will be loaded back when next time. */
2214                 ns->ln_status = LS_CRASHED;
2215                 /* fall through */
2216         case LS_PAUSED:
2217         case LS_CRASHED:
2218                 spin_lock(&lfsck->li_lock);
2219                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
2220                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
2221                 spin_unlock(&lfsck->li_lock);
2222                 break;
2223         }
2224
2225         GOTO(out, rc = 0);
2226
2227 out:
2228         if (root != NULL && !IS_ERR(root))
2229                 lu_object_put(env, &root->do_lu);
2230         if (rc != 0) {
2231                 lfsck_component_cleanup(env, com);
2232                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
2233                        lfsck_lfsck2name(lfsck), rc);
2234         }
2235         return rc;
2236 }