Whamcloud - gitweb
LU-6142 lustre: use list_first/last_entry() for list heads
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
59 {
60         return it->ooi_prefetched < OTABLE_PREFETCH;
61 }
62
63 /**
64  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
65  *
66  * \retval   1, changed nothing
67  * \retval   0, changed successfully
68  * \retval -ve, on error
69  */
70 int osd_scrub_refresh_mapping(const struct lu_env *env,
71                               struct osd_device *dev,
72                               const struct lu_fid *fid,
73                               uint64_t oid, enum dt_txn_op ops,
74                               bool force, const char *name)
75 {
76         struct osd_thread_info *info = osd_oti_get(env);
77         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
78         char *buf = info->oti_str;
79         dmu_tx_t *tx = NULL;
80         dnode_t *dn = NULL;
81         uint64_t zapid;
82         int rc;
83         ENTRY;
84
85         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
86                 GOTO(log, rc = 0);
87
88         tx = dmu_tx_create(dev->od_os);
89         if (!tx)
90                 GOTO(log, rc = -ENOMEM);
91
92         zapid = osd_get_name_n_idx(env, dev, fid, buf,
93                                    sizeof(info->oti_str), &dn);
94         osd_tx_hold_zap(tx, zapid, dn,
95                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
96         rc = -dmu_tx_assign(tx, TXG_WAIT);
97         if (rc) {
98                 dmu_tx_abort(tx);
99                 GOTO(log, rc);
100         }
101
102         switch (ops) {
103         case DTO_INDEX_UPDATE:
104                 zde->zde_pad = 0;
105                 zde->zde_dnode = oid;
106                 zde->zde_type = 0; /* The type in OI mapping is useless. */
107                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
108                                  zde, tx);
109                 if (unlikely(rc == -ENOENT)) {
110                         /* Some unlink thread may removed the OI mapping. */
111                         rc = 1;
112                 }
113                 break;
114         case DTO_INDEX_INSERT:
115                 zde->zde_pad = 0;
116                 zde->zde_dnode = oid;
117                 zde->zde_type = 0; /* The type in OI mapping is useless. */
118                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
119                                  zde, tx);
120                 if (unlikely(rc == -EEXIST))
121                         rc = 1;
122                 break;
123         case DTO_INDEX_DELETE:
124                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
125                 if (rc == -ENOENT) {
126                         /* It is normal that the unlink thread has removed the
127                          * OI mapping already. */
128                         rc = 1;
129                 }
130                 break;
131         default:
132                 LASSERTF(0, "Unexpected ops %d\n", ops);
133                 rc = -EINVAL;
134                 break;
135         }
136
137         dmu_tx_commit(tx);
138         GOTO(log, rc);
139
140 log:
141         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
142                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
143                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
144
145         return rc;
146 }
147
148 static int
149 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
150                        const struct lu_fid *fid, uint64_t oid, int val)
151 {
152         struct lustre_scrub *scrub = &dev->od_scrub;
153         struct scrub_file *sf = &scrub->os_file;
154         struct osd_inconsistent_item *oii = NULL;
155         nvlist_t *nvbuf = NULL;
156         dnode_t *dn = NULL;
157         uint64_t oid2;
158         int ops = DTO_INDEX_UPDATE;
159         int rc;
160         ENTRY;
161
162         down_write(&scrub->os_rwsem);
163         scrub->os_new_checked++;
164         if (val < 0)
165                 GOTO(out, rc = val);
166
167         if (scrub->os_in_prior)
168                 oii = list_first_entry(&scrub->os_inconsistent_items,
169                                        struct osd_inconsistent_item, oii_list);
170
171         if (oid < sf->sf_pos_latest_start && !oii)
172                 GOTO(out, rc = 0);
173
174         if (oii && oii->oii_insert) {
175                 ops = DTO_INDEX_INSERT;
176                 goto zget;
177         }
178
179         rc = osd_fid_lookup(env, dev, fid, &oid2);
180         if (rc) {
181                 if (rc != -ENOENT)
182                         GOTO(out, rc);
183
184                 ops = DTO_INDEX_INSERT;
185
186 zget:
187                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
188                 if (rc) {
189                         /* Someone removed the object by race. */
190                         if (rc == -ENOENT || rc == -EEXIST)
191                                 rc = 0;
192                         GOTO(out, rc);
193                 }
194
195                 spin_lock(&scrub->os_lock);
196                 scrub->os_full_speed = 1;
197                 spin_unlock(&scrub->os_lock);
198
199                 sf->sf_flags |= SF_INCONSISTENT;
200         } else if (oid == oid2) {
201                 GOTO(out, rc = 0);
202         } else {
203                 struct lustre_mdt_attrs *lma = NULL;
204                 int size;
205
206                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
207                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
208                         goto update;
209                 if (rc)
210                         GOTO(out, rc);
211
212                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
213                                                (uchar_t **)&lma, &size);
214                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
215                         goto update;
216                 if (rc)
217                         GOTO(out, rc);
218
219                 lustre_lma_swab(lma);
220                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
221                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
222                                "two objects: %llu and %llu (in OI)\n",
223                                osd_name(dev), PFID(fid), oid, oid2);
224
225                         GOTO(out, rc = -EEXIST);
226                 }
227
228 update:
229                 spin_lock(&scrub->os_lock);
230                 scrub->os_full_speed = 1;
231                 spin_unlock(&scrub->os_lock);
232                 sf->sf_flags |= SF_INCONSISTENT;
233         }
234
235         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
236         if (!rc) {
237                 if (scrub->os_in_prior)
238                         sf->sf_items_updated_prior++;
239                 else
240                         sf->sf_items_updated++;
241         }
242
243         GOTO(out, rc);
244
245 out:
246         if (dev->od_is_ost) {
247                 sa_handle_t *hdl;
248                 uint64_t nlink, mode;
249
250                 rc = -sa_handle_get(dev->od_os, oid, NULL, SA_HDL_PRIVATE,
251                                     &hdl);
252                 if (rc)
253                         GOTO(cleanup, rc);
254
255                 rc = -sa_lookup(hdl, SA_ZPL_MODE(dev), &mode, sizeof(mode));
256                 if (rc || !S_ISREG(mode)) {
257                         sa_handle_destroy(hdl);
258                         GOTO(cleanup, rc);
259                 }
260
261                 rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
262                 if (rc == 0 && nlink > 1)
263                         scrub->os_has_ml_file = 1;
264
265                 sa_handle_destroy(hdl);
266         }
267
268 cleanup:
269         if (nvbuf)
270                 nvlist_free(nvbuf);
271
272         if (rc < 0) {
273                 sf->sf_items_failed++;
274                 if (sf->sf_pos_first_inconsistent == 0 ||
275                     sf->sf_pos_first_inconsistent > oid)
276                         sf->sf_pos_first_inconsistent = oid;
277         } else {
278                 rc = 0;
279         }
280
281         /* There may be conflict unlink during the OI scrub,
282          * if happend, then remove the new added OI mapping. */
283         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
284                 osd_scrub_refresh_mapping(env, dev, fid, oid,
285                                           DTO_INDEX_DELETE, false, NULL);
286         up_write(&scrub->os_rwsem);
287
288         if (dn)
289                 osd_dnode_rele(dn);
290
291         if (oii) {
292                 spin_lock(&scrub->os_lock);
293                 if (likely(!list_empty(&oii->oii_list)))
294                         list_del(&oii->oii_list);
295                 spin_unlock(&scrub->os_lock);
296                 OBD_FREE_PTR(oii);
297         }
298
299         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
300 }
301
302 /* iteration engine */
303
304 static inline int
305 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
306 {
307         spin_lock(&scrub->os_lock);
308         if (osd_scrub_has_window(it) ||
309             !list_empty(&scrub->os_inconsistent_items) ||
310             it->ooi_waiting || kthread_should_stop())
311                 scrub->os_waiting = 0;
312         else
313                 scrub->os_waiting = 1;
314         spin_unlock(&scrub->os_lock);
315
316         return !scrub->os_waiting;
317 }
318
319 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
320                           struct lu_fid *fid, uint64_t *oid)
321 {
322         struct lustre_scrub *scrub = &dev->od_scrub;
323         struct osd_otable_it *it = dev->od_otable_it;
324         struct lustre_mdt_attrs *lma = NULL;
325         nvlist_t *nvbuf = NULL;
326         int size = 0;
327         int rc = 0;
328         ENTRY;
329
330         if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
331                 wait_var_event_timeout(
332                         scrub,
333                         !list_empty(&scrub->os_inconsistent_items) ||
334                         kthread_should_stop(),
335                         cfs_time_seconds(cfs_fail_val));
336
337                 if (kthread_should_stop())
338                         RETURN(SCRUB_NEXT_EXIT);
339         }
340
341         if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
342                 spin_lock(&scrub->os_lock);
343                 scrub->os_running = 0;
344                 spin_unlock(&scrub->os_lock);
345                 RETURN(SCRUB_NEXT_CRASH);
346         }
347
348         if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
349                 RETURN(SCRUB_NEXT_FATAL);
350
351 again:
352         if (nvbuf) {
353                 nvlist_free(nvbuf);
354                 nvbuf = NULL;
355                 lma = NULL;
356         }
357
358         if (!list_empty(&scrub->os_inconsistent_items)) {
359                 spin_lock(&scrub->os_lock);
360                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
361                         struct osd_inconsistent_item *oii;
362
363                         oii = list_first_entry(&scrub->os_inconsistent_items,
364                                                struct osd_inconsistent_item,
365                                                oii_list);
366                         *fid = oii->oii_cache.oic_fid;
367                         *oid = oii->oii_cache.oic_dnode;
368                         scrub->os_in_prior = 1;
369                         spin_unlock(&scrub->os_lock);
370
371                         GOTO(out, rc = 0);
372                 }
373                 spin_unlock(&scrub->os_lock);
374         }
375
376         if (!scrub->os_full_speed && !osd_scrub_has_window(it))
377                 wait_var_event(scrub, osd_scrub_wakeup(scrub, it));
378
379         if (kthread_should_stop())
380                 GOTO(out, rc = SCRUB_NEXT_EXIT);
381
382         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
383         if (rc)
384                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
385
386         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
387         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
388                 goto again;
389
390         if (rc)
391                 GOTO(out, rc);
392
393         LASSERT(nvbuf != NULL);
394         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
395                                        (uchar_t **)&lma, &size);
396         if (!rc) {
397                 lustre_lma_swab(lma);
398                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
399                            !(lma->lma_incompat & LMAI_AGENT))) {
400                         *fid = lma->lma_self_fid;
401                         *oid = scrub->os_pos_current;
402
403                         GOTO(out, rc = 0);
404                 }
405         }
406
407         if (!scrub->os_full_speed) {
408                 spin_lock(&scrub->os_lock);
409                 it->ooi_prefetched++;
410                 if (it->ooi_waiting) {
411                         it->ooi_waiting = 0;
412                         wake_up_var(scrub);
413                 }
414                 spin_unlock(&scrub->os_lock);
415         }
416
417         goto again;
418
419 out:
420         if (nvbuf)
421                 nvlist_free(nvbuf);
422
423         return rc;
424 }
425
426 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
427                           const struct lu_fid *fid, uint64_t oid, int rc)
428 {
429         struct lustre_scrub *scrub = &dev->od_scrub;
430         struct osd_otable_it *it = dev->od_otable_it;
431
432         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
433         if (!scrub->os_in_prior) {
434                 if (!scrub->os_full_speed) {
435                         spin_lock(&scrub->os_lock);
436                         it->ooi_prefetched++;
437                         if (it->ooi_waiting) {
438                                 it->ooi_waiting = 0;
439                                 wake_up_var(scrub);
440                         }
441                         spin_unlock(&scrub->os_lock);
442                 }
443         } else {
444                 spin_lock(&scrub->os_lock);
445                 scrub->os_in_prior = 0;
446                 spin_unlock(&scrub->os_lock);
447         }
448
449         if (rc)
450                 return rc;
451
452         rc = scrub_checkpoint(env, scrub);
453         if (rc) {
454                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
455                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
456                 /* Continue, as long as the scrub itself can go ahead. */
457         }
458
459         return 0;
460 }
461
462 static int osd_scan_ml_file_main(const struct lu_env *env,
463                                  struct osd_device *dev);
464
465 static int osd_scrub_main(void *args)
466 {
467         struct lu_env env;
468         struct osd_device *dev = (struct osd_device *)args;
469         struct lustre_scrub *scrub = &dev->od_scrub;
470         struct lu_fid *fid;
471         uint64_t oid;
472         int rc = 0, ret;
473         ENTRY;
474
475         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
476         if (rc) {
477                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
478                        scrub->os_name, rc);
479                 GOTO(noenv, rc);
480         }
481
482         rc = scrub_thread_prep(&env, scrub, dev->od_uuid, 1);
483         if (rc) {
484                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
485                        scrub->os_name, rc);
486                 GOTO(out, rc);
487         }
488
489         if (!scrub->os_full_speed) {
490                 struct osd_otable_it *it = dev->od_otable_it;
491
492                 wait_var_event(scrub,
493                                it->ooi_user_ready ||
494                                kthread_should_stop());
495
496                 if (kthread_should_stop())
497                         GOTO(post, rc = 0);
498
499                 scrub->os_pos_current = it->ooi_pos;
500         }
501
502         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
503                scrub->os_name, scrub->os_start_flags,
504                scrub->os_pos_current);
505
506         fid = &osd_oti_get(&env)->oti_fid;
507         while (!rc && !kthread_should_stop()) {
508                 rc = osd_scrub_next(&env, dev, fid, &oid);
509                 switch (rc) {
510                 case SCRUB_NEXT_EXIT:
511                         GOTO(post, rc = 0);
512                 case SCRUB_NEXT_CRASH:
513                         spin_lock(&scrub->os_lock);
514                         scrub->os_running = 0;
515                         spin_unlock(&scrub->os_lock);
516                         GOTO(out, rc = -EINVAL);
517                 case SCRUB_NEXT_FATAL:
518                         GOTO(post, rc = -EINVAL);
519                 case SCRUB_NEXT_BREAK:
520                         GOTO(post, rc = 1);
521                 }
522
523                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
524         }
525
526         GOTO(post, rc);
527
528 post:
529         if (scrub->os_has_ml_file) {
530                 ret = osd_scan_ml_file_main(&env, dev);
531                 if (ret != 0)
532                         rc = ret;
533         }
534
535         rc = scrub_thread_post(&env, &dev->od_scrub, rc);
536         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
537                scrub->os_name, scrub->os_pos_current, rc);
538
539 out:
540         while (!list_empty(&scrub->os_inconsistent_items)) {
541                 struct osd_inconsistent_item *oii;
542
543                 oii = list_first_entry(&scrub->os_inconsistent_items,
544                                        struct osd_inconsistent_item, oii_list);
545                 list_del_init(&oii->oii_list);
546                 OBD_FREE_PTR(oii);
547         }
548
549         lu_env_fini(&env);
550
551 noenv:
552         spin_lock(&scrub->os_lock);
553         scrub->os_running = 0;
554         spin_unlock(&scrub->os_lock);
555         if (xchg(&scrub->os_task, NULL) == NULL)
556                 /* scrub_stop is waiting, we need to synchronize */
557                 wait_var_event(scrub, kthread_should_stop());
558         wake_up_var(scrub);
559         return rc;
560 }
561
562 /* initial OI scrub */
563
564 struct osd_lf_map;
565
566 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
567                                const char *, uint64_t, uint64_t,
568                                enum osd_lf_flags, bool);
569 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
570                              const char *, uint64_t, uint64_t,
571                              enum osd_lf_flags, bool);
572 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
573                           const char *, uint64_t, uint64_t,
574                           enum osd_lf_flags, bool);
575
576 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
577                           uint64_t, handle_dirent_t, enum osd_lf_flags);
578 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
579                               uint64_t, handle_dirent_t, enum osd_lf_flags);
580 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
581                            uint64_t, handle_dirent_t, enum osd_lf_flags);
582
583 struct osd_lf_map {
584         char                    *olm_name;
585         struct lu_fid            olm_fid;
586         enum osd_lf_flags        olm_flags;
587         scan_dir_t               olm_scan_dir;
588         handle_dirent_t          olm_handle_dirent;
589 };
590
591 /* Add the new introduced local files in the list in the future. */
592 static const struct osd_lf_map osd_lf_maps[] = {
593         /* CONFIGS */
594         {
595                 .olm_name               = MOUNT_CONFIGS_DIR,
596                 .olm_fid                = {
597                         .f_seq  = FID_SEQ_LOCAL_FILE,
598                         .f_oid  = MGS_CONFIGS_OID,
599                 },
600                 .olm_flags              = OLF_SCAN_SUBITEMS,
601                 .olm_scan_dir           = osd_ios_general_sd,
602                 .olm_handle_dirent      = osd_ios_varfid_hd,
603         },
604
605         /* NIDTBL_VERSIONS */
606         {
607                 .olm_name               = MGS_NIDTBL_DIR,
608                 .olm_flags              = OLF_SCAN_SUBITEMS,
609                 .olm_scan_dir           = osd_ios_general_sd,
610                 .olm_handle_dirent      = osd_ios_varfid_hd,
611         },
612
613         /* PENDING */
614         {
615                 .olm_name               = MDT_ORPHAN_DIR,
616         },
617
618         /* ROOT */
619         {
620                 .olm_name               = "ROOT",
621                 .olm_fid                = {
622                         .f_seq  = FID_SEQ_ROOT,
623                         .f_oid  = FID_OID_ROOT,
624                 },
625                 .olm_flags              = OLF_SCAN_SUBITEMS,
626                 .olm_scan_dir           = osd_ios_ROOT_sd,
627         },
628
629         /* fld */
630         {
631                 .olm_name               = "fld",
632                 .olm_fid                = {
633                         .f_seq  = FID_SEQ_LOCAL_FILE,
634                         .f_oid  = FLD_INDEX_OID,
635                 },
636         },
637
638         /* changelog_catalog */
639         {
640                 .olm_name               = CHANGELOG_CATALOG,
641         },
642
643         /* changelog_users */
644         {
645                 .olm_name               = CHANGELOG_USERS,
646         },
647
648         /* quota_master */
649         {
650                 .olm_name               = QMT_DIR,
651                 .olm_flags              = OLF_SCAN_SUBITEMS,
652                 .olm_scan_dir           = osd_ios_general_sd,
653                 .olm_handle_dirent      = osd_ios_varfid_hd,
654         },
655
656         /* quota_slave */
657         {
658                 .olm_name               = QSD_DIR,
659                 .olm_flags              = OLF_SCAN_SUBITEMS,
660                 .olm_scan_dir           = osd_ios_general_sd,
661                 .olm_handle_dirent      = osd_ios_varfid_hd,
662         },
663
664         /* LFSCK */
665         {
666                 .olm_name               = LFSCK_DIR,
667                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
668                 .olm_scan_dir           = osd_ios_general_sd,
669                 .olm_handle_dirent      = osd_ios_varfid_hd,
670         },
671
672         /* lfsck_bookmark */
673         {
674                 .olm_name               = LFSCK_BOOKMARK,
675         },
676
677         /* lfsck_layout */
678         {
679                 .olm_name               = LFSCK_LAYOUT,
680         },
681
682         /* lfsck_namespace */
683         {
684                 .olm_name               = LFSCK_NAMESPACE,
685         },
686
687         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
688          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
689          * for more details. */
690
691         /* update_log */
692         {
693                 .olm_name               = "update_log",
694                 .olm_fid                = {
695                         .f_seq  = FID_SEQ_UPDATE_LOG,
696                 },
697                 .olm_flags              = OLF_IDX_IN_FID,
698         },
699
700         /* update_log_dir */
701         {
702                 .olm_name               = "update_log_dir",
703                 .olm_fid        = {
704                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
705                 },
706                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
707                 .olm_scan_dir           = osd_ios_general_sd,
708                 .olm_handle_dirent      = osd_ios_uld_hd,
709         },
710
711         /* hsm_actions */
712         {
713                 .olm_name               = HSM_ACTIONS,
714         },
715
716         /* nodemap */
717         {
718                 .olm_name               = LUSTRE_NODEMAP_NAME,
719         },
720
721         /* index_backup */
722         {
723                 .olm_name               = INDEX_BACKUP_DIR,
724                 .olm_fid                = {
725                         .f_seq  = FID_SEQ_LOCAL_FILE,
726                         .f_oid  = INDEX_BACKUP_OID,
727                 },
728                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
729                 .olm_scan_dir           = osd_ios_general_sd,
730                 .olm_handle_dirent      = osd_ios_varfid_hd,
731         },
732
733         {
734                 .olm_name               = NULL
735         }
736 };
737
738 /* Add the new introduced files under .lustre/ in the list in the future. */
739 static const struct osd_lf_map osd_dl_maps[] = {
740         /* .lustre/fid */
741         {
742                 .olm_name               = "fid",
743                 .olm_fid                = {
744                         .f_seq  = FID_SEQ_DOT_LUSTRE,
745                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
746                 },
747         },
748
749         /* .lustre/lost+found */
750         {
751                 .olm_name               = "lost+found",
752                 .olm_fid                = {
753                         .f_seq  = FID_SEQ_DOT_LUSTRE,
754                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
755                 },
756         },
757
758         {
759                 .olm_name               = NULL
760         }
761 };
762
763 struct osd_ios_item {
764         struct list_head        oii_list;
765         uint64_t                oii_parent;
766         enum osd_lf_flags       oii_flags;
767         scan_dir_t              oii_scan_dir;
768         handle_dirent_t         oii_handle_dirent;
769 };
770
771 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
772                             enum osd_lf_flags flags, scan_dir_t scan_dir,
773                             handle_dirent_t handle_dirent)
774 {
775         struct osd_ios_item *item;
776
777         OBD_ALLOC_PTR(item);
778         if (!item) {
779                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
780                       osd_name(dev), parent);
781                 return -ENOMEM;
782         }
783
784         INIT_LIST_HEAD(&item->oii_list);
785         item->oii_parent = parent;
786         item->oii_flags = flags;
787         item->oii_scan_dir = scan_dir;
788         item->oii_handle_dirent = handle_dirent;
789         list_add_tail(&item->oii_list, &dev->od_ios_list);
790
791         return 0;
792 }
793
794 static bool osd_index_need_recreate(const struct lu_env *env,
795                                     struct osd_device *dev, uint64_t oid)
796 {
797         struct osd_thread_info *info = osd_oti_get(env);
798         zap_attribute_t *za = &info->oti_za2;
799         zap_cursor_t *zc = &info->oti_zc2;
800         int rc;
801         ENTRY;
802
803         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
804         rc = -zap_cursor_retrieve(zc, za);
805         zap_cursor_fini(zc);
806         if (rc && rc != -ENOENT)
807                 RETURN(true);
808
809         RETURN(false);
810 }
811
812 static void osd_ios_index_register(const struct lu_env *env,
813                                    struct osd_device *osd,
814                                    const struct lu_fid *fid, uint64_t oid)
815 {
816         struct osd_thread_info *info = osd_oti_get(env);
817         zap_attribute_t *za = &info->oti_za2;
818         zap_cursor_t *zc = &info->oti_zc2;
819         struct zap_leaf_entry *le;
820         dnode_t *dn = NULL;
821         sa_handle_t *hdl;
822         __u64 mode = 0;
823         __u32 keysize = 0;
824         __u32 recsize = 0;
825         int rc;
826         ENTRY;
827
828         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
829         if (rc == -EEXIST || rc == -ENOENT)
830                 RETURN_EXIT;
831
832         if (rc < 0)
833                 GOTO(log, rc);
834
835         if (!osd_object_is_zap(dn))
836                 GOTO(log, rc = 1);
837
838         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
839         if (rc)
840                 GOTO(log, rc);
841
842         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
843         sa_handle_destroy(hdl);
844         if (rc)
845                 GOTO(log, rc);
846
847         if (!S_ISREG(mode))
848                 GOTO(log, rc = 1);
849
850         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
851         rc = -zap_cursor_retrieve(zc, za);
852         if (rc)
853                 /* Skip empty index object */
854                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
855
856         if (zc->zc_zap->zap_ismicro ||
857             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
858                 GOTO(fini, rc = 1);
859
860         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
861         keysize = le->le_name_numints * 8;
862         recsize = za->za_integer_length * za->za_num_integers;
863         if (likely(keysize && recsize))
864                 rc = osd_index_register(osd, fid, keysize, recsize);
865
866         GOTO(fini, rc);
867
868 fini:
869         zap_cursor_fini(zc);
870
871 log:
872         if (dn)
873                 osd_dnode_rele(dn);
874         if (rc < 0)
875                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
876                       osd_name(osd), PFID(fid), keysize, recsize, rc);
877         else if (!rc)
878                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
879                        osd_name(osd), PFID(fid), keysize, recsize);
880 }
881
882 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
883                               struct lustre_index_restore_unit *liru, void *buf,
884                               int bufsize)
885 {
886         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
887         struct lu_fid *tgt_fid = &liru->liru_cfid;
888         struct lu_fid bak_fid;
889         int rc;
890         ENTRY;
891
892         lustre_fid2lbx(buf, tgt_fid, bufsize);
893         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
894                          sizeof(*zde) / 8, (void *)zde);
895         if (rc)
896                 GOTO(log, rc);
897
898         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
899         if (rc)
900                 GOTO(log, rc);
901
902         /* The OI mapping for index may be invalid, since it will be
903          * re-created, not update the OI mapping, just cache it in RAM. */
904         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
905                                             liru->liru_clid);
906         if (!rc)
907                 rc = lustre_index_restore(env, &dev->od_dt_dev,
908                                 &liru->liru_pfid, tgt_fid, &bak_fid,
909                                 liru->liru_name, &dev->od_index_backup_list,
910                                 &dev->od_lock, buf, bufsize);
911         GOTO(log, rc);
912
913 log:
914         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
915                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
916 }
917
918 /**
919  * verify FID-in-LMA and OI entry for one object
920  *
921  * ios: Initial OI Scrub.
922  */
923 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
924                             const struct lu_fid *fid, uint64_t parent,
925                             uint64_t oid, const char *name,
926                             enum osd_lf_flags flags)
927 {
928         struct lustre_scrub *scrub = &dev->od_scrub;
929         struct scrub_file *sf = &scrub->os_file;
930         struct lustre_mdt_attrs *lma = NULL;
931         nvlist_t *nvbuf = NULL;
932         struct lu_fid tfid;
933         uint64_t oid2 = 0;
934         __u64 flag = 0;
935         int size = 0;
936         int op = 0;
937         int rc;
938         ENTRY;
939
940         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
941         if (unlikely(rc == -ENOENT || rc == -EEXIST))
942                 RETURN(0);
943
944         if (rc && rc != -ENODATA) {
945                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
946                       "rc = %d\n", osd_name(dev), oid, rc);
947
948                 RETURN(rc);
949         }
950
951         if (!rc) {
952                 LASSERT(nvbuf != NULL);
953                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
954                                                (uchar_t **)&lma, &size);
955                 if (rc || size == 0) {
956                         LASSERT(lma == NULL);
957                         rc = -ENODATA;
958                 } else {
959                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
960                         lustre_lma_swab(lma);
961                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
962                                 nvlist_free(nvbuf);
963                                 RETURN(0);
964                         }
965
966                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
967                             osd_index_need_recreate(env, dev, oid)) {
968                                 if (parent == dev->od_root) {
969                                         lu_local_obj_fid(&tfid,
970                                                          OSD_FS_ROOT_OID);
971                                 } else {
972                                         rc = osd_get_fid_by_oid(env, dev,
973                                                                 parent, &tfid);
974                                         if (rc) {
975                                                 nvlist_free(nvbuf);
976                                                 RETURN(rc);
977                                         }
978                                 }
979
980                                 rc = lustre_liru_new(
981                                                 &dev->od_index_restore_list,
982                                                 &tfid, &lma->lma_self_fid, oid,
983                                                 name, strlen(name));
984                                 nvlist_free(nvbuf);
985                                 RETURN(rc);
986                         }
987
988                         tfid = lma->lma_self_fid;
989                         if (!(flags & OLF_NOT_BACKUP))
990                                 osd_ios_index_register(env, dev, &tfid, oid);
991                 }
992                 nvlist_free(nvbuf);
993         }
994
995         if (rc == -ENODATA) {
996                 if (!fid) {
997                         /* Skip the object without FID-in-LMA */
998                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
999                                osd_name(dev), oid);
1000
1001                         RETURN(0);
1002                 }
1003
1004                 LASSERT(!fid_is_zero(fid));
1005
1006                 tfid = *fid;
1007                 if (flags & OLF_IDX_IN_FID) {
1008                         LASSERT(dev->od_index >= 0);
1009
1010                         tfid.f_oid = dev->od_index;
1011                 }
1012         }
1013
1014         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1015         if (rc) {
1016                 if (rc != -ENOENT) {
1017                         CWARN("%s: initial OI scrub failed to lookup fid for "
1018                               DFID"=>%llu: rc = %d\n",
1019                               osd_name(dev), PFID(&tfid), oid, rc);
1020
1021                         RETURN(rc);
1022                 }
1023
1024                 flag = SF_RECREATED;
1025                 op = DTO_INDEX_INSERT;
1026         } else {
1027                 if (oid == oid2)
1028                         RETURN(0);
1029
1030                 flag = SF_INCONSISTENT;
1031                 op = DTO_INDEX_UPDATE;
1032         }
1033
1034         if (!(sf->sf_flags & flag)) {
1035                 scrub_file_reset(scrub, dev->od_uuid, flag);
1036                 rc = scrub_file_store(env, scrub);
1037                 if (rc)
1038                         RETURN(rc);
1039         }
1040
1041         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1042
1043         RETURN(rc > 0 ? 0 : rc);
1044 }
1045
1046 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1047                              const char *name, uint64_t parent, uint64_t oid,
1048                              enum osd_lf_flags flags, bool is_dir)
1049 {
1050         int rc;
1051         ENTRY;
1052
1053         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1054         if (!rc && is_dir)
1055                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1056                                       osd_ios_varfid_hd);
1057
1058         RETURN(rc);
1059 }
1060
1061 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1062                           const char *name, uint64_t parent, uint64_t oid,
1063                           enum osd_lf_flags flags, bool is_dir)
1064 {
1065         struct lu_fid tfid;
1066         int rc;
1067         ENTRY;
1068
1069         /* skip any non-DFID format name */
1070         if (name[0] != '[')
1071                 RETURN(0);
1072
1073         /* skip the start '[' */
1074         sscanf(&name[1], SFID, RFID(&tfid));
1075         if (fid_is_sane(&tfid))
1076                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1077         else
1078                 rc = -EIO;
1079
1080         RETURN(rc);
1081 }
1082
1083 /*
1084  * General scanner for the directories execpt /ROOT during initial OI scrub.
1085  * It scans the name entries under the given directory one by one. For each
1086  * entry, verifies its OI mapping via the given @handle_dirent.
1087  */
1088 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1089                               uint64_t parent, handle_dirent_t handle_dirent,
1090                               enum osd_lf_flags flags)
1091 {
1092         struct osd_thread_info *info = osd_oti_get(env);
1093         struct luz_direntry *zde = &info->oti_zde;
1094         zap_attribute_t *za = &info->oti_za;
1095         zap_cursor_t *zc = &info->oti_zc;
1096         int rc;
1097         ENTRY;
1098
1099         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1100         rc = -zap_cursor_retrieve(zc, za);
1101         if (rc == -ENOENT)
1102                 zap_cursor_advance(zc);
1103         else if (rc)
1104                 GOTO(log, rc);
1105
1106         while (1) {
1107                 rc = -zap_cursor_retrieve(zc, za);
1108                 if (rc)
1109                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1110
1111                 /* skip the entry started with '.' */
1112                 if (likely(za->za_name[0] != '.')) {
1113                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1114                                         za->za_integer_length,
1115                                         sizeof(*zde) / za->za_integer_length,
1116                                         (void *)zde);
1117                         if (rc) {
1118                                 CWARN("%s: initial OI scrub failed to lookup "
1119                                       "%s under %llu: rc = %d\n",
1120                                       osd_name(dev), za->za_name, parent, rc);
1121                                 continue;
1122                         }
1123
1124                         rc = handle_dirent(env, dev, za->za_name, parent,
1125                                         zde->lzd_reg.zde_dnode, flags,
1126                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1127                                         true : false);
1128                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1129                                "%llu: rc = %d\n",
1130                                osd_name(dev), za->za_name, parent, rc);
1131                 }
1132
1133                 zap_cursor_advance(zc);
1134         }
1135
1136 log:
1137         if (rc)
1138                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1139                       "rc = %d\n", osd_name(dev), parent, rc);
1140         zap_cursor_fini(zc);
1141
1142         return rc;
1143 }
1144
1145 /*
1146  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1147  * be scanned during the initial OI scrub, instead, only the .lustre and the
1148  * sub-items under .lustre will be handled.
1149  */
1150 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1151                            uint64_t parent, handle_dirent_t handle_dirent,
1152                            enum osd_lf_flags flags)
1153 {
1154         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1155         const struct osd_lf_map *map;
1156         uint64_t oid;
1157         int rc;
1158         int rc1 = 0;
1159         ENTRY;
1160
1161         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1162                             sizeof(*zde) / 8, (void *)zde);
1163         if (rc == -ENOENT) {
1164                 /* The .lustre directory is lost. That is not fatal. It can
1165                  * be re-created in the subsequent MDT start processing. */
1166                 RETURN(0);
1167         }
1168
1169         if (rc) {
1170                 CWARN("%s: initial OI scrub failed to find .lustre: "
1171                       "rc = %d\n", osd_name(dev), rc);
1172
1173                 RETURN(rc);
1174         }
1175
1176         oid = zde->lzd_reg.zde_dnode;
1177         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1178                               dot_lustre_name, 0);
1179         if (rc)
1180                 RETURN(rc);
1181
1182         for (map = osd_dl_maps; map->olm_name; map++) {
1183                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1184                                     sizeof(*zde) / 8, (void *)zde);
1185                 if (rc) {
1186                         if (rc != -ENOENT)
1187                                 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1188                                       osd_name(dev), map->olm_name, rc);
1189                         else if (!fid_is_zero(&map->olm_fid))
1190                                 /* Try to remove the stale OI mapping. */
1191                                 osd_scrub_refresh_mapping(env, dev,
1192                                                 &map->olm_fid, 0,
1193                                                 DTO_INDEX_DELETE, true,
1194                                                 map->olm_name);
1195                         continue;
1196                 }
1197
1198                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1199                                       zde->lzd_reg.zde_dnode, map->olm_name,
1200                                       map->olm_flags);
1201                 if (rc)
1202                         rc1 = rc;
1203         }
1204
1205         RETURN(rc1);
1206 }
1207
1208 static void osd_initial_OI_scrub(const struct lu_env *env,
1209                                  struct osd_device *dev)
1210 {
1211         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1212         const struct osd_lf_map *map;
1213         int rc;
1214         ENTRY;
1215
1216         for (map = osd_lf_maps; map->olm_name; map++) {
1217                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1218                                     sizeof(*zde) / 8, (void *)zde);
1219                 if (rc) {
1220                         if (rc != -ENOENT)
1221                                 CWARN("%s: initial OI scrub failed "
1222                                       "to find the entry %s: rc = %d\n",
1223                                       osd_name(dev), map->olm_name, rc);
1224                         else if (!fid_is_zero(&map->olm_fid))
1225                                 /* Try to remove the stale OI mapping. */
1226                                 osd_scrub_refresh_mapping(env, dev,
1227                                                 &map->olm_fid, 0,
1228                                                 DTO_INDEX_DELETE, true,
1229                                                 map->olm_name);
1230                         continue;
1231                 }
1232
1233                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1234                                       zde->lzd_reg.zde_dnode, map->olm_name,
1235                                       map->olm_flags);
1236                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1237                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1238                                          map->olm_flags, map->olm_scan_dir,
1239                                          map->olm_handle_dirent);
1240         }
1241
1242         while (!list_empty(&dev->od_ios_list)) {
1243                 struct osd_ios_item *item;
1244
1245                 item = list_first_entry(&dev->od_ios_list,
1246                                         struct osd_ios_item, oii_list);
1247                 list_del_init(&item->oii_list);
1248                 item->oii_scan_dir(env, dev, item->oii_parent,
1249                                    item->oii_handle_dirent, item->oii_flags);
1250                 OBD_FREE_PTR(item);
1251         }
1252
1253         if (!list_empty(&dev->od_index_restore_list)) {
1254                 char *buf;
1255
1256                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1257                 if (!buf)
1258                         CERROR("%s: not enough RAM for rebuild index\n",
1259                                osd_name(dev));
1260
1261                 while (!list_empty(&dev->od_index_restore_list)) {
1262                         struct lustre_index_restore_unit *liru;
1263
1264                         liru = list_first_entry(&dev->od_index_restore_list,
1265                                                 struct lustre_index_restore_unit,
1266                                                 liru_link);
1267                         list_del(&liru->liru_link);
1268                         if (buf)
1269                                 osd_index_restore(env, dev, liru, buf,
1270                                                   INDEX_BACKUP_BUFSIZE);
1271                         OBD_FREE(liru, liru->liru_len);
1272                 }
1273
1274                 if (buf)
1275                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1276         }
1277
1278         EXIT;
1279 }
1280
1281 /* OI scrub start/stop */
1282
1283 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1284                     __u32 flags)
1285 {
1286         int rc;
1287         ENTRY;
1288
1289         if (dev->od_dt_dev.dd_rdonly)
1290                 RETURN(-EROFS);
1291
1292         /* od_otable_sem: prevent concurrent start/stop */
1293         down(&dev->od_otable_sem);
1294         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1295         up(&dev->od_otable_sem);
1296
1297         RETURN(rc == -EALREADY ? 0 : rc);
1298 }
1299
1300 void osd_scrub_stop(struct osd_device *dev)
1301 {
1302         struct lustre_scrub *scrub = &dev->od_scrub;
1303         ENTRY;
1304
1305         /* od_otable_sem: prevent concurrent start/stop */
1306         down(&dev->od_otable_sem);
1307         spin_lock(&scrub->os_lock);
1308         scrub->os_paused = 1;
1309         spin_unlock(&scrub->os_lock);
1310         scrub_stop(scrub);
1311         up(&dev->od_otable_sem);
1312
1313         EXIT;
1314 }
1315
1316 /* OI scrub setup/cleanup */
1317
1318 static const char osd_scrub_name[] = "OI_scrub";
1319
1320 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev,
1321                     time64_t interval, bool resetoi)
1322 {
1323         struct osd_thread_info *info = osd_oti_get(env);
1324         struct lustre_scrub *scrub = &dev->od_scrub;
1325         struct scrub_file *sf = &scrub->os_file;
1326         struct lu_fid *fid = &info->oti_fid;
1327         struct dt_object *obj;
1328         uint64_t oid;
1329         int rc = 0;
1330         bool dirty = false;
1331         ENTRY;
1332
1333         memcpy(dev->od_uuid.b,
1334                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1335                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1336         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1337         init_rwsem(&scrub->os_rwsem);
1338         spin_lock_init(&scrub->os_lock);
1339         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1340         scrub->os_name = osd_name(dev);
1341         scrub->os_auto_scrub_interval = interval;
1342
1343         /* 'What the @fid is' is not imporatant, because the object
1344          * has no OI mapping, and only is visible inside the OSD.*/
1345         fid->f_seq = FID_SEQ_IGIF_MAX;
1346         if (dev->od_is_ost)
1347                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1348         else
1349                 fid->f_oid = dev->od_index + 1;
1350         fid->f_ver = 0;
1351         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1352                                     osd_scrub_name, &oid, fid, false);
1353         if (rc)
1354                 RETURN(rc);
1355
1356         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1357         if (rc)
1358                 RETURN(rc);
1359
1360         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1361         if (IS_ERR_OR_NULL(obj))
1362                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1363
1364         obj->do_body_ops = &osd_body_scrub_ops;
1365         scrub->os_obj = obj;
1366         rc = scrub_file_load(env, scrub);
1367         if (rc == -ENOENT || rc == -EFAULT) {
1368                 scrub_file_init(scrub, dev->od_uuid);
1369                 dirty = true;
1370         } else if (rc < 0) {
1371                 GOTO(cleanup_obj, rc);
1372         } else {
1373                 if (!guid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1374                         CDEBUG(D_LFSCK,
1375                                "%s: UUID has been changed from %pU to %pU\n",
1376                                osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1377                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1378                         dirty = true;
1379                 } else if (sf->sf_status == SS_SCANNING) {
1380                         sf->sf_status = SS_CRASHED;
1381                         dirty = true;
1382                 }
1383
1384                 if (unlikely((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0 ||
1385                              sf->sf_oi_count > OSD_OI_FID_NR_MAX)) {
1386                         LCONSOLE_WARN("%s: invalid OI count %u, reset to %u\n",
1387                                       osd_name(dev), sf->sf_oi_count,
1388                                       osd_oi_count);
1389                         sf->sf_oi_count = osd_oi_count;
1390                         dirty = true;
1391                 }
1392         }
1393
1394         if (sf->sf_pos_last_checkpoint != 0)
1395                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1396         else
1397                 scrub->os_pos_current = 1;
1398
1399         if (dirty) {
1400                 rc = scrub_file_store(env, scrub);
1401                 if (rc)
1402                         GOTO(cleanup_obj, rc);
1403         }
1404
1405         /* Initialize OI files. */
1406         rc = osd_oi_init(env, dev, resetoi);
1407         if (rc < 0)
1408                 GOTO(cleanup_obj, rc);
1409
1410         if (!dev->od_dt_dev.dd_rdonly)
1411                 osd_initial_OI_scrub(env, dev);
1412
1413         if (!dev->od_dt_dev.dd_rdonly &&
1414             scrub->os_auto_scrub_interval != AS_NEVER &&
1415             ((sf->sf_status == SS_PAUSED) ||
1416              (sf->sf_status == SS_CRASHED &&
1417               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1418                               SF_UPGRADE | SF_AUTO)) ||
1419              (sf->sf_status == SS_INIT &&
1420               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1421                               SF_UPGRADE))))
1422                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1423
1424         if (rc)
1425                 GOTO(cleanup_oi, rc);
1426
1427         RETURN(0);
1428
1429 cleanup_oi:
1430         osd_oi_fini(env, dev);
1431 cleanup_obj:
1432         dt_object_put_nocache(env, scrub->os_obj);
1433         scrub->os_obj = NULL;
1434
1435         return rc;
1436 }
1437
1438 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1439 {
1440         struct lustre_scrub *scrub = &dev->od_scrub;
1441
1442         LASSERT(!dev->od_otable_it);
1443
1444         if (scrub->os_obj) {
1445                 osd_scrub_stop(dev);
1446                 dt_object_put_nocache(env, scrub->os_obj);
1447                 scrub->os_obj = NULL;
1448         }
1449
1450         if (dev->od_oi_table)
1451                 osd_oi_fini(env, dev);
1452 }
1453
1454 /* object table based iteration APIs */
1455
1456 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1457                                        struct dt_object *dt, __u32 attr)
1458 {
1459         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1460         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1461         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1462         struct lustre_scrub *scrub = &dev->od_scrub;
1463         struct osd_otable_it *it;
1464         __u32 start = 0;
1465         int rc;
1466         ENTRY;
1467
1468         if (dev->od_dt_dev.dd_rdonly)
1469                 RETURN(ERR_PTR(-EROFS));
1470
1471         /* od_otable_sem: prevent concurrent init/fini */
1472         down(&dev->od_otable_sem);
1473         if (dev->od_otable_it)
1474                 GOTO(out, it = ERR_PTR(-EALREADY));
1475
1476         OBD_ALLOC_PTR(it);
1477         if (!it)
1478                 GOTO(out, it = ERR_PTR(-ENOMEM));
1479
1480         if (flags & DOIF_OUTUSED)
1481                 it->ooi_used_outside = 1;
1482
1483         if (flags & DOIF_RESET)
1484                 start |= SS_RESET;
1485
1486         if (valid & DOIV_ERROR_HANDLE) {
1487                 if (flags & DOIF_FAILOUT)
1488                         start |= SS_SET_FAILOUT;
1489                 else
1490                         start |= SS_CLEAR_FAILOUT;
1491         }
1492
1493         if (valid & DOIV_DRYRUN) {
1494                 if (flags & DOIF_DRYRUN)
1495                         start |= SS_SET_DRYRUN;
1496                 else
1497                         start |= SS_CLEAR_DRYRUN;
1498         }
1499
1500         /* XXX: dmu_object_next() does NOT find dnodes allocated
1501          *      in the current non-committed txg, so we force txg
1502          *      commit to find all existing dnodes ... */
1503         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1504
1505         dev->od_otable_it = it;
1506         it->ooi_dev = dev;
1507         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1508         if (rc == -EALREADY) {
1509                 it->ooi_pos = 1;
1510         } else if (rc < 0) {
1511                 dev->od_otable_it = NULL;
1512                 OBD_FREE_PTR(it);
1513                 it = ERR_PTR(rc);
1514         } else {
1515                 it->ooi_pos = scrub->os_pos_current;
1516         }
1517
1518         GOTO(out, it);
1519
1520 out:
1521         up(&dev->od_otable_sem);
1522         return (struct dt_it *)it;
1523 }
1524
1525 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1526 {
1527         struct osd_otable_it *it = (struct osd_otable_it *)di;
1528         struct osd_device *dev = it->ooi_dev;
1529
1530         /* od_otable_sem: prevent concurrent init/fini */
1531         down(&dev->od_otable_sem);
1532         scrub_stop(&dev->od_scrub);
1533         LASSERT(dev->od_otable_it == it);
1534
1535         dev->od_otable_it = NULL;
1536         up(&dev->od_otable_sem);
1537         OBD_FREE_PTR(it);
1538 }
1539
1540 static int osd_otable_it_get(const struct lu_env *env,
1541                              struct dt_it *di, const struct dt_key *key)
1542 {
1543         return 0;
1544 }
1545
1546 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1547 {
1548 }
1549
1550 static void osd_otable_it_preload(const struct lu_env *env,
1551                                   struct osd_otable_it *it)
1552 {
1553         struct osd_device *dev = it->ooi_dev;
1554         int rc;
1555
1556         /* can go negative on the very first access to the iterator
1557          * or if some non-Lustre objects were found */
1558         if (unlikely(it->ooi_prefetched < 0))
1559                 it->ooi_prefetched = 0;
1560
1561         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1562                 return;
1563
1564         if (it->ooi_prefetched_dnode == 0)
1565                 it->ooi_prefetched_dnode = it->ooi_pos;
1566
1567         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1568                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1569                                       B_FALSE, 0);
1570                 if (rc)
1571                         break;
1572
1573                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1574                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1575                 it->ooi_prefetched++;
1576         }
1577 }
1578
1579 static inline int
1580 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1581 {
1582         spin_lock(&scrub->os_lock);
1583         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1584             !scrub->os_running)
1585                 it->ooi_waiting = 0;
1586         else
1587                 it->ooi_waiting = 1;
1588         spin_unlock(&scrub->os_lock);
1589
1590         return !it->ooi_waiting;
1591 }
1592
1593 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1594 {
1595         struct osd_otable_it *it = (struct osd_otable_it *)di;
1596         struct osd_device *dev = it->ooi_dev;
1597         struct lustre_scrub *scrub = &dev->od_scrub;
1598         struct lustre_mdt_attrs *lma = NULL;
1599         nvlist_t *nvbuf = NULL;
1600         int rc, size = 0;
1601         bool locked;
1602         ENTRY;
1603
1604         LASSERT(it->ooi_user_ready);
1605         fid_zero(&it->ooi_fid);
1606
1607         if (unlikely(it->ooi_all_cached))
1608                 RETURN(1);
1609
1610 again:
1611         if (nvbuf) {
1612                 nvlist_free(nvbuf);
1613                 nvbuf = NULL;
1614                 lma = NULL;
1615                 size = 0;
1616         }
1617
1618         if (it->ooi_pos >= scrub->os_pos_current)
1619                 wait_var_event(scrub,
1620                                osd_otable_it_wakeup(scrub, it));
1621
1622         if (!scrub->os_running && !it->ooi_used_outside)
1623                 GOTO(out, rc = 1);
1624
1625         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1626         if (rc) {
1627                 if (unlikely(rc == -ESRCH)) {
1628                         it->ooi_all_cached = 1;
1629                         rc = 1;
1630                 }
1631
1632                 GOTO(out, rc);
1633         }
1634
1635         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1636
1637         locked = false;
1638         if (!scrub->os_full_speed) {
1639                 spin_lock(&scrub->os_lock);
1640                 locked = true;
1641         }
1642         it->ooi_prefetched--;
1643         if (!scrub->os_full_speed) {
1644                 if (scrub->os_waiting) {
1645                         scrub->os_waiting = 0;
1646                         wake_up_var(scrub);
1647                 }
1648         }
1649         if (locked)
1650                 spin_unlock(&scrub->os_lock);
1651
1652         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1653                 goto again;
1654
1655         if (rc)
1656                 GOTO(out, rc);
1657
1658         LASSERT(nvbuf != NULL);
1659         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1660                                        (uchar_t **)&lma, &size);
1661         if (rc || size == 0)
1662                 /* It is either non-Lustre object or OSD internal object,
1663                  * ignore it, go ahead */
1664                 goto again;
1665
1666         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1667         lustre_lma_swab(lma);
1668         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1669                      lma->lma_incompat & LMAI_AGENT))
1670                 goto again;
1671
1672         it->ooi_fid = lma->lma_self_fid;
1673
1674         GOTO(out, rc = 0);
1675
1676 out:
1677         if (nvbuf)
1678                 nvlist_free(nvbuf);
1679
1680         if (!rc && scrub->os_full_speed)
1681                 osd_otable_it_preload(env, it);
1682
1683         return rc;
1684 }
1685
1686 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1687                                         const struct dt_it *di)
1688 {
1689         return NULL;
1690 }
1691
1692 static int osd_otable_it_key_size(const struct lu_env *env,
1693                                   const struct dt_it *di)
1694 {
1695         return sizeof(__u64);
1696 }
1697
1698 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1699                              struct dt_rec *rec, __u32 attr)
1700 {
1701         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1702         struct lu_fid *fid = (struct lu_fid *)rec;
1703
1704         *fid = it->ooi_fid;
1705         return 0;
1706 }
1707
1708 static __u64 osd_otable_it_store(const struct lu_env *env,
1709                                  const struct dt_it *di)
1710 {
1711         struct osd_otable_it *it = (struct osd_otable_it *)di;
1712
1713         return it->ooi_pos;
1714 }
1715
1716 /**
1717  * Set the OSD layer iteration start position as the specified hash.
1718  */
1719 static int osd_otable_it_load(const struct lu_env *env,
1720                               const struct dt_it *di, __u64 hash)
1721 {
1722         struct osd_otable_it *it = (struct osd_otable_it *)di;
1723         struct osd_device *dev = it->ooi_dev;
1724         struct lustre_scrub *scrub = &dev->od_scrub;
1725         int rc;
1726         ENTRY;
1727
1728         /* Forbid to set iteration position after iteration started. */
1729         if (it->ooi_user_ready)
1730                 RETURN(-EPERM);
1731
1732         if (hash > OSD_OTABLE_MAX_HASH)
1733                 hash = OSD_OTABLE_MAX_HASH;
1734
1735         /* The hash is the last checkpoint position,
1736          * we will start from the next one. */
1737         it->ooi_pos = hash + 1;
1738         it->ooi_prefetched = 0;
1739         it->ooi_prefetched_dnode = 0;
1740         it->ooi_user_ready = 1;
1741         if (!scrub->os_full_speed)
1742                 wake_up_var(scrub);
1743
1744         /* Unplug OSD layer iteration by the first next() call. */
1745         rc = osd_otable_it_next(env, (struct dt_it *)it);
1746
1747         RETURN(rc);
1748 }
1749
1750 static int osd_otable_it_key_rec(const struct lu_env *env,
1751                                  const struct dt_it *di, void *key_rec)
1752 {
1753         return 0;
1754 }
1755
1756 const struct dt_index_operations osd_otable_ops = {
1757         .dio_it = {
1758                 .init     = osd_otable_it_init,
1759                 .fini     = osd_otable_it_fini,
1760                 .get      = osd_otable_it_get,
1761                 .put      = osd_otable_it_put,
1762                 .next     = osd_otable_it_next,
1763                 .key      = osd_otable_it_key,
1764                 .key_size = osd_otable_it_key_size,
1765                 .rec      = osd_otable_it_rec,
1766                 .store    = osd_otable_it_store,
1767                 .load     = osd_otable_it_load,
1768                 .key_rec  = osd_otable_it_key_rec,
1769         }
1770 };
1771
1772 /* high priority inconsistent items list APIs */
1773
1774 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1775                    const struct lu_fid *fid, uint64_t oid, bool insert)
1776 {
1777         struct lustre_scrub *scrub = &dev->od_scrub;
1778         struct osd_inconsistent_item *oii;
1779         bool wakeup = false;
1780         ENTRY;
1781
1782         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1783         OBD_ALLOC_PTR(oii);
1784         if (unlikely(!oii))
1785                 RETURN(-ENOMEM);
1786
1787         INIT_LIST_HEAD(&oii->oii_list);
1788         oii->oii_cache.oic_dev = dev;
1789         oii->oii_cache.oic_fid = *fid;
1790         oii->oii_cache.oic_dnode = oid;
1791         oii->oii_insert = insert;
1792
1793         spin_lock(&scrub->os_lock);
1794         if (!scrub->os_running) {
1795                 spin_unlock(&scrub->os_lock);
1796                 OBD_FREE_PTR(oii);
1797                 RETURN(-EAGAIN);
1798         }
1799
1800         if (list_empty(&scrub->os_inconsistent_items))
1801                 wakeup = true;
1802         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1803         spin_unlock(&scrub->os_lock);
1804
1805         if (wakeup)
1806                 wake_up_var(scrub);
1807
1808         RETURN(0);
1809 }
1810
1811 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1812                    uint64_t *oid)
1813 {
1814         struct lustre_scrub *scrub = &dev->od_scrub;
1815         struct osd_inconsistent_item *oii;
1816         int ret = -ENOENT;
1817         ENTRY;
1818
1819         spin_lock(&scrub->os_lock);
1820         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1821                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1822                         *oid = oii->oii_cache.oic_dnode;
1823                         ret = 0;
1824                         break;
1825                 }
1826         }
1827         spin_unlock(&scrub->os_lock);
1828
1829         RETURN(ret);
1830 }
1831
1832 typedef int (*scan_dir_helper_t)(const struct lu_env *env,
1833                                  struct osd_device *dev, uint64_t dir_oid,
1834                                  struct osd_zap_it *ozi);
1835
1836 static int osd_scan_dir(const struct lu_env *env, struct osd_device *dev,
1837                         uint64_t id, scan_dir_helper_t cb)
1838 {
1839         struct osd_zap_it *it;
1840         struct luz_direntry *zde;
1841         zap_attribute_t *za;
1842         int rc;
1843
1844         ENTRY;
1845
1846         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
1847         if (it == NULL)
1848                 RETURN(-ENOMEM);
1849
1850         rc = osd_zap_cursor_init(&it->ozi_zc, dev->od_os, id, 0);
1851         if (rc != 0)
1852                 GOTO(out, rc);
1853
1854         za = &it->ozi_za;
1855         zde = &it->ozi_zde;
1856         while (1) {
1857                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1858                 if (unlikely(rc)) {
1859                         if (rc == -ENOENT)
1860                                 rc = 0;
1861
1862                         break;
1863                 }
1864
1865                 if (name_is_dot_or_dotdot(za->za_name, strlen(za->za_name))) {
1866                         zap_cursor_advance(it->ozi_zc);
1867                         continue;
1868                 }
1869
1870                 strncpy(it->ozi_name, za->za_name, sizeof(it->ozi_name));
1871                 if (za->za_integer_length != 8) {
1872                         rc = -EIO;
1873                         break;
1874                 }
1875
1876                 rc = osd_zap_lookup(dev, it->ozi_zc->zc_zapobj, NULL,
1877                                     za->za_name, za->za_integer_length,
1878                                     sizeof(*zde) / za->za_integer_length, zde);
1879                 if (rc)
1880                         break;
1881
1882                 rc = cb(env, dev, id, it);
1883                 if (rc)
1884                         break;
1885
1886                 zap_cursor_advance(it->ozi_zc);
1887         }
1888         osd_zap_cursor_fini(it->ozi_zc);
1889
1890 out:
1891         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
1892         RETURN(rc);
1893 }
1894
1895 static int osd_remove_ml_file(const struct lu_env *env, struct osd_device *dev,
1896                               uint64_t dir, uint64_t id, struct lu_fid *fid,
1897                               char *name)
1898 {
1899         struct osd_thread_info *info = osd_oti_get(env);
1900         struct dt_object *dt;
1901         struct osd_object *obj = NULL;
1902         dmu_tx_t *tx;
1903         sa_handle_t *hdl;
1904         uint64_t nlink;
1905         int rc;
1906
1907         rc = -sa_handle_get(dev->od_os, id, NULL, SA_HDL_PRIVATE, &hdl);
1908         if (rc)
1909                 RETURN(rc);
1910
1911         dt = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1912         if (IS_ERR(dt))
1913                 RETURN(PTR_ERR(dt));
1914
1915         if (dt) {
1916                 obj = osd_dt_obj(dt);
1917                 down_read(&obj->oo_guard);
1918         }
1919
1920         rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
1921         if (rc)
1922                 GOTO(out, rc);
1923
1924         if (nlink <= 1) {
1925                 CERROR("%s: multi-link file O/%s/%s/%s has nlink %llu\n",
1926                        osd_name(dev), info->oti_seq_name, info->oti_dir_name,
1927                        name, nlink);
1928                 GOTO(out, rc = 0);
1929         }
1930
1931         tx = dmu_tx_create(dev->od_os);
1932         if (!tx) {
1933                 CERROR("%s: fail to create tx to remove multi-link file!\n",
1934                        osd_name(dev));
1935                 GOTO(out, rc = -ENOMEM);
1936         }
1937
1938         dmu_tx_hold_zap(tx, dir, FALSE, NULL);
1939         rc = -dmu_tx_assign(tx, TXG_WAIT);
1940         if (rc)
1941                 GOTO(abort, rc);
1942
1943         nlink--;
1944         rc = -sa_update(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink), tx);
1945         if (rc)
1946                 GOTO(abort, rc);
1947
1948         rc = -zap_remove(dev->od_os, dir, name, tx);
1949         if (rc)
1950                 GOTO(abort, rc);
1951
1952         dmu_tx_commit(tx);
1953         GOTO(out, rc);
1954
1955 abort:
1956         dmu_tx_abort(tx);
1957
1958 out:
1959         if (dt) {
1960                 up_read(&obj->oo_guard);
1961                 dt_object_put_nocache(env, dt);
1962         }
1963
1964         sa_handle_destroy(hdl);
1965         RETURN(rc);
1966 }
1967
1968 static int osd_scan_ml_file(const struct lu_env *env, struct osd_device *dev,
1969                             uint64_t dir_oid, struct osd_zap_it *ozi)
1970 {
1971         struct osd_thread_info *info = osd_oti_get(env);
1972         struct lu_fid *fid = &info->oti_fid;
1973         struct ost_id *ostid = &info->oti_ostid;
1974         char name[32];
1975         u64 seq;
1976         int rc = 0;
1977
1978         ENTRY;
1979
1980         rc = osd_get_fid_by_oid(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode, fid);
1981         if (rc)
1982                 RETURN(rc);
1983
1984         seq = fid_seq(fid);
1985         fid_to_ostid(fid, ostid);
1986
1987         snprintf(name, sizeof(name), (fid_seq_is_rsvd(seq) ||
1988                                       fid_seq_is_mdt0(seq)) ? "%llu" : "%llx",
1989                                       fid_seq_is_idif(seq) ? 0 : seq);
1990         if (strcmp(info->oti_seq_name, name) != 0)
1991                 GOTO(fix, rc);
1992
1993         snprintf(name, sizeof(name), "d%d",
1994                 (int)ostid_id(ostid) % OSD_OST_MAP_SIZE);
1995         if (strcmp(info->oti_dir_name, name) != 0)
1996                 GOTO(fix, rc);
1997
1998         snprintf(name, sizeof(name), "%llu", ostid_id(ostid));
1999         if (strcmp(ozi->ozi_name, name) == 0)
2000                 RETURN(0);
2001
2002 fix:
2003         CDEBUG(D_LFSCK, "%s: the file O/%s/%s/%s is corrupted\n",
2004                osd_name(dev), info->oti_seq_name, info->oti_dir_name,
2005                ozi->ozi_name);
2006
2007         rc = osd_remove_ml_file(env, dev, dir_oid,
2008                                 ozi->ozi_zde.lzd_reg.zde_dnode, fid,
2009                                 ozi->ozi_name);
2010         RETURN(rc);
2011 }
2012
2013 static int osd_scan_ml_file_dir(const struct lu_env *env,
2014                                 struct osd_device *dev, uint64_t dir_oid,
2015                                 struct osd_zap_it *ozi)
2016 {
2017         struct osd_thread_info *info = osd_oti_get(env);
2018
2019         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2020                 return 0;
2021
2022         info->oti_dir_name = ozi->ozi_name;
2023         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2024                             osd_scan_ml_file);
2025 }
2026
2027 static int osd_scan_ml_file_seq(const struct lu_env *env,
2028                                 struct osd_device *dev, uint64_t dir_oid,
2029                                 struct osd_zap_it *ozi)
2030 {
2031         struct osd_thread_info *info = osd_oti_get(env);
2032
2033         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2034                 return 0;
2035
2036         info->oti_seq_name = ozi->ozi_name;
2037         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2038                             osd_scan_ml_file_dir);
2039 }
2040
2041 static int osd_scan_ml_file_main(const struct lu_env *env,
2042                                  struct osd_device *dev)
2043 {
2044         return osd_scan_dir(env, dev, dev->od_O_id, osd_scan_ml_file_seq);
2045 }