Whamcloud - gitweb
LU-14927 quota: move qsd_transfer to lquota module
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
59 {
60         return it->ooi_prefetched < OTABLE_PREFETCH;
61 }
62
63 /**
64  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
65  *
66  * \retval   1, changed nothing
67  * \retval   0, changed successfully
68  * \retval -ve, on error
69  */
70 int osd_scrub_refresh_mapping(const struct lu_env *env,
71                               struct osd_device *dev,
72                               const struct lu_fid *fid,
73                               uint64_t oid, enum dt_txn_op ops,
74                               bool force, const char *name)
75 {
76         struct osd_thread_info *info = osd_oti_get(env);
77         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
78         char *buf = info->oti_str;
79         dmu_tx_t *tx = NULL;
80         dnode_t *dn = NULL;
81         uint64_t zapid;
82         int rc;
83         ENTRY;
84
85         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
86                 GOTO(log, rc = 0);
87
88         tx = dmu_tx_create(dev->od_os);
89         if (!tx)
90                 GOTO(log, rc = -ENOMEM);
91
92         zapid = osd_get_name_n_idx(env, dev, fid, buf,
93                                    sizeof(info->oti_str), &dn);
94         osd_tx_hold_zap(tx, zapid, dn,
95                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
96         rc = -dmu_tx_assign(tx, TXG_WAIT);
97         if (rc) {
98                 dmu_tx_abort(tx);
99                 GOTO(log, rc);
100         }
101
102         switch (ops) {
103         case DTO_INDEX_UPDATE:
104                 zde->zde_pad = 0;
105                 zde->zde_dnode = oid;
106                 zde->zde_type = 0; /* The type in OI mapping is useless. */
107                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
108                                  zde, tx);
109                 if (unlikely(rc == -ENOENT)) {
110                         /* Some unlink thread may removed the OI mapping. */
111                         rc = 1;
112                 }
113                 break;
114         case DTO_INDEX_INSERT:
115                 zde->zde_pad = 0;
116                 zde->zde_dnode = oid;
117                 zde->zde_type = 0; /* The type in OI mapping is useless. */
118                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
119                                  zde, tx);
120                 if (unlikely(rc == -EEXIST))
121                         rc = 1;
122                 break;
123         case DTO_INDEX_DELETE:
124                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
125                 if (rc == -ENOENT) {
126                         /* It is normal that the unlink thread has removed the
127                          * OI mapping already. */
128                         rc = 1;
129                 }
130                 break;
131         default:
132                 LASSERTF(0, "Unexpected ops %d\n", ops);
133                 rc = -EINVAL;
134                 break;
135         }
136
137         dmu_tx_commit(tx);
138         GOTO(log, rc);
139
140 log:
141         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
142                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
143                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
144
145         return rc;
146 }
147
148 static int
149 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
150                        const struct lu_fid *fid, uint64_t oid, int val)
151 {
152         struct lustre_scrub *scrub = &dev->od_scrub;
153         struct scrub_file *sf = &scrub->os_file;
154         struct osd_inconsistent_item *oii = NULL;
155         nvlist_t *nvbuf = NULL;
156         dnode_t *dn = NULL;
157         uint64_t oid2;
158         int ops = DTO_INDEX_UPDATE;
159         int rc;
160         ENTRY;
161
162         down_write(&scrub->os_rwsem);
163         scrub->os_new_checked++;
164         if (val < 0)
165                 GOTO(out, rc = val);
166
167         if (scrub->os_in_prior)
168                 oii = list_entry(scrub->os_inconsistent_items.next,
169                                  struct osd_inconsistent_item, oii_list);
170
171         if (oid < sf->sf_pos_latest_start && !oii)
172                 GOTO(out, rc = 0);
173
174         if (oii && oii->oii_insert) {
175                 ops = DTO_INDEX_INSERT;
176                 goto zget;
177         }
178
179         rc = osd_fid_lookup(env, dev, fid, &oid2);
180         if (rc) {
181                 if (rc != -ENOENT)
182                         GOTO(out, rc);
183
184                 ops = DTO_INDEX_INSERT;
185
186 zget:
187                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
188                 if (rc) {
189                         /* Someone removed the object by race. */
190                         if (rc == -ENOENT || rc == -EEXIST)
191                                 rc = 0;
192                         GOTO(out, rc);
193                 }
194
195                 spin_lock(&scrub->os_lock);
196                 scrub->os_full_speed = 1;
197                 spin_unlock(&scrub->os_lock);
198
199                 sf->sf_flags |= SF_INCONSISTENT;
200         } else if (oid == oid2) {
201                 GOTO(out, rc = 0);
202         } else {
203                 struct lustre_mdt_attrs *lma = NULL;
204                 int size;
205
206                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
207                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
208                         goto update;
209                 if (rc)
210                         GOTO(out, rc);
211
212                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
213                                                (uchar_t **)&lma, &size);
214                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
215                         goto update;
216                 if (rc)
217                         GOTO(out, rc);
218
219                 lustre_lma_swab(lma);
220                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
221                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
222                                "two objects: %llu and %llu (in OI)\n",
223                                osd_name(dev), PFID(fid), oid, oid2);
224
225                         GOTO(out, rc = -EEXIST);
226                 }
227
228 update:
229                 spin_lock(&scrub->os_lock);
230                 scrub->os_full_speed = 1;
231                 spin_unlock(&scrub->os_lock);
232                 sf->sf_flags |= SF_INCONSISTENT;
233         }
234
235         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
236         if (!rc) {
237                 if (scrub->os_in_prior)
238                         sf->sf_items_updated_prior++;
239                 else
240                         sf->sf_items_updated++;
241         }
242
243         GOTO(out, rc);
244
245 out:
246         if (dev->od_is_ost) {
247                 sa_handle_t *hdl;
248                 uint64_t nlink, mode;
249
250                 rc = -sa_handle_get(dev->od_os, oid, NULL, SA_HDL_PRIVATE,
251                                     &hdl);
252                 if (rc)
253                         GOTO(cleanup, rc);
254
255                 rc = -sa_lookup(hdl, SA_ZPL_MODE(dev), &mode, sizeof(mode));
256                 if (rc || !S_ISREG(mode)) {
257                         sa_handle_destroy(hdl);
258                         GOTO(cleanup, rc);
259                 }
260
261                 rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
262                 if (rc == 0 && nlink > 1)
263                         scrub->os_has_ml_file = 1;
264
265                 sa_handle_destroy(hdl);
266         }
267
268 cleanup:
269         if (nvbuf)
270                 nvlist_free(nvbuf);
271
272         if (rc < 0) {
273                 sf->sf_items_failed++;
274                 if (sf->sf_pos_first_inconsistent == 0 ||
275                     sf->sf_pos_first_inconsistent > oid)
276                         sf->sf_pos_first_inconsistent = oid;
277         } else {
278                 rc = 0;
279         }
280
281         /* There may be conflict unlink during the OI scrub,
282          * if happend, then remove the new added OI mapping. */
283         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
284                 osd_scrub_refresh_mapping(env, dev, fid, oid,
285                                           DTO_INDEX_DELETE, false, NULL);
286         up_write(&scrub->os_rwsem);
287
288         if (dn)
289                 osd_dnode_rele(dn);
290
291         if (oii) {
292                 spin_lock(&scrub->os_lock);
293                 if (likely(!list_empty(&oii->oii_list)))
294                         list_del(&oii->oii_list);
295                 spin_unlock(&scrub->os_lock);
296                 OBD_FREE_PTR(oii);
297         }
298
299         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
300 }
301
302 /* iteration engine */
303
304 static inline int
305 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
306 {
307         spin_lock(&scrub->os_lock);
308         if (osd_scrub_has_window(it) ||
309             !list_empty(&scrub->os_inconsistent_items) ||
310             it->ooi_waiting || kthread_should_stop())
311                 scrub->os_waiting = 0;
312         else
313                 scrub->os_waiting = 1;
314         spin_unlock(&scrub->os_lock);
315
316         return !scrub->os_waiting;
317 }
318
319 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
320                           struct lu_fid *fid, uint64_t *oid)
321 {
322         struct lustre_scrub *scrub = &dev->od_scrub;
323         struct osd_otable_it *it = dev->od_otable_it;
324         struct lustre_mdt_attrs *lma = NULL;
325         nvlist_t *nvbuf = NULL;
326         int size = 0;
327         int rc = 0;
328         ENTRY;
329
330         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
331                 wait_var_event_timeout(
332                         scrub,
333                         !list_empty(&scrub->os_inconsistent_items) ||
334                         kthread_should_stop(),
335                         cfs_time_seconds(cfs_fail_val));
336
337                 if (kthread_should_stop())
338                         RETURN(SCRUB_NEXT_EXIT);
339         }
340
341         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
342                 spin_lock(&scrub->os_lock);
343                 scrub->os_running = 0;
344                 spin_unlock(&scrub->os_lock);
345                 RETURN(SCRUB_NEXT_CRASH);
346         }
347
348         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
349                 RETURN(SCRUB_NEXT_FATAL);
350
351 again:
352         if (nvbuf) {
353                 nvlist_free(nvbuf);
354                 nvbuf = NULL;
355                 lma = NULL;
356         }
357
358         if (!list_empty(&scrub->os_inconsistent_items)) {
359                 spin_lock(&scrub->os_lock);
360                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
361                         struct osd_inconsistent_item *oii;
362
363                         oii = list_entry(scrub->os_inconsistent_items.next,
364                                 struct osd_inconsistent_item, oii_list);
365                         *fid = oii->oii_cache.oic_fid;
366                         *oid = oii->oii_cache.oic_dnode;
367                         scrub->os_in_prior = 1;
368                         spin_unlock(&scrub->os_lock);
369
370                         GOTO(out, rc = 0);
371                 }
372                 spin_unlock(&scrub->os_lock);
373         }
374
375         if (!scrub->os_full_speed && !osd_scrub_has_window(it))
376                 wait_var_event(scrub, osd_scrub_wakeup(scrub, it));
377
378         if (kthread_should_stop())
379                 GOTO(out, rc = SCRUB_NEXT_EXIT);
380
381         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
382         if (rc)
383                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
384
385         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
386         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
387                 goto again;
388
389         if (rc)
390                 GOTO(out, rc);
391
392         LASSERT(nvbuf != NULL);
393         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
394                                        (uchar_t **)&lma, &size);
395         if (!rc) {
396                 lustre_lma_swab(lma);
397                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
398                            !(lma->lma_incompat & LMAI_AGENT))) {
399                         *fid = lma->lma_self_fid;
400                         *oid = scrub->os_pos_current;
401
402                         GOTO(out, rc = 0);
403                 }
404         }
405
406         if (!scrub->os_full_speed) {
407                 spin_lock(&scrub->os_lock);
408                 it->ooi_prefetched++;
409                 if (it->ooi_waiting) {
410                         it->ooi_waiting = 0;
411                         wake_up_var(scrub);
412                 }
413                 spin_unlock(&scrub->os_lock);
414         }
415
416         goto again;
417
418 out:
419         if (nvbuf)
420                 nvlist_free(nvbuf);
421
422         return rc;
423 }
424
425 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
426                           const struct lu_fid *fid, uint64_t oid, int rc)
427 {
428         struct lustre_scrub *scrub = &dev->od_scrub;
429         struct osd_otable_it *it = dev->od_otable_it;
430
431         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
432         if (!scrub->os_in_prior) {
433                 if (!scrub->os_full_speed) {
434                         spin_lock(&scrub->os_lock);
435                         it->ooi_prefetched++;
436                         if (it->ooi_waiting) {
437                                 it->ooi_waiting = 0;
438                                 wake_up_var(scrub);
439                         }
440                         spin_unlock(&scrub->os_lock);
441                 }
442         } else {
443                 spin_lock(&scrub->os_lock);
444                 scrub->os_in_prior = 0;
445                 spin_unlock(&scrub->os_lock);
446         }
447
448         if (rc)
449                 return rc;
450
451         rc = scrub_checkpoint(env, scrub);
452         if (rc) {
453                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
454                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
455                 /* Continue, as long as the scrub itself can go ahead. */
456         }
457
458         return 0;
459 }
460
461 static int osd_scan_ml_file_main(const struct lu_env *env,
462                                  struct osd_device *dev);
463
464 static int osd_scrub_main(void *args)
465 {
466         struct lu_env env;
467         struct osd_device *dev = (struct osd_device *)args;
468         struct lustre_scrub *scrub = &dev->od_scrub;
469         struct lu_fid *fid;
470         uint64_t oid;
471         int rc = 0, ret;
472         ENTRY;
473
474         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
475         if (rc) {
476                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
477                        scrub->os_name, rc);
478                 GOTO(noenv, rc);
479         }
480
481         rc = scrub_thread_prep(&env, scrub, dev->od_uuid, 1);
482         if (rc) {
483                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
484                        scrub->os_name, rc);
485                 GOTO(out, rc);
486         }
487
488         if (!scrub->os_full_speed) {
489                 struct osd_otable_it *it = dev->od_otable_it;
490
491                 wait_var_event(scrub,
492                                it->ooi_user_ready ||
493                                kthread_should_stop());
494
495                 if (kthread_should_stop())
496                         GOTO(post, rc = 0);
497
498                 scrub->os_pos_current = it->ooi_pos;
499         }
500
501         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
502                scrub->os_name, scrub->os_start_flags,
503                scrub->os_pos_current);
504
505         fid = &osd_oti_get(&env)->oti_fid;
506         while (!rc && !kthread_should_stop()) {
507                 rc = osd_scrub_next(&env, dev, fid, &oid);
508                 switch (rc) {
509                 case SCRUB_NEXT_EXIT:
510                         GOTO(post, rc = 0);
511                 case SCRUB_NEXT_CRASH:
512                         spin_lock(&scrub->os_lock);
513                         scrub->os_running = 0;
514                         spin_unlock(&scrub->os_lock);
515                         GOTO(out, rc = -EINVAL);
516                 case SCRUB_NEXT_FATAL:
517                         GOTO(post, rc = -EINVAL);
518                 case SCRUB_NEXT_BREAK:
519                         GOTO(post, rc = 1);
520                 }
521
522                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
523         }
524
525         GOTO(post, rc);
526
527 post:
528         if (scrub->os_has_ml_file) {
529                 ret = osd_scan_ml_file_main(&env, dev);
530                 if (ret != 0)
531                         rc = ret;
532         }
533
534         rc = scrub_thread_post(&env, &dev->od_scrub, rc);
535         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
536                scrub->os_name, scrub->os_pos_current, rc);
537
538 out:
539         while (!list_empty(&scrub->os_inconsistent_items)) {
540                 struct osd_inconsistent_item *oii;
541
542                 oii = list_entry(scrub->os_inconsistent_items.next,
543                                  struct osd_inconsistent_item, oii_list);
544                 list_del_init(&oii->oii_list);
545                 OBD_FREE_PTR(oii);
546         }
547
548         lu_env_fini(&env);
549
550 noenv:
551         spin_lock(&scrub->os_lock);
552         scrub->os_running = 0;
553         spin_unlock(&scrub->os_lock);
554         if (xchg(&scrub->os_task, NULL) == NULL)
555                 /* scrub_stop is waiting, we need to synchronize */
556                 wait_var_event(scrub, kthread_should_stop());
557         wake_up_var(scrub);
558         return rc;
559 }
560
561 /* initial OI scrub */
562
563 struct osd_lf_map;
564
565 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
566                                const char *, uint64_t, uint64_t,
567                                enum osd_lf_flags, bool);
568 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
569                              const char *, uint64_t, uint64_t,
570                              enum osd_lf_flags, bool);
571 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
572                           const char *, uint64_t, uint64_t,
573                           enum osd_lf_flags, bool);
574
575 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
576                           uint64_t, handle_dirent_t, enum osd_lf_flags);
577 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
578                               uint64_t, handle_dirent_t, enum osd_lf_flags);
579 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
580                            uint64_t, handle_dirent_t, enum osd_lf_flags);
581
582 struct osd_lf_map {
583         char                    *olm_name;
584         struct lu_fid            olm_fid;
585         enum osd_lf_flags        olm_flags;
586         scan_dir_t               olm_scan_dir;
587         handle_dirent_t          olm_handle_dirent;
588 };
589
590 /* Add the new introduced local files in the list in the future. */
591 static const struct osd_lf_map osd_lf_maps[] = {
592         /* CONFIGS */
593         {
594                 .olm_name               = MOUNT_CONFIGS_DIR,
595                 .olm_fid                = {
596                         .f_seq  = FID_SEQ_LOCAL_FILE,
597                         .f_oid  = MGS_CONFIGS_OID,
598                 },
599                 .olm_flags              = OLF_SCAN_SUBITEMS,
600                 .olm_scan_dir           = osd_ios_general_sd,
601                 .olm_handle_dirent      = osd_ios_varfid_hd,
602         },
603
604         /* NIDTBL_VERSIONS */
605         {
606                 .olm_name               = MGS_NIDTBL_DIR,
607                 .olm_flags              = OLF_SCAN_SUBITEMS,
608                 .olm_scan_dir           = osd_ios_general_sd,
609                 .olm_handle_dirent      = osd_ios_varfid_hd,
610         },
611
612         /* PENDING */
613         {
614                 .olm_name               = MDT_ORPHAN_DIR,
615         },
616
617         /* ROOT */
618         {
619                 .olm_name               = "ROOT",
620                 .olm_fid                = {
621                         .f_seq  = FID_SEQ_ROOT,
622                         .f_oid  = FID_OID_ROOT,
623                 },
624                 .olm_flags              = OLF_SCAN_SUBITEMS,
625                 .olm_scan_dir           = osd_ios_ROOT_sd,
626         },
627
628         /* fld */
629         {
630                 .olm_name               = "fld",
631                 .olm_fid                = {
632                         .f_seq  = FID_SEQ_LOCAL_FILE,
633                         .f_oid  = FLD_INDEX_OID,
634                 },
635         },
636
637         /* changelog_catalog */
638         {
639                 .olm_name               = CHANGELOG_CATALOG,
640         },
641
642         /* changelog_users */
643         {
644                 .olm_name               = CHANGELOG_USERS,
645         },
646
647         /* quota_master */
648         {
649                 .olm_name               = QMT_DIR,
650                 .olm_flags              = OLF_SCAN_SUBITEMS,
651                 .olm_scan_dir           = osd_ios_general_sd,
652                 .olm_handle_dirent      = osd_ios_varfid_hd,
653         },
654
655         /* quota_slave */
656         {
657                 .olm_name               = QSD_DIR,
658                 .olm_flags              = OLF_SCAN_SUBITEMS,
659                 .olm_scan_dir           = osd_ios_general_sd,
660                 .olm_handle_dirent      = osd_ios_varfid_hd,
661         },
662
663         /* LFSCK */
664         {
665                 .olm_name               = LFSCK_DIR,
666                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
667                 .olm_scan_dir           = osd_ios_general_sd,
668                 .olm_handle_dirent      = osd_ios_varfid_hd,
669         },
670
671         /* lfsck_bookmark */
672         {
673                 .olm_name               = LFSCK_BOOKMARK,
674         },
675
676         /* lfsck_layout */
677         {
678                 .olm_name               = LFSCK_LAYOUT,
679         },
680
681         /* lfsck_namespace */
682         {
683                 .olm_name               = LFSCK_NAMESPACE,
684         },
685
686         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
687          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
688          * for more details. */
689
690         /* update_log */
691         {
692                 .olm_name               = "update_log",
693                 .olm_fid                = {
694                         .f_seq  = FID_SEQ_UPDATE_LOG,
695                 },
696                 .olm_flags              = OLF_IDX_IN_FID,
697         },
698
699         /* update_log_dir */
700         {
701                 .olm_name               = "update_log_dir",
702                 .olm_fid        = {
703                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
704                 },
705                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
706                 .olm_scan_dir           = osd_ios_general_sd,
707                 .olm_handle_dirent      = osd_ios_uld_hd,
708         },
709
710         /* hsm_actions */
711         {
712                 .olm_name               = HSM_ACTIONS,
713         },
714
715         /* nodemap */
716         {
717                 .olm_name               = LUSTRE_NODEMAP_NAME,
718         },
719
720         /* index_backup */
721         {
722                 .olm_name               = INDEX_BACKUP_DIR,
723                 .olm_fid                = {
724                         .f_seq  = FID_SEQ_LOCAL_FILE,
725                         .f_oid  = INDEX_BACKUP_OID,
726                 },
727                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
728                 .olm_scan_dir           = osd_ios_general_sd,
729                 .olm_handle_dirent      = osd_ios_varfid_hd,
730         },
731
732         {
733                 .olm_name               = NULL
734         }
735 };
736
737 /* Add the new introduced files under .lustre/ in the list in the future. */
738 static const struct osd_lf_map osd_dl_maps[] = {
739         /* .lustre/fid */
740         {
741                 .olm_name               = "fid",
742                 .olm_fid                = {
743                         .f_seq  = FID_SEQ_DOT_LUSTRE,
744                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
745                 },
746         },
747
748         /* .lustre/lost+found */
749         {
750                 .olm_name               = "lost+found",
751                 .olm_fid                = {
752                         .f_seq  = FID_SEQ_DOT_LUSTRE,
753                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
754                 },
755         },
756
757         {
758                 .olm_name               = NULL
759         }
760 };
761
762 struct osd_ios_item {
763         struct list_head        oii_list;
764         uint64_t                oii_parent;
765         enum osd_lf_flags       oii_flags;
766         scan_dir_t              oii_scan_dir;
767         handle_dirent_t         oii_handle_dirent;
768 };
769
770 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
771                             enum osd_lf_flags flags, scan_dir_t scan_dir,
772                             handle_dirent_t handle_dirent)
773 {
774         struct osd_ios_item *item;
775
776         OBD_ALLOC_PTR(item);
777         if (!item) {
778                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
779                       osd_name(dev), parent);
780                 return -ENOMEM;
781         }
782
783         INIT_LIST_HEAD(&item->oii_list);
784         item->oii_parent = parent;
785         item->oii_flags = flags;
786         item->oii_scan_dir = scan_dir;
787         item->oii_handle_dirent = handle_dirent;
788         list_add_tail(&item->oii_list, &dev->od_ios_list);
789
790         return 0;
791 }
792
793 static bool osd_index_need_recreate(const struct lu_env *env,
794                                     struct osd_device *dev, uint64_t oid)
795 {
796         struct osd_thread_info *info = osd_oti_get(env);
797         zap_attribute_t *za = &info->oti_za2;
798         zap_cursor_t *zc = &info->oti_zc2;
799         int rc;
800         ENTRY;
801
802         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
803         rc = -zap_cursor_retrieve(zc, za);
804         zap_cursor_fini(zc);
805         if (rc && rc != -ENOENT)
806                 RETURN(true);
807
808         RETURN(false);
809 }
810
811 static void osd_ios_index_register(const struct lu_env *env,
812                                    struct osd_device *osd,
813                                    const struct lu_fid *fid, uint64_t oid)
814 {
815         struct osd_thread_info *info = osd_oti_get(env);
816         zap_attribute_t *za = &info->oti_za2;
817         zap_cursor_t *zc = &info->oti_zc2;
818         struct zap_leaf_entry *le;
819         dnode_t *dn = NULL;
820         sa_handle_t *hdl;
821         __u64 mode = 0;
822         __u32 keysize = 0;
823         __u32 recsize = 0;
824         int rc;
825         ENTRY;
826
827         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
828         if (rc == -EEXIST || rc == -ENOENT)
829                 RETURN_EXIT;
830
831         if (rc < 0)
832                 GOTO(log, rc);
833
834         if (!osd_object_is_zap(dn))
835                 GOTO(log, rc = 1);
836
837         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
838         if (rc)
839                 GOTO(log, rc);
840
841         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
842         sa_handle_destroy(hdl);
843         if (rc)
844                 GOTO(log, rc);
845
846         if (!S_ISREG(mode))
847                 GOTO(log, rc = 1);
848
849         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
850         rc = -zap_cursor_retrieve(zc, za);
851         if (rc)
852                 /* Skip empty index object */
853                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
854
855         if (zc->zc_zap->zap_ismicro ||
856             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
857                 GOTO(fini, rc = 1);
858
859         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
860         keysize = le->le_name_numints * 8;
861         recsize = za->za_integer_length * za->za_num_integers;
862         if (likely(keysize && recsize))
863                 rc = osd_index_register(osd, fid, keysize, recsize);
864
865         GOTO(fini, rc);
866
867 fini:
868         zap_cursor_fini(zc);
869
870 log:
871         if (dn)
872                 osd_dnode_rele(dn);
873         if (rc < 0)
874                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
875                       osd_name(osd), PFID(fid), keysize, recsize, rc);
876         else if (!rc)
877                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
878                        osd_name(osd), PFID(fid), keysize, recsize);
879 }
880
881 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
882                               struct lustre_index_restore_unit *liru, void *buf,
883                               int bufsize)
884 {
885         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
886         struct lu_fid *tgt_fid = &liru->liru_cfid;
887         struct lu_fid bak_fid;
888         int rc;
889         ENTRY;
890
891         lustre_fid2lbx(buf, tgt_fid, bufsize);
892         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
893                          sizeof(*zde) / 8, (void *)zde);
894         if (rc)
895                 GOTO(log, rc);
896
897         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
898         if (rc)
899                 GOTO(log, rc);
900
901         /* The OI mapping for index may be invalid, since it will be
902          * re-created, not update the OI mapping, just cache it in RAM. */
903         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
904                                             liru->liru_clid);
905         if (!rc)
906                 rc = lustre_index_restore(env, &dev->od_dt_dev,
907                                 &liru->liru_pfid, tgt_fid, &bak_fid,
908                                 liru->liru_name, &dev->od_index_backup_list,
909                                 &dev->od_lock, buf, bufsize);
910         GOTO(log, rc);
911
912 log:
913         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
914                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
915 }
916
917 /**
918  * verify FID-in-LMA and OI entry for one object
919  *
920  * ios: Initial OI Scrub.
921  */
922 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
923                             const struct lu_fid *fid, uint64_t parent,
924                             uint64_t oid, const char *name,
925                             enum osd_lf_flags flags)
926 {
927         struct lustre_scrub *scrub = &dev->od_scrub;
928         struct scrub_file *sf = &scrub->os_file;
929         struct lustre_mdt_attrs *lma = NULL;
930         nvlist_t *nvbuf = NULL;
931         struct lu_fid tfid;
932         uint64_t oid2 = 0;
933         __u64 flag = 0;
934         int size = 0;
935         int op = 0;
936         int rc;
937         ENTRY;
938
939         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
940         if (unlikely(rc == -ENOENT || rc == -EEXIST))
941                 RETURN(0);
942
943         if (rc && rc != -ENODATA) {
944                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
945                       "rc = %d\n", osd_name(dev), oid, rc);
946
947                 RETURN(rc);
948         }
949
950         if (!rc) {
951                 LASSERT(nvbuf != NULL);
952                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
953                                                (uchar_t **)&lma, &size);
954                 if (rc || size == 0) {
955                         LASSERT(lma == NULL);
956                         rc = -ENODATA;
957                 } else {
958                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
959                         lustre_lma_swab(lma);
960                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
961                                 nvlist_free(nvbuf);
962                                 RETURN(0);
963                         }
964
965                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
966                             osd_index_need_recreate(env, dev, oid)) {
967                                 if (parent == dev->od_root) {
968                                         lu_local_obj_fid(&tfid,
969                                                          OSD_FS_ROOT_OID);
970                                 } else {
971                                         rc = osd_get_fid_by_oid(env, dev,
972                                                                 parent, &tfid);
973                                         if (rc) {
974                                                 nvlist_free(nvbuf);
975                                                 RETURN(rc);
976                                         }
977                                 }
978
979                                 rc = lustre_liru_new(
980                                                 &dev->od_index_restore_list,
981                                                 &tfid, &lma->lma_self_fid, oid,
982                                                 name, strlen(name));
983                                 nvlist_free(nvbuf);
984                                 RETURN(rc);
985                         }
986
987                         tfid = lma->lma_self_fid;
988                         if (!(flags & OLF_NOT_BACKUP))
989                                 osd_ios_index_register(env, dev, &tfid, oid);
990                 }
991                 nvlist_free(nvbuf);
992         }
993
994         if (rc == -ENODATA) {
995                 if (!fid) {
996                         /* Skip the object without FID-in-LMA */
997                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
998                                osd_name(dev), oid);
999
1000                         RETURN(0);
1001                 }
1002
1003                 LASSERT(!fid_is_zero(fid));
1004
1005                 tfid = *fid;
1006                 if (flags & OLF_IDX_IN_FID) {
1007                         LASSERT(dev->od_index >= 0);
1008
1009                         tfid.f_oid = dev->od_index;
1010                 }
1011         }
1012
1013         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1014         if (rc) {
1015                 if (rc != -ENOENT) {
1016                         CWARN("%s: initial OI scrub failed to lookup fid for "
1017                               DFID"=>%llu: rc = %d\n",
1018                               osd_name(dev), PFID(&tfid), oid, rc);
1019
1020                         RETURN(rc);
1021                 }
1022
1023                 flag = SF_RECREATED;
1024                 op = DTO_INDEX_INSERT;
1025         } else {
1026                 if (oid == oid2)
1027                         RETURN(0);
1028
1029                 flag = SF_INCONSISTENT;
1030                 op = DTO_INDEX_UPDATE;
1031         }
1032
1033         if (!(sf->sf_flags & flag)) {
1034                 scrub_file_reset(scrub, dev->od_uuid, flag);
1035                 rc = scrub_file_store(env, scrub);
1036                 if (rc)
1037                         RETURN(rc);
1038         }
1039
1040         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1041
1042         RETURN(rc > 0 ? 0 : rc);
1043 }
1044
1045 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1046                              const char *name, uint64_t parent, uint64_t oid,
1047                              enum osd_lf_flags flags, bool is_dir)
1048 {
1049         int rc;
1050         ENTRY;
1051
1052         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1053         if (!rc && is_dir)
1054                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1055                                       osd_ios_varfid_hd);
1056
1057         RETURN(rc);
1058 }
1059
1060 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1061                           const char *name, uint64_t parent, uint64_t oid,
1062                           enum osd_lf_flags flags, bool is_dir)
1063 {
1064         struct lu_fid tfid;
1065         int rc;
1066         ENTRY;
1067
1068         /* skip any non-DFID format name */
1069         if (name[0] != '[')
1070                 RETURN(0);
1071
1072         /* skip the start '[' */
1073         sscanf(&name[1], SFID, RFID(&tfid));
1074         if (fid_is_sane(&tfid))
1075                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1076         else
1077                 rc = -EIO;
1078
1079         RETURN(rc);
1080 }
1081
1082 /*
1083  * General scanner for the directories execpt /ROOT during initial OI scrub.
1084  * It scans the name entries under the given directory one by one. For each
1085  * entry, verifies its OI mapping via the given @handle_dirent.
1086  */
1087 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1088                               uint64_t parent, handle_dirent_t handle_dirent,
1089                               enum osd_lf_flags flags)
1090 {
1091         struct osd_thread_info *info = osd_oti_get(env);
1092         struct luz_direntry *zde = &info->oti_zde;
1093         zap_attribute_t *za = &info->oti_za;
1094         zap_cursor_t *zc = &info->oti_zc;
1095         int rc;
1096         ENTRY;
1097
1098         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1099         rc = -zap_cursor_retrieve(zc, za);
1100         if (rc == -ENOENT)
1101                 zap_cursor_advance(zc);
1102         else if (rc)
1103                 GOTO(log, rc);
1104
1105         while (1) {
1106                 rc = -zap_cursor_retrieve(zc, za);
1107                 if (rc)
1108                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1109
1110                 /* skip the entry started with '.' */
1111                 if (likely(za->za_name[0] != '.')) {
1112                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1113                                         za->za_integer_length,
1114                                         sizeof(*zde) / za->za_integer_length,
1115                                         (void *)zde);
1116                         if (rc) {
1117                                 CWARN("%s: initial OI scrub failed to lookup "
1118                                       "%s under %llu: rc = %d\n",
1119                                       osd_name(dev), za->za_name, parent, rc);
1120                                 continue;
1121                         }
1122
1123                         rc = handle_dirent(env, dev, za->za_name, parent,
1124                                         zde->lzd_reg.zde_dnode, flags,
1125                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1126                                         true : false);
1127                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1128                                "%llu: rc = %d\n",
1129                                osd_name(dev), za->za_name, parent, rc);
1130                 }
1131
1132                 zap_cursor_advance(zc);
1133         }
1134
1135 log:
1136         if (rc)
1137                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1138                       "rc = %d\n", osd_name(dev), parent, rc);
1139         zap_cursor_fini(zc);
1140
1141         return rc;
1142 }
1143
1144 /*
1145  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1146  * be scanned during the initial OI scrub, instead, only the .lustre and the
1147  * sub-items under .lustre will be handled.
1148  */
1149 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1150                            uint64_t parent, handle_dirent_t handle_dirent,
1151                            enum osd_lf_flags flags)
1152 {
1153         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1154         const struct osd_lf_map *map;
1155         uint64_t oid;
1156         int rc;
1157         int rc1 = 0;
1158         ENTRY;
1159
1160         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1161                             sizeof(*zde) / 8, (void *)zde);
1162         if (rc == -ENOENT) {
1163                 /* The .lustre directory is lost. That is not fatal. It can
1164                  * be re-created in the subsequent MDT start processing. */
1165                 RETURN(0);
1166         }
1167
1168         if (rc) {
1169                 CWARN("%s: initial OI scrub failed to find .lustre: "
1170                       "rc = %d\n", osd_name(dev), rc);
1171
1172                 RETURN(rc);
1173         }
1174
1175         oid = zde->lzd_reg.zde_dnode;
1176         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1177                               dot_lustre_name, 0);
1178         if (rc)
1179                 RETURN(rc);
1180
1181         for (map = osd_dl_maps; map->olm_name; map++) {
1182                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1183                                     sizeof(*zde) / 8, (void *)zde);
1184                 if (rc) {
1185                         if (rc != -ENOENT)
1186                                 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1187                                       osd_name(dev), map->olm_name, rc);
1188                         else if (!fid_is_zero(&map->olm_fid))
1189                                 /* Try to remove the stale OI mapping. */
1190                                 osd_scrub_refresh_mapping(env, dev,
1191                                                 &map->olm_fid, 0,
1192                                                 DTO_INDEX_DELETE, true,
1193                                                 map->olm_name);
1194                         continue;
1195                 }
1196
1197                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1198                                       zde->lzd_reg.zde_dnode, map->olm_name,
1199                                       map->olm_flags);
1200                 if (rc)
1201                         rc1 = rc;
1202         }
1203
1204         RETURN(rc1);
1205 }
1206
1207 static void osd_initial_OI_scrub(const struct lu_env *env,
1208                                  struct osd_device *dev)
1209 {
1210         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1211         const struct osd_lf_map *map;
1212         int rc;
1213         ENTRY;
1214
1215         for (map = osd_lf_maps; map->olm_name; map++) {
1216                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1217                                     sizeof(*zde) / 8, (void *)zde);
1218                 if (rc) {
1219                         if (rc != -ENOENT)
1220                                 CWARN("%s: initial OI scrub failed "
1221                                       "to find the entry %s: rc = %d\n",
1222                                       osd_name(dev), map->olm_name, rc);
1223                         else if (!fid_is_zero(&map->olm_fid))
1224                                 /* Try to remove the stale OI mapping. */
1225                                 osd_scrub_refresh_mapping(env, dev,
1226                                                 &map->olm_fid, 0,
1227                                                 DTO_INDEX_DELETE, true,
1228                                                 map->olm_name);
1229                         continue;
1230                 }
1231
1232                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1233                                       zde->lzd_reg.zde_dnode, map->olm_name,
1234                                       map->olm_flags);
1235                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1236                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1237                                          map->olm_flags, map->olm_scan_dir,
1238                                          map->olm_handle_dirent);
1239         }
1240
1241         while (!list_empty(&dev->od_ios_list)) {
1242                 struct osd_ios_item *item;
1243
1244                 item = list_entry(dev->od_ios_list.next,
1245                                   struct osd_ios_item, oii_list);
1246                 list_del_init(&item->oii_list);
1247                 item->oii_scan_dir(env, dev, item->oii_parent,
1248                                    item->oii_handle_dirent, item->oii_flags);
1249                 OBD_FREE_PTR(item);
1250         }
1251
1252         if (!list_empty(&dev->od_index_restore_list)) {
1253                 char *buf;
1254
1255                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1256                 if (!buf)
1257                         CERROR("%s: not enough RAM for rebuild index\n",
1258                                osd_name(dev));
1259
1260                 while (!list_empty(&dev->od_index_restore_list)) {
1261                         struct lustre_index_restore_unit *liru;
1262
1263                         liru = list_entry(dev->od_index_restore_list.next,
1264                                           struct lustre_index_restore_unit,
1265                                           liru_link);
1266                         list_del(&liru->liru_link);
1267                         if (buf)
1268                                 osd_index_restore(env, dev, liru, buf,
1269                                                   INDEX_BACKUP_BUFSIZE);
1270                         OBD_FREE(liru, liru->liru_len);
1271                 }
1272
1273                 if (buf)
1274                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1275         }
1276
1277         EXIT;
1278 }
1279
1280 /* OI scrub start/stop */
1281
1282 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1283                     __u32 flags)
1284 {
1285         int rc;
1286         ENTRY;
1287
1288         if (dev->od_dt_dev.dd_rdonly)
1289                 RETURN(-EROFS);
1290
1291         /* od_otable_sem: prevent concurrent start/stop */
1292         down(&dev->od_otable_sem);
1293         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1294         up(&dev->od_otable_sem);
1295
1296         RETURN(rc == -EALREADY ? 0 : rc);
1297 }
1298
1299 void osd_scrub_stop(struct osd_device *dev)
1300 {
1301         struct lustre_scrub *scrub = &dev->od_scrub;
1302         ENTRY;
1303
1304         /* od_otable_sem: prevent concurrent start/stop */
1305         down(&dev->od_otable_sem);
1306         spin_lock(&scrub->os_lock);
1307         scrub->os_paused = 1;
1308         spin_unlock(&scrub->os_lock);
1309         scrub_stop(scrub);
1310         up(&dev->od_otable_sem);
1311
1312         EXIT;
1313 }
1314
1315 /* OI scrub setup/cleanup */
1316
1317 static const char osd_scrub_name[] = "OI_scrub";
1318
1319 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev,
1320                     time64_t interval, bool resetoi)
1321 {
1322         struct osd_thread_info *info = osd_oti_get(env);
1323         struct lustre_scrub *scrub = &dev->od_scrub;
1324         struct scrub_file *sf = &scrub->os_file;
1325         struct lu_fid *fid = &info->oti_fid;
1326         struct dt_object *obj;
1327         uint64_t oid;
1328         int rc = 0;
1329         bool dirty = false;
1330         ENTRY;
1331
1332         memcpy(dev->od_uuid.b,
1333                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1334                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1335         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1336         init_rwsem(&scrub->os_rwsem);
1337         spin_lock_init(&scrub->os_lock);
1338         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1339         scrub->os_name = osd_name(dev);
1340         scrub->os_auto_scrub_interval = interval;
1341
1342         /* 'What the @fid is' is not imporatant, because the object
1343          * has no OI mapping, and only is visible inside the OSD.*/
1344         fid->f_seq = FID_SEQ_IGIF_MAX;
1345         if (dev->od_is_ost)
1346                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1347         else
1348                 fid->f_oid = dev->od_index + 1;
1349         fid->f_ver = 0;
1350         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1351                                     osd_scrub_name, &oid, fid, false);
1352         if (rc)
1353                 RETURN(rc);
1354
1355         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1356         if (rc)
1357                 RETURN(rc);
1358
1359         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1360         if (IS_ERR_OR_NULL(obj))
1361                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1362
1363         obj->do_body_ops = &osd_body_scrub_ops;
1364         scrub->os_obj = obj;
1365         rc = scrub_file_load(env, scrub);
1366         if (rc == -ENOENT || rc == -EFAULT) {
1367                 scrub_file_init(scrub, dev->od_uuid);
1368                 dirty = true;
1369         } else if (rc < 0) {
1370                 GOTO(cleanup_obj, rc);
1371         } else {
1372                 if (!uuid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1373                         CDEBUG(D_LFSCK,
1374                                "%s: UUID has been changed from %pU to %pU\n",
1375                                osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1376                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1377                         dirty = true;
1378                 } else if (sf->sf_status == SS_SCANNING) {
1379                         sf->sf_status = SS_CRASHED;
1380                         dirty = true;
1381                 }
1382
1383                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1384                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1385                                       osd_name(dev), sf->sf_oi_count,
1386                                       osd_oi_count);
1387                         sf->sf_oi_count = osd_oi_count;
1388                         dirty = true;
1389                 }
1390         }
1391
1392         if (sf->sf_pos_last_checkpoint != 0)
1393                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1394         else
1395                 scrub->os_pos_current = 1;
1396
1397         if (dirty) {
1398                 rc = scrub_file_store(env, scrub);
1399                 if (rc)
1400                         GOTO(cleanup_obj, rc);
1401         }
1402
1403         /* Initialize OI files. */
1404         rc = osd_oi_init(env, dev, resetoi);
1405         if (rc < 0)
1406                 GOTO(cleanup_obj, rc);
1407
1408         if (!dev->od_dt_dev.dd_rdonly)
1409                 osd_initial_OI_scrub(env, dev);
1410
1411         if (!dev->od_dt_dev.dd_rdonly &&
1412             scrub->os_auto_scrub_interval != AS_NEVER &&
1413             ((sf->sf_status == SS_PAUSED) ||
1414              (sf->sf_status == SS_CRASHED &&
1415               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1416                               SF_UPGRADE | SF_AUTO)) ||
1417              (sf->sf_status == SS_INIT &&
1418               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1419                               SF_UPGRADE))))
1420                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1421
1422         if (rc)
1423                 GOTO(cleanup_oi, rc);
1424
1425         RETURN(0);
1426
1427 cleanup_oi:
1428         osd_oi_fini(env, dev);
1429 cleanup_obj:
1430         dt_object_put_nocache(env, scrub->os_obj);
1431         scrub->os_obj = NULL;
1432
1433         return rc;
1434 }
1435
1436 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1437 {
1438         struct lustre_scrub *scrub = &dev->od_scrub;
1439
1440         LASSERT(!dev->od_otable_it);
1441
1442         if (scrub->os_obj) {
1443                 osd_scrub_stop(dev);
1444                 dt_object_put_nocache(env, scrub->os_obj);
1445                 scrub->os_obj = NULL;
1446         }
1447
1448         if (dev->od_oi_table)
1449                 osd_oi_fini(env, dev);
1450 }
1451
1452 /* object table based iteration APIs */
1453
1454 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1455                                        struct dt_object *dt, __u32 attr)
1456 {
1457         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1458         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1459         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1460         struct lustre_scrub *scrub = &dev->od_scrub;
1461         struct osd_otable_it *it;
1462         __u32 start = 0;
1463         int rc;
1464         ENTRY;
1465
1466         if (dev->od_dt_dev.dd_rdonly)
1467                 RETURN(ERR_PTR(-EROFS));
1468
1469         /* od_otable_sem: prevent concurrent init/fini */
1470         down(&dev->od_otable_sem);
1471         if (dev->od_otable_it)
1472                 GOTO(out, it = ERR_PTR(-EALREADY));
1473
1474         OBD_ALLOC_PTR(it);
1475         if (!it)
1476                 GOTO(out, it = ERR_PTR(-ENOMEM));
1477
1478         if (flags & DOIF_OUTUSED)
1479                 it->ooi_used_outside = 1;
1480
1481         if (flags & DOIF_RESET)
1482                 start |= SS_RESET;
1483
1484         if (valid & DOIV_ERROR_HANDLE) {
1485                 if (flags & DOIF_FAILOUT)
1486                         start |= SS_SET_FAILOUT;
1487                 else
1488                         start |= SS_CLEAR_FAILOUT;
1489         }
1490
1491         if (valid & DOIV_DRYRUN) {
1492                 if (flags & DOIF_DRYRUN)
1493                         start |= SS_SET_DRYRUN;
1494                 else
1495                         start |= SS_CLEAR_DRYRUN;
1496         }
1497
1498         /* XXX: dmu_object_next() does NOT find dnodes allocated
1499          *      in the current non-committed txg, so we force txg
1500          *      commit to find all existing dnodes ... */
1501         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1502
1503         dev->od_otable_it = it;
1504         it->ooi_dev = dev;
1505         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1506         if (rc == -EALREADY) {
1507                 it->ooi_pos = 1;
1508         } else if (rc < 0) {
1509                 dev->od_otable_it = NULL;
1510                 OBD_FREE_PTR(it);
1511                 it = ERR_PTR(rc);
1512         } else {
1513                 it->ooi_pos = scrub->os_pos_current;
1514         }
1515
1516         GOTO(out, it);
1517
1518 out:
1519         up(&dev->od_otable_sem);
1520         return (struct dt_it *)it;
1521 }
1522
1523 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1524 {
1525         struct osd_otable_it *it = (struct osd_otable_it *)di;
1526         struct osd_device *dev = it->ooi_dev;
1527
1528         /* od_otable_sem: prevent concurrent init/fini */
1529         down(&dev->od_otable_sem);
1530         scrub_stop(&dev->od_scrub);
1531         LASSERT(dev->od_otable_it == it);
1532
1533         dev->od_otable_it = NULL;
1534         up(&dev->od_otable_sem);
1535         OBD_FREE_PTR(it);
1536 }
1537
1538 static int osd_otable_it_get(const struct lu_env *env,
1539                              struct dt_it *di, const struct dt_key *key)
1540 {
1541         return 0;
1542 }
1543
1544 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1545 {
1546 }
1547
1548 static void osd_otable_it_preload(const struct lu_env *env,
1549                                   struct osd_otable_it *it)
1550 {
1551         struct osd_device *dev = it->ooi_dev;
1552         int rc;
1553
1554         /* can go negative on the very first access to the iterator
1555          * or if some non-Lustre objects were found */
1556         if (unlikely(it->ooi_prefetched < 0))
1557                 it->ooi_prefetched = 0;
1558
1559         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1560                 return;
1561
1562         if (it->ooi_prefetched_dnode == 0)
1563                 it->ooi_prefetched_dnode = it->ooi_pos;
1564
1565         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1566                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1567                                       B_FALSE, 0);
1568                 if (rc)
1569                         break;
1570
1571                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1572                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1573                 it->ooi_prefetched++;
1574         }
1575 }
1576
1577 static inline int
1578 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1579 {
1580         spin_lock(&scrub->os_lock);
1581         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1582             !scrub->os_running)
1583                 it->ooi_waiting = 0;
1584         else
1585                 it->ooi_waiting = 1;
1586         spin_unlock(&scrub->os_lock);
1587
1588         return !it->ooi_waiting;
1589 }
1590
1591 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1592 {
1593         struct osd_otable_it *it = (struct osd_otable_it *)di;
1594         struct osd_device *dev = it->ooi_dev;
1595         struct lustre_scrub *scrub = &dev->od_scrub;
1596         struct lustre_mdt_attrs *lma = NULL;
1597         nvlist_t *nvbuf = NULL;
1598         int rc, size = 0;
1599         bool locked;
1600         ENTRY;
1601
1602         LASSERT(it->ooi_user_ready);
1603         fid_zero(&it->ooi_fid);
1604
1605         if (unlikely(it->ooi_all_cached))
1606                 RETURN(1);
1607
1608 again:
1609         if (nvbuf) {
1610                 nvlist_free(nvbuf);
1611                 nvbuf = NULL;
1612                 lma = NULL;
1613                 size = 0;
1614         }
1615
1616         if (it->ooi_pos >= scrub->os_pos_current)
1617                 wait_var_event(scrub,
1618                                osd_otable_it_wakeup(scrub, it));
1619
1620         if (!scrub->os_running && !it->ooi_used_outside)
1621                 GOTO(out, rc = 1);
1622
1623         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1624         if (rc) {
1625                 if (unlikely(rc == -ESRCH)) {
1626                         it->ooi_all_cached = 1;
1627                         rc = 1;
1628                 }
1629
1630                 GOTO(out, rc);
1631         }
1632
1633         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1634
1635         locked = false;
1636         if (!scrub->os_full_speed) {
1637                 spin_lock(&scrub->os_lock);
1638                 locked = true;
1639         }
1640         it->ooi_prefetched--;
1641         if (!scrub->os_full_speed) {
1642                 if (scrub->os_waiting) {
1643                         scrub->os_waiting = 0;
1644                         wake_up_var(scrub);
1645                 }
1646         }
1647         if (locked)
1648                 spin_unlock(&scrub->os_lock);
1649
1650         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1651                 goto again;
1652
1653         if (rc)
1654                 GOTO(out, rc);
1655
1656         LASSERT(nvbuf != NULL);
1657         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1658                                        (uchar_t **)&lma, &size);
1659         if (rc || size == 0)
1660                 /* It is either non-Lustre object or OSD internal object,
1661                  * ignore it, go ahead */
1662                 goto again;
1663
1664         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1665         lustre_lma_swab(lma);
1666         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1667                      lma->lma_incompat & LMAI_AGENT))
1668                 goto again;
1669
1670         it->ooi_fid = lma->lma_self_fid;
1671
1672         GOTO(out, rc = 0);
1673
1674 out:
1675         if (nvbuf)
1676                 nvlist_free(nvbuf);
1677
1678         if (!rc && scrub->os_full_speed)
1679                 osd_otable_it_preload(env, it);
1680
1681         return rc;
1682 }
1683
1684 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1685                                         const struct dt_it *di)
1686 {
1687         return NULL;
1688 }
1689
1690 static int osd_otable_it_key_size(const struct lu_env *env,
1691                                   const struct dt_it *di)
1692 {
1693         return sizeof(__u64);
1694 }
1695
1696 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1697                              struct dt_rec *rec, __u32 attr)
1698 {
1699         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1700         struct lu_fid *fid = (struct lu_fid *)rec;
1701
1702         *fid = it->ooi_fid;
1703         return 0;
1704 }
1705
1706 static __u64 osd_otable_it_store(const struct lu_env *env,
1707                                  const struct dt_it *di)
1708 {
1709         struct osd_otable_it *it = (struct osd_otable_it *)di;
1710
1711         return it->ooi_pos;
1712 }
1713
1714 /**
1715  * Set the OSD layer iteration start position as the specified hash.
1716  */
1717 static int osd_otable_it_load(const struct lu_env *env,
1718                               const struct dt_it *di, __u64 hash)
1719 {
1720         struct osd_otable_it *it = (struct osd_otable_it *)di;
1721         struct osd_device *dev = it->ooi_dev;
1722         struct lustre_scrub *scrub = &dev->od_scrub;
1723         int rc;
1724         ENTRY;
1725
1726         /* Forbid to set iteration position after iteration started. */
1727         if (it->ooi_user_ready)
1728                 RETURN(-EPERM);
1729
1730         if (hash > OSD_OTABLE_MAX_HASH)
1731                 hash = OSD_OTABLE_MAX_HASH;
1732
1733         /* The hash is the last checkpoint position,
1734          * we will start from the next one. */
1735         it->ooi_pos = hash + 1;
1736         it->ooi_prefetched = 0;
1737         it->ooi_prefetched_dnode = 0;
1738         it->ooi_user_ready = 1;
1739         if (!scrub->os_full_speed)
1740                 wake_up_var(scrub);
1741
1742         /* Unplug OSD layer iteration by the first next() call. */
1743         rc = osd_otable_it_next(env, (struct dt_it *)it);
1744
1745         RETURN(rc);
1746 }
1747
1748 static int osd_otable_it_key_rec(const struct lu_env *env,
1749                                  const struct dt_it *di, void *key_rec)
1750 {
1751         return 0;
1752 }
1753
1754 const struct dt_index_operations osd_otable_ops = {
1755         .dio_it = {
1756                 .init     = osd_otable_it_init,
1757                 .fini     = osd_otable_it_fini,
1758                 .get      = osd_otable_it_get,
1759                 .put      = osd_otable_it_put,
1760                 .next     = osd_otable_it_next,
1761                 .key      = osd_otable_it_key,
1762                 .key_size = osd_otable_it_key_size,
1763                 .rec      = osd_otable_it_rec,
1764                 .store    = osd_otable_it_store,
1765                 .load     = osd_otable_it_load,
1766                 .key_rec  = osd_otable_it_key_rec,
1767         }
1768 };
1769
1770 /* high priority inconsistent items list APIs */
1771
1772 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1773                    const struct lu_fid *fid, uint64_t oid, bool insert)
1774 {
1775         struct lustre_scrub *scrub = &dev->od_scrub;
1776         struct osd_inconsistent_item *oii;
1777         bool wakeup = false;
1778         ENTRY;
1779
1780         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1781         OBD_ALLOC_PTR(oii);
1782         if (unlikely(!oii))
1783                 RETURN(-ENOMEM);
1784
1785         INIT_LIST_HEAD(&oii->oii_list);
1786         oii->oii_cache.oic_dev = dev;
1787         oii->oii_cache.oic_fid = *fid;
1788         oii->oii_cache.oic_dnode = oid;
1789         oii->oii_insert = insert;
1790
1791         spin_lock(&scrub->os_lock);
1792         if (!scrub->os_running) {
1793                 spin_unlock(&scrub->os_lock);
1794                 OBD_FREE_PTR(oii);
1795                 RETURN(-EAGAIN);
1796         }
1797
1798         if (list_empty(&scrub->os_inconsistent_items))
1799                 wakeup = true;
1800         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1801         spin_unlock(&scrub->os_lock);
1802
1803         if (wakeup)
1804                 wake_up_var(scrub);
1805
1806         RETURN(0);
1807 }
1808
1809 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1810                    uint64_t *oid)
1811 {
1812         struct lustre_scrub *scrub = &dev->od_scrub;
1813         struct osd_inconsistent_item *oii;
1814         int ret = -ENOENT;
1815         ENTRY;
1816
1817         spin_lock(&scrub->os_lock);
1818         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1819                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1820                         *oid = oii->oii_cache.oic_dnode;
1821                         ret = 0;
1822                         break;
1823                 }
1824         }
1825         spin_unlock(&scrub->os_lock);
1826
1827         RETURN(ret);
1828 }
1829
1830 typedef int (*scan_dir_helper_t)(const struct lu_env *env,
1831                                  struct osd_device *dev, uint64_t dir_oid,
1832                                  struct osd_zap_it *ozi);
1833
1834 static int osd_scan_dir(const struct lu_env *env, struct osd_device *dev,
1835                         uint64_t id, scan_dir_helper_t cb)
1836 {
1837         struct osd_zap_it *it;
1838         struct luz_direntry *zde;
1839         zap_attribute_t *za;
1840         int rc;
1841
1842         ENTRY;
1843
1844         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
1845         if (it == NULL)
1846                 RETURN(-ENOMEM);
1847
1848         rc = osd_zap_cursor_init(&it->ozi_zc, dev->od_os, id, 0);
1849         if (rc != 0)
1850                 GOTO(out, rc);
1851
1852         za = &it->ozi_za;
1853         zde = &it->ozi_zde;
1854         while (1) {
1855                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1856                 if (unlikely(rc)) {
1857                         if (rc == -ENOENT)
1858                                 rc = 0;
1859
1860                         break;
1861                 }
1862
1863                 if (name_is_dot_or_dotdot(za->za_name, strlen(za->za_name))) {
1864                         zap_cursor_advance(it->ozi_zc);
1865                         continue;
1866                 }
1867
1868                 strncpy(it->ozi_name, za->za_name, sizeof(it->ozi_name));
1869                 if (za->za_integer_length != 8) {
1870                         rc = -EIO;
1871                         break;
1872                 }
1873
1874                 rc = osd_zap_lookup(dev, it->ozi_zc->zc_zapobj, NULL,
1875                                     za->za_name, za->za_integer_length,
1876                                     sizeof(*zde) / za->za_integer_length, zde);
1877                 if (rc)
1878                         break;
1879
1880                 rc = cb(env, dev, id, it);
1881                 if (rc)
1882                         break;
1883
1884                 zap_cursor_advance(it->ozi_zc);
1885         }
1886         osd_zap_cursor_fini(it->ozi_zc);
1887
1888 out:
1889         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
1890         RETURN(rc);
1891 }
1892
1893 static int osd_remove_ml_file(const struct lu_env *env, struct osd_device *dev,
1894                               uint64_t dir, uint64_t id, struct lu_fid *fid,
1895                               char *name)
1896 {
1897         struct osd_thread_info *info = osd_oti_get(env);
1898         struct dt_object *dt;
1899         struct osd_object *obj = NULL;
1900         dmu_tx_t *tx;
1901         sa_handle_t *hdl;
1902         uint64_t nlink;
1903         int rc;
1904
1905         rc = -sa_handle_get(dev->od_os, id, NULL, SA_HDL_PRIVATE, &hdl);
1906         if (rc)
1907                 RETURN(rc);
1908
1909         dt = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1910         if (IS_ERR(dt))
1911                 RETURN(PTR_ERR(dt));
1912
1913         if (dt) {
1914                 obj = osd_dt_obj(dt);
1915                 down_read(&obj->oo_guard);
1916         }
1917
1918         rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
1919         if (rc)
1920                 GOTO(out, rc);
1921
1922         if (nlink <= 1) {
1923                 CERROR("%s: multi-link file O/%s/%s/%s has nlink %llu\n",
1924                        osd_name(dev), info->oti_seq_name, info->oti_dir_name,
1925                        name, nlink);
1926                 GOTO(out, rc = 0);
1927         }
1928
1929         tx = dmu_tx_create(dev->od_os);
1930         if (!tx) {
1931                 CERROR("%s: fail to create tx to remove multi-link file!\n",
1932                        osd_name(dev));
1933                 GOTO(out, rc = -ENOMEM);
1934         }
1935
1936         dmu_tx_hold_zap(tx, dir, FALSE, NULL);
1937         rc = -dmu_tx_assign(tx, TXG_WAIT);
1938         if (rc)
1939                 GOTO(abort, rc);
1940
1941         nlink--;
1942         rc = -sa_update(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink), tx);
1943         if (rc)
1944                 GOTO(abort, rc);
1945
1946         rc = -zap_remove(dev->od_os, dir, name, tx);
1947         if (rc)
1948                 GOTO(abort, rc);
1949
1950         dmu_tx_commit(tx);
1951         GOTO(out, rc);
1952
1953 abort:
1954         dmu_tx_abort(tx);
1955
1956 out:
1957         if (dt) {
1958                 up_read(&obj->oo_guard);
1959                 dt_object_put_nocache(env, dt);
1960         }
1961
1962         sa_handle_destroy(hdl);
1963         RETURN(rc);
1964 }
1965
1966 static int osd_scan_ml_file(const struct lu_env *env, struct osd_device *dev,
1967                             uint64_t dir_oid, struct osd_zap_it *ozi)
1968 {
1969         struct osd_thread_info *info = osd_oti_get(env);
1970         struct lu_fid *fid = &info->oti_fid;
1971         struct ost_id *ostid = &info->oti_ostid;
1972         char name[32];
1973         u64 seq;
1974         int rc = 0;
1975
1976         ENTRY;
1977
1978         rc = osd_get_fid_by_oid(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode, fid);
1979         if (rc)
1980                 RETURN(rc);
1981
1982         seq = fid_seq(fid);
1983         fid_to_ostid(fid, ostid);
1984
1985         snprintf(name, sizeof(name), (fid_seq_is_rsvd(seq) ||
1986                                       fid_seq_is_mdt0(seq)) ? "%llu" : "%llx",
1987                                       fid_seq_is_idif(seq) ? 0 : seq);
1988         if (strcmp(info->oti_seq_name, name) != 0)
1989                 GOTO(fix, rc);
1990
1991         snprintf(name, sizeof(name), "d%d",
1992                 (int)ostid_id(ostid) % OSD_OST_MAP_SIZE);
1993         if (strcmp(info->oti_dir_name, name) != 0)
1994                 GOTO(fix, rc);
1995
1996         snprintf(name, sizeof(name), "%llu", ostid_id(ostid));
1997         if (strcmp(ozi->ozi_name, name) == 0)
1998                 RETURN(0);
1999
2000 fix:
2001         CDEBUG(D_LFSCK, "%s: the file O/%s/%s/%s is corrupted\n",
2002                osd_name(dev), info->oti_seq_name, info->oti_dir_name,
2003                ozi->ozi_name);
2004
2005         rc = osd_remove_ml_file(env, dev, dir_oid,
2006                                 ozi->ozi_zde.lzd_reg.zde_dnode, fid,
2007                                 ozi->ozi_name);
2008         RETURN(rc);
2009 }
2010
2011 static int osd_scan_ml_file_dir(const struct lu_env *env,
2012                                 struct osd_device *dev, uint64_t dir_oid,
2013                                 struct osd_zap_it *ozi)
2014 {
2015         struct osd_thread_info *info = osd_oti_get(env);
2016
2017         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2018                 return 0;
2019
2020         info->oti_dir_name = ozi->ozi_name;
2021         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2022                             osd_scan_ml_file);
2023 }
2024
2025 static int osd_scan_ml_file_seq(const struct lu_env *env,
2026                                 struct osd_device *dev, uint64_t dir_oid,
2027                                 struct osd_zap_it *ozi)
2028 {
2029         struct osd_thread_info *info = osd_oti_get(env);
2030
2031         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2032                 return 0;
2033
2034         info->oti_seq_name = ozi->ozi_name;
2035         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2036                             osd_scan_ml_file_dir);
2037 }
2038
2039 static int osd_scan_ml_file_main(const struct lu_env *env,
2040                                  struct osd_device *dev)
2041 {
2042         return osd_scan_dir(env, dev, dev->od_O_id, osd_scan_ml_file_seq);
2043 }