Whamcloud - gitweb
LU-1346 libcfs: replace cfs_ memory wrappers
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34 #define DEBUG_SUBSYSTEM S_LFSCK
35
36 #include <lustre/lustre_idl.h>
37 #include <lu_object.h>
38 #include <dt_object.h>
39 #include <md_object.h>
40 #include <lustre_linkea.h>
41 #include <lustre_fid.h>
42 #include <lustre_lib.h>
43 #include <lustre_net.h>
44 #include <lustre/lustre_user.h>
45
46 #include "lfsck_internal.h"
47
48 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
49
50 static const char lfsck_namespace_name[] = "lfsck_namespace";
51
52 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *des,
53                                       struct lfsck_namespace *src)
54 {
55         des->ln_magic = le32_to_cpu(src->ln_magic);
56         des->ln_status = le32_to_cpu(src->ln_status);
57         des->ln_flags = le32_to_cpu(src->ln_flags);
58         des->ln_success_count = le32_to_cpu(src->ln_success_count);
59         des->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
60         des->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
61         des->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
62         des->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
63         des->ln_time_last_checkpoint =
64                                 le64_to_cpu(src->ln_time_last_checkpoint);
65         lfsck_position_le_to_cpu(&des->ln_pos_latest_start,
66                                  &src->ln_pos_latest_start);
67         lfsck_position_le_to_cpu(&des->ln_pos_last_checkpoint,
68                                  &src->ln_pos_last_checkpoint);
69         lfsck_position_le_to_cpu(&des->ln_pos_first_inconsistent,
70                                  &src->ln_pos_first_inconsistent);
71         des->ln_items_checked = le64_to_cpu(src->ln_items_checked);
72         des->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
73         des->ln_items_failed = le64_to_cpu(src->ln_items_failed);
74         des->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
75         des->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
76         des->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
77         des->ln_objs_repaired_phase2 =
78                                 le64_to_cpu(src->ln_objs_repaired_phase2);
79         des->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
80         des->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
81         des->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
82         fid_le_to_cpu(&des->ln_fid_latest_scanned_phase2,
83                       &src->ln_fid_latest_scanned_phase2);
84 }
85
86 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *des,
87                                       struct lfsck_namespace *src)
88 {
89         des->ln_magic = cpu_to_le32(src->ln_magic);
90         des->ln_status = cpu_to_le32(src->ln_status);
91         des->ln_flags = cpu_to_le32(src->ln_flags);
92         des->ln_success_count = cpu_to_le32(src->ln_success_count);
93         des->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
94         des->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
95         des->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
96         des->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
97         des->ln_time_last_checkpoint =
98                                 cpu_to_le64(src->ln_time_last_checkpoint);
99         lfsck_position_cpu_to_le(&des->ln_pos_latest_start,
100                                  &src->ln_pos_latest_start);
101         lfsck_position_cpu_to_le(&des->ln_pos_last_checkpoint,
102                                  &src->ln_pos_last_checkpoint);
103         lfsck_position_cpu_to_le(&des->ln_pos_first_inconsistent,
104                                  &src->ln_pos_first_inconsistent);
105         des->ln_items_checked = cpu_to_le64(src->ln_items_checked);
106         des->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
107         des->ln_items_failed = cpu_to_le64(src->ln_items_failed);
108         des->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
109         des->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
110         des->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
111         des->ln_objs_repaired_phase2 =
112                                 cpu_to_le64(src->ln_objs_repaired_phase2);
113         des->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
114         des->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
115         des->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
116         fid_cpu_to_le(&des->ln_fid_latest_scanned_phase2,
117                       &src->ln_fid_latest_scanned_phase2);
118 }
119
120 /**
121  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
122  * \retval 0: succeed.
123  * \retval -ve: failed cases.
124  */
125 static int lfsck_namespace_load(const struct lu_env *env,
126                                 struct lfsck_component *com)
127 {
128         int len = com->lc_file_size;
129         int rc;
130
131         rc = dt_xattr_get(env, com->lc_obj,
132                           lfsck_buf_get(env, com->lc_file_disk, len),
133                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
134         if (rc == len) {
135                 struct lfsck_namespace *ns = com->lc_file_ram;
136
137                 lfsck_namespace_le_to_cpu(ns,
138                                 (struct lfsck_namespace *)com->lc_file_disk);
139                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
140                         CWARN("%.16s: invalid lfsck_namespace magic "
141                               "0x%x != 0x%x\n",
142                               lfsck_lfsck2name(com->lc_lfsck),
143                               ns->ln_magic, LFSCK_NAMESPACE_MAGIC);
144                         rc = 1;
145                 } else {
146                         rc = 0;
147                 }
148         } else if (rc != -ENODATA) {
149                 CERROR("%.16s: fail to load lfsck_namespace, expected = %d, "
150                        "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc);
151                 if (rc >= 0)
152                         rc = 1;
153         }
154         return rc;
155 }
156
157 static int lfsck_namespace_store(const struct lu_env *env,
158                                  struct lfsck_component *com, bool init)
159 {
160         struct dt_object        *obj    = com->lc_obj;
161         struct lfsck_instance   *lfsck  = com->lc_lfsck;
162         struct thandle          *handle;
163         int                      len    = com->lc_file_size;
164         int                      rc;
165         ENTRY;
166
167         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
168                                   (struct lfsck_namespace *)com->lc_file_ram);
169         handle = dt_trans_create(env, lfsck->li_bottom);
170         if (IS_ERR(handle)) {
171                 rc = PTR_ERR(handle);
172                 CERROR("%.16s: fail to create trans for storing "
173                        "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
174                 RETURN(rc);
175         }
176
177         rc = dt_declare_xattr_set(env, obj,
178                                   lfsck_buf_get(env, com->lc_file_disk, len),
179                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
180         if (rc != 0) {
181                 CERROR("%.16s: fail to declare trans for storing "
182                        "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
183                 GOTO(out, rc);
184         }
185
186         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
187         if (rc != 0) {
188                 CERROR("%.16s: fail to start trans for storing "
189                        "lfsck_namespace: %d\n,", lfsck_lfsck2name(lfsck), rc);
190                 GOTO(out, rc);
191         }
192
193         rc = dt_xattr_set(env, obj,
194                           lfsck_buf_get(env, com->lc_file_disk, len),
195                           XATTR_NAME_LFSCK_NAMESPACE,
196                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
197                           handle, BYPASS_CAPA);
198         if (rc != 0)
199                 CERROR("%.16s: fail to store lfsck_namespace, len = %d, "
200                        "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
201
202         GOTO(out, rc);
203
204 out:
205         dt_trans_stop(env, lfsck->li_bottom, handle);
206         return rc;
207 }
208
209 static int lfsck_namespace_init(const struct lu_env *env,
210                                 struct lfsck_component *com)
211 {
212         struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
213         int rc;
214
215         memset(ns, 0, sizeof(*ns));
216         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
217         ns->ln_status = LS_INIT;
218         down_write(&com->lc_sem);
219         rc = lfsck_namespace_store(env, com, true);
220         up_write(&com->lc_sem);
221         return rc;
222 }
223
224 static int lfsck_namespace_lookup(const struct lu_env *env,
225                                   struct lfsck_component *com,
226                                   const struct lu_fid *fid, __u8 *flags)
227 {
228         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
229         int            rc;
230
231         fid_cpu_to_be(key, fid);
232         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
233                        (const struct dt_key *)key, BYPASS_CAPA);
234         return rc;
235 }
236
237 static int lfsck_namespace_delete(const struct lu_env *env,
238                                   struct lfsck_component *com,
239                                   const struct lu_fid *fid)
240 {
241         struct lfsck_instance   *lfsck  = com->lc_lfsck;
242         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
243         struct thandle          *handle;
244         struct dt_object        *obj    = com->lc_obj;
245         int                      rc;
246         ENTRY;
247
248         handle = dt_trans_create(env, lfsck->li_bottom);
249         if (IS_ERR(handle))
250                 RETURN(PTR_ERR(handle));
251
252         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         fid_cpu_to_be(key, fid);
261         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
262                        BYPASS_CAPA);
263
264         GOTO(out, rc);
265
266 out:
267         dt_trans_stop(env, lfsck->li_bottom, handle);
268         return rc;
269 }
270
271 static int lfsck_namespace_update(const struct lu_env *env,
272                                   struct lfsck_component *com,
273                                   const struct lu_fid *fid,
274                                   __u8 flags, bool force)
275 {
276         struct lfsck_instance   *lfsck  = com->lc_lfsck;
277         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
278         struct thandle          *handle;
279         struct dt_object        *obj    = com->lc_obj;
280         int                      rc;
281         bool                     exist  = false;
282         __u8                     tf;
283         ENTRY;
284
285         rc = lfsck_namespace_lookup(env, com, fid, &tf);
286         if (rc != 0 && rc != -ENOENT)
287                 RETURN(rc);
288
289         if (rc == 0) {
290                 if (!force || flags == tf)
291                         RETURN(0);
292
293                 exist = true;
294                 handle = dt_trans_create(env, lfsck->li_bottom);
295                 if (IS_ERR(handle))
296                         RETURN(PTR_ERR(handle));
297
298                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
299                                        handle);
300                 if (rc != 0)
301                         GOTO(out, rc);
302         } else {
303                 handle = dt_trans_create(env, lfsck->li_bottom);
304                 if (IS_ERR(handle))
305                         RETURN(PTR_ERR(handle));
306         }
307
308         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
309                                (const struct dt_key *)fid, handle);
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
314         if (rc != 0)
315                 GOTO(out, rc);
316
317         fid_cpu_to_be(key, fid);
318         if (exist) {
319                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
320                                BYPASS_CAPA);
321                 if (rc != 0) {
322                         CERROR("%s: fail to insert "DFID", rc = %d\n",
323                                lfsck_lfsck2name(com->lc_lfsck), PFID(fid), rc);
324                         GOTO(out, rc);
325                 }
326         }
327
328         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
329                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
330
331         GOTO(out, rc);
332
333 out:
334         dt_trans_stop(env, lfsck->li_bottom, handle);
335         return rc;
336 }
337
338 static int lfsck_namespace_check_exist(const struct lu_env *env,
339                                        struct lfsck_instance *lfsck,
340                                        struct dt_object *obj, const char *name)
341 {
342         struct dt_object *dir = lfsck->li_obj_dir;
343         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
344         int               rc;
345         ENTRY;
346
347         if (unlikely(lfsck_is_dead_obj(obj)))
348                 RETURN(LFSCK_NAMEENTRY_DEAD);
349
350         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
351                        (const struct dt_key *)name, BYPASS_CAPA);
352         if (rc == -ENOENT)
353                 RETURN(LFSCK_NAMEENTRY_REMOVED);
354
355         if (rc < 0)
356                 RETURN(rc);
357
358         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
359                 RETURN(LFSCK_NAMEENTRY_RECREATED);
360
361         RETURN(0);
362 }
363
364 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
365                                             struct dt_object *obj,
366                                             struct thandle *handle)
367 {
368         int rc;
369
370         /* For destroying all invalid linkEA entries. */
371         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
372         if (rc != 0)
373                 return rc;
374
375         /* For insert new linkEA entry. */
376         rc = dt_declare_xattr_set(env, obj,
377                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
378                         XATTR_NAME_LINK, 0, handle);
379         return rc;
380 }
381
382 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
383                             struct linkea_data *ldata)
384 {
385         int rc;
386
387         ldata->ld_buf =
388                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
389                                        PAGE_CACHE_SIZE);
390         if (ldata->ld_buf->lb_buf == NULL)
391                 return -ENOMEM;
392
393         if (!dt_object_exists(obj))
394                 return -ENODATA;
395
396         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
397         if (rc == -ERANGE) {
398                 /* Buf was too small, figure out what we need. */
399                 lu_buf_free(ldata->ld_buf);
400                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
401                                   BYPASS_CAPA);
402                 if (rc < 0)
403                         return rc;
404
405                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
406                 if (ldata->ld_buf->lb_buf == NULL)
407                         return -ENOMEM;
408
409                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
410                                   BYPASS_CAPA);
411         }
412         if (rc < 0)
413                 return rc;
414
415         linkea_init(ldata);
416
417         return 0;
418 }
419
420 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
421                              struct linkea_data *ldata, struct thandle *handle)
422 {
423         const struct lu_buf *buf = lfsck_buf_get_const(env,
424                                                        ldata->ld_buf->lb_buf,
425                                                        ldata->ld_leh->leh_len);
426
427         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
428                             BYPASS_CAPA);
429 }
430
431 /**
432  * \retval ve: removed entries
433  */
434 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
435                                      struct linkea_data *ldata,
436                                      struct lu_name *cname,
437                                      struct lu_fid *pfid)
438 {
439         struct link_ea_entry    *oldlee;
440         int                      oldlen;
441         int                      removed = 0;
442
443         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
444         oldlee = ldata->ld_lee;
445         oldlen = ldata->ld_reclen;
446         linkea_next_entry(ldata);
447         while (ldata->ld_lee != NULL) {
448                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
449                                    ldata->ld_lee->lee_reclen[1];
450                 if (unlikely(ldata->ld_reclen == oldlen &&
451                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
452                         linkea_del_buf(ldata, cname);
453                         removed++;
454                 } else {
455                         linkea_next_entry(ldata);
456                 }
457         }
458         ldata->ld_lee = oldlee;
459         ldata->ld_reclen = oldlen;
460         return removed;
461 }
462
463 /**
464  * \retval +ve  repaired
465  * \retval 0    no need to repair
466  * \retval -ve  error cases
467  */
468 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
469                                            struct lfsck_component *com,
470                                            struct dt_object *child, __u8 flags)
471 {
472         struct lfsck_thread_info *info    = lfsck_env_info(env);
473         struct lu_attr           *la      = &info->lti_la;
474         struct lu_name           *cname   = &info->lti_name;
475         struct lu_fid            *pfid    = &info->lti_fid;
476         struct lu_fid            *cfid    = &info->lti_fid2;
477         struct lfsck_instance   *lfsck    = com->lc_lfsck;
478         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
479         struct lfsck_namespace  *ns       =
480                                 (struct lfsck_namespace *)com->lc_file_ram;
481         struct linkea_data       ldata    = { 0 };
482         struct thandle          *handle   = NULL;
483         bool                     locked   = false;
484         bool                     update   = false;
485         int                      rc;
486         ENTRY;
487
488         if (com->lc_journal) {
489
490 again:
491                 LASSERT(!locked);
492
493                 update = false;
494                 com->lc_journal = 1;
495                 handle = dt_trans_create(env, lfsck->li_next);
496                 if (IS_ERR(handle))
497                         RETURN(rc = PTR_ERR(handle));
498
499                 rc = dt_declare_xattr_set(env, child,
500                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
501                         XATTR_NAME_LINK, 0, handle);
502                 if (rc != 0)
503                         GOTO(stop, rc);
504
505                 rc = dt_trans_start(env, lfsck->li_next, handle);
506                 if (rc != 0)
507                         GOTO(stop, rc);
508
509                 dt_write_lock(env, child, MOR_TGT_CHILD);
510                 locked = true;
511         }
512
513         if (unlikely(lfsck_is_dead_obj(child)))
514                 GOTO(stop, rc = 0);
515
516         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
517         if (rc == 0)
518                 rc = lfsck_links_read(env, child, &ldata);
519         if (rc != 0) {
520                 if ((bk->lb_param & LPF_DRYRUN) &&
521                     (rc == -EINVAL || rc == -ENODATA))
522                         rc = 1;
523
524                 GOTO(stop, rc);
525         }
526
527         linkea_first_entry(&ldata);
528         while (ldata.ld_lee != NULL) {
529                 struct dt_object *parent = NULL;
530
531                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
532                 if (rc > 0)
533                         update = true;
534
535                 if (!fid_is_sane(pfid))
536                         goto shrink;
537
538                 parent = lfsck_object_find(env, lfsck, pfid);
539                 if (parent == NULL)
540                         goto shrink;
541                 else if (IS_ERR(parent))
542                         GOTO(stop, rc = PTR_ERR(parent));
543
544                 if (!dt_object_exists(parent))
545                         goto shrink;
546
547                 /* XXX: Currently, skip remote object, the consistency for
548                  *      remote object will be processed in LFSCK phase III. */
549                 if (dt_object_remote(parent)) {
550                         lfsck_object_put(env, parent);
551                         linkea_next_entry(&ldata);
552                         continue;
553                 }
554
555                 if (unlikely(!dt_try_as_dir(env, parent)))
556                         goto shrink;
557
558                 /* To guarantee the 'name' is terminated with '0'. */
559                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
560                 info->lti_key[cname->ln_namelen] = 0;
561                 cname->ln_name = info->lti_key;
562                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
563                                (const struct dt_key *)cname->ln_name,
564                                BYPASS_CAPA);
565                 if (rc != 0 && rc != -ENOENT) {
566                         lfsck_object_put(env, parent);
567                         GOTO(stop, rc);
568                 }
569
570                 if (rc == 0) {
571                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
572                                 lfsck_object_put(env, parent);
573                                 linkea_next_entry(&ldata);
574                                 continue;
575                         }
576
577                         goto shrink;
578                 }
579
580                 /* If there is no name entry in the parent dir and the object
581                  * link count is less than the linkea entries count, then the
582                  * linkea entry should be removed. */
583                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
584                         goto shrink;
585
586                 /* XXX: For the case of there is a linkea entry, but without
587                  *      name entry pointing to the object and its hard links
588                  *      count is not less than the object name entries count,
589                  *      then seems we should add the 'missed' name entry back
590                  *      to namespace, but before LFSCK phase III finished, we
591                  *      do not know whether the object has some inconsistency
592                  *      on other MDTs. So now, do NOT add the name entry back
593                  *      to the namespace, but keep the linkEA entry. LU-2914 */
594                 lfsck_object_put(env, parent);
595                 linkea_next_entry(&ldata);
596                 continue;
597
598 shrink:
599                 if (parent != NULL)
600                         lfsck_object_put(env, parent);
601                 if (bk->lb_param & LPF_DRYRUN)
602                         RETURN(1);
603
604                 CDEBUG(D_LFSCK, "Remove linkEA: "DFID"[%.*s], "DFID"\n",
605                        PFID(lfsck_dto2fid(child)), cname->ln_namelen, cname->ln_name,
606                        PFID(pfid));
607                 linkea_del_buf(&ldata, cname);
608                 update = true;
609         }
610
611         if (update) {
612                 if (!com->lc_journal) {
613                         com->lc_journal = 1;
614                         goto again;
615                 }
616
617                 rc = lfsck_links_write(env, child, &ldata, handle);
618         }
619
620         GOTO(stop, rc);
621
622 stop:
623         if (locked) {
624         /* XXX: For the case linkea entries count does not match the object hard
625          *      links count, we cannot update the later one simply. Before LFSCK
626          *      phase III finished, we cannot know whether there are some remote
627          *      name entries to be repaired or not. LU-2914 */
628                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
629                     ldata.ld_leh != NULL &&
630                     ldata.ld_leh->leh_reccount != la->la_nlink)
631                         CWARN("%.16s: the object "DFID" linkEA entry count %u "
632                               "may not match its hardlink count %u\n",
633                               lfsck_lfsck2name(lfsck), PFID(cfid),
634                               ldata.ld_leh->leh_reccount, la->la_nlink);
635
636                 dt_write_unlock(env, child);
637         }
638
639         if (handle != NULL)
640                 dt_trans_stop(env, lfsck->li_next, handle);
641
642         if (rc == 0 && update) {
643                 ns->ln_objs_nlink_repaired++;
644                 rc = 1;
645         }
646
647         return rc;
648 }
649
650 /* namespace APIs */
651
652 static int lfsck_namespace_reset(const struct lu_env *env,
653                                  struct lfsck_component *com, bool init)
654 {
655         struct lfsck_instance   *lfsck = com->lc_lfsck;
656         struct lfsck_namespace  *ns    =
657                                 (struct lfsck_namespace *)com->lc_file_ram;
658         struct dt_object        *dto;
659         int                      rc;
660         ENTRY;
661
662         down_write(&com->lc_sem);
663         if (init) {
664                 memset(ns, 0, sizeof(*ns));
665         } else {
666                 __u32 count = ns->ln_success_count;
667                 __u64 last_time = ns->ln_time_last_complete;
668
669                 memset(ns, 0, sizeof(*ns));
670                 ns->ln_success_count = count;
671                 ns->ln_time_last_complete = last_time;
672         }
673         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
674         ns->ln_status = LS_INIT;
675
676         rc = local_object_unlink(env, lfsck->li_bottom, lfsck->li_local_root,
677                                  lfsck_namespace_name);
678         if (rc != 0)
679                 GOTO(out, rc);
680
681         dto = local_index_find_or_create(env, lfsck->li_los, lfsck->li_local_root,
682                                          lfsck_namespace_name,
683                                          S_IFREG | S_IRUGO | S_IWUSR,
684                                          &dt_lfsck_features);
685         if (IS_ERR(dto))
686                 GOTO(out, rc = PTR_ERR(dto));
687
688         com->lc_obj = dto;
689         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
690         if (rc != 0)
691                 GOTO(out, rc);
692
693         rc = lfsck_namespace_store(env, com, true);
694
695         GOTO(out, rc);
696
697 out:
698         up_write(&com->lc_sem);
699         return rc;
700 }
701
702 static void
703 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
704                      bool new_checked)
705 {
706         struct lfsck_namespace *ns = (struct lfsck_namespace *)com->lc_file_ram;
707
708         down_write(&com->lc_sem);
709         if (new_checked)
710                 com->lc_new_checked++;
711         ns->ln_items_failed++;
712         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
713                 lfsck_pos_fill(env, com->lc_lfsck,
714                                &ns->ln_pos_first_inconsistent, false);
715         up_write(&com->lc_sem);
716 }
717
718 static int lfsck_namespace_checkpoint(const struct lu_env *env,
719                                       struct lfsck_component *com, bool init)
720 {
721         struct lfsck_instance   *lfsck = com->lc_lfsck;
722         struct lfsck_namespace  *ns    =
723                                 (struct lfsck_namespace *)com->lc_file_ram;
724         int                      rc;
725
726         if (com->lc_new_checked == 0 && !init)
727                 return 0;
728
729         down_write(&com->lc_sem);
730
731         if (init) {
732                 ns->ln_pos_latest_start = lfsck->li_pos_current;
733         } else {
734                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
735                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
736                                 HALF_SEC - lfsck->li_time_last_checkpoint);
737                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
738                 ns->ln_items_checked += com->lc_new_checked;
739                 com->lc_new_checked = 0;
740         }
741
742         rc = lfsck_namespace_store(env, com, false);
743
744         up_write(&com->lc_sem);
745         return rc;
746 }
747
748 static int lfsck_namespace_prep(const struct lu_env *env,
749                                 struct lfsck_component *com)
750 {
751         struct lfsck_instance   *lfsck  = com->lc_lfsck;
752         struct lfsck_namespace  *ns     =
753                                 (struct lfsck_namespace *)com->lc_file_ram;
754         struct lfsck_position   *pos    = &com->lc_pos_start;
755
756         if (ns->ln_status == LS_COMPLETED) {
757                 int rc;
758
759                 rc = lfsck_namespace_reset(env, com, false);
760                 if (rc != 0)
761                         return rc;
762         }
763
764         down_write(&com->lc_sem);
765
766         ns->ln_time_latest_start = cfs_time_current_sec();
767
768         spin_lock(&lfsck->li_lock);
769         if (ns->ln_flags & LF_SCANNED_ONCE) {
770                 if (!lfsck->li_drop_dryrun ||
771                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
772                         ns->ln_status = LS_SCANNING_PHASE2;
773                         cfs_list_del_init(&com->lc_link);
774                         cfs_list_add_tail(&com->lc_link,
775                                           &lfsck->li_list_double_scan);
776                         if (!cfs_list_empty(&com->lc_link_dir))
777                                 cfs_list_del_init(&com->lc_link_dir);
778                         lfsck_pos_set_zero(pos);
779                 } else {
780                         ns->ln_status = LS_SCANNING_PHASE1;
781                         ns->ln_run_time_phase1 = 0;
782                         ns->ln_run_time_phase2 = 0;
783                         ns->ln_items_checked = 0;
784                         ns->ln_items_repaired = 0;
785                         ns->ln_items_failed = 0;
786                         ns->ln_dirs_checked = 0;
787                         ns->ln_mlinked_checked = 0;
788                         ns->ln_objs_checked_phase2 = 0;
789                         ns->ln_objs_repaired_phase2 = 0;
790                         ns->ln_objs_failed_phase2 = 0;
791                         ns->ln_objs_nlink_repaired = 0;
792                         ns->ln_objs_lost_found = 0;
793                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
794                         if (cfs_list_empty(&com->lc_link_dir))
795                                 cfs_list_add_tail(&com->lc_link_dir,
796                                                   &lfsck->li_list_dir);
797                         *pos = ns->ln_pos_first_inconsistent;
798                 }
799         } else {
800                 ns->ln_status = LS_SCANNING_PHASE1;
801                 if (cfs_list_empty(&com->lc_link_dir))
802                         cfs_list_add_tail(&com->lc_link_dir,
803                                           &lfsck->li_list_dir);
804                 if (!lfsck->li_drop_dryrun ||
805                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
806                         *pos = ns->ln_pos_last_checkpoint;
807                         pos->lp_oit_cookie++;
808                 } else {
809                         *pos = ns->ln_pos_first_inconsistent;
810                 }
811         }
812         spin_unlock(&lfsck->li_lock);
813
814         up_write(&com->lc_sem);
815         return 0;
816 }
817
818 static int lfsck_namespace_exec_oit(const struct lu_env *env,
819                                     struct lfsck_component *com,
820                                     struct dt_object *obj)
821 {
822         down_write(&com->lc_sem);
823         com->lc_new_checked++;
824         if (S_ISDIR(lfsck_object_type(obj)))
825                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
826         up_write(&com->lc_sem);
827         return 0;
828 }
829
830 static int lfsck_namespace_exec_dir(const struct lu_env *env,
831                                     struct lfsck_component *com,
832                                     struct dt_object *obj,
833                                     struct lu_dirent *ent)
834 {
835         struct lfsck_thread_info   *info     = lfsck_env_info(env);
836         struct lu_attr             *la       = &info->lti_la;
837         struct lfsck_instance      *lfsck    = com->lc_lfsck;
838         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
839         struct lfsck_namespace     *ns       =
840                                 (struct lfsck_namespace *)com->lc_file_ram;
841         struct linkea_data          ldata    = { 0 };
842         const struct lu_fid        *pfid     =
843                                 lu_object_fid(&lfsck->li_obj_dir->do_lu);
844         const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
845         const struct lu_name       *cname;
846         struct thandle             *handle   = NULL;
847         bool                        repaired = false;
848         bool                        locked   = false;
849         bool                        remove;
850         bool                        newdata;
851         int                         count    = 0;
852         int                         rc;
853         ENTRY;
854
855         cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
856         down_write(&com->lc_sem);
857         com->lc_new_checked++;
858
859         if (ent->lde_attrs & LUDA_UPGRADE) {
860                 ns->ln_flags |= LF_UPGRADE;
861                 repaired = true;
862         } else if (ent->lde_attrs & LUDA_REPAIR) {
863                 ns->ln_flags |= LF_INCONSISTENT;
864                 repaired = true;
865         }
866
867         if (ent->lde_name[0] == '.' &&
868             (ent->lde_namelen == 1 ||
869              (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
870              fid_is_dot_lustre(&ent->lde_fid)))
871                 GOTO(out, rc = 0);
872
873         if (!(bk->lb_param & LPF_DRYRUN) &&
874             (com->lc_journal || repaired)) {
875
876 again:
877                 LASSERT(!locked);
878
879                 com->lc_journal = 1;
880                 handle = dt_trans_create(env, lfsck->li_next);
881                 if (IS_ERR(handle))
882                         GOTO(out, rc = PTR_ERR(handle));
883
884                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
885                 if (rc != 0)
886                         GOTO(stop, rc);
887
888                 rc = dt_trans_start(env, lfsck->li_next, handle);
889                 if (rc != 0)
890                         GOTO(stop, rc);
891
892                 dt_write_lock(env, obj, MOR_TGT_CHILD);
893                 locked = true;
894         }
895
896         rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
897         if (rc != 0)
898                 GOTO(stop, rc);
899
900         rc = lfsck_links_read(env, obj, &ldata);
901         if (rc == 0) {
902                 count = ldata.ld_leh->leh_reccount;
903                 rc = linkea_links_find(&ldata, cname, pfid);
904                 if ((rc == 0) &&
905                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
906                         goto record;
907
908                 ns->ln_flags |= LF_INCONSISTENT;
909                 /* For dir, if there are more than one linkea entries, or the
910                  * linkea entry does not match the name entry, then remove all
911                  * and add the correct one. */
912                 if (S_ISDIR(lfsck_object_type(obj))) {
913                         remove = true;
914                         newdata = true;
915                 } else {
916                         remove = false;
917                         newdata = false;
918                 }
919                 goto nodata;
920         } else if (unlikely(rc == -EINVAL)) {
921                 count = 1;
922                 ns->ln_flags |= LF_INCONSISTENT;
923                 /* The magic crashed, we are not sure whether there are more
924                  * corrupt data in the linkea, so remove all linkea entries. */
925                 remove = true;
926                 newdata = true;
927                 goto nodata;
928         } else if (rc == -ENODATA) {
929                 count = 1;
930                 ns->ln_flags |= LF_UPGRADE;
931                 remove = false;
932                 newdata = true;
933
934 nodata:
935                 if (bk->lb_param & LPF_DRYRUN) {
936                         repaired = true;
937                         goto record;
938                 }
939
940                 if (!com->lc_journal)
941                         goto again;
942
943                 if (remove) {
944                         LASSERT(newdata);
945
946                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
947                                           BYPASS_CAPA);
948                         if (rc != 0)
949                                 GOTO(stop, rc);
950                 }
951
952                 if (newdata) {
953                         rc = linkea_data_new(&ldata,
954                                         &lfsck_env_info(env)->lti_linkea_buf);
955                         if (rc != 0)
956                                 GOTO(stop, rc);
957                 }
958
959                 rc = linkea_add_buf(&ldata, cname, pfid);
960                 if (rc != 0)
961                         GOTO(stop, rc);
962
963                 rc = lfsck_links_write(env, obj, &ldata, handle);
964                 if (rc != 0)
965                         GOTO(stop, rc);
966
967                 count = ldata.ld_leh->leh_reccount;
968                 repaired = true;
969         } else {
970                 GOTO(stop, rc);
971         }
972
973 record:
974         LASSERT(count > 0);
975
976         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
977         if (rc != 0)
978                 GOTO(stop, rc);
979
980         if ((count == 1) &&
981             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
982                 /* Usually, it is for single linked object or dir, do nothing.*/
983                 GOTO(stop, rc);
984
985         /* Following modification will be in another transaction.  */
986         if (handle != NULL) {
987                 LASSERT(dt_write_locked(env, obj));
988
989                 dt_write_unlock(env, obj);
990                 locked = false;
991
992                 dt_trans_stop(env, lfsck->li_next, handle);
993                 handle = NULL;
994         }
995
996         ns->ln_mlinked_checked++;
997         rc = lfsck_namespace_update(env, com, cfid,
998                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
999
1000         GOTO(out, rc);
1001
1002 stop:
1003         if (locked)
1004                 dt_write_unlock(env, obj);
1005
1006         if (handle != NULL)
1007                 dt_trans_stop(env, lfsck->li_next, handle);
1008
1009 out:
1010         if (rc < 0) {
1011                 ns->ln_items_failed++;
1012                 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1013                         lfsck_pos_fill(env, lfsck,
1014                                        &ns->ln_pos_first_inconsistent, false);
1015                 if (!(bk->lb_param & LPF_FAILOUT))
1016                         rc = 0;
1017         } else {
1018                 if (repaired)
1019                         ns->ln_items_repaired++;
1020                 else
1021                         com->lc_journal = 0;
1022                 rc = 0;
1023         }
1024         up_write(&com->lc_sem);
1025         return rc;
1026 }
1027
1028 static int lfsck_namespace_post(const struct lu_env *env,
1029                                 struct lfsck_component *com,
1030                                 int result, bool init)
1031 {
1032         struct lfsck_instance   *lfsck = com->lc_lfsck;
1033         struct lfsck_namespace  *ns    =
1034                                 (struct lfsck_namespace *)com->lc_file_ram;
1035         int                      rc;
1036
1037         down_write(&com->lc_sem);
1038
1039         spin_lock(&lfsck->li_lock);
1040         if (!init)
1041                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
1042         if (result > 0) {
1043                 ns->ln_status = LS_SCANNING_PHASE2;
1044                 ns->ln_flags |= LF_SCANNED_ONCE;
1045                 ns->ln_flags &= ~LF_UPGRADE;
1046                 cfs_list_del_init(&com->lc_link);
1047                 cfs_list_del_init(&com->lc_link_dir);
1048                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
1049         } else if (result == 0) {
1050                 if (lfsck->li_paused) {
1051                         ns->ln_status = LS_PAUSED;
1052                 } else {
1053                         ns->ln_status = LS_STOPPED;
1054                         cfs_list_del_init(&com->lc_link);
1055                         cfs_list_del_init(&com->lc_link_dir);
1056                         cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1057                 }
1058         } else {
1059                 ns->ln_status = LS_FAILED;
1060                 cfs_list_del_init(&com->lc_link);
1061                 cfs_list_del_init(&com->lc_link_dir);
1062                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1063         }
1064         spin_unlock(&lfsck->li_lock);
1065
1066         if (!init) {
1067                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1068                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1069                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1070                 ns->ln_items_checked += com->lc_new_checked;
1071                 com->lc_new_checked = 0;
1072         }
1073
1074         rc = lfsck_namespace_store(env, com, false);
1075
1076         up_write(&com->lc_sem);
1077         return rc;
1078 }
1079
1080 static int
1081 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1082                      char *buf, int len)
1083 {
1084         struct lfsck_instance   *lfsck = com->lc_lfsck;
1085         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1086         struct lfsck_namespace  *ns    =
1087                                 (struct lfsck_namespace *)com->lc_file_ram;
1088         int                      save  = len;
1089         int                      ret   = -ENOSPC;
1090         int                      rc;
1091
1092         down_read(&com->lc_sem);
1093         rc = snprintf(buf, len,
1094                       "name: lfsck_namespace\n"
1095                       "magic: 0x%x\n"
1096                       "version: %d\n"
1097                       "status: %s\n",
1098                       ns->ln_magic,
1099                       bk->lb_version,
1100                       lfsck_status_names[ns->ln_status]);
1101         if (rc <= 0)
1102                 goto out;
1103
1104         buf += rc;
1105         len -= rc;
1106         rc = lfsck_bits_dump(&buf, &len, ns->ln_flags, lfsck_flags_names,
1107                              "flags");
1108         if (rc < 0)
1109                 goto out;
1110
1111         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
1112                              "param");
1113         if (rc < 0)
1114                 goto out;
1115
1116         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_complete,
1117                              "time_since_last_completed");
1118         if (rc < 0)
1119                 goto out;
1120
1121         rc = lfsck_time_dump(&buf, &len, ns->ln_time_latest_start,
1122                              "time_since_latest_start");
1123         if (rc < 0)
1124                 goto out;
1125
1126         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_checkpoint,
1127                              "time_since_last_checkpoint");
1128         if (rc < 0)
1129                 goto out;
1130
1131         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_latest_start,
1132                             "latest_start_position");
1133         if (rc < 0)
1134                 goto out;
1135
1136         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_last_checkpoint,
1137                             "last_checkpoint_position");
1138         if (rc < 0)
1139                 goto out;
1140
1141         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_first_inconsistent,
1142                             "first_failure_position");
1143         if (rc < 0)
1144                 goto out;
1145
1146         if (ns->ln_status == LS_SCANNING_PHASE1) {
1147                 struct lfsck_position pos;
1148                 const struct dt_it_ops *iops;
1149                 cfs_duration_t duration = cfs_time_current() -
1150                                           lfsck->li_time_last_checkpoint;
1151                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1152                 __u64 speed = checked;
1153                 __u64 new_checked = com->lc_new_checked * CFS_HZ;
1154                 __u32 rtime = ns->ln_run_time_phase1 +
1155                               cfs_duration_sec(duration + HALF_SEC);
1156
1157                 if (duration != 0)
1158                         do_div(new_checked, duration);
1159                 if (rtime != 0)
1160                         do_div(speed, rtime);
1161                 rc = snprintf(buf, len,
1162                               "checked_phase1: "LPU64"\n"
1163                               "checked_phase2: "LPU64"\n"
1164                               "updated_phase1: "LPU64"\n"
1165                               "updated_phase2: "LPU64"\n"
1166                               "failed_phase1: "LPU64"\n"
1167                               "failed_phase2: "LPU64"\n"
1168                               "dirs: "LPU64"\n"
1169                               "M-linked: "LPU64"\n"
1170                               "nlinks_repaired: "LPU64"\n"
1171                               "lost_found: "LPU64"\n"
1172                               "success_count: %u\n"
1173                               "run_time_phase1: %u seconds\n"
1174                               "run_time_phase2: %u seconds\n"
1175                               "average_speed_phase1: "LPU64" items/sec\n"
1176                               "average_speed_phase2: N/A\n"
1177                               "real-time_speed_phase1: "LPU64" items/sec\n"
1178                               "real-time_speed_phase2: N/A\n",
1179                               checked,
1180                               ns->ln_objs_checked_phase2,
1181                               ns->ln_items_repaired,
1182                               ns->ln_objs_repaired_phase2,
1183                               ns->ln_items_failed,
1184                               ns->ln_objs_failed_phase2,
1185                               ns->ln_dirs_checked,
1186                               ns->ln_mlinked_checked,
1187                               ns->ln_objs_nlink_repaired,
1188                               ns->ln_objs_lost_found,
1189                               ns->ln_success_count,
1190                               rtime,
1191                               ns->ln_run_time_phase2,
1192                               speed,
1193                               new_checked);
1194                 if (rc <= 0)
1195                         goto out;
1196
1197                 buf += rc;
1198                 len -= rc;
1199
1200                 LASSERT(lfsck->li_di_oit != NULL);
1201
1202                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1203
1204                 /* The low layer otable-based iteration position may NOT
1205                  * exactly match the namespace-based directory traversal
1206                  * cookie. Generally, it is not a serious issue. But the
1207                  * caller should NOT make assumption on that. */
1208                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1209                 if (!lfsck->li_current_oit_processed)
1210                         pos.lp_oit_cookie--;
1211
1212                 spin_lock(&lfsck->li_lock);
1213                 if (lfsck->li_di_dir != NULL) {
1214                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1215                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1216                                 fid_zero(&pos.lp_dir_parent);
1217                                 pos.lp_dir_cookie = 0;
1218                         } else {
1219                                 pos.lp_dir_parent =
1220                                 *lu_object_fid(&lfsck->li_obj_dir->do_lu);
1221                         }
1222                 } else {
1223                         fid_zero(&pos.lp_dir_parent);
1224                         pos.lp_dir_cookie = 0;
1225                 }
1226                 spin_unlock(&lfsck->li_lock);
1227                 rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
1228                 if (rc <= 0)
1229                         goto out;
1230         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1231                 cfs_duration_t duration = cfs_time_current() -
1232                                           lfsck->li_time_last_checkpoint;
1233                 __u64 checked = ns->ln_objs_checked_phase2 +
1234                                 com->lc_new_checked;
1235                 __u64 speed1 = ns->ln_items_checked;
1236                 __u64 speed2 = checked;
1237                 __u64 new_checked = com->lc_new_checked * CFS_HZ;
1238                 __u32 rtime = ns->ln_run_time_phase2 +
1239                               cfs_duration_sec(duration + HALF_SEC);
1240
1241                 if (duration != 0)
1242                         do_div(new_checked, duration);
1243                 if (ns->ln_run_time_phase1 != 0)
1244                         do_div(speed1, ns->ln_run_time_phase1);
1245                 if (rtime != 0)
1246                         do_div(speed2, rtime);
1247                 rc = snprintf(buf, len,
1248                               "checked_phase1: "LPU64"\n"
1249                               "checked_phase2: "LPU64"\n"
1250                               "updated_phase1: "LPU64"\n"
1251                               "updated_phase2: "LPU64"\n"
1252                               "failed_phase1: "LPU64"\n"
1253                               "failed_phase2: "LPU64"\n"
1254                               "dirs: "LPU64"\n"
1255                               "M-linked: "LPU64"\n"
1256                               "nlinks_repaired: "LPU64"\n"
1257                               "lost_found: "LPU64"\n"
1258                               "success_count: %u\n"
1259                               "run_time_phase1: %u seconds\n"
1260                               "run_time_phase2: %u seconds\n"
1261                               "average_speed_phase1: "LPU64" items/sec\n"
1262                               "average_speed_phase2: "LPU64" objs/sec\n"
1263                               "real-time_speed_phase1: N/A\n"
1264                               "real-time_speed_phase2: "LPU64" objs/sec\n"
1265                               "current_position: "DFID"\n",
1266                               ns->ln_items_checked,
1267                               checked,
1268                               ns->ln_items_repaired,
1269                               ns->ln_objs_repaired_phase2,
1270                               ns->ln_items_failed,
1271                               ns->ln_objs_failed_phase2,
1272                               ns->ln_dirs_checked,
1273                               ns->ln_mlinked_checked,
1274                               ns->ln_objs_nlink_repaired,
1275                               ns->ln_objs_lost_found,
1276                               ns->ln_success_count,
1277                               ns->ln_run_time_phase1,
1278                               rtime,
1279                               speed1,
1280                               speed2,
1281                               new_checked,
1282                               PFID(&ns->ln_fid_latest_scanned_phase2));
1283                 if (rc <= 0)
1284                         goto out;
1285
1286                 buf += rc;
1287                 len -= rc;
1288         } else {
1289                 __u64 speed1 = ns->ln_items_checked;
1290                 __u64 speed2 = ns->ln_objs_checked_phase2;
1291
1292                 if (ns->ln_run_time_phase1 != 0)
1293                         do_div(speed1, ns->ln_run_time_phase1);
1294                 if (ns->ln_run_time_phase2 != 0)
1295                         do_div(speed2, ns->ln_run_time_phase2);
1296                 rc = snprintf(buf, len,
1297                               "checked_phase1: "LPU64"\n"
1298                               "checked_phase2: "LPU64"\n"
1299                               "updated_phase1: "LPU64"\n"
1300                               "updated_phase2: "LPU64"\n"
1301                               "failed_phase1: "LPU64"\n"
1302                               "failed_phase2: "LPU64"\n"
1303                               "dirs: "LPU64"\n"
1304                               "M-linked: "LPU64"\n"
1305                               "nlinks_repaired: "LPU64"\n"
1306                               "lost_found: "LPU64"\n"
1307                               "success_count: %u\n"
1308                               "run_time_phase1: %u seconds\n"
1309                               "run_time_phase2: %u seconds\n"
1310                               "average_speed_phase1: "LPU64" items/sec\n"
1311                               "average_speed_phase2: "LPU64" objs/sec\n"
1312                               "real-time_speed_phase1: N/A\n"
1313                               "real-time_speed_phase2: N/A\n"
1314                               "current_position: N/A\n",
1315                               ns->ln_items_checked,
1316                               ns->ln_objs_checked_phase2,
1317                               ns->ln_items_repaired,
1318                               ns->ln_objs_repaired_phase2,
1319                               ns->ln_items_failed,
1320                               ns->ln_objs_failed_phase2,
1321                               ns->ln_dirs_checked,
1322                               ns->ln_mlinked_checked,
1323                               ns->ln_objs_nlink_repaired,
1324                               ns->ln_objs_lost_found,
1325                               ns->ln_success_count,
1326                               ns->ln_run_time_phase1,
1327                               ns->ln_run_time_phase2,
1328                               speed1,
1329                               speed2);
1330                 if (rc <= 0)
1331                         goto out;
1332
1333                 buf += rc;
1334                 len -= rc;
1335         }
1336         ret = save - len;
1337
1338 out:
1339         up_read(&com->lc_sem);
1340         return ret;
1341 }
1342
1343 static int lfsck_namespace_double_scan(const struct lu_env *env,
1344                                        struct lfsck_component *com)
1345 {
1346         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1347         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1348         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1349         struct lfsck_namespace  *ns     =
1350                                 (struct lfsck_namespace *)com->lc_file_ram;
1351         struct dt_object        *obj    = com->lc_obj;
1352         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1353         struct dt_object        *target;
1354         struct dt_it            *di;
1355         struct dt_key           *key;
1356         struct lu_fid            fid;
1357         int                      rc;
1358         __u8                     flags;
1359         ENTRY;
1360
1361         lfsck->li_new_scanned = 0;
1362         lfsck->li_time_last_checkpoint = cfs_time_current();
1363         lfsck->li_time_next_checkpoint = lfsck->li_time_last_checkpoint +
1364                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1365
1366         di = iops->init(env, obj, 0, BYPASS_CAPA);
1367         if (IS_ERR(di))
1368                 RETURN(PTR_ERR(di));
1369
1370         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1371         rc = iops->get(env, di, (const struct dt_key *)&fid);
1372         if (rc < 0)
1373                 GOTO(fini, rc);
1374
1375         /* Skip the start one, which either has been processed or non-exist. */
1376         rc = iops->next(env, di);
1377         if (rc != 0)
1378                 GOTO(put, rc);
1379
1380         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1381                 GOTO(put, rc = 0);
1382
1383         do {
1384                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1385                     cfs_fail_val > 0) {
1386                         struct l_wait_info lwi;
1387
1388                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1389                                           NULL, NULL);
1390                         l_wait_event(thread->t_ctl_waitq,
1391                                      !thread_is_running(thread),
1392                                      &lwi);
1393                 }
1394
1395                 key = iops->key(env, di);
1396                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1397                 target = lfsck_object_find(env, lfsck, &fid);
1398                 down_write(&com->lc_sem);
1399                 if (target == NULL) {
1400                         rc = 0;
1401                         goto checkpoint;
1402                 } else if (IS_ERR(target)) {
1403                         rc = PTR_ERR(target);
1404                         goto checkpoint;
1405                 }
1406
1407                 /* XXX: Currently, skip remote object, the consistency for
1408                  *      remote object will be processed in LFSCK phase III. */
1409                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1410                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1411                         if (rc == 0)
1412                                 rc = lfsck_namespace_double_scan_one(env, com,
1413                                                                 target, flags);
1414                 }
1415
1416                 lfsck_object_put(env, target);
1417
1418 checkpoint:
1419                 lfsck->li_new_scanned++;
1420                 com->lc_new_checked++;
1421                 ns->ln_fid_latest_scanned_phase2 = fid;
1422                 if (rc > 0)
1423                         ns->ln_objs_repaired_phase2++;
1424                 else if (rc < 0)
1425                         ns->ln_objs_failed_phase2++;
1426                 up_write(&com->lc_sem);
1427
1428                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1429                         lfsck_namespace_delete(env, com, &fid);
1430                 } else if (rc < 0) {
1431                         flags |= LLF_REPAIR_FAILED;
1432                         lfsck_namespace_update(env, com, &fid, flags, true);
1433                 }
1434
1435                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1436                         GOTO(put, rc);
1437
1438                 if (unlikely(cfs_time_beforeq(lfsck->li_time_next_checkpoint,
1439                                               cfs_time_current())) &&
1440                     com->lc_new_checked != 0) {
1441                         down_write(&com->lc_sem);
1442                         ns->ln_run_time_phase2 +=
1443                                 cfs_duration_sec(cfs_time_current() +
1444                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1445                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1446                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1447                         com->lc_new_checked = 0;
1448                         rc = lfsck_namespace_store(env, com, false);
1449                         up_write(&com->lc_sem);
1450                         if (rc != 0)
1451                                 GOTO(put, rc);
1452
1453                         lfsck->li_time_last_checkpoint = cfs_time_current();
1454                         lfsck->li_time_next_checkpoint =
1455                                 lfsck->li_time_last_checkpoint +
1456                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1457                 }
1458
1459                 lfsck_control_speed(lfsck);
1460                 if (unlikely(!thread_is_running(thread)))
1461                         GOTO(put, rc = 0);
1462
1463                 rc = iops->next(env, di);
1464         } while (rc == 0);
1465
1466         GOTO(put, rc);
1467
1468 put:
1469         iops->put(env, di);
1470
1471 fini:
1472         iops->fini(env, di);
1473         down_write(&com->lc_sem);
1474
1475         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1476                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1477         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1478         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1479         com->lc_new_checked = 0;
1480
1481         if (rc > 0) {
1482                 com->lc_journal = 0;
1483                 ns->ln_status = LS_COMPLETED;
1484                 if (!(bk->lb_param & LPF_DRYRUN))
1485                         ns->ln_flags &=
1486                         ~(LF_SCANNED_ONCE | LF_INCONSISTENT | LF_UPGRADE);
1487                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1488                 ns->ln_success_count++;
1489         } else if (rc == 0) {
1490                 if (lfsck->li_paused)
1491                         ns->ln_status = LS_PAUSED;
1492                 else
1493                         ns->ln_status = LS_STOPPED;
1494         } else {
1495                 ns->ln_status = LS_FAILED;
1496         }
1497
1498         if (ns->ln_status != LS_PAUSED) {
1499                 spin_lock(&lfsck->li_lock);
1500                 cfs_list_del_init(&com->lc_link);
1501                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1502                 spin_unlock(&lfsck->li_lock);
1503         }
1504
1505         rc = lfsck_namespace_store(env, com, false);
1506
1507         up_write(&com->lc_sem);
1508         return rc;
1509 }
1510
1511 static struct lfsck_operations lfsck_namespace_ops = {
1512         .lfsck_reset            = lfsck_namespace_reset,
1513         .lfsck_fail             = lfsck_namespace_fail,
1514         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1515         .lfsck_prep             = lfsck_namespace_prep,
1516         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1517         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1518         .lfsck_post             = lfsck_namespace_post,
1519         .lfsck_dump             = lfsck_namespace_dump,
1520         .lfsck_double_scan      = lfsck_namespace_double_scan,
1521 };
1522
1523 int lfsck_namespace_setup(const struct lu_env *env,
1524                           struct lfsck_instance *lfsck)
1525 {
1526         struct lfsck_component  *com;
1527         struct lfsck_namespace  *ns;
1528         struct dt_object        *obj;
1529         int                      rc;
1530         ENTRY;
1531
1532         LASSERT(lfsck->li_master);
1533
1534         OBD_ALLOC_PTR(com);
1535         if (com == NULL)
1536                 RETURN(-ENOMEM);
1537
1538         CFS_INIT_LIST_HEAD(&com->lc_link);
1539         CFS_INIT_LIST_HEAD(&com->lc_link_dir);
1540         init_rwsem(&com->lc_sem);
1541         atomic_set(&com->lc_ref, 1);
1542         com->lc_lfsck = lfsck;
1543         com->lc_type = LT_NAMESPACE;
1544         com->lc_ops = &lfsck_namespace_ops;
1545         com->lc_file_size = sizeof(struct lfsck_namespace);
1546         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1547         if (com->lc_file_ram == NULL)
1548                 GOTO(out, rc = -ENOMEM);
1549
1550         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1551         if (com->lc_file_disk == NULL)
1552                 GOTO(out, rc = -ENOMEM);
1553
1554         obj = local_index_find_or_create(env, lfsck->li_los,
1555                                          lfsck->li_local_root,
1556                                          lfsck_namespace_name,
1557                                          S_IFREG | S_IRUGO | S_IWUSR,
1558                                          &dt_lfsck_features);
1559         if (IS_ERR(obj))
1560                 GOTO(out, rc = PTR_ERR(obj));
1561
1562         com->lc_obj = obj;
1563         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1564         if (rc != 0)
1565                 GOTO(out, rc);
1566
1567         rc = lfsck_namespace_load(env, com);
1568         if (rc > 0)
1569                 rc = lfsck_namespace_reset(env, com, true);
1570         else if (rc == -ENODATA)
1571                 rc = lfsck_namespace_init(env, com);
1572         if (rc != 0)
1573                 GOTO(out, rc);
1574
1575         ns = (struct lfsck_namespace *)com->lc_file_ram;
1576         switch (ns->ln_status) {
1577         case LS_INIT:
1578         case LS_COMPLETED:
1579         case LS_FAILED:
1580         case LS_STOPPED:
1581                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1582                 break;
1583         default:
1584                 CERROR("%s: unknown lfsck_namespace status: %u\n",
1585                        lfsck_lfsck2name(lfsck), ns->ln_status);
1586                 /* fall through */
1587         case LS_SCANNING_PHASE1:
1588         case LS_SCANNING_PHASE2:
1589                 /* No need to store the status to disk right now.
1590                  * If the system crashed before the status stored,
1591                  * it will be loaded back when next time. */
1592                 ns->ln_status = LS_CRASHED;
1593                 /* fall through */
1594         case LS_PAUSED:
1595         case LS_CRASHED:
1596                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
1597                 cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
1598                 break;
1599         }
1600
1601         GOTO(out, rc = 0);
1602
1603 out:
1604         if (rc != 0)
1605                 lfsck_component_cleanup(env, com);
1606         return rc;
1607 }