Whamcloud - gitweb
586694fd757a272e2dff7734d7fa1f6600fc114d
[fs/lustre-release.git] / lustre / lfsck / lfsck_namespace.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  */
25 /*
26  * lustre/lfsck/lfsck_namespace.c
27  *
28  * Author: Fan, Yong <fan.yong@intel.com>
29  */
30
31 #define DEBUG_SUBSYSTEM S_LFSCK
32
33 #include <lustre/lustre_idl.h>
34 #include <lu_object.h>
35 #include <dt_object.h>
36 #include <md_object.h>
37 #include <lustre_linkea.h>
38 #include <lustre_fid.h>
39 #include <lustre_lib.h>
40 #include <lustre_net.h>
41 #include <lustre/lustre_user.h>
42
43 #include "lfsck_internal.h"
44
45 #define LFSCK_NAMESPACE_MAGIC   0xA0629D03
46
47 static const char lfsck_namespace_name[] = "lfsck_namespace";
48
49 static void lfsck_namespace_le_to_cpu(struct lfsck_namespace *dst,
50                                       struct lfsck_namespace *src)
51 {
52         dst->ln_magic = le32_to_cpu(src->ln_magic);
53         dst->ln_status = le32_to_cpu(src->ln_status);
54         dst->ln_flags = le32_to_cpu(src->ln_flags);
55         dst->ln_success_count = le32_to_cpu(src->ln_success_count);
56         dst->ln_run_time_phase1 = le32_to_cpu(src->ln_run_time_phase1);
57         dst->ln_run_time_phase2 = le32_to_cpu(src->ln_run_time_phase2);
58         dst->ln_time_last_complete = le64_to_cpu(src->ln_time_last_complete);
59         dst->ln_time_latest_start = le64_to_cpu(src->ln_time_latest_start);
60         dst->ln_time_last_checkpoint =
61                                 le64_to_cpu(src->ln_time_last_checkpoint);
62         lfsck_position_le_to_cpu(&dst->ln_pos_latest_start,
63                                  &src->ln_pos_latest_start);
64         lfsck_position_le_to_cpu(&dst->ln_pos_last_checkpoint,
65                                  &src->ln_pos_last_checkpoint);
66         lfsck_position_le_to_cpu(&dst->ln_pos_first_inconsistent,
67                                  &src->ln_pos_first_inconsistent);
68         dst->ln_items_checked = le64_to_cpu(src->ln_items_checked);
69         dst->ln_items_repaired = le64_to_cpu(src->ln_items_repaired);
70         dst->ln_items_failed = le64_to_cpu(src->ln_items_failed);
71         dst->ln_dirs_checked = le64_to_cpu(src->ln_dirs_checked);
72         dst->ln_mlinked_checked = le64_to_cpu(src->ln_mlinked_checked);
73         dst->ln_objs_checked_phase2 = le64_to_cpu(src->ln_objs_checked_phase2);
74         dst->ln_objs_repaired_phase2 =
75                                 le64_to_cpu(src->ln_objs_repaired_phase2);
76         dst->ln_objs_failed_phase2 = le64_to_cpu(src->ln_objs_failed_phase2);
77         dst->ln_objs_nlink_repaired = le64_to_cpu(src->ln_objs_nlink_repaired);
78         dst->ln_objs_lost_found = le64_to_cpu(src->ln_objs_lost_found);
79         fid_le_to_cpu(&dst->ln_fid_latest_scanned_phase2,
80                       &src->ln_fid_latest_scanned_phase2);
81         dst->ln_dirent_repaired = le64_to_cpu(src->ln_dirent_repaired);
82         dst->ln_linkea_repaired = le64_to_cpu(src->ln_linkea_repaired);
83 }
84
85 static void lfsck_namespace_cpu_to_le(struct lfsck_namespace *dst,
86                                       struct lfsck_namespace *src)
87 {
88         dst->ln_magic = cpu_to_le32(src->ln_magic);
89         dst->ln_status = cpu_to_le32(src->ln_status);
90         dst->ln_flags = cpu_to_le32(src->ln_flags);
91         dst->ln_success_count = cpu_to_le32(src->ln_success_count);
92         dst->ln_run_time_phase1 = cpu_to_le32(src->ln_run_time_phase1);
93         dst->ln_run_time_phase2 = cpu_to_le32(src->ln_run_time_phase2);
94         dst->ln_time_last_complete = cpu_to_le64(src->ln_time_last_complete);
95         dst->ln_time_latest_start = cpu_to_le64(src->ln_time_latest_start);
96         dst->ln_time_last_checkpoint =
97                                 cpu_to_le64(src->ln_time_last_checkpoint);
98         lfsck_position_cpu_to_le(&dst->ln_pos_latest_start,
99                                  &src->ln_pos_latest_start);
100         lfsck_position_cpu_to_le(&dst->ln_pos_last_checkpoint,
101                                  &src->ln_pos_last_checkpoint);
102         lfsck_position_cpu_to_le(&dst->ln_pos_first_inconsistent,
103                                  &src->ln_pos_first_inconsistent);
104         dst->ln_items_checked = cpu_to_le64(src->ln_items_checked);
105         dst->ln_items_repaired = cpu_to_le64(src->ln_items_repaired);
106         dst->ln_items_failed = cpu_to_le64(src->ln_items_failed);
107         dst->ln_dirs_checked = cpu_to_le64(src->ln_dirs_checked);
108         dst->ln_mlinked_checked = cpu_to_le64(src->ln_mlinked_checked);
109         dst->ln_objs_checked_phase2 = cpu_to_le64(src->ln_objs_checked_phase2);
110         dst->ln_objs_repaired_phase2 =
111                                 cpu_to_le64(src->ln_objs_repaired_phase2);
112         dst->ln_objs_failed_phase2 = cpu_to_le64(src->ln_objs_failed_phase2);
113         dst->ln_objs_nlink_repaired = cpu_to_le64(src->ln_objs_nlink_repaired);
114         dst->ln_objs_lost_found = cpu_to_le64(src->ln_objs_lost_found);
115         fid_cpu_to_le(&dst->ln_fid_latest_scanned_phase2,
116                       &src->ln_fid_latest_scanned_phase2);
117         dst->ln_dirent_repaired = cpu_to_le64(src->ln_dirent_repaired);
118         dst->ln_linkea_repaired = cpu_to_le64(src->ln_linkea_repaired);
119 }
120
121 /**
122  * \retval +ve: the lfsck_namespace is broken, the caller should reset it.
123  * \retval 0: succeed.
124  * \retval -ve: failed cases.
125  */
126 static int lfsck_namespace_load(const struct lu_env *env,
127                                 struct lfsck_component *com)
128 {
129         int len = com->lc_file_size;
130         int rc;
131
132         rc = dt_xattr_get(env, com->lc_obj,
133                           lfsck_buf_get(env, com->lc_file_disk, len),
134                           XATTR_NAME_LFSCK_NAMESPACE, BYPASS_CAPA);
135         if (rc == len) {
136                 struct lfsck_namespace *ns = com->lc_file_ram;
137
138                 lfsck_namespace_le_to_cpu(ns,
139                                 (struct lfsck_namespace *)com->lc_file_disk);
140                 if (ns->ln_magic != LFSCK_NAMESPACE_MAGIC) {
141                         CWARN("%s: invalid lfsck_namespace magic %#x != %#x\n",
142                               lfsck_lfsck2name(com->lc_lfsck), ns->ln_magic,
143                               LFSCK_NAMESPACE_MAGIC);
144                         rc = 1;
145                 } else {
146                         rc = 0;
147                 }
148         } else if (rc != -ENODATA) {
149                 CERROR("%s: fail to load lfsck_namespace: expected = %d, "
150                        "rc = %d\n", lfsck_lfsck2name(com->lc_lfsck), len, rc);
151                 if (rc >= 0)
152                         rc = 1;
153         }
154         return rc;
155 }
156
157 static int lfsck_namespace_store(const struct lu_env *env,
158                                  struct lfsck_component *com, bool init)
159 {
160         struct dt_object        *obj    = com->lc_obj;
161         struct lfsck_instance   *lfsck  = com->lc_lfsck;
162         struct thandle          *handle;
163         int                      len    = com->lc_file_size;
164         int                      rc;
165         ENTRY;
166
167         lfsck_namespace_cpu_to_le((struct lfsck_namespace *)com->lc_file_disk,
168                                   (struct lfsck_namespace *)com->lc_file_ram);
169         handle = dt_trans_create(env, lfsck->li_bottom);
170         if (IS_ERR(handle)) {
171                 rc = PTR_ERR(handle);
172                 CERROR("%s: fail to create trans for storing lfsck_namespace: "
173                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
174                 RETURN(rc);
175         }
176
177         rc = dt_declare_xattr_set(env, obj,
178                                   lfsck_buf_get(env, com->lc_file_disk, len),
179                                   XATTR_NAME_LFSCK_NAMESPACE, 0, handle);
180         if (rc != 0) {
181                 CERROR("%s: fail to declare trans for storing lfsck_namespace: "
182                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
183                 GOTO(out, rc);
184         }
185
186         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
187         if (rc != 0) {
188                 CERROR("%s: fail to start trans for storing lfsck_namespace: "
189                        "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
190                 GOTO(out, rc);
191         }
192
193         rc = dt_xattr_set(env, obj,
194                           lfsck_buf_get(env, com->lc_file_disk, len),
195                           XATTR_NAME_LFSCK_NAMESPACE,
196                           init ? LU_XATTR_CREATE : LU_XATTR_REPLACE,
197                           handle, BYPASS_CAPA);
198         if (rc != 0)
199                 CERROR("%s: fail to store lfsck_namespace: len = %d, "
200                        "rc = %d\n", lfsck_lfsck2name(lfsck), len, rc);
201
202         GOTO(out, rc);
203
204 out:
205         dt_trans_stop(env, lfsck->li_bottom, handle);
206         return rc;
207 }
208
209 static int lfsck_namespace_init(const struct lu_env *env,
210                                 struct lfsck_component *com)
211 {
212         struct lfsck_namespace *ns = com->lc_file_ram;
213         int rc;
214
215         memset(ns, 0, sizeof(*ns));
216         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
217         ns->ln_status = LS_INIT;
218         down_write(&com->lc_sem);
219         rc = lfsck_namespace_store(env, com, true);
220         up_write(&com->lc_sem);
221         return rc;
222 }
223
224 static int lfsck_namespace_lookup(const struct lu_env *env,
225                                   struct lfsck_component *com,
226                                   const struct lu_fid *fid, __u8 *flags)
227 {
228         struct lu_fid *key = &lfsck_env_info(env)->lti_fid;
229         int            rc;
230
231         fid_cpu_to_be(key, fid);
232         rc = dt_lookup(env, com->lc_obj, (struct dt_rec *)flags,
233                        (const struct dt_key *)key, BYPASS_CAPA);
234         return rc;
235 }
236
237 static int lfsck_namespace_delete(const struct lu_env *env,
238                                   struct lfsck_component *com,
239                                   const struct lu_fid *fid)
240 {
241         struct lfsck_instance   *lfsck  = com->lc_lfsck;
242         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
243         struct thandle          *handle;
244         struct dt_object        *obj    = com->lc_obj;
245         int                      rc;
246         ENTRY;
247
248         handle = dt_trans_create(env, lfsck->li_bottom);
249         if (IS_ERR(handle))
250                 RETURN(PTR_ERR(handle));
251
252         rc = dt_declare_delete(env, obj, (const struct dt_key *)fid, handle);
253         if (rc != 0)
254                 GOTO(out, rc);
255
256         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
257         if (rc != 0)
258                 GOTO(out, rc);
259
260         fid_cpu_to_be(key, fid);
261         rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
262                        BYPASS_CAPA);
263
264         GOTO(out, rc);
265
266 out:
267         dt_trans_stop(env, lfsck->li_bottom, handle);
268         return rc;
269 }
270
271 static int lfsck_namespace_update(const struct lu_env *env,
272                                   struct lfsck_component *com,
273                                   const struct lu_fid *fid,
274                                   __u8 flags, bool force)
275 {
276         struct lfsck_instance   *lfsck  = com->lc_lfsck;
277         struct lu_fid           *key    = &lfsck_env_info(env)->lti_fid;
278         struct thandle          *handle;
279         struct dt_object        *obj    = com->lc_obj;
280         int                      rc;
281         bool                     exist  = false;
282         __u8                     tf;
283         ENTRY;
284
285         rc = lfsck_namespace_lookup(env, com, fid, &tf);
286         if (rc != 0 && rc != -ENOENT)
287                 RETURN(rc);
288
289         if (rc == 0) {
290                 if (!force || flags == tf)
291                         RETURN(0);
292
293                 exist = true;
294                 handle = dt_trans_create(env, lfsck->li_bottom);
295                 if (IS_ERR(handle))
296                         RETURN(PTR_ERR(handle));
297
298                 rc = dt_declare_delete(env, obj, (const struct dt_key *)fid,
299                                        handle);
300                 if (rc != 0)
301                         GOTO(out, rc);
302         } else {
303                 handle = dt_trans_create(env, lfsck->li_bottom);
304                 if (IS_ERR(handle))
305                         RETURN(PTR_ERR(handle));
306         }
307
308         rc = dt_declare_insert(env, obj, (const struct dt_rec *)&flags,
309                                (const struct dt_key *)fid, handle);
310         if (rc != 0)
311                 GOTO(out, rc);
312
313         rc = dt_trans_start_local(env, lfsck->li_bottom, handle);
314         if (rc != 0)
315                 GOTO(out, rc);
316
317         fid_cpu_to_be(key, fid);
318         if (exist) {
319                 rc = dt_delete(env, obj, (const struct dt_key *)key, handle,
320                                BYPASS_CAPA);
321                 if (rc != 0) {
322                         CERROR("%s: fail to insert "DFID": rc = %d\n",
323                                lfsck_lfsck2name(com->lc_lfsck), PFID(fid), rc);
324                         GOTO(out, rc);
325                 }
326         }
327
328         rc = dt_insert(env, obj, (const struct dt_rec *)&flags,
329                        (const struct dt_key *)key, handle, BYPASS_CAPA, 1);
330
331         GOTO(out, rc);
332
333 out:
334         dt_trans_stop(env, lfsck->li_bottom, handle);
335         return rc;
336 }
337
338 static int lfsck_namespace_check_exist(const struct lu_env *env,
339                                        struct lfsck_instance *lfsck,
340                                        struct dt_object *obj, const char *name)
341 {
342         struct dt_object *dir = lfsck->li_obj_dir;
343         struct lu_fid    *fid = &lfsck_env_info(env)->lti_fid;
344         int               rc;
345         ENTRY;
346
347         if (unlikely(lfsck_is_dead_obj(obj)))
348                 RETURN(LFSCK_NAMEENTRY_DEAD);
349
350         rc = dt_lookup(env, dir, (struct dt_rec *)fid,
351                        (const struct dt_key *)name, BYPASS_CAPA);
352         if (rc == -ENOENT)
353                 RETURN(LFSCK_NAMEENTRY_REMOVED);
354
355         if (rc < 0)
356                 RETURN(rc);
357
358         if (!lu_fid_eq(fid, lfsck_dto2fid(obj)))
359                 RETURN(LFSCK_NAMEENTRY_RECREATED);
360
361         RETURN(0);
362 }
363
364 static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
365                                             struct dt_object *obj,
366                                             struct thandle *handle)
367 {
368         int rc;
369
370         /* For destroying all invalid linkEA entries. */
371         rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LINK, handle);
372         if (rc != 0)
373                 return rc;
374
375         /* For insert new linkEA entry. */
376         rc = dt_declare_xattr_set(env, obj,
377                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
378                         XATTR_NAME_LINK, 0, handle);
379         return rc;
380 }
381
382 static int lfsck_links_read(const struct lu_env *env, struct dt_object *obj,
383                             struct linkea_data *ldata)
384 {
385         int rc;
386
387         ldata->ld_buf =
388                 lu_buf_check_and_alloc(&lfsck_env_info(env)->lti_linkea_buf,
389                                        PAGE_CACHE_SIZE);
390         if (ldata->ld_buf->lb_buf == NULL)
391                 return -ENOMEM;
392
393         if (!dt_object_exists(obj))
394                 return -ENODATA;
395
396         rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK, BYPASS_CAPA);
397         if (rc == -ERANGE) {
398                 /* Buf was too small, figure out what we need. */
399                 lu_buf_free(ldata->ld_buf);
400                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
401                                   BYPASS_CAPA);
402                 if (rc < 0)
403                         return rc;
404
405                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
406                 if (ldata->ld_buf->lb_buf == NULL)
407                         return -ENOMEM;
408
409                 rc = dt_xattr_get(env, obj, ldata->ld_buf, XATTR_NAME_LINK,
410                                   BYPASS_CAPA);
411         }
412         if (rc < 0)
413                 return rc;
414
415         linkea_init(ldata);
416
417         return 0;
418 }
419
420 static int lfsck_links_write(const struct lu_env *env, struct dt_object *obj,
421                              struct linkea_data *ldata, struct thandle *handle)
422 {
423         const struct lu_buf *buf = lfsck_buf_get_const(env,
424                                                        ldata->ld_buf->lb_buf,
425                                                        ldata->ld_leh->leh_len);
426
427         return dt_xattr_set(env, obj, buf, XATTR_NAME_LINK, 0, handle,
428                             BYPASS_CAPA);
429 }
430
431 /**
432  * \retval ve: removed entries
433  */
434 static int lfsck_linkea_entry_unpack(struct lfsck_instance *lfsck,
435                                      struct linkea_data *ldata,
436                                      struct lu_name *cname,
437                                      struct lu_fid *pfid)
438 {
439         struct link_ea_entry    *oldlee;
440         int                      oldlen;
441         int                      removed = 0;
442
443         linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, cname, pfid);
444         oldlee = ldata->ld_lee;
445         oldlen = ldata->ld_reclen;
446         linkea_next_entry(ldata);
447         while (ldata->ld_lee != NULL) {
448                 ldata->ld_reclen = (ldata->ld_lee->lee_reclen[0] << 8) |
449                                    ldata->ld_lee->lee_reclen[1];
450                 if (unlikely(ldata->ld_reclen == oldlen &&
451                              memcmp(ldata->ld_lee, oldlee, oldlen) == 0)) {
452                         linkea_del_buf(ldata, cname);
453                         removed++;
454                 } else {
455                         linkea_next_entry(ldata);
456                 }
457         }
458         ldata->ld_lee = oldlee;
459         ldata->ld_reclen = oldlen;
460         return removed;
461 }
462
463 /**
464  * \retval +ve  repaired
465  * \retval 0    no need to repair
466  * \retval -ve  error cases
467  */
468 static int lfsck_namespace_double_scan_one(const struct lu_env *env,
469                                            struct lfsck_component *com,
470                                            struct dt_object *child, __u8 flags)
471 {
472         struct lfsck_thread_info *info    = lfsck_env_info(env);
473         struct lu_attr           *la      = &info->lti_la;
474         struct lu_name           *cname   = &info->lti_name;
475         struct lu_fid            *pfid    = &info->lti_fid;
476         struct lu_fid            *cfid    = &info->lti_fid2;
477         struct lfsck_instance   *lfsck    = com->lc_lfsck;
478         struct lfsck_bookmark   *bk       = &lfsck->li_bookmark_ram;
479         struct lfsck_namespace  *ns       = com->lc_file_ram;
480         struct linkea_data       ldata    = { 0 };
481         struct thandle          *handle   = NULL;
482         bool                     locked   = false;
483         bool                     update   = false;
484         int                      rc;
485         ENTRY;
486
487         if (com->lc_journal) {
488
489 again:
490                 LASSERT(!locked);
491
492                 update = false;
493                 com->lc_journal = 1;
494                 handle = dt_trans_create(env, lfsck->li_next);
495                 if (IS_ERR(handle))
496                         RETURN(rc = PTR_ERR(handle));
497
498                 rc = dt_declare_xattr_set(env, child,
499                         lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
500                         XATTR_NAME_LINK, 0, handle);
501                 if (rc != 0)
502                         GOTO(stop, rc);
503
504                 rc = dt_trans_start(env, lfsck->li_next, handle);
505                 if (rc != 0)
506                         GOTO(stop, rc);
507
508                 dt_write_lock(env, child, MOR_TGT_CHILD);
509                 locked = true;
510         }
511
512         if (unlikely(lfsck_is_dead_obj(child)))
513                 GOTO(stop, rc = 0);
514
515         rc = dt_attr_get(env, child, la, BYPASS_CAPA);
516         if (rc == 0)
517                 rc = lfsck_links_read(env, child, &ldata);
518         if (rc != 0) {
519                 if ((bk->lb_param & LPF_DRYRUN) &&
520                     (rc == -EINVAL || rc == -ENODATA))
521                         rc = 1;
522
523                 GOTO(stop, rc);
524         }
525
526         linkea_first_entry(&ldata);
527         while (ldata.ld_lee != NULL) {
528                 struct dt_object *parent = NULL;
529
530                 rc = lfsck_linkea_entry_unpack(lfsck, &ldata, cname, pfid);
531                 if (rc > 0)
532                         update = true;
533
534                 if (!fid_is_sane(pfid))
535                         goto shrink;
536
537                 parent = lfsck_object_find(env, lfsck, pfid);
538                 if (parent == NULL)
539                         goto shrink;
540                 else if (IS_ERR(parent))
541                         GOTO(stop, rc = PTR_ERR(parent));
542
543                 if (!dt_object_exists(parent))
544                         goto shrink;
545
546                 /* XXX: Currently, skip remote object, the consistency for
547                  *      remote object will be processed in LFSCK phase III. */
548                 if (dt_object_remote(parent)) {
549                         lfsck_object_put(env, parent);
550                         linkea_next_entry(&ldata);
551                         continue;
552                 }
553
554                 if (unlikely(!dt_try_as_dir(env, parent)))
555                         goto shrink;
556
557                 /* To guarantee the 'name' is terminated with '0'. */
558                 memcpy(info->lti_key, cname->ln_name, cname->ln_namelen);
559                 info->lti_key[cname->ln_namelen] = 0;
560                 cname->ln_name = info->lti_key;
561                 rc = dt_lookup(env, parent, (struct dt_rec *)cfid,
562                                (const struct dt_key *)cname->ln_name,
563                                BYPASS_CAPA);
564                 if (rc != 0 && rc != -ENOENT) {
565                         lfsck_object_put(env, parent);
566                         GOTO(stop, rc);
567                 }
568
569                 if (rc == 0) {
570                         if (lu_fid_eq(cfid, lfsck_dto2fid(child))) {
571                                 lfsck_object_put(env, parent);
572                                 linkea_next_entry(&ldata);
573                                 continue;
574                         }
575
576                         goto shrink;
577                 }
578
579                 /* If there is no name entry in the parent dir and the object
580                  * link count is less than the linkea entries count, then the
581                  * linkea entry should be removed. */
582                 if (ldata.ld_leh->leh_reccount > la->la_nlink)
583                         goto shrink;
584
585                 /* XXX: For the case of there is a linkea entry, but without
586                  *      name entry pointing to the object and its hard links
587                  *      count is not less than the object name entries count,
588                  *      then seems we should add the 'missed' name entry back
589                  *      to namespace, but before LFSCK phase III finished, we
590                  *      do not know whether the object has some inconsistency
591                  *      on other MDTs. So now, do NOT add the name entry back
592                  *      to the namespace, but keep the linkEA entry. LU-2914 */
593                 lfsck_object_put(env, parent);
594                 linkea_next_entry(&ldata);
595                 continue;
596
597 shrink:
598                 if (parent != NULL)
599                         lfsck_object_put(env, parent);
600                 if (bk->lb_param & LPF_DRYRUN)
601                         RETURN(1);
602
603                 CDEBUG(D_LFSCK, "Remove linkEA: "DFID"[%.*s], "DFID"\n",
604                        PFID(lfsck_dto2fid(child)), cname->ln_namelen, cname->ln_name,
605                        PFID(pfid));
606                 linkea_del_buf(&ldata, cname);
607                 update = true;
608         }
609
610         if (update) {
611                 if (!com->lc_journal) {
612                         com->lc_journal = 1;
613                         goto again;
614                 }
615
616                 rc = lfsck_links_write(env, child, &ldata, handle);
617         }
618
619         GOTO(stop, rc);
620
621 stop:
622         if (locked) {
623         /* XXX: For the case linkea entries count does not match the object hard
624          *      links count, we cannot update the later one simply. Before LFSCK
625          *      phase III finished, we cannot know whether there are some remote
626          *      name entries to be repaired or not. LU-2914 */
627                 if (rc == 0 && !lfsck_is_dead_obj(child) &&
628                     ldata.ld_leh != NULL &&
629                     ldata.ld_leh->leh_reccount != la->la_nlink)
630                         CWARN("%s: the object "DFID" linkEA entry count %u "
631                               "may not match its hardlink count %u\n",
632                               lfsck_lfsck2name(lfsck), PFID(cfid),
633                               ldata.ld_leh->leh_reccount, la->la_nlink);
634
635                 dt_write_unlock(env, child);
636         }
637
638         if (handle != NULL)
639                 dt_trans_stop(env, lfsck->li_next, handle);
640
641         if (rc == 0 && update) {
642                 ns->ln_objs_nlink_repaired++;
643                 rc = 1;
644         }
645
646         return rc;
647 }
648
649 /* namespace APIs */
650
651 static int lfsck_namespace_reset(const struct lu_env *env,
652                                  struct lfsck_component *com, bool init)
653 {
654         struct lfsck_instance   *lfsck = com->lc_lfsck;
655         struct lfsck_namespace  *ns    = com->lc_file_ram;
656         struct dt_object        *root;
657         struct dt_object        *dto;
658         int                      rc;
659         ENTRY;
660
661         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
662         if (IS_ERR(root))
663                 RETURN(PTR_ERR(root));
664
665         if (unlikely(!dt_try_as_dir(env, root))) {
666                 lu_object_put(env, &root->do_lu);
667                 RETURN(-ENOTDIR);
668         }
669
670         down_write(&com->lc_sem);
671         if (init) {
672                 memset(ns, 0, sizeof(*ns));
673         } else {
674                 __u32 count = ns->ln_success_count;
675                 __u64 last_time = ns->ln_time_last_complete;
676
677                 memset(ns, 0, sizeof(*ns));
678                 ns->ln_success_count = count;
679                 ns->ln_time_last_complete = last_time;
680         }
681         ns->ln_magic = LFSCK_NAMESPACE_MAGIC;
682         ns->ln_status = LS_INIT;
683
684         rc = local_object_unlink(env, lfsck->li_bottom, root,
685                                  lfsck_namespace_name);
686         if (rc != 0)
687                 GOTO(out, rc);
688
689         lfsck_object_put(env, com->lc_obj);
690         com->lc_obj = NULL;
691         dto = local_index_find_or_create(env, lfsck->li_los, root,
692                                          lfsck_namespace_name,
693                                          S_IFREG | S_IRUGO | S_IWUSR,
694                                          &dt_lfsck_features);
695         if (IS_ERR(dto))
696                 GOTO(out, rc = PTR_ERR(dto));
697
698         com->lc_obj = dto;
699         rc = dto->do_ops->do_index_try(env, dto, &dt_lfsck_features);
700         if (rc != 0)
701                 GOTO(out, rc);
702
703         rc = lfsck_namespace_store(env, com, true);
704
705         GOTO(out, rc);
706
707 out:
708         up_write(&com->lc_sem);
709         lu_object_put(env, &root->do_lu);
710         return rc;
711 }
712
713 static void
714 lfsck_namespace_fail(const struct lu_env *env, struct lfsck_component *com,
715                      bool new_checked)
716 {
717         struct lfsck_namespace *ns = com->lc_file_ram;
718
719         down_write(&com->lc_sem);
720         if (new_checked)
721                 com->lc_new_checked++;
722         ns->ln_items_failed++;
723         if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
724                 lfsck_pos_fill(env, com->lc_lfsck,
725                                &ns->ln_pos_first_inconsistent, false);
726         up_write(&com->lc_sem);
727 }
728
729 static int lfsck_namespace_checkpoint(const struct lu_env *env,
730                                       struct lfsck_component *com, bool init)
731 {
732         struct lfsck_instance   *lfsck = com->lc_lfsck;
733         struct lfsck_namespace  *ns    = com->lc_file_ram;
734         int                      rc;
735
736         if (com->lc_new_checked == 0 && !init)
737                 return 0;
738
739         down_write(&com->lc_sem);
740
741         if (init) {
742                 ns->ln_pos_latest_start = lfsck->li_pos_current;
743         } else {
744                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
745                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
746                                 HALF_SEC - lfsck->li_time_last_checkpoint);
747                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
748                 ns->ln_items_checked += com->lc_new_checked;
749                 com->lc_new_checked = 0;
750         }
751
752         rc = lfsck_namespace_store(env, com, false);
753
754         up_write(&com->lc_sem);
755         return rc;
756 }
757
758 static int lfsck_namespace_prep(const struct lu_env *env,
759                                 struct lfsck_component *com,
760                                 struct lfsck_start_param *lsp)
761 {
762         struct lfsck_instance   *lfsck  = com->lc_lfsck;
763         struct lfsck_namespace  *ns     = com->lc_file_ram;
764         struct lfsck_position   *pos    = &com->lc_pos_start;
765
766         if (ns->ln_status == LS_COMPLETED) {
767                 int rc;
768
769                 rc = lfsck_namespace_reset(env, com, false);
770                 if (rc == 0)
771                         rc = lfsck_set_param(env, lfsck, lsp->lsp_start, true);
772
773                 if (rc != 0)
774                         return rc;
775         }
776
777         down_write(&com->lc_sem);
778
779         ns->ln_time_latest_start = cfs_time_current_sec();
780
781         spin_lock(&lfsck->li_lock);
782         if (ns->ln_flags & LF_SCANNED_ONCE) {
783                 if (!lfsck->li_drop_dryrun ||
784                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
785                         ns->ln_status = LS_SCANNING_PHASE2;
786                         cfs_list_del_init(&com->lc_link);
787                         cfs_list_add_tail(&com->lc_link,
788                                           &lfsck->li_list_double_scan);
789                         if (!cfs_list_empty(&com->lc_link_dir))
790                                 cfs_list_del_init(&com->lc_link_dir);
791                         lfsck_pos_set_zero(pos);
792                 } else {
793                         ns->ln_status = LS_SCANNING_PHASE1;
794                         ns->ln_run_time_phase1 = 0;
795                         ns->ln_run_time_phase2 = 0;
796                         ns->ln_items_checked = 0;
797                         ns->ln_items_repaired = 0;
798                         ns->ln_items_failed = 0;
799                         ns->ln_dirs_checked = 0;
800                         ns->ln_mlinked_checked = 0;
801                         ns->ln_objs_checked_phase2 = 0;
802                         ns->ln_objs_repaired_phase2 = 0;
803                         ns->ln_objs_failed_phase2 = 0;
804                         ns->ln_objs_nlink_repaired = 0;
805                         ns->ln_objs_lost_found = 0;
806                         fid_zero(&ns->ln_fid_latest_scanned_phase2);
807                         if (cfs_list_empty(&com->lc_link_dir))
808                                 cfs_list_add_tail(&com->lc_link_dir,
809                                                   &lfsck->li_list_dir);
810                         *pos = ns->ln_pos_first_inconsistent;
811                 }
812         } else {
813                 ns->ln_status = LS_SCANNING_PHASE1;
814                 if (cfs_list_empty(&com->lc_link_dir))
815                         cfs_list_add_tail(&com->lc_link_dir,
816                                           &lfsck->li_list_dir);
817                 if (!lfsck->li_drop_dryrun ||
818                     lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent)) {
819                         *pos = ns->ln_pos_last_checkpoint;
820                         pos->lp_oit_cookie++;
821                 } else {
822                         *pos = ns->ln_pos_first_inconsistent;
823                 }
824         }
825         spin_unlock(&lfsck->li_lock);
826
827         up_write(&com->lc_sem);
828         return 0;
829 }
830
831 static int lfsck_namespace_exec_oit(const struct lu_env *env,
832                                     struct lfsck_component *com,
833                                     struct dt_object *obj)
834 {
835         down_write(&com->lc_sem);
836         com->lc_new_checked++;
837         if (S_ISDIR(lfsck_object_type(obj)))
838                 ((struct lfsck_namespace *)com->lc_file_ram)->ln_dirs_checked++;
839         up_write(&com->lc_sem);
840         return 0;
841 }
842
843 static int lfsck_namespace_exec_dir(const struct lu_env *env,
844                                     struct lfsck_component *com,
845                                     struct dt_object *obj,
846                                     struct lu_dirent *ent)
847 {
848         struct lfsck_thread_info   *info     = lfsck_env_info(env);
849         struct lu_attr             *la       = &info->lti_la;
850         struct lfsck_instance      *lfsck    = com->lc_lfsck;
851         struct lfsck_bookmark      *bk       = &lfsck->li_bookmark_ram;
852         struct lfsck_namespace     *ns       = com->lc_file_ram;
853         struct linkea_data          ldata    = { 0 };
854         const struct lu_fid        *pfid     = lfsck_dto2fid(lfsck->li_obj_dir);
855         const struct lu_fid        *cfid     = lfsck_dto2fid(obj);
856         const struct lu_name       *cname;
857         struct thandle             *handle   = NULL;
858         bool                        repaired = false;
859         bool                        locked   = false;
860         bool                        remove;
861         bool                        newdata;
862         int                         count    = 0;
863         int                         rc;
864         ENTRY;
865
866         cname = lfsck_name_get_const(env, ent->lde_name, ent->lde_namelen);
867         down_write(&com->lc_sem);
868         com->lc_new_checked++;
869
870         if (ent->lde_attrs & LUDA_UPGRADE) {
871                 ns->ln_flags |= LF_UPGRADE;
872                 ns->ln_dirent_repaired++;
873                 repaired = true;
874         } else if (ent->lde_attrs & LUDA_REPAIR) {
875                 ns->ln_flags |= LF_INCONSISTENT;
876                 ns->ln_dirent_repaired++;
877                 repaired = true;
878         }
879
880         if (ent->lde_name[0] == '.' &&
881             (ent->lde_namelen == 1 ||
882              (ent->lde_namelen == 2 && ent->lde_name[1] == '.') ||
883              fid_seq_is_dot_lustre(fid_seq(&ent->lde_fid))))
884                 GOTO(out, rc = 0);
885
886         if (!(bk->lb_param & LPF_DRYRUN) &&
887             (com->lc_journal || repaired)) {
888
889 again:
890                 LASSERT(!locked);
891
892                 com->lc_journal = 1;
893                 handle = dt_trans_create(env, lfsck->li_next);
894                 if (IS_ERR(handle))
895                         GOTO(out, rc = PTR_ERR(handle));
896
897                 rc = lfsck_declare_namespace_exec_dir(env, obj, handle);
898                 if (rc != 0)
899                         GOTO(stop, rc);
900
901                 rc = dt_trans_start(env, lfsck->li_next, handle);
902                 if (rc != 0)
903                         GOTO(stop, rc);
904
905                 dt_write_lock(env, obj, MOR_TGT_CHILD);
906                 locked = true;
907         }
908
909         rc = lfsck_namespace_check_exist(env, lfsck, obj, ent->lde_name);
910         if (rc != 0)
911                 GOTO(stop, rc);
912
913         rc = lfsck_links_read(env, obj, &ldata);
914         if (rc == 0) {
915                 count = ldata.ld_leh->leh_reccount;
916                 rc = linkea_links_find(&ldata, cname, pfid);
917                 if ((rc == 0) &&
918                     (count == 1 || !S_ISDIR(lfsck_object_type(obj))))
919                         goto record;
920
921                 ns->ln_flags |= LF_INCONSISTENT;
922                 /* For dir, if there are more than one linkea entries, or the
923                  * linkea entry does not match the name entry, then remove all
924                  * and add the correct one. */
925                 if (S_ISDIR(lfsck_object_type(obj))) {
926                         remove = true;
927                         newdata = true;
928                 } else {
929                         remove = false;
930                         newdata = false;
931                 }
932                 goto nodata;
933         } else if (unlikely(rc == -EINVAL)) {
934                 count = 1;
935                 ns->ln_flags |= LF_INCONSISTENT;
936                 /* The magic crashed, we are not sure whether there are more
937                  * corrupt data in the linkea, so remove all linkea entries. */
938                 remove = true;
939                 newdata = true;
940                 goto nodata;
941         } else if (rc == -ENODATA) {
942                 count = 1;
943                 ns->ln_flags |= LF_UPGRADE;
944                 remove = false;
945                 newdata = true;
946
947 nodata:
948                 if (bk->lb_param & LPF_DRYRUN) {
949                         ns->ln_linkea_repaired++;
950                         repaired = true;
951                         goto record;
952                 }
953
954                 if (!com->lc_journal)
955                         goto again;
956
957                 if (remove) {
958                         LASSERT(newdata);
959
960                         rc = dt_xattr_del(env, obj, XATTR_NAME_LINK, handle,
961                                           BYPASS_CAPA);
962                         if (rc != 0)
963                                 GOTO(stop, rc);
964                 }
965
966                 if (newdata) {
967                         rc = linkea_data_new(&ldata,
968                                         &lfsck_env_info(env)->lti_linkea_buf);
969                         if (rc != 0)
970                                 GOTO(stop, rc);
971                 }
972
973                 rc = linkea_add_buf(&ldata, cname, pfid);
974                 if (rc != 0)
975                         GOTO(stop, rc);
976
977                 rc = lfsck_links_write(env, obj, &ldata, handle);
978                 if (rc != 0)
979                         GOTO(stop, rc);
980
981                 count = ldata.ld_leh->leh_reccount;
982                 ns->ln_linkea_repaired++;
983                 repaired = true;
984         } else {
985                 GOTO(stop, rc);
986         }
987
988 record:
989         LASSERT(count > 0);
990
991         rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
992         if (rc != 0)
993                 GOTO(stop, rc);
994
995         if ((count == 1) &&
996             (la->la_nlink == 1 || S_ISDIR(lfsck_object_type(obj))))
997                 /* Usually, it is for single linked object or dir, do nothing.*/
998                 GOTO(stop, rc);
999
1000         /* Following modification will be in another transaction.  */
1001         if (handle != NULL) {
1002                 LASSERT(dt_write_locked(env, obj));
1003
1004                 dt_write_unlock(env, obj);
1005                 locked = false;
1006
1007                 dt_trans_stop(env, lfsck->li_next, handle);
1008                 handle = NULL;
1009         }
1010
1011         ns->ln_mlinked_checked++;
1012         rc = lfsck_namespace_update(env, com, cfid,
1013                         count != la->la_nlink ? LLF_UNMATCH_NLINKS : 0, false);
1014
1015         GOTO(out, rc);
1016
1017 stop:
1018         if (locked)
1019                 dt_write_unlock(env, obj);
1020
1021         if (handle != NULL)
1022                 dt_trans_stop(env, lfsck->li_next, handle);
1023
1024 out:
1025         if (rc < 0) {
1026                 ns->ln_items_failed++;
1027                 if (lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1028                         lfsck_pos_fill(env, lfsck,
1029                                        &ns->ln_pos_first_inconsistent, false);
1030                 if (!(bk->lb_param & LPF_FAILOUT))
1031                         rc = 0;
1032         } else {
1033                 if (repaired) {
1034                         ns->ln_items_repaired++;
1035                         if (bk->lb_param & LPF_DRYRUN &&
1036                             lfsck_pos_is_zero(&ns->ln_pos_first_inconsistent))
1037                                 lfsck_pos_fill(env, lfsck,
1038                                                &ns->ln_pos_first_inconsistent,
1039                                                false);
1040                 } else {
1041                         com->lc_journal = 0;
1042                 }
1043                 rc = 0;
1044         }
1045         up_write(&com->lc_sem);
1046         return rc;
1047 }
1048
1049 static int lfsck_namespace_post(const struct lu_env *env,
1050                                 struct lfsck_component *com,
1051                                 int result, bool init)
1052 {
1053         struct lfsck_instance   *lfsck = com->lc_lfsck;
1054         struct lfsck_namespace  *ns    = com->lc_file_ram;
1055         int                      rc;
1056
1057         down_write(&com->lc_sem);
1058
1059         spin_lock(&lfsck->li_lock);
1060         if (!init)
1061                 ns->ln_pos_last_checkpoint = lfsck->li_pos_current;
1062         if (result > 0) {
1063                 ns->ln_status = LS_SCANNING_PHASE2;
1064                 ns->ln_flags |= LF_SCANNED_ONCE;
1065                 ns->ln_flags &= ~LF_UPGRADE;
1066                 cfs_list_del_init(&com->lc_link);
1067                 cfs_list_del_init(&com->lc_link_dir);
1068                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_double_scan);
1069         } else if (result == 0) {
1070                 ns->ln_status = lfsck->li_status;
1071                 if (ns->ln_status == 0)
1072                         ns->ln_status = LS_STOPPED;
1073                 if (ns->ln_status != LS_PAUSED) {
1074                         cfs_list_del_init(&com->lc_link);
1075                         cfs_list_del_init(&com->lc_link_dir);
1076                         cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1077                 }
1078         } else {
1079                 ns->ln_status = LS_FAILED;
1080                 cfs_list_del_init(&com->lc_link);
1081                 cfs_list_del_init(&com->lc_link_dir);
1082                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1083         }
1084         spin_unlock(&lfsck->li_lock);
1085
1086         if (!init) {
1087                 ns->ln_run_time_phase1 += cfs_duration_sec(cfs_time_current() +
1088                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1089                 ns->ln_time_last_checkpoint = cfs_time_current_sec();
1090                 ns->ln_items_checked += com->lc_new_checked;
1091                 com->lc_new_checked = 0;
1092         }
1093
1094         rc = lfsck_namespace_store(env, com, false);
1095
1096         up_write(&com->lc_sem);
1097         return rc;
1098 }
1099
1100 static int
1101 lfsck_namespace_dump(const struct lu_env *env, struct lfsck_component *com,
1102                      char *buf, int len)
1103 {
1104         struct lfsck_instance   *lfsck = com->lc_lfsck;
1105         struct lfsck_bookmark   *bk    = &lfsck->li_bookmark_ram;
1106         struct lfsck_namespace  *ns    = com->lc_file_ram;
1107         int                      save  = len;
1108         int                      ret   = -ENOSPC;
1109         int                      rc;
1110
1111         down_read(&com->lc_sem);
1112         rc = snprintf(buf, len,
1113                       "name: lfsck_namespace\n"
1114                       "magic: %#x\n"
1115                       "version: %d\n"
1116                       "status: %s\n",
1117                       ns->ln_magic,
1118                       bk->lb_version,
1119                       lfsck_status2names(ns->ln_status));
1120         if (rc <= 0)
1121                 goto out;
1122
1123         buf += rc;
1124         len -= rc;
1125         rc = lfsck_bits_dump(&buf, &len, ns->ln_flags, lfsck_flags_names,
1126                              "flags");
1127         if (rc < 0)
1128                 goto out;
1129
1130         rc = lfsck_bits_dump(&buf, &len, bk->lb_param, lfsck_param_names,
1131                              "param");
1132         if (rc < 0)
1133                 goto out;
1134
1135         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_complete,
1136                              "time_since_last_completed");
1137         if (rc < 0)
1138                 goto out;
1139
1140         rc = lfsck_time_dump(&buf, &len, ns->ln_time_latest_start,
1141                              "time_since_latest_start");
1142         if (rc < 0)
1143                 goto out;
1144
1145         rc = lfsck_time_dump(&buf, &len, ns->ln_time_last_checkpoint,
1146                              "time_since_last_checkpoint");
1147         if (rc < 0)
1148                 goto out;
1149
1150         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_latest_start,
1151                             "latest_start_position");
1152         if (rc < 0)
1153                 goto out;
1154
1155         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_last_checkpoint,
1156                             "last_checkpoint_position");
1157         if (rc < 0)
1158                 goto out;
1159
1160         rc = lfsck_pos_dump(&buf, &len, &ns->ln_pos_first_inconsistent,
1161                             "first_failure_position");
1162         if (rc < 0)
1163                 goto out;
1164
1165         if (ns->ln_status == LS_SCANNING_PHASE1) {
1166                 struct lfsck_position pos;
1167                 const struct dt_it_ops *iops;
1168                 cfs_duration_t duration = cfs_time_current() -
1169                                           lfsck->li_time_last_checkpoint;
1170                 __u64 checked = ns->ln_items_checked + com->lc_new_checked;
1171                 __u64 speed = checked;
1172                 __u64 new_checked = com->lc_new_checked * HZ;
1173                 __u32 rtime = ns->ln_run_time_phase1 +
1174                               cfs_duration_sec(duration + HALF_SEC);
1175
1176                 if (duration != 0)
1177                         do_div(new_checked, duration);
1178                 if (rtime != 0)
1179                         do_div(speed, rtime);
1180                 rc = snprintf(buf, len,
1181                               "checked_phase1: "LPU64"\n"
1182                               "checked_phase2: "LPU64"\n"
1183                               "updated_phase1: "LPU64"\n"
1184                               "updated_phase2: "LPU64"\n"
1185                               "failed_phase1: "LPU64"\n"
1186                               "failed_phase2: "LPU64"\n"
1187                               "dirs: "LPU64"\n"
1188                               "M-linked: "LPU64"\n"
1189                               "dirent_repaired: "LPU64"\n"
1190                               "linkea_repaired: "LPU64"\n"
1191                               "nlinks_repaired: "LPU64"\n"
1192                               "lost_found: "LPU64"\n"
1193                               "success_count: %u\n"
1194                               "run_time_phase1: %u seconds\n"
1195                               "run_time_phase2: %u seconds\n"
1196                               "average_speed_phase1: "LPU64" items/sec\n"
1197                               "average_speed_phase2: N/A\n"
1198                               "real-time_speed_phase1: "LPU64" items/sec\n"
1199                               "real-time_speed_phase2: N/A\n",
1200                               checked,
1201                               ns->ln_objs_checked_phase2,
1202                               ns->ln_items_repaired,
1203                               ns->ln_objs_repaired_phase2,
1204                               ns->ln_items_failed,
1205                               ns->ln_objs_failed_phase2,
1206                               ns->ln_dirs_checked,
1207                               ns->ln_mlinked_checked,
1208                               ns->ln_dirent_repaired,
1209                               ns->ln_linkea_repaired,
1210                               ns->ln_objs_nlink_repaired,
1211                               ns->ln_objs_lost_found,
1212                               ns->ln_success_count,
1213                               rtime,
1214                               ns->ln_run_time_phase2,
1215                               speed,
1216                               new_checked);
1217                 if (rc <= 0)
1218                         goto out;
1219
1220                 buf += rc;
1221                 len -= rc;
1222
1223                 LASSERT(lfsck->li_di_oit != NULL);
1224
1225                 iops = &lfsck->li_obj_oit->do_index_ops->dio_it;
1226
1227                 /* The low layer otable-based iteration position may NOT
1228                  * exactly match the namespace-based directory traversal
1229                  * cookie. Generally, it is not a serious issue. But the
1230                  * caller should NOT make assumption on that. */
1231                 pos.lp_oit_cookie = iops->store(env, lfsck->li_di_oit);
1232                 if (!lfsck->li_current_oit_processed)
1233                         pos.lp_oit_cookie--;
1234
1235                 spin_lock(&lfsck->li_lock);
1236                 if (lfsck->li_di_dir != NULL) {
1237                         pos.lp_dir_cookie = lfsck->li_cookie_dir;
1238                         if (pos.lp_dir_cookie >= MDS_DIR_END_OFF) {
1239                                 fid_zero(&pos.lp_dir_parent);
1240                                 pos.lp_dir_cookie = 0;
1241                         } else {
1242                                 pos.lp_dir_parent =
1243                                         *lfsck_dto2fid(lfsck->li_obj_dir);
1244                         }
1245                 } else {
1246                         fid_zero(&pos.lp_dir_parent);
1247                         pos.lp_dir_cookie = 0;
1248                 }
1249                 spin_unlock(&lfsck->li_lock);
1250                 rc = lfsck_pos_dump(&buf, &len, &pos, "current_position");
1251                 if (rc <= 0)
1252                         goto out;
1253         } else if (ns->ln_status == LS_SCANNING_PHASE2) {
1254                 cfs_duration_t duration = cfs_time_current() -
1255                                           lfsck->li_time_last_checkpoint;
1256                 __u64 checked = ns->ln_objs_checked_phase2 +
1257                                 com->lc_new_checked;
1258                 __u64 speed1 = ns->ln_items_checked;
1259                 __u64 speed2 = checked;
1260                 __u64 new_checked = com->lc_new_checked * HZ;
1261                 __u32 rtime = ns->ln_run_time_phase2 +
1262                               cfs_duration_sec(duration + HALF_SEC);
1263
1264                 if (duration != 0)
1265                         do_div(new_checked, duration);
1266                 if (ns->ln_run_time_phase1 != 0)
1267                         do_div(speed1, ns->ln_run_time_phase1);
1268                 if (rtime != 0)
1269                         do_div(speed2, rtime);
1270                 rc = snprintf(buf, len,
1271                               "checked_phase1: "LPU64"\n"
1272                               "checked_phase2: "LPU64"\n"
1273                               "updated_phase1: "LPU64"\n"
1274                               "updated_phase2: "LPU64"\n"
1275                               "failed_phase1: "LPU64"\n"
1276                               "failed_phase2: "LPU64"\n"
1277                               "dirs: "LPU64"\n"
1278                               "M-linked: "LPU64"\n"
1279                               "dirent_repaired: "LPU64"\n"
1280                               "linkea_repaired: "LPU64"\n"
1281                               "nlinks_repaired: "LPU64"\n"
1282                               "lost_found: "LPU64"\n"
1283                               "success_count: %u\n"
1284                               "run_time_phase1: %u seconds\n"
1285                               "run_time_phase2: %u seconds\n"
1286                               "average_speed_phase1: "LPU64" items/sec\n"
1287                               "average_speed_phase2: "LPU64" objs/sec\n"
1288                               "real-time_speed_phase1: N/A\n"
1289                               "real-time_speed_phase2: "LPU64" objs/sec\n"
1290                               "current_position: "DFID"\n",
1291                               ns->ln_items_checked,
1292                               checked,
1293                               ns->ln_items_repaired,
1294                               ns->ln_objs_repaired_phase2,
1295                               ns->ln_items_failed,
1296                               ns->ln_objs_failed_phase2,
1297                               ns->ln_dirs_checked,
1298                               ns->ln_mlinked_checked,
1299                               ns->ln_dirent_repaired,
1300                               ns->ln_linkea_repaired,
1301                               ns->ln_objs_nlink_repaired,
1302                               ns->ln_objs_lost_found,
1303                               ns->ln_success_count,
1304                               ns->ln_run_time_phase1,
1305                               rtime,
1306                               speed1,
1307                               speed2,
1308                               new_checked,
1309                               PFID(&ns->ln_fid_latest_scanned_phase2));
1310                 if (rc <= 0)
1311                         goto out;
1312
1313                 buf += rc;
1314                 len -= rc;
1315         } else {
1316                 __u64 speed1 = ns->ln_items_checked;
1317                 __u64 speed2 = ns->ln_objs_checked_phase2;
1318
1319                 if (ns->ln_run_time_phase1 != 0)
1320                         do_div(speed1, ns->ln_run_time_phase1);
1321                 if (ns->ln_run_time_phase2 != 0)
1322                         do_div(speed2, ns->ln_run_time_phase2);
1323                 rc = snprintf(buf, len,
1324                               "checked_phase1: "LPU64"\n"
1325                               "checked_phase2: "LPU64"\n"
1326                               "updated_phase1: "LPU64"\n"
1327                               "updated_phase2: "LPU64"\n"
1328                               "failed_phase1: "LPU64"\n"
1329                               "failed_phase2: "LPU64"\n"
1330                               "dirs: "LPU64"\n"
1331                               "M-linked: "LPU64"\n"
1332                               "dirent_repaired: "LPU64"\n"
1333                               "linkea_repaired: "LPU64"\n"
1334                               "nlinks_repaired: "LPU64"\n"
1335                               "lost_found: "LPU64"\n"
1336                               "success_count: %u\n"
1337                               "run_time_phase1: %u seconds\n"
1338                               "run_time_phase2: %u seconds\n"
1339                               "average_speed_phase1: "LPU64" items/sec\n"
1340                               "average_speed_phase2: "LPU64" objs/sec\n"
1341                               "real-time_speed_phase1: N/A\n"
1342                               "real-time_speed_phase2: N/A\n"
1343                               "current_position: N/A\n",
1344                               ns->ln_items_checked,
1345                               ns->ln_objs_checked_phase2,
1346                               ns->ln_items_repaired,
1347                               ns->ln_objs_repaired_phase2,
1348                               ns->ln_items_failed,
1349                               ns->ln_objs_failed_phase2,
1350                               ns->ln_dirs_checked,
1351                               ns->ln_mlinked_checked,
1352                               ns->ln_dirent_repaired,
1353                               ns->ln_linkea_repaired,
1354                               ns->ln_objs_nlink_repaired,
1355                               ns->ln_objs_lost_found,
1356                               ns->ln_success_count,
1357                               ns->ln_run_time_phase1,
1358                               ns->ln_run_time_phase2,
1359                               speed1,
1360                               speed2);
1361                 if (rc <= 0)
1362                         goto out;
1363
1364                 buf += rc;
1365                 len -= rc;
1366         }
1367         ret = save - len;
1368
1369 out:
1370         up_read(&com->lc_sem);
1371         return ret;
1372 }
1373
1374 static int lfsck_namespace_double_scan_main(void *args)
1375 {
1376         struct lfsck_thread_args *lta   = args;
1377         const struct lu_env     *env    = &lta->lta_env;
1378         struct lfsck_component  *com    = lta->lta_com;
1379         struct lfsck_instance   *lfsck  = com->lc_lfsck;
1380         struct ptlrpc_thread    *thread = &lfsck->li_thread;
1381         struct lfsck_bookmark   *bk     = &lfsck->li_bookmark_ram;
1382         struct lfsck_namespace  *ns     = com->lc_file_ram;
1383         struct dt_object        *obj    = com->lc_obj;
1384         const struct dt_it_ops  *iops   = &obj->do_index_ops->dio_it;
1385         struct dt_object        *target;
1386         struct dt_it            *di;
1387         struct dt_key           *key;
1388         struct lu_fid            fid;
1389         int                      rc;
1390         __u8                     flags = 0;
1391         ENTRY;
1392
1393         com->lc_new_checked = 0;
1394         com->lc_new_scanned = 0;
1395         com->lc_time_last_checkpoint = cfs_time_current();
1396         com->lc_time_next_checkpoint = com->lc_time_last_checkpoint +
1397                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1398
1399         di = iops->init(env, obj, 0, BYPASS_CAPA);
1400         if (IS_ERR(di))
1401                 GOTO(out, rc = PTR_ERR(di));
1402
1403         fid_cpu_to_be(&fid, &ns->ln_fid_latest_scanned_phase2);
1404         rc = iops->get(env, di, (const struct dt_key *)&fid);
1405         if (rc < 0)
1406                 GOTO(fini, rc);
1407
1408         /* Skip the start one, which either has been processed or non-exist. */
1409         rc = iops->next(env, di);
1410         if (rc != 0)
1411                 GOTO(put, rc);
1412
1413         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_DOUBLESCAN))
1414                 GOTO(put, rc = 0);
1415
1416         do {
1417                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
1418                     cfs_fail_val > 0) {
1419                         struct l_wait_info lwi;
1420
1421                         lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
1422                                           NULL, NULL);
1423                         l_wait_event(thread->t_ctl_waitq,
1424                                      !thread_is_running(thread),
1425                                      &lwi);
1426                 }
1427
1428                 key = iops->key(env, di);
1429                 fid_be_to_cpu(&fid, (const struct lu_fid *)key);
1430                 target = lfsck_object_find(env, lfsck, &fid);
1431                 down_write(&com->lc_sem);
1432                 if (target == NULL) {
1433                         rc = 0;
1434                         goto checkpoint;
1435                 } else if (IS_ERR(target)) {
1436                         rc = PTR_ERR(target);
1437                         goto checkpoint;
1438                 }
1439
1440                 /* XXX: Currently, skip remote object, the consistency for
1441                  *      remote object will be processed in LFSCK phase III. */
1442                 if (dt_object_exists(target) && !dt_object_remote(target)) {
1443                         rc = iops->rec(env, di, (struct dt_rec *)&flags, 0);
1444                         if (rc == 0)
1445                                 rc = lfsck_namespace_double_scan_one(env, com,
1446                                                                 target, flags);
1447                 }
1448
1449                 lfsck_object_put(env, target);
1450
1451 checkpoint:
1452                 com->lc_new_checked++;
1453                 com->lc_new_scanned++;
1454                 ns->ln_fid_latest_scanned_phase2 = fid;
1455                 if (rc > 0)
1456                         ns->ln_objs_repaired_phase2++;
1457                 else if (rc < 0)
1458                         ns->ln_objs_failed_phase2++;
1459                 up_write(&com->lc_sem);
1460
1461                 if ((rc == 0) || ((rc > 0) && !(bk->lb_param & LPF_DRYRUN))) {
1462                         lfsck_namespace_delete(env, com, &fid);
1463                 } else if (rc < 0) {
1464                         flags |= LLF_REPAIR_FAILED;
1465                         lfsck_namespace_update(env, com, &fid, flags, true);
1466                 }
1467
1468                 if (rc < 0 && bk->lb_param & LPF_FAILOUT)
1469                         GOTO(put, rc);
1470
1471                 if (unlikely(cfs_time_beforeq(com->lc_time_next_checkpoint,
1472                                               cfs_time_current())) &&
1473                     com->lc_new_checked != 0) {
1474                         down_write(&com->lc_sem);
1475                         ns->ln_run_time_phase2 +=
1476                                 cfs_duration_sec(cfs_time_current() +
1477                                 HALF_SEC - com->lc_time_last_checkpoint);
1478                         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1479                         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1480                         com->lc_new_checked = 0;
1481                         rc = lfsck_namespace_store(env, com, false);
1482                         up_write(&com->lc_sem);
1483                         if (rc != 0)
1484                                 GOTO(put, rc);
1485
1486                         com->lc_time_last_checkpoint = cfs_time_current();
1487                         com->lc_time_next_checkpoint =
1488                                 com->lc_time_last_checkpoint +
1489                                 cfs_time_seconds(LFSCK_CHECKPOINT_INTERVAL);
1490                 }
1491
1492                 lfsck_control_speed_by_self(com);
1493                 if (unlikely(!thread_is_running(thread)))
1494                         GOTO(put, rc = 0);
1495
1496                 rc = iops->next(env, di);
1497         } while (rc == 0);
1498
1499         GOTO(put, rc);
1500
1501 put:
1502         iops->put(env, di);
1503
1504 fini:
1505         iops->fini(env, di);
1506
1507 out:
1508         down_write(&com->lc_sem);
1509         ns->ln_run_time_phase2 += cfs_duration_sec(cfs_time_current() +
1510                                 HALF_SEC - lfsck->li_time_last_checkpoint);
1511         ns->ln_time_last_checkpoint = cfs_time_current_sec();
1512         ns->ln_objs_checked_phase2 += com->lc_new_checked;
1513         com->lc_new_checked = 0;
1514
1515         if (rc > 0) {
1516                 com->lc_journal = 0;
1517                 ns->ln_status = LS_COMPLETED;
1518                 if (!(bk->lb_param & LPF_DRYRUN))
1519                         ns->ln_flags &= ~(LF_SCANNED_ONCE | LF_INCONSISTENT);
1520                 ns->ln_time_last_complete = ns->ln_time_last_checkpoint;
1521                 ns->ln_success_count++;
1522         } else if (rc == 0) {
1523                 ns->ln_status = lfsck->li_status;
1524                 if (ns->ln_status == 0)
1525                         ns->ln_status = LS_STOPPED;
1526         } else {
1527                 ns->ln_status = LS_FAILED;
1528         }
1529
1530         rc = lfsck_namespace_store(env, com, false);
1531         up_write(&com->lc_sem);
1532         if (atomic_dec_and_test(&lfsck->li_double_scan_count))
1533                 wake_up_all(&thread->t_ctl_waitq);
1534
1535         lfsck_thread_args_fini(lta);
1536
1537         return rc;
1538 }
1539
1540 static int lfsck_namespace_double_scan(const struct lu_env *env,
1541                                        struct lfsck_component *com)
1542 {
1543         struct lfsck_instance           *lfsck = com->lc_lfsck;
1544         struct lfsck_namespace          *ns    = com->lc_file_ram;
1545         struct lfsck_thread_args        *lta;
1546         long                             rc;
1547         ENTRY;
1548
1549         if (unlikely(ns->ln_status != LS_SCANNING_PHASE2))
1550                 RETURN(0);
1551
1552         lta = lfsck_thread_args_init(lfsck, com, NULL);
1553         if (IS_ERR(lta))
1554                 RETURN(PTR_ERR(lta));
1555
1556         atomic_inc(&lfsck->li_double_scan_count);
1557         rc = PTR_ERR(kthread_run(lfsck_namespace_double_scan_main, lta,
1558                                  "lfsck_namespace"));
1559         if (IS_ERR_VALUE(rc)) {
1560                 CERROR("%s: cannot start LFSCK namespace thread: rc = %ld\n",
1561                        lfsck_lfsck2name(lfsck), rc);
1562                 atomic_dec(&lfsck->li_double_scan_count);
1563                 lfsck_thread_args_fini(lta);
1564         } else {
1565                 rc = 0;
1566         }
1567
1568         RETURN(rc);
1569 }
1570
1571 static int lfsck_namespace_in_notify(const struct lu_env *env,
1572                                      struct lfsck_component *com,
1573                                      struct lfsck_request *lr)
1574 {
1575         return 0;
1576 }
1577
1578 static int lfsck_namespace_query(const struct lu_env *env,
1579                                  struct lfsck_component *com)
1580 {
1581         struct lfsck_namespace *ns = com->lc_file_ram;
1582
1583         return ns->ln_status;
1584 }
1585
1586 static struct lfsck_operations lfsck_namespace_ops = {
1587         .lfsck_reset            = lfsck_namespace_reset,
1588         .lfsck_fail             = lfsck_namespace_fail,
1589         .lfsck_checkpoint       = lfsck_namespace_checkpoint,
1590         .lfsck_prep             = lfsck_namespace_prep,
1591         .lfsck_exec_oit         = lfsck_namespace_exec_oit,
1592         .lfsck_exec_dir         = lfsck_namespace_exec_dir,
1593         .lfsck_post             = lfsck_namespace_post,
1594         .lfsck_dump             = lfsck_namespace_dump,
1595         .lfsck_double_scan      = lfsck_namespace_double_scan,
1596         .lfsck_in_notify        = lfsck_namespace_in_notify,
1597         .lfsck_query            = lfsck_namespace_query,
1598 };
1599
1600 int lfsck_namespace_setup(const struct lu_env *env,
1601                           struct lfsck_instance *lfsck)
1602 {
1603         struct lfsck_component  *com;
1604         struct lfsck_namespace  *ns;
1605         struct dt_object        *root = NULL;
1606         struct dt_object        *obj;
1607         int                      rc;
1608         ENTRY;
1609
1610         LASSERT(lfsck->li_master);
1611
1612         OBD_ALLOC_PTR(com);
1613         if (com == NULL)
1614                 RETURN(-ENOMEM);
1615
1616         CFS_INIT_LIST_HEAD(&com->lc_link);
1617         CFS_INIT_LIST_HEAD(&com->lc_link_dir);
1618         init_rwsem(&com->lc_sem);
1619         atomic_set(&com->lc_ref, 1);
1620         com->lc_lfsck = lfsck;
1621         com->lc_type = LT_NAMESPACE;
1622         com->lc_ops = &lfsck_namespace_ops;
1623         com->lc_file_size = sizeof(struct lfsck_namespace);
1624         OBD_ALLOC(com->lc_file_ram, com->lc_file_size);
1625         if (com->lc_file_ram == NULL)
1626                 GOTO(out, rc = -ENOMEM);
1627
1628         OBD_ALLOC(com->lc_file_disk, com->lc_file_size);
1629         if (com->lc_file_disk == NULL)
1630                 GOTO(out, rc = -ENOMEM);
1631
1632         root = dt_locate(env, lfsck->li_bottom, &lfsck->li_local_root_fid);
1633         if (IS_ERR(root))
1634                 GOTO(out, rc = PTR_ERR(root));
1635
1636         if (unlikely(!dt_try_as_dir(env, root)))
1637                 GOTO(out, rc = -ENOTDIR);
1638
1639         obj = local_index_find_or_create(env, lfsck->li_los, root,
1640                                          lfsck_namespace_name,
1641                                          S_IFREG | S_IRUGO | S_IWUSR,
1642                                          &dt_lfsck_features);
1643         if (IS_ERR(obj))
1644                 GOTO(out, rc = PTR_ERR(obj));
1645
1646         com->lc_obj = obj;
1647         rc = obj->do_ops->do_index_try(env, obj, &dt_lfsck_features);
1648         if (rc != 0)
1649                 GOTO(out, rc);
1650
1651         rc = lfsck_namespace_load(env, com);
1652         if (rc > 0)
1653                 rc = lfsck_namespace_reset(env, com, true);
1654         else if (rc == -ENODATA)
1655                 rc = lfsck_namespace_init(env, com);
1656         if (rc != 0)
1657                 GOTO(out, rc);
1658
1659         ns = com->lc_file_ram;
1660         switch (ns->ln_status) {
1661         case LS_INIT:
1662         case LS_COMPLETED:
1663         case LS_FAILED:
1664         case LS_STOPPED:
1665                 spin_lock(&lfsck->li_lock);
1666                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_idle);
1667                 spin_unlock(&lfsck->li_lock);
1668                 break;
1669         default:
1670                 CERROR("%s: unknown lfsck_namespace status: rc = %u\n",
1671                        lfsck_lfsck2name(lfsck), ns->ln_status);
1672                 /* fall through */
1673         case LS_SCANNING_PHASE1:
1674         case LS_SCANNING_PHASE2:
1675                 /* No need to store the status to disk right now.
1676                  * If the system crashed before the status stored,
1677                  * it will be loaded back when next time. */
1678                 ns->ln_status = LS_CRASHED;
1679                 /* fall through */
1680         case LS_PAUSED:
1681         case LS_CRASHED:
1682                 spin_lock(&lfsck->li_lock);
1683                 cfs_list_add_tail(&com->lc_link, &lfsck->li_list_scan);
1684                 cfs_list_add_tail(&com->lc_link_dir, &lfsck->li_list_dir);
1685                 spin_unlock(&lfsck->li_lock);
1686                 break;
1687         }
1688
1689         GOTO(out, rc = 0);
1690
1691 out:
1692         if (root != NULL && !IS_ERR(root))
1693                 lu_object_put(env, &root->do_lu);
1694         if (rc != 0)
1695                 lfsck_component_cleanup(env, com);
1696         return rc;
1697 }