Whamcloud - gitweb
92e77cba493b797731102755dfcb742746947333
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_linkea.h>
38 #include <lustre_fid.h>
39 #include <lustre_lib.h>
40 #include <lustre_net.h>
41 #include <lustre/lustre_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
46
47 static const char lfsck_namespace_name[] = "lfsck_namespace";
48
49 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
50                                       struct lfsck_namespace *src)
51 {
52         dst->ln_magic = le32_to_cpu(src->ln_magic);
53         dst->ln_status = le32_to_cpu(src->ln_status);
54         dst->ln_flags = le32_to_cpu(src->ln_flags);
55         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
56         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
57         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
58         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
59         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
60         dst->ln_time_last_checkpoint =
61                                 le64_to_cpu(src->ln_time_last_checkpoint);
62         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
63                                  &src->ln_pos_latest_start);
64         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
65                                  &src->ln_pos_last_checkpoint);
66         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
67                                  &src->ln_pos_first_inconsistent);
68         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
69         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
70         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
71         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
72         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
73         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
74         dst->ln_objs_repaired_phase2 =
75                                 le64_to_cpu(src->ln_objs_repaired_phase2);
76         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
77         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
78         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
79         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
80                       &src->ln_fid_latest_scanned_phase2);
81         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
82         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
83 }
84
85 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
86                                       struct lfsck_namespace *src)
87 {
88         dst->ln_magic = cpu_to_le32(src->ln_magic);
89         dst->ln_status = cpu_to_le32(src->ln_status);
90         dst->ln_flags = cpu_to_le32(src->ln_flags);
91         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
92         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
93         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
94         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
95         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
96         dst->ln_time_last_checkpoint =
97                                 cpu_to_le64(src->ln_time_last_checkpoint);
98         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
99                                  &src->ln_pos_latest_start);
100         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
101                                  &src->ln_pos_last_checkpoint);
102         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
103                                  &src->ln_pos_first_inconsistent);
104         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
105         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
106         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
107         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
108         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
109         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
110         dst->ln_objs_repaired_phase2 =
111                                 cpu_to_le64(src->ln_objs_repaired_phase2);
112         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
113         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
114         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
115         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
116                       &src->ln_fid_latest_scanned_phase2);
117         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
118         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
119 }
120
121 /**
122  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
123  * \retval 0: succeed.
124  * \retval -ve: failed cases.
125  */
126 static int lfsck_namespace_load(const struct lu_env *env,
127                                 struct lfsck_component *com)
128 {
129         int len = com->lc_file_size;
130         int rc;
131
132         rc = dt_xattr_get(env, com->lc_obj,
133                           lfsck_buf_get(env, com->lc_file_disk, len),
134                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
135         if (rc == len) {
136                 struct lfsck_namespace *ns = com->lc_file_ram;
137
138                 lfsck_namespace_le_to_cpu(ns,
139                                 (struct lfsck_namespace *)com->lc_file_disk);
140                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
141                         CWARN("%s: invalid lfsck_namespace magic %#x != %#x\n",
142                               lfsck_lfsck2name(com->lc_lfsck), ns->ln_magic,
143                               LFSCK_NAMESPACE_MAGIC);
144                         rc = 1;
145                 } else {
146                         rc = 0;
147                 }
148         } else if (rc != -ENODATA) {
149                 CERROR("%s: fail to load lfsck_namespace: expected = %d, "
150                        "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc);
151                 if (rc >= 0)
152                         rc = 1;
153         }
154         return rc;
155 }
156
157 static int lfsck_namespace_store(const struct lu_env *env,
158                                  struct lfsck_component *com, bool init)
159 {
160         struct dt_object        *obj    = com->lc_obj;
161         struct lfsck_instance   *lfsck  = com->lc_lfsck;
162         struct thandle          *handle;
163         int                      len    = com->lc_file_size;
164         int                      rc;
165         ENTRY;
166
167         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
168                                   (struct lfsck_namespace *)com->lc_file_ram);
169         handle = dt_trans_create(env, lfsck->li_bottom);
170         if (IS_ERR(handle)) {
171                 rc = PTR_ERR(handle);
172                 CERROR("%s: fail to create trans for storing lfsck_namespace: "
173                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
174                 RETURN(rc);
175         }
176
177         rc = dt_declare_xattr_set(env, obj,
178                                   lfsck_buf_get(env, com->lc_file_disk, len),
179                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
180         if (rc != 0) {
181                 CERROR("%s: fail to declare trans for storing lfsck_namespace: "
182                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
183                 GOTO(out, rc);
184         }
185
186         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
187         if (rc != 0) {
188                 CERROR("%s: fail to start trans for storing lfsck_namespace: "
189                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
190                 GOTO(out, rc);
191         }
192
193         rc = dt_xattr_set(env, obj,
194                           lfsck_buf_get(env, com->lc_file_disk, len),
195                           XATTR_NAME_LFSCK_NAMESPACE,
196                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
197                           handle, BYPASS_CAPA);
198         if (rc != 0)
199                 CERROR("%s: fail to store lfsck_namespace: len = %d, "
200                        "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
201
202         GOTO(out, rc);
203
204 out:
205         dt_trans_stop(env, lfsck->li_bottom, handle);
206         return rc;
207 }
208
209 static int lfsck_namespace_init(const struct lu_env *env,
210                                 struct lfsck_component *com)
211 {
212         struct lfsck_namespace *ns = com->lc_file_ram;
213         int rc;
214
215         memset(ns, 0, sizeof(*ns));
216         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
217         ns->ln_status = LS_INIT;
218         down_write(&com->lc_sem);
219         rc = lfsck_namespace_store(env, com, true);
220         up_write(&com->lc_sem);
221         return rc;
222 }
223
224 static int lfsck_namespace_lookup(const struct lu_env *env,
225                                   struct lfsck_component *com,
226                                   const struct lu_fid *fid, __u8 *flags)
227 {
228         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
229         int            rc;
230
231         fid_cpu_to_be(key, fid);
232         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
233                        (const struct dt_key *)key, BYPASS_CAPA);
234         return rc;
235 }
236
237 static int lfsck_namespace_delete(const struct lu_env *env,
238                                   struct lfsck_component *com,
239                                   const struct lu_fid *fid)
240 {
241         struct lfsck_instance   *lfsck  = com->lc_lfsck;
242         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
243         struct thandle          *handle;
244         struct dt_object        *obj    = com->lc_obj;
245         int                      rc;
246         ENTRY;
247
248         handle = dt_trans_create(env, lfsck->li_bottom);
249         if (IS_ERR(handle))
250                 RETURN(PTR_ERR(handle));
251
252         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         fid_cpu_to_be(key, fid);
261         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
262                        BYPASS_CAPA);
263
264         GOTO(out, rc);
265
266 out:
267         dt_trans_stop(env, lfsck->li_bottom, handle);
268         return rc;
269 }
270
271 static int lfsck_namespace_update(const struct lu_env *env,
272                                   struct lfsck_component *com,
273                                   const struct lu_fid *fid,
274                                   __u8 flags, bool force)
275 {
276         struct lfsck_instance   *lfsck  = com->lc_lfsck;
277         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
278         struct thandle          *handle;
279         struct dt_object        *obj    = com->lc_obj;
280         int                      rc;
281         bool                     exist  = false;
282         __u8                     tf;
283         ENTRY;
284
285         rc = lfsck_namespace_lookup(env, com, fid, &tf);
286         if (rc != 0 && rc != -ENOENT)
287                 RETURN(rc);
288
289         if (rc == 0) {
290                 if (!force || flags == tf)
291                         RETURN(0);
292
293                 exist = true;
294                 handle = dt_trans_create(env, lfsck->li_bottom);
295                 if (IS_ERR(handle))
296                         RETURN(PTR_ERR(handle));
297
298                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
299                                        handle);
300                 if (rc != 0)
301                         GOTO(out, rc);
302         } else {
303                 handle = dt_trans_create(env, lfsck->li_bottom);
304                 if (IS_ERR(handle))
305                         RETURN(PTR_ERR(handle));
306         }
307
308         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
309                                (const struct dt_key *)fid, handle);
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
314         if (rc != 0)
315                 GOTO(out, rc);
316
317         fid_cpu_to_be(key, fid);
318         if (exist) {
319                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
320                                BYPASS_CAPA);
321                 if (rc != 0) {
322                         CERROR("%s: fail to insert "DFID": rc = %d\n",
323                                lfsck_lfsck2name(com->lc_lfsck), PFID(fid), rc);
324                         GOTO(out, rc);
325                 }
326         }
327
328         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
329                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
330
331         GOTO(out, rc);
332
333 out:
334         dt_trans_stop(env, lfsck->li_bottom, handle);
335         return rc;
336 }
337
338 static int lfsck_namespace_check_exist(const struct lu_env *env,
339                                        struct lfsck_instance *lfsck,
340                                        struct dt_object *obj, const char *name)
341 {
342         struct dt_object *dir = lfsck->li_obj_dir;
343         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
344         int               rc;
345         ENTRY;
346
347         if (unlikely(lfsck_is_dead_obj(obj)))
348                 RETURN(LFSCK_NAMEENTRY_DEAD);
349
350         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
351                        (const struct dt_key *)name, BYPASS_CAPA);
352         if (rc == -ENOENT)
353                 RETURN(LFSCK_NAMEENTRY_REMOVED);
354
355         if (rc < 0)
356                 RETURN(rc);
357
358         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
359                 RETURN(LFSCK_NAMEENTRY_RECREATED);
360
361         RETURN(0);
362 }
363
364 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
365                                             struct dt_object *obj,
366                                             struct thandle *handle)
367 {
368         int rc;
369
370         /* For destroying all invalid linkEA entries. */
371         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
372         if (rc != 0)
373                 return rc;
374
375         /* For insert new linkEA entry. */
376         rc = dt_declare_xattr_set(env, obj,
377                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
378                         XATTR_NAME_LINK, 0, handle);
379         return rc;
380 }
381
382 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
383                             struct linkea_data *ldata)
384 {
385         int rc;
386
387         ldata->ld_buf =
388                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
389                                        PAGE_CACHE_SIZE);
390         if (ldata->ld_buf->lb_buf == NULL)
391                 return -ENOMEM;
392
393         if (!dt_object_exists(obj))
394                 return -ENODATA;
395
396         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
397         if (rc == -ERANGE) {
398                 /* Buf was too small, figure out what we need. */
399                 lu_buf_free(ldata->ld_buf);
400                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
401                                   BYPASS_CAPA);
402                 if (rc < 0)
403                         return rc;
404
405                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
406                 if (ldata->ld_buf->lb_buf == NULL)
407                         return -ENOMEM;
408
409                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
410                                   BYPASS_CAPA);
411         }
412         if (rc < 0)
413                 return rc;
414
415         linkea_init(ldata);
416
417         return 0;
418 }
419
420 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
421                              struct linkea_data *ldata, struct thandle *handle)
422 {
423         const struct lu_buf *buf = lfsck_buf_get_const(env,
424                                                        ldata->ld_buf->lb_buf,
425                                                        ldata->ld_leh->leh_len);
426
427         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
428                             BYPASS_CAPA);
429 }
430
431 /**
432  * \retval ve: removed entries
433  */
434 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
435                                      struct linkea_data *ldata,
436                                      struct lu_name *cname,
437                                      struct lu_fid *pfid)
438 {
439         struct link_ea_entry    *oldlee;
440         int                      oldlen;
441         int                      removed = 0;
442
443         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
444         oldlee = ldata->ld_lee;
445         oldlen = ldata->ld_reclen;
446         linkea_next_entry(ldata);
447         while (ldata->ld_lee != NULL) {
448                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
449                                    ldata->ld_lee->lee_reclen[1];
450                 if (unlikely(ldata->ld_reclen == oldlen &&
451                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
452                         linkea_del_buf(ldata, cname);
453                         removed++;
454                 } else {
455                         linkea_next_entry(ldata);
456                 }
457         }
458         ldata->ld_lee = oldlee;
459         ldata->ld_reclen = oldlen;
460         return removed;
461 }
462
463 /**
464  * \retval +ve  repaired
465  * \retval 0    no need to repair
466  * \retval -ve  error cases
467  */
468 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
469                                            struct lfsck_component *com,
470                                            struct dt_object *child, __u8 flags)
471 {
472         struct lfsck_thread_info *info    = lfsck_env_info(env);
473         struct lu_attr           *la      = &info->lti_la;
474         struct lu_name           *cname   = &info->lti_name;
475         struct lu_fid            *pfid    = &info->lti_fid;
476         struct lu_fid            *cfid    = &info->lti_fid2;
477         struct lfsck_instance   *lfsck    = com->lc_lfsck;
478         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
479         struct lfsck_namespace  *ns       = com->lc_file_ram;
480         struct linkea_data       ldata    = { 0 };
481         struct thandle          *handle   = NULL;
482         bool                     locked   = false;
483         bool                     update   = false;
484         int                      rc;
485         ENTRY;
486
487         if (com->lc_journal) {
488
489 again:
490                 LASSERT(!locked);
491
492                 update = false;
493                 com->lc_journal = 1;
494                 handle = dt_trans_create(env, lfsck->li_next);
495                 if (IS_ERR(handle))
496                         RETURN(rc = PTR_ERR(handle));
497
498                 rc = dt_declare_xattr_set(env, child,
499                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
500                         XATTR_NAME_LINK, 0, handle);
501                 if (rc != 0)
502                         GOTO(stop, rc);
503
504                 rc = dt_trans_start(env, lfsck->li_next, handle);
505                 if (rc != 0)
506                         GOTO(stop, rc);
507
508                 dt_write_lock(env, child, MOR_TGT_CHILD);
509                 locked = true;
510         }
511
512         if (unlikely(lfsck_is_dead_obj(child)))
513                 GOTO(stop, rc = 0);
514
515         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
516         if (rc == 0)
517                 rc = lfsck_links_read(env, child, &ldata);
518         if (rc != 0) {
519                 if ((bk->lb_param & LPF_DRYRUN) &&
520                     (rc == -EINVAL || rc == -ENODATA))
521                         rc = 1;
522
523                 GOTO(stop, rc);
524         }
525
526         linkea_first_entry(&ldata);
527         while (ldata.ld_lee != NULL) {
528                 struct dt_object *parent = NULL;
529
530                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
531                 if (rc > 0)
532                         update = true;
533
534                 if (!fid_is_sane(pfid))
535                         goto shrink;
536
537                 parent = lfsck_object_find(env, lfsck, pfid);
538                 if (parent == NULL)
539                         goto shrink;
540                 else if (IS_ERR(parent))
541                         GOTO(stop, rc = PTR_ERR(parent));
542
543                 if (!dt_object_exists(parent))
544                         goto shrink;
545
546                 /* XXX: Currently, skip remote object, the consistency for
547                  *      remote object will be processed in LFSCK phase III. */
548                 if (dt_object_remote(parent)) {
549                         lfsck_object_put(env, parent);
550                         linkea_next_entry(&ldata);
551                         continue;
552                 }
553
554                 if (unlikely(!dt_try_as_dir(env, parent)))
555                         goto shrink;
556
557                 /* To guarantee the 'name' is terminated with '0'. */
558                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
559                 info->lti_key[cname->ln_namelen] = 0;
560                 cname->ln_name = info->lti_key;
561                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
562                                (const struct dt_key *)cname->ln_name,
563                                BYPASS_CAPA);
564                 if (rc != 0 && rc != -ENOENT) {
565                         lfsck_object_put(env, parent);
566                         GOTO(stop, rc);
567                 }
568
569                 if (rc == 0) {
570                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
571                                 lfsck_object_put(env, parent);
572                                 linkea_next_entry(&ldata);
573                                 continue;
574                         }
575
576                         goto shrink;
577                 }
578
579                 /* If there is no name entry in the parent dir and the object
580                  * link count is less than the linkea entries count, then the
581                  * linkea entry should be removed. */
582                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
583                         goto shrink;
584
585                 /* XXX: For the case of there is a linkea entry, but without
586                  *      name entry pointing to the object and its hard links
587                  *      count is not less than the object name entries count,
588                  *      then seems we should add the 'missed' name entry back
589                  *      to namespace, but before LFSCK phase III finished, we
590                  *      do not know whether the object has some inconsistency
591                  *      on other MDTs. So now, do NOT add the name entry back
592                  *      to the namespace, but keep the linkEA entry. LU-2914 */
593                 lfsck_object_put(env, parent);
594                 linkea_next_entry(&ldata);
595                 continue;
596
597 shrink:
598                 if (parent != NULL)
599                         lfsck_object_put(env, parent);
600                 if (bk->lb_param & LPF_DRYRUN)
601                         RETURN(1);
602
603                 CDEBUG(D_LFSCK, "Remove linkEA: "DFID"[%.*s], "DFID"\n",
604                        PFID(lfsck_dto2fid(child)), cname->ln_namelen, cname->ln_name,
605                        PFID(pfid));
606                 linkea_del_buf(&ldata, cname);
607                 update = true;
608         }
609
610         if (update) {
611                 if (!com->lc_journal) {
612                         com->lc_journal = 1;
613                         goto again;
614                 }
615
616                 rc = lfsck_links_write(env, child, &ldata, handle);
617         }
618
619         GOTO(stop, rc);
620
621 stop:
622         if (locked) {
623         /* XXX: For the case linkea entries count does not match the object hard
624          *      links count, we cannot update the later one simply. Before LFSCK
625          *      phase III finished, we cannot know whether there are some remote
626          *      name entries to be repaired or not. LU-2914 */
627                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
628                     ldata.ld_leh != NULL &&
629                     ldata.ld_leh->leh_reccount != la->la_nlink)
630                         CWARN("%s: the object "DFID" linkEA entry count %u "
631                               "may not match its hardlink count %u\n",
632                               lfsck_lfsck2name(lfsck), PFID(cfid),
633                               ldata.ld_leh->leh_reccount, la->la_nlink);
634
635                 dt_write_unlock(env, child);
636         }
637
638         if (handle != NULL)
639                 dt_trans_stop(env, lfsck->li_next, handle);
640
641         if (rc == 0 && update) {
642                 ns->ln_objs_nlink_repaired++;
643                 rc = 1;
644         }
645
646         return rc;
647 }
648
649 /* namespace APIs */
650
651 static int lfsck_namespace_reset(const struct lu_env *env,
652                                  struct lfsck_component *com, bool init)
653 {
654         struct lfsck_instance   *lfsck = com->lc_lfsck;
655         struct lfsck_namespace  *ns    = com->lc_file_ram;
656         struct dt_object        *root;
657         struct dt_object        *dto;
658         int                      rc;
659         ENTRY;
660
661         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
662         if (IS_ERR(root))
663                 RETURN(PTR_ERR(root));
664
665         if (unlikely(!dt_try_as_dir(env, root))) {
666                 lu_object_put(env, &root->do_lu);
667                 RETURN(-ENOTDIR);
668         }
669
670         down_write(&com->lc_sem);
671         if (init) {
672                 memset(ns, 0, sizeof(*ns));
673         } else {
674                 __u32 count = ns->ln_success_count;
675                 __u64 last_time = ns->ln_time_last_complete;
676
677                 memset(ns, 0, sizeof(*ns));
678                 ns->ln_success_count = count;
679                 ns->ln_time_last_complete = last_time;
680         }
681         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
682         ns->ln_status = LS_INIT;
683
684         rc = local_object_unlink(env, lfsck->li_bottom, root,
685                                  lfsck_namespace_name);
686         if (rc != 0)
687                 GOTO(out, rc);
688
689         lfsck_object_put(env, com->lc_obj);
690         com->lc_obj = NULL;
691         dto = local_index_find_or_create(env, lfsck->li_los, root,
692                                          lfsck_namespace_name,
693                                          S_IFREG | S_IRUGO | S_IWUSR,
694                                          &dt_lfsck_features);
695         if (IS_ERR(dto))
696                 GOTO(out, rc = PTR_ERR(dto));
697
698         com->lc_obj = dto;
699         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
700         if (rc != 0)
701                 GOTO(out, rc);
702
703         rc = lfsck_namespace_store(env, com, true);
704
705         GOTO(out, rc);
706
707 out:
708         up_write(&com->lc_sem);
709         lu_object_put(env, &root->do_lu);
710         return rc;
711 }
712
713 static void
714 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
715                      bool new_checked)
716 {
717         struct lfsck_namespace *ns = com->lc_file_ram;
718
719         down_write(&com->lc_sem);
720         if (new_checked)
721                 com->lc_new_checked++;
722         ns->ln_items_failed++;
723         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
724                 lfsck_pos_fill(env, com->lc_lfsck,
725                                &ns->ln_pos_first_inconsistent, false);
726         up_write(&com->lc_sem);
727 }
728
729 static int lfsck_namespace_checkpoint(const struct lu_env *env,
730                                       struct lfsck_component *com, bool init)
731 {
732         struct lfsck_instance   *lfsck = com->lc_lfsck;
733         struct lfsck_namespace  *ns    = com->lc_file_ram;
734         int                      rc;
735
736         if (com->lc_new_checked == 0 && !init)
737                 return 0;
738
739         down_write(&com->lc_sem);
740
741         if (init) {
742                 ns->ln_pos_latest_start = lfsck->li_pos_current;
743         } else {
744                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
745                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
746                                 HALF_SEC - lfsck->li_time_last_checkpoint);
747                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
748                 ns->ln_items_checked += com->lc_new_checked;
749                 com->lc_new_checked = 0;
750         }
751
752         rc = lfsck_namespace_store(env, com, false);
753
754         up_write(&com->lc_sem);
755         return rc;
756 }
757
758 static int lfsck_namespace_prep(const struct lu_env *env,
759                                 struct lfsck_component *com,
760                                 struct lfsck_start_param *lsp)
761 {
762         struct lfsck_instance   *lfsck  = com->lc_lfsck;
763         struct lfsck_namespace  *ns     = com->lc_file_ram;
764         struct lfsck_position   *pos    = &com->lc_pos_start;
765
766         if (ns->ln_status == LS_COMPLETED) {
767                 int rc;
768
769                 rc = lfsck_namespace_reset(env, com, false);
770                 if (rc != 0)
771                         return rc;
772         }
773
774         down_write(&com->lc_sem);
775
776         ns->ln_time_latest_start = cfs_time_current_sec();
777
778         spin_lock(&lfsck->li_lock);
779         if (ns->ln_flags & LF_SCANNED_ONCE) {
780                 if (!lfsck->li_drop_dryrun ||
781                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
782                         ns->ln_status = LS_SCANNING_PHASE2;
783                         cfs_list_del_init(&com->lc_link);
784                         cfs_list_add_tail(&com->lc_link,
785                                           &lfsck->li_list_double_scan);
786                         if (!cfs_list_empty(&com->lc_link_dir))
787                                 cfs_list_del_init(&com->lc_link_dir);
788                         lfsck_pos_set_zero(pos);
789                 } else {
790                         ns->ln_status = LS_SCANNING_PHASE1;
791                         ns->ln_run_time_phase1 = 0;
792                         ns->ln_run_time_phase2 = 0;
793                         ns->ln_items_checked = 0;
794                         ns->ln_items_repaired = 0;
795                         ns->ln_items_failed = 0;
796                         ns->ln_dirs_checked = 0;
797                         ns->ln_mlinked_checked = 0;
798                         ns->ln_objs_checked_phase2 = 0;
799                         ns->ln_objs_repaired_phase2 = 0;
800                         ns->ln_objs_failed_phase2 = 0;
801                         ns->ln_objs_nlink_repaired = 0;
802                         ns->ln_objs_lost_found = 0;
803                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
804                         if (cfs_list_empty(&com->lc_link_dir))
805                                 cfs_list_add_tail(&com->lc_link_dir,
806                                                   &lfsck->li_list_dir);
807                         *pos = ns->ln_pos_first_inconsistent;
808                 }
809         } else {
810                 ns->ln_status = LS_SCANNING_PHASE1;
811                 if (cfs_list_empty(&com->lc_link_dir))
812                         cfs_list_add_tail(&com->lc_link_dir,
813                                           &lfsck->li_list_dir);
814                 if (!lfsck->li_drop_dryrun ||
815                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
816                         *pos = ns->ln_pos_last_checkpoint;
817                         pos->lp_oit_cookie++;
818                 } else {
819                         *pos = ns->ln_pos_first_inconsistent;
820                 }
821         }
822         spin_unlock(&lfsck->li_lock);
823
824         up_write(&com->lc_sem);
825         return 0;
826 }
827
828 static int lfsck_namespace_exec_oit(const struct lu_env *env,
829                                     struct lfsck_component *com,
830                                     struct dt_object *obj)
831 {
832         down_write(&com->lc_sem);
833         com->lc_new_checked++;
834         if (S_ISDIR(lfsck_object_type(obj)))
835                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
836         up_write(&com->lc_sem);
837         return 0;
838 }
839
840 static int lfsck_namespace_exec_dir(const struct lu_env *env,
841                                     struct lfsck_component *com,
842                                     struct dt_object *obj,
843                                     struct lu_dirent *ent)
844 {
845         struct lfsck_thread_info   *info     = lfsck_env_info(env);
846         struct lu_attr             *la       = &info->lti_la;
847         struct lfsck_instance      *lfsck    = com->lc_lfsck;
848         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
849         struct lfsck_namespace     *ns       = com->lc_file_ram;
850         struct linkea_data          ldata    = { 0 };
851         const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
852         const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
853         const struct lu_name       *cname;
854         struct thandle             *handle   = NULL;
855         bool                        repaired = false;
856         bool                        locked   = false;
857         bool                        remove;
858         bool                        newdata;
859         int                         count    = 0;
860         int                         rc;
861         ENTRY;
862
863         cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
864         down_write(&com->lc_sem);
865         com->lc_new_checked++;
866
867         if (ent->lde_attrs & LUDA_UPGRADE) {
868                 ns->ln_flags |= LF_UPGRADE;
869                 ns->ln_dirent_repaired++;
870                 repaired = true;
871         } else if (ent->lde_attrs & LUDA_REPAIR) {
872                 ns->ln_flags |= LF_INCONSISTENT;
873                 ns->ln_dirent_repaired++;
874                 repaired = true;
875         }
876
877         if (ent->lde_name[0] == '.' &&
878             (ent->lde_namelen == 1 ||
879              (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
880              fid_is_dot_lustre(&ent->lde_fid)))
881                 GOTO(out, rc = 0);
882
883         if (!(bk->lb_param & LPF_DRYRUN) &&
884             (com->lc_journal || repaired)) {
885
886 again:
887                 LASSERT(!locked);
888
889                 com->lc_journal = 1;
890                 handle = dt_trans_create(env, lfsck->li_next);
891                 if (IS_ERR(handle))
892                         GOTO(out, rc = PTR_ERR(handle));
893
894                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
895                 if (rc != 0)
896                         GOTO(stop, rc);
897
898                 rc = dt_trans_start(env, lfsck->li_next, handle);
899                 if (rc != 0)
900                         GOTO(stop, rc);
901
902                 dt_write_lock(env, obj, MOR_TGT_CHILD);
903                 locked = true;
904         }
905
906         rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
907         if (rc != 0)
908                 GOTO(stop, rc);
909
910         rc = lfsck_links_read(env, obj, &ldata);
911         if (rc == 0) {
912                 count = ldata.ld_leh->leh_reccount;
913                 rc = linkea_links_find(&ldata, cname, pfid);
914                 if ((rc == 0) &&
915                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
916                         goto record;
917
918                 ns->ln_flags |= LF_INCONSISTENT;
919                 /* For dir, if there are more than one linkea entries, or the
920                  * linkea entry does not match the name entry, then remove all
921                  * and add the correct one. */
922                 if (S_ISDIR(lfsck_object_type(obj))) {
923                         remove = true;
924                         newdata = true;
925                 } else {
926                         remove = false;
927                         newdata = false;
928                 }
929                 goto nodata;
930         } else if (unlikely(rc == -EINVAL)) {
931                 count = 1;
932                 ns->ln_flags |= LF_INCONSISTENT;
933                 /* The magic crashed, we are not sure whether there are more
934                  * corrupt data in the linkea, so remove all linkea entries. */
935                 remove = true;
936                 newdata = true;
937                 goto nodata;
938         } else if (rc == -ENODATA) {
939                 count = 1;
940                 ns->ln_flags |= LF_UPGRADE;
941                 remove = false;
942                 newdata = true;
943
944 nodata:
945                 if (bk->lb_param & LPF_DRYRUN) {
946                         ns->ln_linkea_repaired++;
947                         repaired = true;
948                         goto record;
949                 }
950
951                 if (!com->lc_journal)
952                         goto again;
953
954                 if (remove) {
955                         LASSERT(newdata);
956
957                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
958                                           BYPASS_CAPA);
959                         if (rc != 0)
960                                 GOTO(stop, rc);
961                 }
962
963                 if (newdata) {
964                         rc = linkea_data_new(&ldata,
965                                         &lfsck_env_info(env)->lti_linkea_buf);
966                         if (rc != 0)
967                                 GOTO(stop, rc);
968                 }
969
970                 rc = linkea_add_buf(&ldata, cname, pfid);
971                 if (rc != 0)
972                         GOTO(stop, rc);
973
974                 rc = lfsck_links_write(env, obj, &ldata, handle);
975                 if (rc != 0)
976                         GOTO(stop, rc);
977
978                 count = ldata.ld_leh->leh_reccount;
979                 ns->ln_linkea_repaired++;
980                 repaired = true;
981         } else {
982                 GOTO(stop, rc);
983         }
984
985 record:
986         LASSERT(count > 0);
987
988         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
989         if (rc != 0)
990                 GOTO(stop, rc);
991
992         if ((count == 1) &&
993             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
994                 /* Usually, it is for single linked object or dir, do nothing.*/
995                 GOTO(stop, rc);
996
997         /* Following modification will be in another transaction.  */
998         if (handle != NULL) {
999                 LASSERT(dt_write_locked(env, obj));
1000
1001                 dt_write_unlock(env, obj);
1002                 locked = false;
1003
1004                 dt_trans_stop(env, lfsck->li_next, handle);
1005                 handle = NULL;
1006         }
1007
1008         ns->ln_mlinked_checked++;
1009         rc = lfsck_namespace_update(env, com, cfid,
1010                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1011
1012         GOTO(out, rc);
1013
1014 stop:
1015         if (locked)
1016                 dt_write_unlock(env, obj);
1017
1018         if (handle != NULL)
1019                 dt_trans_stop(env, lfsck->li_next, handle);
1020
1021 out:
1022         if (rc < 0) {
1023                 ns->ln_items_failed++;
1024                 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1025                         lfsck_pos_fill(env, lfsck,
1026                                        &ns->ln_pos_first_inconsistent, false);
1027                 if (!(bk->lb_param & LPF_FAILOUT))
1028                         rc = 0;
1029         } else {
1030                 if (repaired) {
1031                         ns->ln_items_repaired++;
1032                         if (bk->lb_param & LPF_DRYRUN &&
1033                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1034                                 lfsck_pos_fill(env, lfsck,
1035                                                &ns->ln_pos_first_inconsistent,
1036                                                false);
1037                 } else {
1038                         com->lc_journal = 0;
1039                 }
1040                 rc = 0;
1041         }
1042         up_write(&com->lc_sem);
1043         return rc;
1044 }
1045
1046 static int lfsck_namespace_post(const struct lu_env *env,
1047                                 struct lfsck_component *com,
1048                                 int result, bool init)
1049 {
1050         struct lfsck_instance   *lfsck = com->lc_lfsck;
1051         struct lfsck_namespace  *ns    = com->lc_file_ram;
1052         int                      rc;
1053
1054         down_write(&com->lc_sem);
1055
1056         spin_lock(&lfsck->li_lock);
1057         if (!init)
1058                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
1059         if (result > 0) {
1060                 ns->ln_status = LS_SCANNING_PHASE2;
1061                 ns->ln_flags |= LF_SCANNED_ONCE;
1062                 ns->ln_flags &= ~LF_UPGRADE;
1063                 cfs_list_del_init(&com->lc_link);
1064                 cfs_list_del_init(&com->lc_link_dir);
1065                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
1066         } else if (result == 0) {
1067                 ns->ln_status = lfsck->li_status;
1068                 if (ns->ln_status == 0)
1069                         ns->ln_status = LS_STOPPED;
1070                 if (ns->ln_status != LS_PAUSED) {
1071                         cfs_list_del_init(&com->lc_link);
1072                         cfs_list_del_init(&com->lc_link_dir);
1073                         cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1074                 }
1075         } else {
1076                 ns->ln_status = LS_FAILED;
1077                 cfs_list_del_init(&com->lc_link);
1078                 cfs_list_del_init(&com->lc_link_dir);
1079                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1080         }
1081         spin_unlock(&lfsck->li_lock);
1082
1083         if (!init) {
1084                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1085                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1086                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1087                 ns->ln_items_checked += com->lc_new_checked;
1088                 com->lc_new_checked = 0;
1089         }
1090
1091         rc = lfsck_namespace_store(env, com, false);
1092
1093         up_write(&com->lc_sem);
1094         return rc;
1095 }
1096
1097 static int
1098 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1099                      char *buf, int len)
1100 {
1101         struct lfsck_instance   *lfsck = com->lc_lfsck;
1102         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1103         struct lfsck_namespace  *ns    = com->lc_file_ram;
1104         int                      save  = len;
1105         int                      ret   = -ENOSPC;
1106         int                      rc;
1107
1108         down_read(&com->lc_sem);
1109         rc = snprintf(buf, len,
1110                       "name: lfsck_namespace\n"
1111                       "magic: %#x\n"
1112                       "version: %d\n"
1113                       "status: %s\n",
1114                       ns->ln_magic,
1115                       bk->lb_version,
1116                       lfsck_status2names(ns->ln_status));
1117         if (rc <= 0)
1118                 goto out;
1119
1120         buf += rc;
1121         len -= rc;
1122         rc = lfsck_bits_dump(&buf, &len, ns->ln_flags, lfsck_flags_names,
1123                              "flags");
1124         if (rc < 0)
1125                 goto out;
1126
1127         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
1128                              "param");
1129         if (rc < 0)
1130                 goto out;
1131
1132         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_complete,
1133                              "time_since_last_completed");
1134         if (rc < 0)
1135                 goto out;
1136
1137         rc = lfsck_time_dump(&buf, &len, ns->ln_time_latest_start,
1138                              "time_since_latest_start");
1139         if (rc < 0)
1140                 goto out;
1141
1142         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_checkpoint,
1143                              "time_since_last_checkpoint");
1144         if (rc < 0)
1145                 goto out;
1146
1147         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_latest_start,
1148                             "latest_start_position");
1149         if (rc < 0)
1150                 goto out;
1151
1152         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_last_checkpoint,
1153                             "last_checkpoint_position");
1154         if (rc < 0)
1155                 goto out;
1156
1157         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_first_inconsistent,
1158                             "first_failure_position");
1159         if (rc < 0)
1160                 goto out;
1161
1162         if (ns->ln_status == LS_SCANNING_PHASE1) {
1163                 struct lfsck_position pos;
1164                 const struct dt_it_ops *iops;
1165                 cfs_duration_t duration = cfs_time_current() -
1166                                           lfsck->li_time_last_checkpoint;
1167                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1168                 __u64 speed = checked;
1169                 __u64 new_checked = com->lc_new_checked * HZ;
1170                 __u32 rtime = ns->ln_run_time_phase1 +
1171                               cfs_duration_sec(duration + HALF_SEC);
1172
1173                 if (duration != 0)
1174                         do_div(new_checked, duration);
1175                 if (rtime != 0)
1176                         do_div(speed, rtime);
1177                 rc = snprintf(buf, len,
1178                               "checked_phase1: "LPU64"\n"
1179                               "checked_phase2: "LPU64"\n"
1180                               "updated_phase1: "LPU64"\n"
1181                               "updated_phase2: "LPU64"\n"
1182                               "failed_phase1: "LPU64"\n"
1183                               "failed_phase2: "LPU64"\n"
1184                               "dirs: "LPU64"\n"
1185                               "M-linked: "LPU64"\n"
1186                               "dirent_repaired: "LPU64"\n"
1187                               "linkea_repaired: "LPU64"\n"
1188                               "nlinks_repaired: "LPU64"\n"
1189                               "lost_found: "LPU64"\n"
1190                               "success_count: %u\n"
1191                               "run_time_phase1: %u seconds\n"
1192                               "run_time_phase2: %u seconds\n"
1193                               "average_speed_phase1: "LPU64" items/sec\n"
1194                               "average_speed_phase2: N/A\n"
1195                               "real-time_speed_phase1: "LPU64" items/sec\n"
1196                               "real-time_speed_phase2: N/A\n",
1197                               checked,
1198                               ns->ln_objs_checked_phase2,
1199                               ns->ln_items_repaired,
1200                               ns->ln_objs_repaired_phase2,
1201                               ns->ln_items_failed,
1202                               ns->ln_objs_failed_phase2,
1203                               ns->ln_dirs_checked,
1204                               ns->ln_mlinked_checked,
1205                               ns->ln_dirent_repaired,
1206                               ns->ln_linkea_repaired,
1207                               ns->ln_objs_nlink_repaired,
1208                               ns->ln_objs_lost_found,
1209                               ns->ln_success_count,
1210                               rtime,
1211                               ns->ln_run_time_phase2,
1212                               speed,
1213                               new_checked);
1214                 if (rc <= 0)
1215                         goto out;
1216
1217                 buf += rc;
1218                 len -= rc;
1219
1220                 LASSERT(lfsck->li_di_oit != NULL);
1221
1222                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1223
1224                 /* The low layer otable-based iteration position may NOT
1225                  * exactly match the namespace-based directory traversal
1226                  * cookie. Generally, it is not a serious issue. But the
1227                  * caller should NOT make assumption on that. */
1228                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1229                 if (!lfsck->li_current_oit_processed)
1230                         pos.lp_oit_cookie--;
1231
1232                 spin_lock(&lfsck->li_lock);
1233                 if (lfsck->li_di_dir != NULL) {
1234                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1235                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1236                                 fid_zero(&pos.lp_dir_parent);
1237                                 pos.lp_dir_cookie = 0;
1238                         } else {
1239                                 pos.lp_dir_parent =
1240                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1241                         }
1242                 } else {
1243                         fid_zero(&pos.lp_dir_parent);
1244                         pos.lp_dir_cookie = 0;
1245                 }
1246                 spin_unlock(&lfsck->li_lock);
1247                 rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
1248                 if (rc <= 0)
1249                         goto out;
1250         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1251                 cfs_duration_t duration = cfs_time_current() -
1252                                           lfsck->li_time_last_checkpoint;
1253                 __u64 checked = ns->ln_objs_checked_phase2 +
1254                                 com->lc_new_checked;
1255                 __u64 speed1 = ns->ln_items_checked;
1256                 __u64 speed2 = checked;
1257                 __u64 new_checked = com->lc_new_checked * HZ;
1258                 __u32 rtime = ns->ln_run_time_phase2 +
1259                               cfs_duration_sec(duration + HALF_SEC);
1260
1261                 if (duration != 0)
1262                         do_div(new_checked, duration);
1263                 if (ns->ln_run_time_phase1 != 0)
1264                         do_div(speed1, ns->ln_run_time_phase1);
1265                 if (rtime != 0)
1266                         do_div(speed2, rtime);
1267                 rc = snprintf(buf, len,
1268                               "checked_phase1: "LPU64"\n"
1269                               "checked_phase2: "LPU64"\n"
1270                               "updated_phase1: "LPU64"\n"
1271                               "updated_phase2: "LPU64"\n"
1272                               "failed_phase1: "LPU64"\n"
1273                               "failed_phase2: "LPU64"\n"
1274                               "dirs: "LPU64"\n"
1275                               "M-linked: "LPU64"\n"
1276                               "dirent_repaired: "LPU64"\n"
1277                               "linkea_repaired: "LPU64"\n"
1278                               "nlinks_repaired: "LPU64"\n"
1279                               "lost_found: "LPU64"\n"
1280                               "success_count: %u\n"
1281                               "run_time_phase1: %u seconds\n"
1282                               "run_time_phase2: %u seconds\n"
1283                               "average_speed_phase1: "LPU64" items/sec\n"
1284                               "average_speed_phase2: "LPU64" objs/sec\n"
1285                               "real-time_speed_phase1: N/A\n"
1286                               "real-time_speed_phase2: "LPU64" objs/sec\n"
1287                               "current_position: "DFID"\n",
1288                               ns->ln_items_checked,
1289                               checked,
1290                               ns->ln_items_repaired,
1291                               ns->ln_objs_repaired_phase2,
1292                               ns->ln_items_failed,
1293                               ns->ln_objs_failed_phase2,
1294                               ns->ln_dirs_checked,
1295                               ns->ln_mlinked_checked,
1296                               ns->ln_dirent_repaired,
1297                               ns->ln_linkea_repaired,
1298                               ns->ln_objs_nlink_repaired,
1299                               ns->ln_objs_lost_found,
1300                               ns->ln_success_count,
1301                               ns->ln_run_time_phase1,
1302                               rtime,
1303                               speed1,
1304                               speed2,
1305                               new_checked,
1306                               PFID(&ns->ln_fid_latest_scanned_phase2));
1307                 if (rc <= 0)
1308                         goto out;
1309
1310                 buf += rc;
1311                 len -= rc;
1312         } else {
1313                 __u64 speed1 = ns->ln_items_checked;
1314                 __u64 speed2 = ns->ln_objs_checked_phase2;
1315
1316                 if (ns->ln_run_time_phase1 != 0)
1317                         do_div(speed1, ns->ln_run_time_phase1);
1318                 if (ns->ln_run_time_phase2 != 0)
1319                         do_div(speed2, ns->ln_run_time_phase2);
1320                 rc = snprintf(buf, len,
1321                               "checked_phase1: "LPU64"\n"
1322                               "checked_phase2: "LPU64"\n"
1323                               "updated_phase1: "LPU64"\n"
1324                               "updated_phase2: "LPU64"\n"
1325                               "failed_phase1: "LPU64"\n"
1326                               "failed_phase2: "LPU64"\n"
1327                               "dirs: "LPU64"\n"
1328                               "M-linked: "LPU64"\n"
1329                               "dirent_repaired: "LPU64"\n"
1330                               "linkea_repaired: "LPU64"\n"
1331                               "nlinks_repaired: "LPU64"\n"
1332                               "lost_found: "LPU64"\n"
1333                               "success_count: %u\n"
1334                               "run_time_phase1: %u seconds\n"
1335                               "run_time_phase2: %u seconds\n"
1336                               "average_speed_phase1: "LPU64" items/sec\n"
1337                               "average_speed_phase2: "LPU64" objs/sec\n"
1338                               "real-time_speed_phase1: N/A\n"
1339                               "real-time_speed_phase2: N/A\n"
1340                               "current_position: N/A\n",
1341                               ns->ln_items_checked,
1342                               ns->ln_objs_checked_phase2,
1343                               ns->ln_items_repaired,
1344                               ns->ln_objs_repaired_phase2,
1345                               ns->ln_items_failed,
1346                               ns->ln_objs_failed_phase2,
1347                               ns->ln_dirs_checked,
1348                               ns->ln_mlinked_checked,
1349                               ns->ln_dirent_repaired,
1350                               ns->ln_linkea_repaired,
1351                               ns->ln_objs_nlink_repaired,
1352                               ns->ln_objs_lost_found,
1353                               ns->ln_success_count,
1354                               ns->ln_run_time_phase1,
1355                               ns->ln_run_time_phase2,
1356                               speed1,
1357                               speed2);
1358                 if (rc <= 0)
1359                         goto out;
1360
1361                 buf += rc;
1362                 len -= rc;
1363         }
1364         ret = save - len;
1365
1366 out:
1367         up_read(&com->lc_sem);
1368         return ret;
1369 }
1370
1371 static int lfsck_namespace_double_scan_main(void *args)
1372 {
1373         struct lfsck_thread_args *lta   = args;
1374         const struct lu_env     *env    = &lta->lta_env;
1375         struct lfsck_component  *com    = lta->lta_com;
1376         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1377         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1378         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1379         struct lfsck_namespace  *ns     = com->lc_file_ram;
1380         struct dt_object        *obj    = com->lc_obj;
1381         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1382         struct dt_object        *target;
1383         struct dt_it            *di;
1384         struct dt_key           *key;
1385         struct lu_fid            fid;
1386         int                      rc;
1387         __u8                     flags = 0;
1388         ENTRY;
1389
1390         com->lc_new_checked = 0;
1391         com->lc_new_scanned = 0;
1392         com->lc_time_last_checkpoint = cfs_time_current();
1393         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1394                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1395
1396         di = iops->init(env, obj, 0, BYPASS_CAPA);
1397         if (IS_ERR(di))
1398                 GOTO(out, rc = PTR_ERR(di));
1399
1400         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1401         rc = iops->get(env, di, (const struct dt_key *)&fid);
1402         if (rc < 0)
1403                 GOTO(fini, rc);
1404
1405         /* Skip the start one, which either has been processed or non-exist. */
1406         rc = iops->next(env, di);
1407         if (rc != 0)
1408                 GOTO(put, rc);
1409
1410         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1411                 GOTO(put, rc = 0);
1412
1413         do {
1414                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1415                     cfs_fail_val > 0) {
1416                         struct l_wait_info lwi;
1417
1418                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1419                                           NULL, NULL);
1420                         l_wait_event(thread->t_ctl_waitq,
1421                                      !thread_is_running(thread),
1422                                      &lwi);
1423                 }
1424
1425                 key = iops->key(env, di);
1426                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1427                 target = lfsck_object_find(env, lfsck, &fid);
1428                 down_write(&com->lc_sem);
1429                 if (target == NULL) {
1430                         rc = 0;
1431                         goto checkpoint;
1432                 } else if (IS_ERR(target)) {
1433                         rc = PTR_ERR(target);
1434                         goto checkpoint;
1435                 }
1436
1437                 /* XXX: Currently, skip remote object, the consistency for
1438                  *      remote object will be processed in LFSCK phase III. */
1439                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1440                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1441                         if (rc == 0)
1442                                 rc = lfsck_namespace_double_scan_one(env, com,
1443                                                                 target, flags);
1444                 }
1445
1446                 lfsck_object_put(env, target);
1447
1448 checkpoint:
1449                 com->lc_new_checked++;
1450                 com->lc_new_scanned++;
1451                 ns->ln_fid_latest_scanned_phase2 = fid;
1452                 if (rc > 0)
1453                         ns->ln_objs_repaired_phase2++;
1454                 else if (rc < 0)
1455                         ns->ln_objs_failed_phase2++;
1456                 up_write(&com->lc_sem);
1457
1458                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1459                         lfsck_namespace_delete(env, com, &fid);
1460                 } else if (rc < 0) {
1461                         flags |= LLF_REPAIR_FAILED;
1462                         lfsck_namespace_update(env, com, &fid, flags, true);
1463                 }
1464
1465                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1466                         GOTO(put, rc);
1467
1468                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1469                                               cfs_time_current())) &&
1470                     com->lc_new_checked != 0) {
1471                         down_write(&com->lc_sem);
1472                         ns->ln_run_time_phase2 +=
1473                                 cfs_duration_sec(cfs_time_current() +
1474                                 HALF_SEC - com->lc_time_last_checkpoint);
1475                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1476                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1477                         com->lc_new_checked = 0;
1478                         rc = lfsck_namespace_store(env, com, false);
1479                         up_write(&com->lc_sem);
1480                         if (rc != 0)
1481                                 GOTO(put, rc);
1482
1483                         com->lc_time_last_checkpoint = cfs_time_current();
1484                         com->lc_time_next_checkpoint =
1485                                 com->lc_time_last_checkpoint +
1486                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1487                 }
1488
1489                 lfsck_control_speed_by_self(com);
1490                 if (unlikely(!thread_is_running(thread)))
1491                         GOTO(put, rc = 0);
1492
1493                 rc = iops->next(env, di);
1494         } while (rc == 0);
1495
1496         GOTO(put, rc);
1497
1498 put:
1499         iops->put(env, di);
1500
1501 fini:
1502         iops->fini(env, di);
1503
1504 out:
1505         down_write(&com->lc_sem);
1506         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1507                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1508         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1509         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1510         com->lc_new_checked = 0;
1511
1512         if (rc > 0) {
1513                 com->lc_journal = 0;
1514                 ns->ln_status = LS_COMPLETED;
1515                 if (!(bk->lb_param & LPF_DRYRUN))
1516                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1517                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1518                 ns->ln_success_count++;
1519         } else if (rc == 0) {
1520                 ns->ln_status = lfsck->li_status;
1521                 if (ns->ln_status == 0)
1522                         ns->ln_status = LS_STOPPED;
1523         } else {
1524                 ns->ln_status = LS_FAILED;
1525         }
1526
1527         rc = lfsck_namespace_store(env, com, false);
1528         up_write(&com->lc_sem);
1529         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
1530                 wake_up_all(&thread->t_ctl_waitq);
1531
1532         lfsck_thread_args_fini(lta);
1533
1534         return rc;
1535 }
1536
1537 static int lfsck_namespace_double_scan(const struct lu_env *env,
1538                                        struct lfsck_component *com)
1539 {
1540         struct lfsck_instance           *lfsck = com->lc_lfsck;
1541         struct lfsck_namespace          *ns    = com->lc_file_ram;
1542         struct lfsck_thread_args        *lta;
1543         long                             rc;
1544         ENTRY;
1545
1546         if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
1547                 RETURN(0);
1548
1549         lta = lfsck_thread_args_init(lfsck, com, NULL);
1550         if (IS_ERR(lta))
1551                 RETURN(PTR_ERR(lta));
1552
1553         atomic_inc(&lfsck->li_double_scan_count);
1554         rc = PTR_ERR(kthread_run(lfsck_namespace_double_scan_main, lta,
1555                                  "lfsck_namespace"));
1556         if (IS_ERR_VALUE(rc)) {
1557                 CERROR("%s: cannot start LFSCK namespace thread: rc = %ld\n",
1558                        lfsck_lfsck2name(lfsck), rc);
1559                 atomic_dec(&lfsck->li_double_scan_count);
1560                 lfsck_thread_args_fini(lta);
1561         } else {
1562                 rc = 0;
1563         }
1564
1565         RETURN(rc);
1566 }
1567
1568 static int lfsck_namespace_in_notify(const struct lu_env *env,
1569                                      struct lfsck_component *com,
1570                                      struct lfsck_request *lr)
1571 {
1572         return 0;
1573 }
1574
1575 static int lfsck_namespace_query(const struct lu_env *env,
1576                                  struct lfsck_component *com)
1577 {
1578         struct lfsck_namespace *ns = com->lc_file_ram;
1579
1580         return ns->ln_status;
1581 }
1582
1583 static struct lfsck_operations lfsck_namespace_ops = {
1584         .lfsck_reset            = lfsck_namespace_reset,
1585         .lfsck_fail             = lfsck_namespace_fail,
1586         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1587         .lfsck_prep             = lfsck_namespace_prep,
1588         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1589         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1590         .lfsck_post             = lfsck_namespace_post,
1591         .lfsck_dump             = lfsck_namespace_dump,
1592         .lfsck_double_scan      = lfsck_namespace_double_scan,
1593         .lfsck_in_notify        = lfsck_namespace_in_notify,
1594         .lfsck_query            = lfsck_namespace_query,
1595 };
1596
1597 int lfsck_namespace_setup(const struct lu_env *env,
1598                           struct lfsck_instance *lfsck)
1599 {
1600         struct lfsck_component  *com;
1601         struct lfsck_namespace  *ns;
1602         struct dt_object        *root = NULL;
1603         struct dt_object        *obj;
1604         int                      rc;
1605         ENTRY;
1606
1607         LASSERT(lfsck->li_master);
1608
1609         OBD_ALLOC_PTR(com);
1610         if (com == NULL)
1611                 RETURN(-ENOMEM);
1612
1613         CFS_INIT_LIST_HEAD(&com->lc_link);
1614         CFS_INIT_LIST_HEAD(&com->lc_link_dir);
1615         init_rwsem(&com->lc_sem);
1616         atomic_set(&com->lc_ref, 1);
1617         com->lc_lfsck = lfsck;
1618         com->lc_type = LT_NAMESPACE;
1619         com->lc_ops = &lfsck_namespace_ops;
1620         com->lc_file_size = sizeof(struct lfsck_namespace);
1621         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1622         if (com->lc_file_ram == NULL)
1623                 GOTO(out, rc = -ENOMEM);
1624
1625         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1626         if (com->lc_file_disk == NULL)
1627                 GOTO(out, rc = -ENOMEM);
1628
1629         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1630         if (IS_ERR(root))
1631                 GOTO(out, rc = PTR_ERR(root));
1632
1633         if (unlikely(!dt_try_as_dir(env, root)))
1634                 GOTO(out, rc = -ENOTDIR);
1635
1636         obj = local_index_find_or_create(env, lfsck->li_los, root,
1637                                          lfsck_namespace_name,
1638                                          S_IFREG | S_IRUGO | S_IWUSR,
1639                                          &dt_lfsck_features);
1640         if (IS_ERR(obj))
1641                 GOTO(out, rc = PTR_ERR(obj));
1642
1643         com->lc_obj = obj;
1644         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1645         if (rc != 0)
1646                 GOTO(out, rc);
1647
1648         rc = lfsck_namespace_load(env, com);
1649         if (rc > 0)
1650                 rc = lfsck_namespace_reset(env, com, true);
1651         else if (rc == -ENODATA)
1652                 rc = lfsck_namespace_init(env, com);
1653         if (rc != 0)
1654                 GOTO(out, rc);
1655
1656         ns = com->lc_file_ram;
1657         switch (ns->ln_status) {
1658         case LS_INIT:
1659         case LS_COMPLETED:
1660         case LS_FAILED:
1661         case LS_STOPPED:
1662                 spin_lock(&lfsck->li_lock);
1663                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1664                 spin_unlock(&lfsck->li_lock);
1665                 break;
1666         default:
1667                 CERROR("%s: unknown lfsck_namespace status: rc = %u\n",
1668                        lfsck_lfsck2name(lfsck), ns->ln_status);
1669                 /* fall through */
1670         case LS_SCANNING_PHASE1:
1671         case LS_SCANNING_PHASE2:
1672                 /* No need to store the status to disk right now.
1673                  * If the system crashed before the status stored,
1674                  * it will be loaded back when next time. */
1675                 ns->ln_status = LS_CRASHED;
1676                 /* fall through */
1677         case LS_PAUSED:
1678         case LS_CRASHED:
1679                 spin_lock(&lfsck->li_lock);
1680                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
1681                 cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
1682                 spin_unlock(&lfsck->li_lock);
1683                 break;
1684         }
1685
1686         GOTO(out, rc = 0);
1687
1688 out:
1689         if (root != NULL && !IS_ERR(root))
1690                 lu_object_put(env, &root->do_lu);
1691         if (rc != 0)
1692                 lfsck_component_cleanup(env, com);
1693         return rc;
1694 }