Whamcloud - gitweb
LU-4788 lfsck: enable verification for remote object
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_fid.h>
38 #include <lustre_lib.h>
39 #include <lustre_net.h>
40 #include <lustre/lustre_user.h>
41
42 #include "lfsck_internal.h"
43
44 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
45
46 enum lfsck_nameentry_check {
47         LFSCK_NAMEENTRY_DEAD            = 1, /* The object has been unlinked. */
48         LFSCK_NAMEENTRY_REMOVED         = 2, /* The entry has been removed. */
49         LFSCK_NAMEENTRY_RECREATED       = 3, /* The entry has been recreated. */
50 };
51
52 static const char lfsck_namespace_name[] = "lfsck_namespace";
53
54 struct lfsck_namespace_req {
55         struct lfsck_assistant_req       lnr_lar;
56         struct dt_object                *lnr_obj;
57         struct lu_fid                    lnr_fid;
58         __u64                            lnr_oit_cookie;
59         __u64                            lnr_dir_cookie;
60         __u32                            lnr_attr;
61         __u32                            lnr_size;
62         __u16                            lnr_type;
63         __u16                            lnr_namelen;
64         char                             lnr_name[0];
65 };
66
67 static struct lfsck_namespace_req *
68 lfsck_namespace_assistant_req_init(struct lfsck_instance *lfsck,
69                                    struct lu_dirent *ent, __u16 type)
70 {
71         struct lfsck_namespace_req *lnr;
72         int                         size;
73
74         size = sizeof(*lnr) + (ent->lde_namelen & ~3) + 4;
75         OBD_ALLOC(lnr, size);
76         if (lnr == NULL)
77                 return ERR_PTR(-ENOMEM);
78
79         INIT_LIST_HEAD(&lnr->lnr_lar.lar_list);
80         lu_object_get(&lfsck->li_obj_dir->do_lu);
81         lnr->lnr_obj = lfsck->li_obj_dir;
82         lnr->lnr_fid = ent->lde_fid;
83         lnr->lnr_oit_cookie = lfsck->li_pos_current.lp_oit_cookie;
84         lnr->lnr_dir_cookie = ent->lde_hash;
85         lnr->lnr_attr = ent->lde_attrs;
86         lnr->lnr_size = size;
87         lnr->lnr_type = type;
88         lnr->lnr_namelen = ent->lde_namelen;
89         memcpy(lnr->lnr_name, ent->lde_name, ent->lde_namelen);
90
91         return lnr;
92 }
93
94 static void lfsck_namespace_assistant_req_fini(const struct lu_env *env,
95                                                struct lfsck_assistant_req *lar)
96 {
97         struct lfsck_namespace_req *lnr =
98                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
99
100         lu_object_put(env, &lnr->lnr_obj->do_lu);
101         OBD_FREE(lnr, lnr->lnr_size);
102 }
103
104 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
105                                       struct lfsck_namespace *src)
106 {
107         dst->ln_magic = le32_to_cpu(src->ln_magic);
108         dst->ln_status = le32_to_cpu(src->ln_status);
109         dst->ln_flags = le32_to_cpu(src->ln_flags);
110         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
111         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
112         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
113         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
114         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
115         dst->ln_time_last_checkpoint =
116                                 le64_to_cpu(src->ln_time_last_checkpoint);
117         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
118                                  &src->ln_pos_latest_start);
119         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
120                                  &src->ln_pos_last_checkpoint);
121         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
122                                  &src->ln_pos_first_inconsistent);
123         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
124         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
125         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
126         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
127         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
128         dst->ln_objs_repaired_phase2 =
129                                 le64_to_cpu(src->ln_objs_repaired_phase2);
130         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
131         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
132         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
133         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
134                       &src->ln_fid_latest_scanned_phase2);
135         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
136         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
137         dst->ln_mul_linked_checked = le64_to_cpu(src->ln_mul_linked_checked);
138         dst->ln_mul_linked_repaired = le64_to_cpu(src->ln_mul_linked_repaired);
139 }
140
141 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
142                                       struct lfsck_namespace *src)
143 {
144         dst->ln_magic = cpu_to_le32(src->ln_magic);
145         dst->ln_status = cpu_to_le32(src->ln_status);
146         dst->ln_flags = cpu_to_le32(src->ln_flags);
147         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
148         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
149         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
150         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
151         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
152         dst->ln_time_last_checkpoint =
153                                 cpu_to_le64(src->ln_time_last_checkpoint);
154         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
155                                  &src->ln_pos_latest_start);
156         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
157                                  &src->ln_pos_last_checkpoint);
158         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
159                                  &src->ln_pos_first_inconsistent);
160         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
161         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
162         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
163         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
164         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
165         dst->ln_objs_repaired_phase2 =
166                                 cpu_to_le64(src->ln_objs_repaired_phase2);
167         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
168         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
169         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
170         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
171                       &src->ln_fid_latest_scanned_phase2);
172         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
173         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
174         dst->ln_mul_linked_checked = cpu_to_le64(src->ln_mul_linked_checked);
175         dst->ln_mul_linked_repaired = cpu_to_le64(src->ln_mul_linked_repaired);
176 }
177
178 static void lfsck_namespace_record_failure(const struct lu_env *env,
179                                            struct lfsck_instance *lfsck,
180                                            struct lfsck_namespace *ns)
181 {
182         struct lfsck_position pos;
183
184         ns->ln_items_failed++;
185         lfsck_pos_fill(env, lfsck, &pos, false);
186         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent) ||
187             lfsck_pos_is_eq(&pos, &ns->ln_pos_first_inconsistent) < 0) {
188                 ns->ln_pos_first_inconsistent = pos;
189
190                 CDEBUG(D_LFSCK, "%s: namespace LFSCK hit first non-repaired "
191                        "inconsistency at the pos ["LPU64", "DFID", "LPX64"]\n",
192                        lfsck_lfsck2name(lfsck),
193                        ns->ln_pos_first_inconsistent.lp_oit_cookie,
194                        PFID(&ns->ln_pos_first_inconsistent.lp_dir_parent),
195                        ns->ln_pos_first_inconsistent.lp_dir_cookie);
196         }
197 }
198
199 /**
200  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
201  * \retval 0: succeed.
202  * \retval -ve: failed cases.
203  */
204 static int lfsck_namespace_load(const struct lu_env *env,
205                                 struct lfsck_component *com)
206 {
207         int len = com->lc_file_size;
208         int rc;
209
210         rc = dt_xattr_get(env, com->lc_obj,
211                           lfsck_buf_get(env, com->lc_file_disk, len),
212                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
213         if (rc == len) {
214                 struct lfsck_namespace *ns = com->lc_file_ram;
215
216                 lfsck_namespace_le_to_cpu(ns,
217                                 (struct lfsck_namespace *)com->lc_file_disk);
218                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
219                         CDEBUG(D_LFSCK, "%s: invalid lfsck_namespace magic "
220                                "%#x != %#x\n", lfsck_lfsck2name(com->lc_lfsck),
221                                ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
222                         rc = 1;
223                 } else {
224                         rc = 0;
225                 }
226         } else if (rc != -ENODATA) {
227                 CDEBUG(D_LFSCK, "%s: fail to load lfsck_namespace, "
228                        "expected = %d: rc = %d\n",
229                        lfsck_lfsck2name(com->lc_lfsck), len, rc);
230                 if (rc >= 0)
231                         rc = 1;
232         }
233         return rc;
234 }
235
236 static int lfsck_namespace_store(const struct lu_env *env,
237                                  struct lfsck_component *com, bool init)
238 {
239         struct dt_object        *obj    = com->lc_obj;
240         struct lfsck_instance   *lfsck  = com->lc_lfsck;
241         struct thandle          *handle;
242         int                      len    = com->lc_file_size;
243         int                      rc;
244         ENTRY;
245
246         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
247                                   (struct lfsck_namespace *)com->lc_file_ram);
248         handle = dt_trans_create(env, lfsck->li_bottom);
249         if (IS_ERR(handle))
250                 GOTO(log, rc = PTR_ERR(handle));
251
252         rc = dt_declare_xattr_set(env, obj,
253                                   lfsck_buf_get(env, com->lc_file_disk, len),
254                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
255         if (rc != 0)
256                 GOTO(out, rc);
257
258         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
259         if (rc != 0)
260                 GOTO(out, rc);
261
262         rc = dt_xattr_set(env, obj,
263                           lfsck_buf_get(env, com->lc_file_disk, len),
264                           XATTR_NAME_LFSCK_NAMESPACE,
265                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
266                           handle, BYPASS_CAPA);
267
268         GOTO(out, rc);
269
270 out:
271         dt_trans_stop(env, lfsck->li_bottom, handle);
272
273 log:
274         if (rc != 0)
275                 CDEBUG(D_LFSCK, "%s: fail to store lfsck_namespace: rc = %d\n",
276                        lfsck_lfsck2name(lfsck), rc);
277         return rc;
278 }
279
280 static int lfsck_namespace_init(const struct lu_env *env,
281                                 struct lfsck_component *com)
282 {
283         struct lfsck_namespace *ns = com->lc_file_ram;
284         int rc;
285
286         memset(ns, 0, sizeof(*ns));
287         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
288         ns->ln_status = LS_INIT;
289         down_write(&com->lc_sem);
290         rc = lfsck_namespace_store(env, com, true);
291         up_write(&com->lc_sem);
292         return rc;
293 }
294
295 /**
296  * Update the namespace LFSCK tracing file for the given @fid
297  *
298  * \param[in] env       pointer to the thread context
299  * \param[in] com       pointer to the lfsck component
300  * \param[in] fid       the fid which flags to be updated in the lfsck
301  *                      tracing file
302  * \param[in] add       true if add new flags, otherwise remove flags
303  *
304  * \retval              0 for succeed or nothing to be done
305  * \retval              negative error number on failure
306  */
307 int lfsck_namespace_trace_update(const struct lu_env *env,
308                                  struct lfsck_component *com,
309                                  const struct lu_fid *fid,
310                                  const __u8 flags, bool add)
311 {
312         struct lfsck_instance   *lfsck  = com->lc_lfsck;
313         struct dt_object        *obj    = com->lc_obj;
314         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid3;
315         struct dt_device        *dev    = lfsck->li_bottom;
316         struct thandle          *th     = NULL;
317         int                      rc     = 0;
318         __u8                     old    = 0;
319         __u8                     new    = 0;
320         ENTRY;
321
322         LASSERT(flags != 0);
323
324         down_write(&com->lc_sem);
325         fid_cpu_to_be(key, fid);
326         rc = dt_lookup(env, obj, (struct dt_rec *)&old,
327                        (const struct dt_key *)key, BYPASS_CAPA);
328         if (rc == -ENOENT) {
329                 if (!add)
330                         GOTO(unlock, rc = 0);
331
332                 old = 0;
333                 new = flags;
334         } else if (rc == 0) {
335                 if (add) {
336                         if ((old & flags) == flags)
337                                 GOTO(unlock, rc = 0);
338
339                         new = old | flags;
340                 } else {
341                         if ((old & flags) == 0)
342                                 GOTO(unlock, rc = 0);
343
344                         new = old & ~flags;
345                 }
346         } else {
347                 GOTO(log, rc);
348         }
349
350         th = dt_trans_create(env, dev);
351         if (IS_ERR(th))
352                 GOTO(log, rc = PTR_ERR(th));
353
354         if (old != 0) {
355                 rc = dt_declare_delete(env, obj,
356                                        (const struct dt_key *)key, th);
357                 if (rc != 0)
358                         GOTO(log, rc);
359         }
360
361         if (new != 0) {
362                 rc = dt_declare_insert(env, obj,
363                                        (const struct dt_rec *)&new,
364                                        (const struct dt_key *)key, th);
365                 if (rc != 0)
366                         GOTO(log, rc);
367         }
368
369         rc = dt_trans_start_local(env, dev, th);
370         if (rc != 0)
371                 GOTO(log, rc);
372
373         if (old != 0) {
374                 rc = dt_delete(env, obj, (const struct dt_key *)key,
375                                th, BYPASS_CAPA);
376                 if (rc != 0)
377                         GOTO(log, rc);
378         }
379
380         if (new != 0) {
381                 rc = dt_insert(env, obj, (const struct dt_rec *)&new,
382                                (const struct dt_key *)key, th, BYPASS_CAPA, 1);
383                 if (rc != 0)
384                         GOTO(log, rc);
385         }
386
387         GOTO(log, rc);
388
389 log:
390         if (th != NULL && !IS_ERR(th))
391                 dt_trans_stop(env, dev, th);
392
393         CDEBUG(D_LFSCK, "%s: namespace LFSCK %s flags for "DFID" in the "
394                "tracing file, flags %x, old %x, new %x: rc = %d\n",
395                lfsck_lfsck2name(lfsck), add ? "add" : "del", PFID(fid),
396                (__u32)flags, (__u32)old, (__u32)new, rc);
397
398 unlock:
399         up_write(&com->lc_sem);
400
401         return rc;
402 }
403
404 static int lfsck_namespace_check_exist(const struct lu_env *env,
405                                        struct dt_object *dir,
406                                        struct dt_object *obj, const char *name)
407 {
408         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
409         int               rc;
410         ENTRY;
411
412         if (unlikely(lfsck_is_dead_obj(obj)))
413                 RETURN(LFSCK_NAMEENTRY_DEAD);
414
415         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
416                        (const struct dt_key *)name, BYPASS_CAPA);
417         if (rc == -ENOENT)
418                 RETURN(LFSCK_NAMEENTRY_REMOVED);
419
420         if (rc < 0)
421                 RETURN(rc);
422
423         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
424                 RETURN(LFSCK_NAMEENTRY_RECREATED);
425
426         RETURN(0);
427 }
428
429 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
430                                             struct dt_object *obj,
431                                             struct thandle *handle)
432 {
433         int rc;
434
435         /* For destroying all invalid linkEA entries. */
436         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
437         if (rc != 0)
438                 return rc;
439
440         /* For insert new linkEA entry. */
441         rc = dt_declare_xattr_set(env, obj,
442                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
443                         XATTR_NAME_LINK, 0, handle);
444         return rc;
445 }
446
447 int __lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
448                        struct linkea_data *ldata)
449 {
450         int rc;
451
452         if (ldata->ld_buf->lb_buf == NULL)
453                 return -ENOMEM;
454
455         if (!dt_object_exists(obj))
456                 return -ENOENT;
457
458         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
459         if (rc == -ERANGE) {
460                 /* Buf was too small, figure out what we need. */
461                 rc = dt_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LINK,
462                                   BYPASS_CAPA);
463                 if (rc <= 0)
464                         return rc;
465
466                 lu_buf_realloc(ldata->ld_buf, rc);
467                 if (ldata->ld_buf->lb_buf == NULL)
468                         return -ENOMEM;
469
470                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
471                                   BYPASS_CAPA);
472         }
473
474         if (rc > 0)
475                 rc = linkea_init(ldata);
476
477         return rc;
478 }
479
480 /**
481  * Remove linkEA for the given object.
482  *
483  * The caller should take the ldlm lock before the calling.
484  *
485  * \param[in] env       pointer to the thread context
486  * \param[in] com       pointer to the lfsck component
487  * \param[in] obj       pointer to the dt_object to be handled
488  *
489  * \retval              0 for repaired cases
490  * \retval              negative error number on failure
491  */
492 static int lfsck_namespace_links_remove(const struct lu_env *env,
493                                         struct lfsck_component *com,
494                                         struct dt_object *obj)
495 {
496         struct lfsck_instance           *lfsck  = com->lc_lfsck;
497         struct dt_device                *dev    = lfsck->li_bottom;
498         struct thandle                  *th     = NULL;
499         int                              rc     = 0;
500         ENTRY;
501
502         LASSERT(dt_object_remote(obj) == 0);
503
504         th = dt_trans_create(env, dev);
505         if (IS_ERR(th))
506                 GOTO(log, rc = PTR_ERR(th));
507
508         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, th);
509         if (rc != 0)
510                 GOTO(stop, rc);
511
512         rc = dt_trans_start_local(env, dev, th);
513         if (rc != 0)
514                 GOTO(stop, rc);
515
516         dt_write_lock(env, obj, 0);
517         if (unlikely(lfsck_is_dead_obj(obj)))
518                 GOTO(unlock, rc = -ENOENT);
519
520         if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
521                 GOTO(unlock, rc = 0);
522
523         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, th, BYPASS_CAPA);
524
525         GOTO(unlock, rc);
526
527 unlock:
528         dt_write_unlock(env, obj);
529
530 stop:
531         dt_trans_stop(env, dev, th);
532
533 log:
534         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove invalid linkEA "
535                "for the object "DFID": rc = %d\n",
536                lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
537
538         if (rc == 0) {
539                 struct lfsck_namespace *ns = com->lc_file_ram;
540
541                 ns->ln_flags |= LF_INCONSISTENT;
542         }
543
544         return rc;
545 }
546
547 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
548                              struct linkea_data *ldata, struct thandle *handle)
549 {
550         const struct lu_buf *buf = lfsck_buf_get_const(env,
551                                                        ldata->ld_buf->lb_buf,
552                                                        ldata->ld_leh->leh_len);
553
554         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
555                             BYPASS_CAPA);
556 }
557
558 static void lfsck_namespace_unpack_linkea_entry(struct linkea_data *ldata,
559                                                 struct lu_name *cname,
560                                                 struct lu_fid *pfid,
561                                                 char *buf)
562 {
563         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
564         /* To guarantee the 'name' is terminated with '0'. */
565         memcpy(buf, cname->ln_name, cname->ln_namelen);
566         buf[cname->ln_namelen] = 0;
567         cname->ln_name = buf;
568 }
569
570 static int lfsck_namespace_filter_linkea_entry(struct linkea_data *ldata,
571                                                struct lu_name *cname,
572                                                struct lu_fid *pfid,
573                                                bool remove)
574 {
575         struct link_ea_entry    *oldlee;
576         int                      oldlen;
577         int                      repeated = 0;
578
579         oldlee = ldata->ld_lee;
580         oldlen = ldata->ld_reclen;
581         linkea_next_entry(ldata);
582         while (ldata->ld_lee != NULL) {
583                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
584                                    ldata->ld_lee->lee_reclen[1];
585                 if (unlikely(ldata->ld_reclen == oldlen &&
586                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
587                         repeated++;
588                         if (!remove)
589                                 break;
590
591                         linkea_del_buf(ldata, cname);
592                 } else {
593                         linkea_next_entry(ldata);
594                 }
595         }
596         ldata->ld_lee = oldlee;
597         ldata->ld_reclen = oldlen;
598
599         return repeated;
600 }
601
602 static int lfsck_namespace_insert_orphan(const struct lu_env *env,
603                                          struct lfsck_component *com,
604                                          struct dt_object *orphan,
605                                          const char *infix, const char *type,
606                                          int *count)
607 {
608         /* XXX: TBD */
609         return 0;
610 }
611
612 static int lfsck_namespace_insert_normal(const struct lu_env *env,
613                                          struct lfsck_component *com,
614                                          struct dt_object *parent,
615                                          struct dt_object *child,
616                                          const char *name)
617 {
618         /* XXX: TBD */
619         return 0;
620 }
621
622 static int lfsck_namespace_create_orphan(const struct lu_env *env,
623                                          struct lfsck_component *com,
624                                          struct dt_object *orphan)
625 {
626         /* XXX: TBD */
627         return 0;
628 }
629
630 /**
631  * Remove the specified entry from the linkEA.
632  *
633  * Locate the linkEA entry with the given @cname and @pfid, then
634  * remove this entry or the other entries those are repeated with
635  * this entry.
636  *
637  * \param[in] env       pointer to the thread context
638  * \param[in] com       pointer to the lfsck component
639  * \param[in] obj       pointer to the dt_object to be handled
640  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
641  * \param[in] cname     the name for the child in the parent directory
642  * \param[in] pfid      the parent directory's FID for the linkEA
643  * \param[in] next      if true, then remove the first found linkEA
644  *                      entry, and move the ldata->ld_lee to next entry
645  *
646  * \retval              positive number for repaired cases
647  * \retval              0 if nothing to be repaired
648  * \retval              negative error number on failure
649  */
650 static int lfsck_namespace_shrink_linkea(const struct lu_env *env,
651                                          struct lfsck_component *com,
652                                          struct dt_object *obj,
653                                          struct linkea_data *ldata,
654                                          struct lu_name *cname,
655                                          struct lu_fid *pfid,
656                                          bool next)
657 {
658         struct lfsck_instance           *lfsck     = com->lc_lfsck;
659         struct dt_device                *dev       = lfsck->li_bottom;
660         struct lfsck_bookmark           *bk        = &lfsck->li_bookmark_ram;
661         struct thandle                  *th        = NULL;
662         struct lustre_handle             lh        = { 0 };
663         struct linkea_data               ldata_new = { 0 };
664         struct lu_buf                    linkea_buf;
665         int                              rc        = 0;
666         ENTRY;
667
668         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
669                               MDS_INODELOCK_UPDATE |
670                               MDS_INODELOCK_XATTR, LCK_EX);
671         if (rc != 0)
672                 GOTO(log, rc);
673
674         if (next)
675                 linkea_del_buf(ldata, cname);
676         else
677                 lfsck_namespace_filter_linkea_entry(ldata, cname, pfid,
678                                                     true);
679         lfsck_buf_init(&linkea_buf, ldata->ld_buf->lb_buf,
680                        ldata->ld_leh->leh_len);
681
682 again:
683         th = dt_trans_create(env, dev);
684         if (IS_ERR(th))
685                 GOTO(unlock1, rc = PTR_ERR(th));
686
687         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
688                                   XATTR_NAME_LINK, 0, th);
689         if (rc != 0)
690                 GOTO(stop, rc);
691
692         rc = dt_trans_start_local(env, dev, th);
693         if (rc != 0)
694                 GOTO(stop, rc);
695
696         dt_write_lock(env, obj, 0);
697         if (unlikely(lfsck_is_dead_obj(obj)))
698                 GOTO(unlock2, rc = -ENOENT);
699
700         rc = lfsck_links_read2(env, obj, &ldata_new);
701         if (rc != 0)
702                 GOTO(unlock2, rc);
703
704         /* The specified linkEA entry has been removed by race. */
705         rc = linkea_links_find(&ldata_new, cname, pfid);
706         if (rc != 0)
707                 GOTO(unlock2, rc = 0);
708
709         if (bk->lb_param & LPF_DRYRUN)
710                 GOTO(unlock2, rc = 1);
711
712         if (next)
713                 linkea_del_buf(&ldata_new, cname);
714         else
715                 lfsck_namespace_filter_linkea_entry(&ldata_new, cname, pfid,
716                                                     true);
717
718         if (linkea_buf.lb_len < ldata_new.ld_leh->leh_len) {
719                 dt_write_unlock(env, obj);
720                 dt_trans_stop(env, dev, th);
721                 lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
722                                ldata_new.ld_leh->leh_len);
723                 goto again;
724         }
725
726         lfsck_buf_init(&linkea_buf, ldata_new.ld_buf->lb_buf,
727                        ldata_new.ld_leh->leh_len);
728         rc = dt_xattr_set(env, obj, &linkea_buf,
729                           XATTR_NAME_LINK, 0, th, BYPASS_CAPA);
730
731         GOTO(unlock2, rc = (rc == 0 ? 1 : rc));
732
733 unlock2:
734         dt_write_unlock(env, obj);
735
736 stop:
737         dt_trans_stop(env, dev, th);
738
739 unlock1:
740         lfsck_ibits_unlock(&lh, LCK_EX);
741
742 log:
743         CDEBUG(D_LFSCK, "%s: namespace LFSCK remove %s linkEA entry "
744                "for the object: "DFID", parent "DFID", name %.*s\n",
745                lfsck_lfsck2name(lfsck), next ? "invalid" : "redundant",
746                PFID(lfsck_dto2fid(obj)), PFID(pfid), cname->ln_namelen,
747                cname->ln_name);
748
749         if (rc != 0) {
750                 struct lfsck_namespace *ns = com->lc_file_ram;
751
752                 ns->ln_flags |= LF_INCONSISTENT;
753         }
754
755         return rc;
756 }
757
758 /**
759  * Conditionally remove the specified entry from the linkEA.
760  *
761  * Take the parent lock firstly, then check whether the specified
762  * name entry exists or not: if yes, do nothing; otherwise, call
763  * lfsck_namespace_shrink_linkea() to remove the linkea entry.
764  *
765  * \param[in] env       pointer to the thread context
766  * \param[in] com       pointer to the lfsck component
767  * \param[in] parent    pointer to the parent directory
768  * \param[in] child     pointer to the child object that holds the linkEA
769  * \param[in,out]ldata  pointer to the buffer that holds the linkEA
770  * \param[in] cname     the name for the child in the parent directory
771  * \param[in] pfid      the parent directory's FID for the linkEA
772  *
773  * \retval              positive number for repaired cases
774  * \retval              0 if nothing to be repaired
775  * \retval              negative error number on failure
776  */
777 static int lfsck_namespace_shrink_linkea_cond(const struct lu_env *env,
778                                               struct lfsck_component *com,
779                                               struct dt_object *parent,
780                                               struct dt_object *child,
781                                               struct linkea_data *ldata,
782                                               struct lu_name *cname,
783                                               struct lu_fid *pfid)
784 {
785         struct lu_fid           *cfid   = &lfsck_env_info(env)->lti_fid3;
786         struct lustre_handle     lh     = { 0 };
787         int                      rc;
788         ENTRY;
789
790         rc = lfsck_ibits_lock(env, com->lc_lfsck, parent, &lh,
791                               MDS_INODELOCK_UPDATE, LCK_EX);
792         if (rc != 0)
793                 RETURN(rc);
794
795         dt_read_lock(env, parent, 0);
796         if (unlikely(lfsck_is_dead_obj(parent))) {
797                 dt_read_unlock(env, parent);
798                 lfsck_ibits_unlock(&lh, LCK_EX);
799                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
800                                                    cname, pfid, true);
801
802                 RETURN(rc);
803         }
804
805         rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
806                        (const struct dt_key *)cname->ln_name,
807                        BYPASS_CAPA);
808         dt_read_unlock(env, parent);
809
810         /* It is safe to release the ldlm lock, because when the logic come
811          * here, we have got all the needed information above whether the
812          * linkEA entry is valid or not. It is not important that others
813          * may add new linkEA entry after the ldlm lock released. If other
814          * has removed the specified linkEA entry by race, then it is OK,
815          * because the subsequent lfsck_namespace_shrink_linkea() can handle
816          * such case. */
817         lfsck_ibits_unlock(&lh, LCK_EX);
818         if (rc == -ENOENT) {
819                 rc = lfsck_namespace_shrink_linkea(env, com, child, ldata,
820                                                    cname, pfid, true);
821
822                 RETURN(rc);
823         }
824
825         if (rc != 0)
826                 RETURN(rc);
827
828         /* The LFSCK just found some internal status of cross-MDTs
829          * create operation. That is normal. */
830         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
831                 linkea_next_entry(ldata);
832
833                 RETURN(0);
834         }
835
836         rc = lfsck_namespace_shrink_linkea(env, com, child, ldata, cname,
837                                            pfid, true);
838
839         RETURN(rc);
840 }
841
842 /**
843  * Double scan the MDT-object for namespace LFSCK.
844  *
845  * If the MDT-object contains invalid or repeated linkEA entries, then drop
846  * those entries from the linkEA; if the linkEA becomes empty or the object
847  * has no linkEA, then it is an orphan and will be added into the directory
848  * .lustre/lost+found/MDTxxxx/; if the remote parent is lost, then recreate
849  * the remote parent; if the name entry corresponding to some linkEA entry
850  * is lost, then add the name entry back to the namespace.
851  *
852  * \param[in] env       pointer to the thread context
853  * \param[in] com       pointer to the lfsck component
854  * \param[in] child     pointer to the dt_object to be handled
855  * \param[in] flags     some hints to indicate how the @child should be handled
856  *
857  * \retval              positive number for repaired cases
858  * \retval              0 if nothing to be repaired
859  * \retval              negative error number on failure
860  */
861 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
862                                            struct lfsck_component *com,
863                                            struct dt_object *child, __u8 flags)
864 {
865         struct lfsck_thread_info *info     = lfsck_env_info(env);
866         struct lu_attr           *la       = &info->lti_la;
867         struct lu_name           *cname    = &info->lti_name;
868         struct lu_fid            *pfid     = &info->lti_fid;
869         struct lu_fid            *cfid     = &info->lti_fid2;
870         struct lfsck_instance    *lfsck    = com->lc_lfsck;
871         struct lfsck_namespace   *ns       = com->lc_file_ram;
872         struct dt_object         *parent   = NULL;
873         struct linkea_data        ldata    = { 0 };
874         bool                      repaired = false;
875         int                       count    = 0;
876         int                       rc;
877         ENTRY;
878
879         dt_read_lock(env, child, 0);
880         if (unlikely(lfsck_is_dead_obj(child))) {
881                 dt_read_unlock(env, child);
882
883                 RETURN(0);
884         }
885
886         rc = lfsck_links_read(env, child, &ldata);
887         dt_read_unlock(env, child);
888         if (rc != 0)
889                 GOTO(out, rc);
890
891         linkea_first_entry(&ldata);
892         while (ldata.ld_lee != NULL) {
893                 lfsck_namespace_unpack_linkea_entry(&ldata, cname, pfid,
894                                                     info->lti_key);
895                 rc = lfsck_namespace_filter_linkea_entry(&ldata, cname, pfid,
896                                                          false);
897                 /* Found repeated linkEA entries */
898                 if (rc > 0) {
899                         rc = lfsck_namespace_shrink_linkea(env, com, child,
900                                                 &ldata, cname, pfid, false);
901                         if (rc < 0)
902                                 GOTO(out, rc);
903
904                         if (rc == 0)
905                                 continue;
906
907                         repaired = true;
908
909                         /* fall through */
910                 }
911
912                 /* Invalid PFID in the linkEA entry. */
913                 if (!fid_is_sane(pfid)) {
914                         rc = lfsck_namespace_shrink_linkea(env, com, child,
915                                                 &ldata, cname, pfid, true);
916                         if (rc < 0)
917                                 GOTO(out, rc);
918
919                         if (rc > 0)
920                                 repaired = true;
921
922                         continue;
923                 }
924
925                 parent = lfsck_object_find(env, lfsck, pfid);
926                 if (IS_ERR(parent))
927                         GOTO(out, rc = PTR_ERR(parent));
928
929                 if (!dt_object_exists(parent)) {
930                         if (ldata.ld_leh->leh_reccount > 1) {
931                                 /* If it is NOT the last linkEA entry, then
932                                  * there is still other chance to make the
933                                  * child to be visible via other parent, then
934                                  * remove this linkEA entry. */
935                                 rc = lfsck_namespace_shrink_linkea(env, com,
936                                         child, &ldata, cname, pfid, true);
937                         } else {
938                                 /* Create the lost parent as an orphan. */
939                                 rc = lfsck_namespace_create_orphan(env, com,
940                                                                    parent);
941                                 if (rc < 0) {
942                                         lfsck_object_put(env, parent);
943
944                                         GOTO(out, rc);
945                                 }
946
947                                 if (rc > 0)
948                                         repaired = true;
949
950                                 /* Add the missed name entry to the parent. */
951                                 rc = lfsck_namespace_insert_normal(env, com,
952                                                 parent, child, cname->ln_name);
953                                 linkea_next_entry(&ldata);
954                         }
955
956                         lfsck_object_put(env, parent);
957                         if (rc < 0)
958                                 GOTO(out, rc);
959
960                         if (rc > 0)
961                                 repaired = true;
962
963                         continue;
964                 }
965
966                 /* The linkEA entry with bad parent will be removed. */
967                 if (unlikely(!dt_try_as_dir(env, parent))) {
968                         lfsck_object_put(env, parent);
969                         rc = lfsck_namespace_shrink_linkea(env, com, child,
970                                                 &ldata, cname, pfid, true);
971                         if (rc < 0)
972                                 GOTO(out, rc);
973
974                         if (rc > 0)
975                                 repaired = true;
976
977                         continue;
978                 }
979
980                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
981                                (const struct dt_key *)cname->ln_name,
982                                BYPASS_CAPA);
983                 if (rc != 0 && rc != -ENOENT) {
984                         lfsck_object_put(env, parent);
985
986                         GOTO(out, rc);
987                 }
988
989                 if (rc == 0) {
990                         lfsck_object_put(env, parent);
991                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
992                                 /* It is the most common case that we
993                                  * find the name entry corresponding
994                                  * to the linkEA entry. */
995                                 linkea_next_entry(&ldata);
996                         } else {
997                                 /* XXX: The name entry references another
998                                  *      MDT-object that may be created by
999                                  *      the LFSCK for repairing dangling
1000                                  *      name entry. There will be another
1001                                  *      patch for further processing. */
1002                                 rc = lfsck_namespace_shrink_linkea(env, com,
1003                                         child, &ldata, cname, pfid, true);
1004                                 if (rc < 0)
1005                                         GOTO(out, rc);
1006
1007                                 if (rc > 0)
1008                                         repaired = true;
1009                         }
1010
1011                         continue;
1012                 }
1013
1014                 rc = dt_attr_get(env, child, la, BYPASS_CAPA);
1015                 if (rc != 0)
1016                         GOTO(out, rc);
1017
1018                 /* If there is no name entry in the parent dir and the object
1019                  * link count is less than the linkea entries count, then the
1020                  * linkea entry should be removed. */
1021                 if (ldata.ld_leh->leh_reccount > la->la_nlink) {
1022                         rc = lfsck_namespace_shrink_linkea_cond(env, com,
1023                                         parent, child, &ldata, cname, pfid);
1024                         lfsck_object_put(env, parent);
1025                         if (rc < 0)
1026                                 GOTO(out, rc);
1027
1028                         if (rc > 0)
1029                                 repaired = true;
1030
1031                         continue;
1032                 }
1033
1034                 /* Add the missed name entry back to the namespace. */
1035                 rc = lfsck_namespace_insert_normal(env, com, parent, child,
1036                                                    cname->ln_name);
1037                 lfsck_object_put(env, parent);
1038                 if (rc < 0)
1039                         GOTO(out, rc);
1040
1041                 if (rc > 0)
1042                         repaired = true;
1043
1044                 linkea_next_entry(&ldata);
1045         }
1046
1047         GOTO(out, rc = 0);
1048
1049 out:
1050         if (rc < 0 && rc != -ENODATA)
1051                 return rc;
1052
1053         if (rc == 0) {
1054                 LASSERT(ldata.ld_leh != NULL);
1055
1056                 count = ldata.ld_leh->leh_reccount;
1057         }
1058
1059         if (count == 0) {
1060                 /* If the child becomes orphan, then insert it into
1061                  * the global .lustre/lost+found/MDTxxxx directory. */
1062                 rc = lfsck_namespace_insert_orphan(env, com, child, "", "O",
1063                                                    &count);
1064                 if (rc < 0)
1065                         return rc;
1066
1067                 if (rc > 0)
1068                         repaired = true;
1069         }
1070
1071         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
1072         if (rc != 0)
1073                 return rc;
1074
1075         if (la->la_nlink != count) {
1076                 /* XXX: there will be other patch(es) for MDT-object
1077                  *      hard links verification. */
1078         }
1079
1080         if (repaired) {
1081                 if (la->la_nlink > 1) {
1082                         down_write(&com->lc_sem);
1083                         ns->ln_mul_linked_repaired++;
1084                         up_write(&com->lc_sem);
1085                 }
1086
1087                 if (rc == 0)
1088                         rc = 1;
1089         }
1090
1091         return rc;
1092 }
1093
1094 static void lfsck_namespace_dump_statistics(struct seq_file *m,
1095                                             struct lfsck_namespace *ns,
1096                                             __u64 checked_phase1,
1097                                             __u64 checked_phase2,
1098                                             __u32 time_phase1,
1099                                             __u32 time_phase2)
1100 {
1101         seq_printf(m, "checked_phase1: "LPU64"\n"
1102                       "checked_phase2: "LPU64"\n"
1103                       "updated_phase1: "LPU64"\n"
1104                       "updated_phase2: "LPU64"\n"
1105                       "failed_phase1: "LPU64"\n"
1106                       "failed_phase2: "LPU64"\n"
1107                       "directories: "LPU64"\n"
1108                       "dirent_repaired: "LPU64"\n"
1109                       "linkea_repaired: "LPU64"\n"
1110                       "nlinks_repaired: "LPU64"\n"
1111                       "lost_found: "LPU64"\n"
1112                       "multiple_linked_checked: "LPU64"\n"
1113                       "multiple_linked_repaired: "LPU64"\n"
1114                       "success_count: %u\n"
1115                       "run_time_phase1: %u seconds\n"
1116                       "run_time_phase2: %u seconds\n",
1117                       checked_phase1,
1118                       checked_phase2,
1119                       ns->ln_items_repaired,
1120                       ns->ln_objs_repaired_phase2,
1121                       ns->ln_items_failed,
1122                       ns->ln_objs_failed_phase2,
1123                       ns->ln_dirs_checked,
1124                       ns->ln_dirent_repaired,
1125                       ns->ln_linkea_repaired,
1126                       ns->ln_objs_nlink_repaired,
1127                       ns->ln_objs_lost_found,
1128                       ns->ln_mul_linked_checked,
1129                       ns->ln_mul_linked_repaired,
1130                       ns->ln_success_count,
1131                       time_phase1,
1132                       time_phase2);
1133 }
1134
1135 /* namespace APIs */
1136
1137 static int lfsck_namespace_reset(const struct lu_env *env,
1138                                  struct lfsck_component *com, bool init)
1139 {
1140         struct lfsck_instance   *lfsck = com->lc_lfsck;
1141         struct lfsck_namespace  *ns    = com->lc_file_ram;
1142         struct dt_object        *root;
1143         struct dt_object        *dto;
1144         int                      rc;
1145         ENTRY;
1146
1147         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1148         if (IS_ERR(root))
1149                 GOTO(log, rc = PTR_ERR(root));
1150
1151         if (unlikely(!dt_try_as_dir(env, root)))
1152                 GOTO(put, rc = -ENOTDIR);
1153
1154         down_write(&com->lc_sem);
1155         if (init) {
1156                 memset(ns, 0, sizeof(*ns));
1157         } else {
1158                 __u32 count = ns->ln_success_count;
1159                 __u64 last_time = ns->ln_time_last_complete;
1160
1161                 memset(ns, 0, sizeof(*ns));
1162                 ns->ln_success_count = count;
1163                 ns->ln_time_last_complete = last_time;
1164         }
1165         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
1166         ns->ln_status = LS_INIT;
1167
1168         rc = local_object_unlink(env, lfsck->li_bottom, root,
1169                                  lfsck_namespace_name);
1170         if (rc != 0)
1171                 GOTO(out, rc);
1172
1173         lfsck_object_put(env, com->lc_obj);
1174         com->lc_obj = NULL;
1175         dto = local_index_find_or_create(env, lfsck->li_los, root,
1176                                          lfsck_namespace_name,
1177                                          S_IFREG | S_IRUGO | S_IWUSR,
1178                                          &dt_lfsck_features);
1179         if (IS_ERR(dto))
1180                 GOTO(out, rc = PTR_ERR(dto));
1181
1182         com->lc_obj = dto;
1183         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
1184         if (rc != 0)
1185                 GOTO(out, rc);
1186
1187         rc = lfsck_namespace_store(env, com, true);
1188
1189         GOTO(out, rc);
1190
1191 out:
1192         up_write(&com->lc_sem);
1193
1194 put:
1195         lu_object_put(env, &root->do_lu);
1196 log:
1197         CDEBUG(D_LFSCK, "%s: namespace LFSCK reset: rc = %d\n",
1198                lfsck_lfsck2name(lfsck), rc);
1199         return rc;
1200 }
1201
1202 static void
1203 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
1204                      bool new_checked)
1205 {
1206         struct lfsck_namespace *ns = com->lc_file_ram;
1207
1208         down_write(&com->lc_sem);
1209         if (new_checked)
1210                 com->lc_new_checked++;
1211         lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
1212         up_write(&com->lc_sem);
1213 }
1214
1215 static int lfsck_namespace_checkpoint(const struct lu_env *env,
1216                                       struct lfsck_component *com, bool init)
1217 {
1218         struct lfsck_instance   *lfsck = com->lc_lfsck;
1219         struct lfsck_namespace  *ns    = com->lc_file_ram;
1220         int                      rc;
1221
1222         if (!init) {
1223                 rc = lfsck_checkpoint_generic(env, com);
1224                 if (rc != 0)
1225                         goto log;
1226         }
1227
1228         down_write(&com->lc_sem);
1229         if (init) {
1230                 ns->ln_pos_latest_start = lfsck->li_pos_checkpoint;
1231         } else {
1232                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
1233                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1234                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1235                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1236                 ns->ln_items_checked += com->lc_new_checked;
1237                 com->lc_new_checked = 0;
1238         }
1239
1240         rc = lfsck_namespace_store(env, com, false);
1241         up_write(&com->lc_sem);
1242
1243 log:
1244         CDEBUG(D_LFSCK, "%s: namespace LFSCK checkpoint at the pos ["LPU64
1245                ", "DFID", "LPX64"]: rc = %d\n", lfsck_lfsck2name(lfsck),
1246                lfsck->li_pos_current.lp_oit_cookie,
1247                PFID(&lfsck->li_pos_current.lp_dir_parent),
1248                lfsck->li_pos_current.lp_dir_cookie, rc);
1249
1250         return rc > 0 ? 0 : rc;
1251 }
1252
1253 static int lfsck_namespace_prep(const struct lu_env *env,
1254                                 struct lfsck_component *com,
1255                                 struct lfsck_start_param *lsp)
1256 {
1257         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1258         struct lfsck_namespace  *ns     = com->lc_file_ram;
1259         struct lfsck_position   *pos    = &com->lc_pos_start;
1260         int                      rc;
1261
1262         if (ns->ln_status == LS_COMPLETED) {
1263                 rc = lfsck_namespace_reset(env, com, false);
1264                 if (rc == 0)
1265                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
1266
1267                 if (rc != 0) {
1268                         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep failed: "
1269                                "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
1270
1271                         return rc;
1272                 }
1273         }
1274
1275         down_write(&com->lc_sem);
1276         ns->ln_time_latest_start = cfs_time_current_sec();
1277         spin_lock(&lfsck->li_lock);
1278
1279         if (ns->ln_flags & LF_SCANNED_ONCE) {
1280                 if (!lfsck->li_drop_dryrun ||
1281                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
1282                         ns->ln_status = LS_SCANNING_PHASE2;
1283                         list_move_tail(&com->lc_link,
1284                                        &lfsck->li_list_double_scan);
1285                         if (!list_empty(&com->lc_link_dir))
1286                                 list_del_init(&com->lc_link_dir);
1287                         lfsck_pos_set_zero(pos);
1288                 } else {
1289                         ns->ln_status = LS_SCANNING_PHASE1;
1290                         ns->ln_run_time_phase1 = 0;
1291                         ns->ln_run_time_phase2 = 0;
1292                         ns->ln_items_checked = 0;
1293                         ns->ln_items_repaired = 0;
1294                         ns->ln_items_failed = 0;
1295                         ns->ln_dirs_checked = 0;
1296                         ns->ln_objs_checked_phase2 = 0;
1297                         ns->ln_objs_repaired_phase2 = 0;
1298                         ns->ln_objs_failed_phase2 = 0;
1299                         ns->ln_objs_nlink_repaired = 0;
1300                         ns->ln_objs_lost_found = 0;
1301                         ns->ln_dirent_repaired = 0;
1302                         ns->ln_linkea_repaired = 0;
1303                         ns->ln_mul_linked_checked = 0;
1304                         ns->ln_mul_linked_repaired = 0;
1305                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
1306                         if (list_empty(&com->lc_link_dir))
1307                                 list_add_tail(&com->lc_link_dir,
1308                                               &lfsck->li_list_dir);
1309                         *pos = ns->ln_pos_first_inconsistent;
1310                 }
1311         } else {
1312                 ns->ln_status = LS_SCANNING_PHASE1;
1313                 if (list_empty(&com->lc_link_dir))
1314                         list_add_tail(&com->lc_link_dir,
1315                                       &lfsck->li_list_dir);
1316                 if (!lfsck->li_drop_dryrun ||
1317                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
1318                         *pos = ns->ln_pos_last_checkpoint;
1319                         pos->lp_oit_cookie++;
1320                 } else {
1321                         *pos = ns->ln_pos_first_inconsistent;
1322                 }
1323         }
1324
1325         spin_unlock(&lfsck->li_lock);
1326         up_write(&com->lc_sem);
1327
1328         rc = lfsck_start_assistant(env, com, lsp);
1329
1330         CDEBUG(D_LFSCK, "%s: namespace LFSCK prep done, start pos ["LPU64", "
1331                DFID", "LPX64"]: rc = %d\n",
1332                lfsck_lfsck2name(lfsck), pos->lp_oit_cookie,
1333                PFID(&pos->lp_dir_parent), pos->lp_dir_cookie, rc);
1334
1335         return rc;
1336 }
1337
1338 static int lfsck_namespace_exec_oit(const struct lu_env *env,
1339                                     struct lfsck_component *com,
1340                                     struct dt_object *obj)
1341 {
1342         struct lfsck_thread_info *info  = lfsck_env_info(env);
1343         struct lfsck_namespace   *ns    = com->lc_file_ram;
1344         struct lfsck_instance    *lfsck = com->lc_lfsck;
1345         const struct lu_fid      *fid   = lfsck_dto2fid(obj);
1346         struct lu_attr           *la    = &info->lti_la;
1347         struct lu_fid            *pfid  = &info->lti_fid2;
1348         struct lu_name           *cname = &info->lti_name;
1349         struct lu_seq_range      *range = &info->lti_range;
1350         struct dt_device         *dev   = lfsck->li_bottom;
1351         struct seq_server_site   *ss    =
1352                                 lu_site2seq(dev->dd_lu_dev.ld_site);
1353         struct linkea_data        ldata = { 0 };
1354         __u32                     idx   = lfsck_dev_idx(dev);
1355         int                       rc;
1356         ENTRY;
1357
1358         rc = lfsck_links_read(env, obj, &ldata);
1359         if (rc == -ENOENT)
1360                 GOTO(out, rc = 0);
1361
1362         /* -EINVAL means crashed linkEA, should be verified. */
1363         if (rc == -EINVAL) {
1364                 rc = lfsck_namespace_trace_update(env, com, fid,
1365                                                   LNTF_CHECK_LINKEA, true);
1366                 if (rc == 0) {
1367                         struct lustre_handle lh = { 0 };
1368
1369                         rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1370                                               MDS_INODELOCK_UPDATE |
1371                                               MDS_INODELOCK_XATTR, LCK_EX);
1372                         if (rc == 0) {
1373                                 rc = lfsck_namespace_links_remove(env, com,
1374                                                                   obj);
1375                                 lfsck_ibits_unlock(&lh, LCK_EX);
1376                         }
1377                 }
1378
1379                 GOTO(out, rc = (rc == -ENOENT ? 0 : rc));
1380         }
1381
1382         /* zero-linkEA object may be orphan, but it also maybe because
1383          * of upgrading. Currently, we cannot record it for double scan.
1384          * Because it may cause the LFSCK tracing file to be too large. */
1385         if (rc == -ENODATA) {
1386                 if (S_ISDIR(lfsck_object_type(obj)))
1387                         GOTO(out, rc = 0);
1388
1389                 rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1390                 if (rc != 0)
1391                         GOTO(out, rc);
1392
1393                 if (la->la_nlink > 1)
1394                         rc = lfsck_namespace_trace_update(env, com, fid,
1395                                                 LNTF_CHECK_LINKEA, true);
1396
1397                 GOTO(out, rc);
1398         }
1399
1400         if (rc != 0)
1401                 GOTO(out, rc);
1402
1403         /* Record multiple-linked object. */
1404         if (ldata.ld_leh->leh_reccount > 1) {
1405                 rc = lfsck_namespace_trace_update(env, com, fid,
1406                                                   LNTF_CHECK_LINKEA, true);
1407
1408                 GOTO(out, rc);
1409         }
1410
1411         linkea_first_entry(&ldata);
1412         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
1413         if (!fid_is_sane(pfid)) {
1414                 rc = lfsck_namespace_trace_update(env, com, fid,
1415                                                   LNTF_CHECK_PARENT, true);
1416         } else {
1417                 fld_range_set_mdt(range);
1418                 rc = fld_local_lookup(env, ss->ss_server_fld,
1419                                       fid_seq(pfid), range);
1420                 if ((rc == -ENOENT) ||
1421                     (rc == 0 && range->lsr_index != idx)) {
1422                         rc = lfsck_namespace_trace_update(env, com, fid,
1423                                                 LNTF_CHECK_LINKEA, true);
1424                 } else {
1425                         if (S_ISDIR(lfsck_object_type(obj)))
1426                                 GOTO(out, rc = 0);
1427
1428                         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
1429                         if (rc != 0)
1430                                 GOTO(out, rc);
1431
1432                         if (la->la_nlink > 1)
1433                                 rc = lfsck_namespace_trace_update(env, com,
1434                                                 fid, LNTF_CHECK_LINKEA, true);
1435                 }
1436         }
1437
1438         GOTO(out, rc);
1439
1440 out:
1441         down_write(&com->lc_sem);
1442         com->lc_new_checked++;
1443         if (S_ISDIR(lfsck_object_type(obj)))
1444                 ns->ln_dirs_checked++;
1445         if (rc != 0)
1446                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
1447         up_write(&com->lc_sem);
1448
1449         return rc;
1450 }
1451
1452 static int lfsck_namespace_exec_dir(const struct lu_env *env,
1453                                     struct lfsck_component *com,
1454                                     struct lu_dirent *ent, __u16 type)
1455 {
1456         struct lfsck_assistant_data     *lad    = com->lc_data;
1457         struct lfsck_namespace_req      *lnr;
1458         bool                             wakeup = false;
1459
1460         lnr = lfsck_namespace_assistant_req_init(com->lc_lfsck, ent, type);
1461         if (IS_ERR(lnr)) {
1462                 struct lfsck_namespace *ns = com->lc_file_ram;
1463
1464                 lfsck_namespace_record_failure(env, com->lc_lfsck, ns);
1465                 return PTR_ERR(lnr);
1466         }
1467
1468         spin_lock(&lad->lad_lock);
1469         if (lad->lad_assistant_status < 0) {
1470                 spin_unlock(&lad->lad_lock);
1471                 lfsck_namespace_assistant_req_fini(env, &lnr->lnr_lar);
1472                 return lad->lad_assistant_status;
1473         }
1474
1475         list_add_tail(&lnr->lnr_lar.lar_list, &lad->lad_req_list);
1476         if (lad->lad_prefetched == 0)
1477                 wakeup = true;
1478
1479         lad->lad_prefetched++;
1480         spin_unlock(&lad->lad_lock);
1481         if (wakeup)
1482                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
1483
1484         down_write(&com->lc_sem);
1485         com->lc_new_checked++;
1486         up_write(&com->lc_sem);
1487
1488         return 0;
1489 }
1490
1491 static int lfsck_namespace_post(const struct lu_env *env,
1492                                 struct lfsck_component *com,
1493                                 int result, bool init)
1494 {
1495         struct lfsck_instance   *lfsck = com->lc_lfsck;
1496         struct lfsck_namespace  *ns    = com->lc_file_ram;
1497         int                      rc;
1498         ENTRY;
1499
1500         lfsck_post_generic(env, com, &result);
1501
1502         down_write(&com->lc_sem);
1503         spin_lock(&lfsck->li_lock);
1504         if (!init)
1505                 ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
1506         if (result > 0) {
1507                 ns->ln_status = LS_SCANNING_PHASE2;
1508                 ns->ln_flags |= LF_SCANNED_ONCE;
1509                 ns->ln_flags &= ~LF_UPGRADE;
1510                 list_del_init(&com->lc_link_dir);
1511                 list_move_tail(&com->lc_link, &lfsck->li_list_double_scan);
1512         } else if (result == 0) {
1513                 ns->ln_status = lfsck->li_status;
1514                 if (ns->ln_status == 0)
1515                         ns->ln_status = LS_STOPPED;
1516                 if (ns->ln_status != LS_PAUSED) {
1517                         list_del_init(&com->lc_link_dir);
1518                         list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1519                 }
1520         } else {
1521                 ns->ln_status = LS_FAILED;
1522                 list_del_init(&com->lc_link_dir);
1523                 list_move_tail(&com->lc_link, &lfsck->li_list_idle);
1524         }
1525         spin_unlock(&lfsck->li_lock);
1526
1527         if (!init) {
1528                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1529                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1530                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1531                 ns->ln_items_checked += com->lc_new_checked;
1532                 com->lc_new_checked = 0;
1533         }
1534
1535         rc = lfsck_namespace_store(env, com, false);
1536         up_write(&com->lc_sem);
1537
1538         CDEBUG(D_LFSCK, "%s: namespace LFSCK post done: rc = %d\n",
1539                lfsck_lfsck2name(lfsck), rc);
1540
1541         RETURN(rc);
1542 }
1543
1544 static int
1545 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1546                      struct seq_file *m)
1547 {
1548         struct lfsck_instance   *lfsck = com->lc_lfsck;
1549         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1550         struct lfsck_namespace  *ns    = com->lc_file_ram;
1551         int                      rc;
1552
1553         down_read(&com->lc_sem);
1554         seq_printf(m, "name: lfsck_namespace\n"
1555                    "magic: %#x\n"
1556                    "version: %d\n"
1557                    "status: %s\n",
1558                    ns->ln_magic,
1559                    bk->lb_version,
1560                    lfsck_status2names(ns->ln_status));
1561
1562         rc = lfsck_bits_dump(m, ns->ln_flags, lfsck_flags_names, "flags");
1563         if (rc < 0)
1564                 goto out;
1565
1566         rc = lfsck_bits_dump(m, bk->lb_param, lfsck_param_names, "param");
1567         if (rc < 0)
1568                 goto out;
1569
1570         rc = lfsck_time_dump(m, ns->ln_time_last_complete,
1571                              "time_since_last_completed");
1572         if (rc < 0)
1573                 goto out;
1574
1575         rc = lfsck_time_dump(m, ns->ln_time_latest_start,
1576                              "time_since_latest_start");
1577         if (rc < 0)
1578                 goto out;
1579
1580         rc = lfsck_time_dump(m, ns->ln_time_last_checkpoint,
1581                              "time_since_last_checkpoint");
1582         if (rc < 0)
1583                 goto out;
1584
1585         rc = lfsck_pos_dump(m, &ns->ln_pos_latest_start,
1586                             "latest_start_position");
1587         if (rc < 0)
1588                 goto out;
1589
1590         rc = lfsck_pos_dump(m, &ns->ln_pos_last_checkpoint,
1591                             "last_checkpoint_position");
1592         if (rc < 0)
1593                 goto out;
1594
1595         rc = lfsck_pos_dump(m, &ns->ln_pos_first_inconsistent,
1596                             "first_failure_position");
1597         if (rc < 0)
1598                 goto out;
1599
1600         if (ns->ln_status == LS_SCANNING_PHASE1) {
1601                 struct lfsck_position pos;
1602                 const struct dt_it_ops *iops;
1603                 cfs_duration_t duration = cfs_time_current() -
1604                                           lfsck->li_time_last_checkpoint;
1605                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1606                 __u64 speed = checked;
1607                 __u64 new_checked = com->lc_new_checked * HZ;
1608                 __u32 rtime = ns->ln_run_time_phase1 +
1609                               cfs_duration_sec(duration + HALF_SEC);
1610
1611                 if (duration != 0)
1612                         do_div(new_checked, duration);
1613                 if (rtime != 0)
1614                         do_div(speed, rtime);
1615                 lfsck_namespace_dump_statistics(m, ns, checked,
1616                                                 ns->ln_objs_checked_phase2,
1617                                                 rtime, ns->ln_run_time_phase2);
1618
1619                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
1620                               "average_speed_phase2: N/A\n"
1621                               "real_time_speed_phase1: "LPU64" items/sec\n"
1622                               "real_time_speed_phase2: N/A\n",
1623                               speed,
1624                               new_checked);
1625
1626                 LASSERT(lfsck->li_di_oit != NULL);
1627
1628                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1629
1630                 /* The low layer otable-based iteration position may NOT
1631                  * exactly match the namespace-based directory traversal
1632                  * cookie. Generally, it is not a serious issue. But the
1633                  * caller should NOT make assumption on that. */
1634                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1635                 if (!lfsck->li_current_oit_processed)
1636                         pos.lp_oit_cookie--;
1637
1638                 spin_lock(&lfsck->li_lock);
1639                 if (lfsck->li_di_dir != NULL) {
1640                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1641                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1642                                 fid_zero(&pos.lp_dir_parent);
1643                                 pos.lp_dir_cookie = 0;
1644                         } else {
1645                                 pos.lp_dir_parent =
1646                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1647                         }
1648                 } else {
1649                         fid_zero(&pos.lp_dir_parent);
1650                         pos.lp_dir_cookie = 0;
1651                 }
1652                 spin_unlock(&lfsck->li_lock);
1653                 lfsck_pos_dump(m, &pos, "current_position");
1654         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1655                 cfs_duration_t duration = cfs_time_current() -
1656                                           lfsck->li_time_last_checkpoint;
1657                 __u64 checked = ns->ln_objs_checked_phase2 +
1658                                 com->lc_new_checked;
1659                 __u64 speed1 = ns->ln_items_checked;
1660                 __u64 speed2 = checked;
1661                 __u64 new_checked = com->lc_new_checked * HZ;
1662                 __u32 rtime = ns->ln_run_time_phase2 +
1663                               cfs_duration_sec(duration + HALF_SEC);
1664
1665                 if (duration != 0)
1666                         do_div(new_checked, duration);
1667                 if (ns->ln_run_time_phase1 != 0)
1668                         do_div(speed1, ns->ln_run_time_phase1);
1669                 if (rtime != 0)
1670                         do_div(speed2, rtime);
1671                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
1672                                                 checked,
1673                                                 ns->ln_run_time_phase1, rtime);
1674
1675                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
1676                               "average_speed_phase2: "LPU64" objs/sec\n"
1677                               "real_time_speed_phase1: N/A\n"
1678                               "real_time_speed_phase2: "LPU64" objs/sec\n"
1679                               "current_position: "DFID"\n",
1680                               speed1,
1681                               speed2,
1682                               new_checked,
1683                               PFID(&ns->ln_fid_latest_scanned_phase2));
1684         } else {
1685                 __u64 speed1 = ns->ln_items_checked;
1686                 __u64 speed2 = ns->ln_objs_checked_phase2;
1687
1688                 if (ns->ln_run_time_phase1 != 0)
1689                         do_div(speed1, ns->ln_run_time_phase1);
1690                 if (ns->ln_run_time_phase2 != 0)
1691                         do_div(speed2, ns->ln_run_time_phase2);
1692                 lfsck_namespace_dump_statistics(m, ns, ns->ln_items_checked,
1693                                                 ns->ln_objs_checked_phase2,
1694                                                 ns->ln_run_time_phase1,
1695                                                 ns->ln_run_time_phase2);
1696
1697                 seq_printf(m, "average_speed_phase1: "LPU64" items/sec\n"
1698                               "average_speed_phase2: "LPU64" objs/sec\n"
1699                               "real_time_speed_phase1: N/A\n"
1700                               "real_time_speed_phase2: N/A\n"
1701                               "current_position: N/A\n",
1702                               speed1,
1703                               speed2);
1704         }
1705 out:
1706         up_read(&com->lc_sem);
1707         return 0;
1708 }
1709
1710 static int lfsck_namespace_double_scan(const struct lu_env *env,
1711                                        struct lfsck_component *com)
1712 {
1713         struct lfsck_namespace *ns = com->lc_file_ram;
1714
1715         return lfsck_double_scan_generic(env, com, ns->ln_status);
1716 }
1717
1718 static void lfsck_namespace_data_release(const struct lu_env *env,
1719                                          struct lfsck_component *com)
1720 {
1721         struct lfsck_assistant_data     *lad    = com->lc_data;
1722         struct lfsck_tgt_descs          *ltds   = &com->lc_lfsck->li_mdt_descs;
1723         struct lfsck_tgt_desc           *ltd;
1724         struct lfsck_tgt_desc           *next;
1725
1726         LASSERT(lad != NULL);
1727         LASSERT(thread_is_init(&lad->lad_thread) ||
1728                 thread_is_stopped(&lad->lad_thread));
1729         LASSERT(list_empty(&lad->lad_req_list));
1730
1731         com->lc_data = NULL;
1732
1733         spin_lock(&ltds->ltd_lock);
1734         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
1735                                  ltd_namespace_phase_list) {
1736                 list_del_init(&ltd->ltd_namespace_phase_list);
1737         }
1738         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase2_list,
1739                                  ltd_namespace_phase_list) {
1740                 list_del_init(&ltd->ltd_namespace_phase_list);
1741         }
1742         list_for_each_entry_safe(ltd, next, &lad->lad_mdt_list,
1743                                  ltd_namespace_list) {
1744                 list_del_init(&ltd->ltd_namespace_list);
1745         }
1746         spin_unlock(&ltds->ltd_lock);
1747
1748         CFS_FREE_BITMAP(lad->lad_bitmap);
1749
1750         OBD_FREE_PTR(lad);
1751 }
1752
1753 static int lfsck_namespace_in_notify(const struct lu_env *env,
1754                                      struct lfsck_component *com,
1755                                      struct lfsck_request *lr)
1756 {
1757         struct lfsck_instance           *lfsck = com->lc_lfsck;
1758         struct lfsck_namespace          *ns    = com->lc_file_ram;
1759         struct lfsck_assistant_data     *lad   = com->lc_data;
1760         struct lfsck_tgt_descs          *ltds  = &lfsck->li_mdt_descs;
1761         struct lfsck_tgt_desc           *ltd;
1762         bool                             fail  = false;
1763         ENTRY;
1764
1765         if (lr->lr_event != LE_PHASE1_DONE &&
1766             lr->lr_event != LE_PHASE2_DONE &&
1767             lr->lr_event != LE_PEER_EXIT)
1768                 RETURN(-EINVAL);
1769
1770         CDEBUG(D_LFSCK, "%s: namespace LFSCK handles notify %u from MDT %x, "
1771                "status %d\n", lfsck_lfsck2name(lfsck), lr->lr_event,
1772                lr->lr_index, lr->lr_status);
1773
1774         spin_lock(&ltds->ltd_lock);
1775         ltd = LTD_TGT(ltds, lr->lr_index);
1776         if (ltd == NULL) {
1777                 spin_unlock(&ltds->ltd_lock);
1778
1779                 RETURN(-ENXIO);
1780         }
1781
1782         list_del_init(&ltd->ltd_namespace_phase_list);
1783         switch (lr->lr_event) {
1784         case LE_PHASE1_DONE:
1785                 if (lr->lr_status <= 0) {
1786                         ltd->ltd_namespace_done = 1;
1787                         list_del_init(&ltd->ltd_namespace_list);
1788                         CDEBUG(D_LFSCK, "%s: MDT %x failed/stopped at "
1789                                "phase1 for namespace LFSCK: rc = %d.\n",
1790                                lfsck_lfsck2name(lfsck),
1791                                ltd->ltd_index, lr->lr_status);
1792                         ns->ln_flags |= LF_INCOMPLETE;
1793                         fail = true;
1794                         break;
1795                 }
1796
1797                 if (list_empty(&ltd->ltd_namespace_list))
1798                         list_add_tail(&ltd->ltd_namespace_list,
1799                                       &lad->lad_mdt_list);
1800                 list_add_tail(&ltd->ltd_namespace_phase_list,
1801                               &lad->lad_mdt_phase2_list);
1802                 break;
1803         case LE_PHASE2_DONE:
1804                 ltd->ltd_namespace_done = 1;
1805                 list_del_init(&ltd->ltd_namespace_list);
1806                 break;
1807         case LE_PEER_EXIT:
1808                 fail = true;
1809                 ltd->ltd_namespace_done = 1;
1810                 list_del_init(&ltd->ltd_namespace_list);
1811                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT)) {
1812                         CDEBUG(D_LFSCK,
1813                                "%s: the peer MDT %x exit namespace LFSCK\n",
1814                                lfsck_lfsck2name(lfsck), ltd->ltd_index);
1815                         ns->ln_flags |= LF_INCOMPLETE;
1816                 }
1817                 break;
1818         default:
1819                 break;
1820         }
1821         spin_unlock(&ltds->ltd_lock);
1822
1823         if (fail && lfsck->li_bookmark_ram.lb_param & LPF_FAILOUT) {
1824                 struct lfsck_stop *stop = &lfsck_env_info(env)->lti_stop;
1825
1826                 memset(stop, 0, sizeof(*stop));
1827                 stop->ls_status = lr->lr_status;
1828                 stop->ls_flags = lr->lr_param & ~LPF_BROADCAST;
1829                 lfsck_stop(env, lfsck->li_bottom, stop);
1830         } else if (lfsck_phase2_next_ready(lad)) {
1831                 wake_up_all(&lad->lad_thread.t_ctl_waitq);
1832         }
1833
1834         RETURN(0);
1835 }
1836
1837 static int lfsck_namespace_query(const struct lu_env *env,
1838                                  struct lfsck_component *com)
1839 {
1840         struct lfsck_namespace *ns = com->lc_file_ram;
1841
1842         return ns->ln_status;
1843 }
1844
1845 static struct lfsck_operations lfsck_namespace_ops = {
1846         .lfsck_reset            = lfsck_namespace_reset,
1847         .lfsck_fail             = lfsck_namespace_fail,
1848         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1849         .lfsck_prep             = lfsck_namespace_prep,
1850         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1851         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1852         .lfsck_post             = lfsck_namespace_post,
1853         .lfsck_dump             = lfsck_namespace_dump,
1854         .lfsck_double_scan      = lfsck_namespace_double_scan,
1855         .lfsck_data_release     = lfsck_namespace_data_release,
1856         .lfsck_quit             = lfsck_quit_generic,
1857         .lfsck_in_notify        = lfsck_namespace_in_notify,
1858         .lfsck_query            = lfsck_namespace_query,
1859 };
1860
1861 static int lfsck_namespace_assistant_handler_p1(const struct lu_env *env,
1862                                                 struct lfsck_component *com,
1863                                                 struct lfsck_assistant_req *lar)
1864 {
1865         struct lfsck_thread_info   *info     = lfsck_env_info(env);
1866         struct lu_attr             *la       = &info->lti_la;
1867         struct lfsck_instance      *lfsck    = com->lc_lfsck;
1868         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
1869         struct lfsck_namespace     *ns       = com->lc_file_ram;
1870         struct linkea_data          ldata    = { 0 };
1871         const struct lu_name       *cname;
1872         struct thandle             *handle   = NULL;
1873         struct lfsck_namespace_req *lnr      =
1874                         container_of0(lar, struct lfsck_namespace_req, lnr_lar);
1875         struct dt_object           *dir      = lnr->lnr_obj;
1876         struct dt_object           *obj      = NULL;
1877         const struct lu_fid        *pfid     = lfsck_dto2fid(dir);
1878         struct dt_device           *dev;
1879         struct lustre_handle        lh       = { 0 };
1880         bool                        repaired = false;
1881         bool                        dtlocked = false;
1882         bool                        remove;
1883         bool                        newdata;
1884         bool                        log      = false;
1885         int                         idx;
1886         int                         count    = 0;
1887         int                         rc;
1888         ENTRY;
1889
1890         if (lnr->lnr_attr & LUDA_UPGRADE) {
1891                 ns->ln_flags |= LF_UPGRADE;
1892                 ns->ln_dirent_repaired++;
1893                 repaired = true;
1894         } else if (lnr->lnr_attr & LUDA_REPAIR) {
1895                 ns->ln_flags |= LF_INCONSISTENT;
1896                 ns->ln_dirent_repaired++;
1897                 repaired = true;
1898         }
1899
1900         if (lnr->lnr_name[0] == '.' &&
1901             (lnr->lnr_namelen == 1 ||
1902              (lnr->lnr_namelen == 2 && lnr->lnr_name[1] == '.') ||
1903              fid_seq_is_dot(fid_seq(&lnr->lnr_fid))))
1904                 GOTO(out, rc = 0);
1905
1906         idx = lfsck_find_mdt_idx_by_fid(env, lfsck, &lnr->lnr_fid);
1907         if (idx < 0)
1908                 GOTO(out, rc = idx);
1909
1910         if (idx == lfsck_dev_idx(lfsck->li_bottom)) {
1911                 dev = lfsck->li_next;
1912         } else {
1913                 struct lfsck_tgt_desc *ltd;
1914
1915                 ltd = LTD_TGT(&lfsck->li_mdt_descs, idx);
1916                 if (unlikely(ltd == NULL)) {
1917                         CDEBUG(D_LFSCK, "%s: cannot talk with MDT %x which "
1918                                "did not join the namespace LFSCK\n",
1919                                lfsck_lfsck2name(lfsck), idx);
1920                         ns->ln_flags |= LF_INCOMPLETE;
1921
1922                         GOTO(out, rc = -ENODEV);
1923                 }
1924
1925                 dev = ltd->ltd_tgt;
1926         }
1927
1928         obj = lfsck_object_find_by_dev(env, dev, &lnr->lnr_fid);
1929         if (IS_ERR(obj))
1930                 GOTO(out, rc = PTR_ERR(obj));
1931
1932         if (dt_object_exists(obj) == 0) {
1933                 rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
1934                 if (rc != 0)
1935                         GOTO(out, rc);
1936
1937                 /* XXX: dangling name entry, will handle it in other patch. */
1938                 GOTO(out, rc);
1939         }
1940
1941         cname = lfsck_name_get_const(env, lnr->lnr_name, lnr->lnr_namelen);
1942         if (!(bk->lb_param & LPF_DRYRUN) && repaired) {
1943
1944 again:
1945                 rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
1946                                       MDS_INODELOCK_UPDATE |
1947                                       MDS_INODELOCK_XATTR, LCK_EX);
1948                 if (rc != 0)
1949                         GOTO(out, rc);
1950
1951                 handle = dt_trans_create(env, dev);
1952                 if (IS_ERR(handle))
1953                         GOTO(out, rc = PTR_ERR(handle));
1954
1955                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
1956                 if (rc != 0)
1957                         GOTO(stop, rc);
1958
1959                 rc = dt_trans_start(env, dev, handle);
1960                 if (rc != 0)
1961                         GOTO(stop, rc);
1962
1963                 dt_write_lock(env, obj, 0);
1964                 dtlocked = true;
1965         }
1966
1967         rc = lfsck_namespace_check_exist(env, dir, obj, lnr->lnr_name);
1968         if (rc != 0)
1969                 GOTO(stop, rc);
1970
1971         rc = lfsck_links_read(env, obj, &ldata);
1972         if (rc == 0) {
1973                 count = ldata.ld_leh->leh_reccount;
1974                 rc = linkea_links_find(&ldata, cname, pfid);
1975                 if ((rc == 0) &&
1976                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
1977                         goto record;
1978
1979                 ns->ln_flags |= LF_INCONSISTENT;
1980                 /* For dir, if there are more than one linkea entries, or the
1981                  * linkea entry does not match the name entry, then remove all
1982                  * and add the correct one. */
1983                 if (S_ISDIR(lfsck_object_type(obj))) {
1984                         remove = true;
1985                         newdata = true;
1986                 } else {
1987                         remove = false;
1988                         newdata = false;
1989                 }
1990                 goto nodata;
1991         } else if (unlikely(rc == -EINVAL)) {
1992                 count = 1;
1993                 ns->ln_flags |= LF_INCONSISTENT;
1994                 /* The magic crashed, we are not sure whether there are more
1995                  * corrupt data in the linkea, so remove all linkea entries. */
1996                 remove = true;
1997                 newdata = true;
1998                 goto nodata;
1999         } else if (rc == -ENODATA) {
2000                 count = 1;
2001                 ns->ln_flags |= LF_UPGRADE;
2002                 remove = false;
2003                 newdata = true;
2004
2005 nodata:
2006                 if (bk->lb_param & LPF_DRYRUN) {
2007                         down_write(&com->lc_sem);
2008                         ns->ln_linkea_repaired++;
2009                         up_write(&com->lc_sem);
2010                         repaired = true;
2011                         log = true;
2012                         goto record;
2013                 }
2014
2015                 if (!lustre_handle_is_used(&lh))
2016                         goto again;
2017
2018                 if (remove) {
2019                         LASSERT(newdata);
2020
2021                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
2022                                           BYPASS_CAPA);
2023                         if (rc != 0)
2024                                 GOTO(stop, rc);
2025                 }
2026
2027                 if (newdata) {
2028                         rc = linkea_data_new(&ldata,
2029                                         &lfsck_env_info(env)->lti_linkea_buf);
2030                         if (rc != 0)
2031                                 GOTO(stop, rc);
2032                 }
2033
2034                 rc = linkea_add_buf(&ldata, cname, pfid);
2035                 if (rc != 0)
2036                         GOTO(stop, rc);
2037
2038                 rc = lfsck_links_write(env, obj, &ldata, handle);
2039                 if (rc != 0)
2040                         GOTO(stop, rc);
2041
2042                 count = ldata.ld_leh->leh_reccount;
2043                 down_write(&com->lc_sem);
2044                 ns->ln_linkea_repaired++;
2045                 up_write(&com->lc_sem);
2046                 repaired = true;
2047                 log = true;
2048         } else if (rc == -ENOENT) {
2049                 log = false;
2050                 repaired = false;
2051
2052                 GOTO(stop, rc = 0);
2053         } else {
2054                 GOTO(stop, rc);
2055         }
2056
2057 record:
2058         LASSERT(count > 0);
2059
2060         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
2061         if (rc != 0)
2062                 GOTO(stop, rc);
2063
2064         if ((count == 1) &&
2065             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
2066                 /* Usually, it is for single linked object or dir, do nothing.*/
2067                 GOTO(stop, rc);
2068
2069         /* Following modification will be in another transaction.  */
2070         if (handle != NULL) {
2071                 LASSERT(dt_write_locked(env, obj));
2072
2073                 dt_write_unlock(env, obj);
2074                 dtlocked = false;
2075
2076                 dt_trans_stop(env, dev, handle);
2077                 handle = NULL;
2078
2079                 lfsck_ibits_unlock(&lh, LCK_EX);
2080         }
2081
2082         down_write(&com->lc_sem);
2083         ns->ln_mul_linked_checked++;
2084         up_write(&com->lc_sem);
2085         rc = lfsck_namespace_trace_update(env, com, &lnr->lnr_fid,
2086                                           LNTF_CHECK_LINKEA, true);
2087
2088         GOTO(out, rc);
2089
2090 stop:
2091         if (dtlocked)
2092                 dt_write_unlock(env, obj);
2093
2094         if (handle != NULL && !IS_ERR(handle))
2095                 dt_trans_stop(env, dev, handle);
2096
2097 out:
2098         lfsck_ibits_unlock(&lh, LCK_EX);
2099         down_write(&com->lc_sem);
2100         if (rc < 0) {
2101                 CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant fail to handle "
2102                        "the entry: "DFID", parent "DFID", name %.*s: rc = %d\n",
2103                        lfsck_lfsck2name(lfsck), PFID(&lnr->lnr_fid),
2104                        PFID(lfsck_dto2fid(lnr->lnr_obj)),
2105                        lnr->lnr_namelen, lnr->lnr_name, rc);
2106
2107                 lfsck_namespace_record_failure(env, lfsck, ns);
2108                 if (!(bk->lb_param & LPF_FAILOUT))
2109                         rc = 0;
2110         } else {
2111                 if (log)
2112                         CDEBUG(D_LFSCK, "%s: namespace LFSCK assistant "
2113                                "repaired the entry: "DFID", parent "DFID
2114                                ", name %.*s\n", lfsck_lfsck2name(lfsck),
2115                                PFID(&lnr->lnr_fid),
2116                                PFID(lfsck_dto2fid(lnr->lnr_obj)),
2117                                lnr->lnr_namelen, lnr->lnr_name);
2118
2119                 if (repaired) {
2120                         ns->ln_items_repaired++;
2121                         if (bk->lb_param & LPF_DRYRUN &&
2122                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
2123                                 lfsck_pos_fill(env, lfsck,
2124                                                &ns->ln_pos_first_inconsistent,
2125                                                false);
2126                 }
2127                 rc = 0;
2128         }
2129         up_write(&com->lc_sem);
2130
2131         if (obj != NULL && !IS_ERR(obj))
2132                 lfsck_object_put(env, obj);
2133         return rc;
2134 }
2135
2136 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
2137                                                 struct lfsck_component *com)
2138 {
2139         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2140         struct ptlrpc_thread    *thread = &lfsck->li_thread;
2141         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
2142         struct lfsck_namespace  *ns     = com->lc_file_ram;
2143         struct dt_object        *obj    = com->lc_obj;
2144         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
2145         struct dt_object        *target;
2146         struct dt_it            *di;
2147         struct dt_key           *key;
2148         struct lu_fid            fid;
2149         int                      rc;
2150         __u8                     flags  = 0;
2151         ENTRY;
2152
2153         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
2154                lfsck_lfsck2name(lfsck));
2155
2156         com->lc_new_checked = 0;
2157         com->lc_new_scanned = 0;
2158         com->lc_time_last_checkpoint = cfs_time_current();
2159         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
2160                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
2161
2162         di = iops->init(env, obj, 0, BYPASS_CAPA);
2163         if (IS_ERR(di))
2164                 RETURN(PTR_ERR(di));
2165
2166         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
2167         rc = iops->get(env, di, (const struct dt_key *)&fid);
2168         if (rc < 0)
2169                 GOTO(fini, rc);
2170
2171         /* Skip the start one, which either has been processed or non-exist. */
2172         rc = iops->next(env, di);
2173         if (rc != 0)
2174                 GOTO(put, rc);
2175
2176         do {
2177                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
2178                     cfs_fail_val > 0) {
2179                         struct l_wait_info lwi;
2180
2181                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
2182                                           NULL, NULL);
2183                         l_wait_event(thread->t_ctl_waitq,
2184                                      !thread_is_running(thread),
2185                                      &lwi);
2186
2187                         if (unlikely(!thread_is_running(thread)))
2188                                 GOTO(put, rc = 0);
2189                 }
2190
2191                 key = iops->key(env, di);
2192                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
2193                 if (!fid_is_sane(&fid)) {
2194                         rc = 0;
2195                         goto checkpoint;
2196                 }
2197
2198                 target = lfsck_object_find(env, lfsck, &fid);
2199                 if (IS_ERR(target)) {
2200                         rc = PTR_ERR(target);
2201                         goto checkpoint;
2202                 }
2203
2204                 if (dt_object_exists(target)) {
2205                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
2206                         if (rc == 0) {
2207                                 rc = lfsck_namespace_double_scan_one(env, com,
2208                                                                 target, flags);
2209                                 if (rc == -ENOENT)
2210                                         rc = 0;
2211                         }
2212                 }
2213
2214                 lfsck_object_put(env, target);
2215
2216 checkpoint:
2217                 down_write(&com->lc_sem);
2218                 com->lc_new_checked++;
2219                 com->lc_new_scanned++;
2220                 ns->ln_fid_latest_scanned_phase2 = fid;
2221                 if (rc > 0)
2222                         ns->ln_objs_repaired_phase2++;
2223                 else if (rc < 0)
2224                         ns->ln_objs_failed_phase2++;
2225                 up_write(&com->lc_sem);
2226
2227                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
2228                         GOTO(put, rc);
2229
2230                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
2231                                               cfs_time_current())) &&
2232                     com->lc_new_checked != 0) {
2233                         down_write(&com->lc_sem);
2234                         ns->ln_run_time_phase2 +=
2235                                 cfs_duration_sec(cfs_time_current() +
2236                                 HALF_SEC - com->lc_time_last_checkpoint);
2237                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
2238                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
2239                         com->lc_new_checked = 0;
2240                         rc = lfsck_namespace_store(env, com, false);
2241                         up_write(&com->lc_sem);
2242                         if (rc != 0)
2243                                 GOTO(put, rc);
2244
2245                         com->lc_time_last_checkpoint = cfs_time_current();
2246                         com->lc_time_next_checkpoint =
2247                                 com->lc_time_last_checkpoint +
2248                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
2249                 }
2250
2251                 lfsck_control_speed_by_self(com);
2252                 if (unlikely(!thread_is_running(thread)))
2253                         GOTO(put, rc = 0);
2254
2255                 rc = iops->next(env, di);
2256         } while (rc == 0);
2257
2258         GOTO(put, rc);
2259
2260 put:
2261         iops->put(env, di);
2262
2263 fini:
2264         iops->fini(env, di);
2265
2266         CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan stop: rc = %d\n",
2267                lfsck_lfsck2name(lfsck), rc);
2268
2269         return rc;
2270 }
2271
2272 static void lfsck_namespace_assistant_fill_pos(const struct lu_env *env,
2273                                                struct lfsck_component *com,
2274                                                struct lfsck_position *pos)
2275 {
2276         struct lfsck_assistant_data     *lad = com->lc_data;
2277         struct lfsck_namespace_req      *lnr;
2278
2279         if (list_empty(&lad->lad_req_list))
2280                 return;
2281
2282         lnr = list_entry(lad->lad_req_list.next,
2283                          struct lfsck_namespace_req,
2284                          lnr_lar.lar_list);
2285         pos->lp_oit_cookie = lnr->lnr_oit_cookie;
2286         pos->lp_dir_cookie = lnr->lnr_dir_cookie - 1;
2287         pos->lp_dir_parent = *lfsck_dto2fid(lnr->lnr_obj);
2288 }
2289
2290 static int lfsck_namespace_double_scan_result(const struct lu_env *env,
2291                                               struct lfsck_component *com,
2292                                               int rc)
2293 {
2294         struct lfsck_instance   *lfsck  = com->lc_lfsck;
2295         struct lfsck_namespace  *ns     = com->lc_file_ram;
2296
2297         down_write(&com->lc_sem);
2298         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
2299                                 HALF_SEC - lfsck->li_time_last_checkpoint);
2300         ns->ln_time_last_checkpoint = cfs_time_current_sec();
2301         ns->ln_objs_checked_phase2 += com->lc_new_checked;
2302         com->lc_new_checked = 0;
2303
2304         if (rc > 0) {
2305                 if (ns->ln_flags & LF_INCOMPLETE)
2306                         ns->ln_status = LS_PARTIAL;
2307                 else
2308                         ns->ln_status = LS_COMPLETED;
2309                 if (!(lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN))
2310                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
2311                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
2312                 ns->ln_success_count++;
2313         } else if (rc == 0) {
2314                 ns->ln_status = lfsck->li_status;
2315                 if (ns->ln_status == 0)
2316                         ns->ln_status = LS_STOPPED;
2317         } else {
2318                 ns->ln_status = LS_FAILED;
2319         }
2320
2321         rc = lfsck_namespace_store(env, com, false);
2322         up_write(&com->lc_sem);
2323
2324         return rc;
2325 }
2326
2327 static void lfsck_namespace_assistant_sync_failures(const struct lu_env *env,
2328                                                     struct lfsck_component *com,
2329                                                     struct lfsck_request *lr)
2330 {
2331         /* XXX: TBD */
2332 }
2333
2334 struct lfsck_assistant_operations lfsck_namespace_assistant_ops = {
2335         .la_handler_p1          = lfsck_namespace_assistant_handler_p1,
2336         .la_handler_p2          = lfsck_namespace_assistant_handler_p2,
2337         .la_fill_pos            = lfsck_namespace_assistant_fill_pos,
2338         .la_double_scan_result  = lfsck_namespace_double_scan_result,
2339         .la_req_fini            = lfsck_namespace_assistant_req_fini,
2340         .la_sync_failures       = lfsck_namespace_assistant_sync_failures,
2341 };
2342
2343 /**
2344  * Verify the specified linkEA entry for the given directory object.
2345  * If the object has no such linkEA entry or it has more other linkEA
2346  * entries, then re-generate the linkEA with the given information.
2347  *
2348  * \param[in] env       pointer to the thread context
2349  * \param[in] dev       pointer to the dt_device
2350  * \param[in] obj       pointer to the dt_object to be handled
2351  * \param[in] cname     the name for the child in the parent directory
2352  * \param[in] pfid      the parent directory's FID for the linkEA
2353  *
2354  * \retval              0 for success
2355  * \retval              negative error number on failure
2356  */
2357 int lfsck_verify_linkea(const struct lu_env *env, struct dt_device *dev,
2358                         struct dt_object *obj, const struct lu_name *cname,
2359                         const struct lu_fid *pfid)
2360 {
2361         struct linkea_data       ldata  = { 0 };
2362         struct lu_buf            linkea_buf;
2363         struct thandle          *th;
2364         int                      rc;
2365         int                      fl     = LU_XATTR_CREATE;
2366         bool                     dirty  = false;
2367         ENTRY;
2368
2369         LASSERT(S_ISDIR(lfsck_object_type(obj)));
2370
2371         rc = lfsck_links_read(env, obj, &ldata);
2372         if (rc == -ENODATA) {
2373                 dirty = true;
2374         } else if (rc == 0) {
2375                 fl = LU_XATTR_REPLACE;
2376                 if (ldata.ld_leh->leh_reccount != 1) {
2377                         dirty = true;
2378                 } else {
2379                         rc = linkea_links_find(&ldata, cname, pfid);
2380                         if (rc != 0)
2381                                 dirty = true;
2382                 }
2383         }
2384
2385         if (!dirty)
2386                 RETURN(rc);
2387
2388         rc = linkea_data_new(&ldata, &lfsck_env_info(env)->lti_linkea_buf);
2389         if (rc != 0)
2390                 RETURN(rc);
2391
2392         rc = linkea_add_buf(&ldata, cname, pfid);
2393         if (rc != 0)
2394                 RETURN(rc);
2395
2396         lfsck_buf_init(&linkea_buf, ldata.ld_buf->lb_buf,
2397                        ldata.ld_leh->leh_len);
2398         th = dt_trans_create(env, dev);
2399         if (IS_ERR(th))
2400                 RETURN(PTR_ERR(th));
2401
2402         rc = dt_declare_xattr_set(env, obj, &linkea_buf,
2403                                   XATTR_NAME_LINK, fl, th);
2404         if (rc != 0)
2405                 GOTO(stop, rc);
2406
2407         rc = dt_trans_start_local(env, dev, th);
2408         if (rc != 0)
2409                 GOTO(stop, rc);
2410
2411         dt_write_lock(env, obj, 0);
2412         rc = dt_xattr_set(env, obj, &linkea_buf,
2413                           XATTR_NAME_LINK, fl, th, BYPASS_CAPA);
2414         dt_write_unlock(env, obj);
2415
2416         GOTO(stop, rc);
2417
2418 stop:
2419         dt_trans_stop(env, dev, th);
2420         return rc;
2421 }
2422
2423 /**
2424  * Get the name and parent directory's FID from the first linkEA entry.
2425  *
2426  * \param[in] env       pointer to the thread context
2427  * \param[in] obj       pointer to the object which get linkEA from
2428  * \param[out] name     pointer to the buffer to hold the name
2429  *                      in the first linkEA entry
2430  * \param[out] pfid     pointer to the buffer to hold the parent
2431  *                      directory's FID in the first linkEA entry
2432  *
2433  * \retval              0 for success
2434  * \retval              negative error number on failure
2435  */
2436 int lfsck_links_get_first(const struct lu_env *env, struct dt_object *obj,
2437                           char *name, struct lu_fid *pfid)
2438 {
2439         struct lu_name           *cname = &lfsck_env_info(env)->lti_name;
2440         struct linkea_data        ldata = { 0 };
2441         int                       rc;
2442
2443         rc = lfsck_links_read(env, obj, &ldata);
2444         if (rc != 0)
2445                 return rc;
2446
2447         linkea_first_entry(&ldata);
2448         if (ldata.ld_lee == NULL)
2449                 return -ENODATA;
2450
2451         linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname, pfid);
2452         /* To guarantee the 'name' is terminated with '0'. */
2453         memcpy(name, cname->ln_name, cname->ln_namelen);
2454         name[cname->ln_namelen] = 0;
2455
2456         return 0;
2457 }
2458
2459 /**
2460  * Remove the name entry from the parent directory.
2461  *
2462  * No need to care about the object referenced by the name entry,
2463  * either the name entry is invalid or redundant, or the referenced
2464  * object has been processed has been or will be handled by others.
2465  *
2466  * \param[in] env       pointer to the thread context
2467  * \param[in] lfsck     pointer to the lfsck instance
2468  * \param[in] parent    pointer to the lost+found object
2469  * \param[in] name      the name for the name entry to be removed
2470  * \param[in] type      the type for the name entry to be removed
2471  *
2472  * \retval              0 for success
2473  * \retval              negative error number on failure
2474  */
2475 int lfsck_remove_name_entry(const struct lu_env *env,
2476                             struct lfsck_instance *lfsck,
2477                             struct dt_object *parent,
2478                             const char *name, __u32 type)
2479 {
2480         struct dt_device        *dev    = lfsck->li_next;
2481         struct thandle          *th;
2482         struct lustre_handle     lh     = { 0 };
2483         int                      rc;
2484         ENTRY;
2485
2486         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2487                               MDS_INODELOCK_UPDATE, LCK_EX);
2488         if (rc != 0)
2489                 RETURN(rc);
2490
2491         th = dt_trans_create(env, dev);
2492         if (IS_ERR(th))
2493                 GOTO(unlock, rc = PTR_ERR(th));
2494
2495         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2496         if (rc != 0)
2497                 GOTO(stop, rc);
2498
2499         if (S_ISDIR(type)) {
2500                 rc = dt_declare_ref_del(env, parent, th);
2501                 if (rc != 0)
2502                         GOTO(stop, rc);
2503         }
2504
2505         rc = dt_trans_start(env, dev, th);
2506         if (rc != 0)
2507                 GOTO(stop, rc);
2508
2509         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2510                        BYPASS_CAPA);
2511         if (rc != 0)
2512                 GOTO(stop, rc);
2513
2514         if (S_ISDIR(type)) {
2515                 dt_write_lock(env, parent, 0);
2516                 rc = dt_ref_del(env, parent, th);
2517                 dt_write_unlock(env, parent);
2518         }
2519
2520         GOTO(stop, rc);
2521
2522 stop:
2523         dt_trans_stop(env, dev, th);
2524
2525 unlock:
2526         lfsck_ibits_unlock(&lh, LCK_EX);
2527
2528         CDEBUG(D_LFSCK, "%s: remove name entry "DFID"/%s "
2529                "with type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
2530                PFID(lfsck_dto2fid(parent)), name, type, rc);
2531
2532         return rc;
2533 }
2534
2535 /**
2536  * Update the object's name entry with the given FID.
2537  *
2538  * \param[in] env       pointer to the thread context
2539  * \param[in] lfsck     pointer to the lfsck instance
2540  * \param[in] parent    pointer to the parent directory that holds
2541  *                      the name entry
2542  * \param[in] name      the name for the entry to be updated
2543  * \param[in] pfid      the new PFID for the name entry
2544  * \param[in] type      the type for the name entry to be updated
2545  *
2546  * \retval              0 for success
2547  * \retval              negative error number on failure
2548  */
2549 int lfsck_update_name_entry(const struct lu_env *env,
2550                             struct lfsck_instance *lfsck,
2551                             struct dt_object *parent, const char *name,
2552                             const struct lu_fid *pfid, __u32 type)
2553 {
2554         struct dt_insert_rec    *rec    = &lfsck_env_info(env)->lti_dt_rec;
2555         struct dt_device        *dev    = lfsck->li_next;
2556         struct lustre_handle     lh     = { 0 };
2557         struct thandle          *th;
2558         int                      rc;
2559         bool                     exists = true;
2560         ENTRY;
2561
2562         rc = lfsck_ibits_lock(env, lfsck, parent, &lh,
2563                               MDS_INODELOCK_UPDATE, LCK_EX);
2564         if (rc != 0)
2565                 RETURN(rc);
2566
2567         th = dt_trans_create(env, dev);
2568         if (IS_ERR(th))
2569                 GOTO(unlock, rc = PTR_ERR(th));
2570
2571         rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th);
2572         if (rc != 0)
2573                 GOTO(stop, rc);
2574
2575         rec->rec_type = type;
2576         rec->rec_fid = pfid;
2577         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
2578                                (const struct dt_key *)name, th);
2579         if (rc != 0)
2580                 GOTO(stop, rc);
2581
2582         rc = dt_declare_ref_add(env, parent, th);
2583         if (rc != 0)
2584                 GOTO(stop, rc);
2585
2586         rc = dt_trans_start(env, dev, th);
2587         if (rc != 0)
2588                 GOTO(stop, rc);
2589
2590         rc = dt_delete(env, parent, (const struct dt_key *)name, th,
2591                        BYPASS_CAPA);
2592         if (rc == -ENOENT) {
2593                 exists = false;
2594                 rc = 0;
2595         }
2596
2597         if (rc != 0)
2598                 GOTO(stop, rc);
2599
2600         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
2601                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
2602         if (rc == 0 && S_ISDIR(type) && !exists) {
2603                 dt_write_lock(env, parent, 0);
2604                 rc = dt_ref_add(env, parent, th);
2605                 dt_write_unlock(env, parent);
2606         }
2607
2608         GOTO(stop, rc);
2609
2610 stop:
2611         dt_trans_stop(env, dev, th);
2612
2613 unlock:
2614         lfsck_ibits_unlock(&lh, LCK_EX);
2615
2616         CDEBUG(D_LFSCK, "%s: update name entry "DFID"/%s with the FID "DFID
2617                " and the type %o: rc = %d\n", lfsck_lfsck2name(lfsck),
2618                PFID(lfsck_dto2fid(parent)), name, PFID(pfid), type, rc);
2619
2620         return rc;
2621 }
2622
2623 int lfsck_namespace_setup(const struct lu_env *env,
2624                           struct lfsck_instance *lfsck)
2625 {
2626         struct lfsck_component  *com;
2627         struct lfsck_namespace  *ns;
2628         struct dt_object        *root = NULL;
2629         struct dt_object        *obj;
2630         int                      rc;
2631         ENTRY;
2632
2633         LASSERT(lfsck->li_master);
2634
2635         OBD_ALLOC_PTR(com);
2636         if (com == NULL)
2637                 RETURN(-ENOMEM);
2638
2639         INIT_LIST_HEAD(&com->lc_link);
2640         INIT_LIST_HEAD(&com->lc_link_dir);
2641         init_rwsem(&com->lc_sem);
2642         atomic_set(&com->lc_ref, 1);
2643         com->lc_lfsck = lfsck;
2644         com->lc_type = LFSCK_TYPE_NAMESPACE;
2645         com->lc_ops = &lfsck_namespace_ops;
2646         com->lc_data = lfsck_assistant_data_init(
2647                         &lfsck_namespace_assistant_ops,
2648                         "lfsck_namespace");
2649         if (com->lc_data == NULL)
2650                 GOTO(out, rc = -ENOMEM);
2651
2652         com->lc_file_size = sizeof(struct lfsck_namespace);
2653         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
2654         if (com->lc_file_ram == NULL)
2655                 GOTO(out, rc = -ENOMEM);
2656
2657         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
2658         if (com->lc_file_disk == NULL)
2659                 GOTO(out, rc = -ENOMEM);
2660
2661         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
2662         if (IS_ERR(root))
2663                 GOTO(out, rc = PTR_ERR(root));
2664
2665         if (unlikely(!dt_try_as_dir(env, root)))
2666                 GOTO(out, rc = -ENOTDIR);
2667
2668         obj = local_index_find_or_create(env, lfsck->li_los, root,
2669                                          lfsck_namespace_name,
2670                                          S_IFREG | S_IRUGO | S_IWUSR,
2671                                          &dt_lfsck_features);
2672         if (IS_ERR(obj))
2673                 GOTO(out, rc = PTR_ERR(obj));
2674
2675         com->lc_obj = obj;
2676         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
2677         if (rc != 0)
2678                 GOTO(out, rc);
2679
2680         rc = lfsck_namespace_load(env, com);
2681         if (rc > 0)
2682                 rc = lfsck_namespace_reset(env, com, true);
2683         else if (rc == -ENODATA)
2684                 rc = lfsck_namespace_init(env, com);
2685         if (rc != 0)
2686                 GOTO(out, rc);
2687
2688         ns = com->lc_file_ram;
2689         switch (ns->ln_status) {
2690         case LS_INIT:
2691         case LS_COMPLETED:
2692         case LS_FAILED:
2693         case LS_STOPPED:
2694                 spin_lock(&lfsck->li_lock);
2695                 list_add_tail(&com->lc_link, &lfsck->li_list_idle);
2696                 spin_unlock(&lfsck->li_lock);
2697                 break;
2698         default:
2699                 CERROR("%s: unknown lfsck_namespace status %d\n",
2700                        lfsck_lfsck2name(lfsck), ns->ln_status);
2701                 /* fall through */
2702         case LS_SCANNING_PHASE1:
2703         case LS_SCANNING_PHASE2:
2704                 /* No need to store the status to disk right now.
2705                  * If the system crashed before the status stored,
2706                  * it will be loaded back when next time. */
2707                 ns->ln_status = LS_CRASHED;
2708                 /* fall through */
2709         case LS_PAUSED:
2710         case LS_CRASHED:
2711                 spin_lock(&lfsck->li_lock);
2712                 list_add_tail(&com->lc_link, &lfsck->li_list_scan);
2713                 list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
2714                 spin_unlock(&lfsck->li_lock);
2715                 break;
2716         }
2717
2718         GOTO(out, rc = 0);
2719
2720 out:
2721         if (root != NULL && !IS_ERR(root))
2722                 lu_object_put(env, &root->do_lu);
2723         if (rc != 0) {
2724                 lfsck_component_cleanup(env, com);
2725                 CERROR("%s: fail to init namespace LFSCK component: rc = %d\n",
2726                        lfsck_lfsck2name(lfsck), rc);
2727         }
2728         return rc;
2729 }