Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
59 {
60         return it->ooi_prefetched < OTABLE_PREFETCH;
61 }
62
63 /**
64  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
65  *
66  * \retval   1, changed nothing
67  * \retval   0, changed successfully
68  * \retval -ve, on error
69  */
70 int osd_scrub_refresh_mapping(const struct lu_env *env,
71                               struct osd_device *dev,
72                               const struct lu_fid *fid,
73                               uint64_t oid, enum dt_txn_op ops,
74                               bool force, const char *name)
75 {
76         struct osd_thread_info *info = osd_oti_get(env);
77         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
78         char *buf = info->oti_str;
79         dmu_tx_t *tx = NULL;
80         dnode_t *dn = NULL;
81         uint64_t zapid;
82         int rc;
83
84         ENTRY;
85         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
86                 GOTO(log, rc = 0);
87
88         tx = dmu_tx_create(dev->od_os);
89         if (!tx)
90                 GOTO(log, rc = -ENOMEM);
91
92         zapid = osd_get_name_n_idx(env, dev, fid, buf,
93                                    sizeof(info->oti_str), &dn);
94         osd_tx_hold_zap(tx, zapid, dn,
95                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
96         rc = -dmu_tx_assign(tx, TXG_WAIT);
97         if (rc) {
98                 dmu_tx_abort(tx);
99                 GOTO(log, rc);
100         }
101
102         switch (ops) {
103         case DTO_INDEX_UPDATE:
104                 zde->zde_pad = 0;
105                 zde->zde_dnode = oid;
106                 zde->zde_type = 0; /* The type in OI mapping is useless. */
107                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
108                                  zde, tx);
109                 if (unlikely(rc == -ENOENT)) {
110                         /* Some unlink thread may removed the OI mapping. */
111                         rc = 1;
112                 }
113                 break;
114         case DTO_INDEX_INSERT:
115                 zde->zde_pad = 0;
116                 zde->zde_dnode = oid;
117                 zde->zde_type = 0; /* The type in OI mapping is useless. */
118                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
119                                  zde, tx);
120                 if (unlikely(rc == -EEXIST))
121                         rc = 1;
122                 break;
123         case DTO_INDEX_DELETE:
124                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
125                 if (rc == -ENOENT) {
126                         /* It is normal that the unlink thread has removed the
127                          * OI mapping already.
128                          */
129                         rc = 1;
130                 }
131                 break;
132         default:
133                 LASSERTF(0, "Unexpected ops %d\n", ops);
134                 rc = -EINVAL;
135                 break;
136         }
137
138         dmu_tx_commit(tx);
139         GOTO(log, rc);
140
141 log:
142         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
143                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
144                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
145
146         return rc;
147 }
148
149 static int
150 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
151                        const struct lu_fid *fid, uint64_t oid, int val)
152 {
153         struct lustre_scrub *scrub = &dev->od_scrub;
154         struct scrub_file *sf = &scrub->os_file;
155         struct osd_inconsistent_item *oii = NULL;
156         nvlist_t *nvbuf = NULL;
157         dnode_t *dn = NULL;
158         uint64_t oid2;
159         int ops = DTO_INDEX_UPDATE;
160         int rc;
161
162         ENTRY;
163         down_write(&scrub->os_rwsem);
164         scrub->os_new_checked++;
165         if (val < 0)
166                 GOTO(out, rc = val);
167
168         if (scrub->os_in_prior)
169                 oii = list_first_entry(&scrub->os_inconsistent_items,
170                                        struct osd_inconsistent_item, oii_list);
171
172         if (oid < sf->sf_pos_latest_start && !oii)
173                 GOTO(out, rc = 0);
174
175         if (oii && oii->oii_insert) {
176                 ops = DTO_INDEX_INSERT;
177                 goto zget;
178         }
179
180         rc = osd_fid_lookup(env, dev, fid, &oid2);
181         if (rc) {
182                 if (rc != -ENOENT)
183                         GOTO(out, rc);
184
185                 ops = DTO_INDEX_INSERT;
186
187 zget:
188                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
189                 if (rc) {
190                         /* Someone removed the object by race. */
191                         if (rc == -ENOENT || rc == -EEXIST)
192                                 rc = 0;
193                         GOTO(out, rc);
194                 }
195
196                 spin_lock(&scrub->os_lock);
197                 scrub->os_full_speed = 1;
198                 spin_unlock(&scrub->os_lock);
199
200                 sf->sf_flags |= SF_INCONSISTENT;
201         } else if (oid == oid2) {
202                 GOTO(out, rc = 0);
203         } else {
204                 struct lustre_mdt_attrs *lma = NULL;
205                 int size;
206
207                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
208                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
209                         goto update;
210                 if (rc)
211                         GOTO(out, rc);
212
213                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
214                                                (uchar_t **)&lma, &size);
215                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
216                         goto update;
217                 if (rc)
218                         GOTO(out, rc);
219
220                 lustre_lma_swab(lma);
221                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
222                         rc = -EEXIST;
223                         CDEBUG(D_LFSCK,
224                                "%s: the FID "DFID" is used by two objects: %llu and %llu (in OI): rc = %d\n",
225                                osd_name(dev), PFID(fid), oid, oid2, rc);
226
227                         GOTO(out, rc);
228                 }
229
230 update:
231                 spin_lock(&scrub->os_lock);
232                 scrub->os_full_speed = 1;
233                 spin_unlock(&scrub->os_lock);
234                 sf->sf_flags |= SF_INCONSISTENT;
235         }
236
237         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
238         if (!rc) {
239                 if (scrub->os_in_prior)
240                         sf->sf_items_updated_prior++;
241                 else
242                         sf->sf_items_updated++;
243         }
244
245         GOTO(out, rc);
246
247 out:
248         if (dev->od_is_ost) {
249                 sa_handle_t *hdl;
250                 uint64_t nlink, mode;
251
252                 rc = -sa_handle_get(dev->od_os, oid, NULL, SA_HDL_PRIVATE,
253                                     &hdl);
254                 if (rc)
255                         GOTO(cleanup, rc);
256
257                 rc = -sa_lookup(hdl, SA_ZPL_MODE(dev), &mode, sizeof(mode));
258                 if (rc || !S_ISREG(mode)) {
259                         sa_handle_destroy(hdl);
260                         GOTO(cleanup, rc);
261                 }
262
263                 rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
264                 if (rc == 0 && nlink > 1)
265                         scrub->os_has_ml_file = 1;
266
267                 sa_handle_destroy(hdl);
268         }
269
270 cleanup:
271         if (nvbuf)
272                 nvlist_free(nvbuf);
273
274         if (rc < 0) {
275                 sf->sf_items_failed++;
276                 if (sf->sf_pos_first_inconsistent == 0 ||
277                     sf->sf_pos_first_inconsistent > oid)
278                         sf->sf_pos_first_inconsistent = oid;
279         } else {
280                 rc = 0;
281         }
282
283         /* There may be conflict unlink during the OI scrub,
284          * if happend, then remove the new added OI mapping.
285          */
286         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
287                 osd_scrub_refresh_mapping(env, dev, fid, oid,
288                                           DTO_INDEX_DELETE, false, NULL);
289         up_write(&scrub->os_rwsem);
290
291         if (dn)
292                 osd_dnode_rele(dn);
293
294         if (oii) {
295                 spin_lock(&scrub->os_lock);
296                 if (likely(!list_empty(&oii->oii_list)))
297                         list_del(&oii->oii_list);
298                 spin_unlock(&scrub->os_lock);
299                 OBD_FREE_PTR(oii);
300         }
301
302         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
303 }
304
305 /* iteration engine */
306
307 static inline int
308 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
309 {
310         spin_lock(&scrub->os_lock);
311         if (osd_scrub_has_window(it) ||
312             !list_empty(&scrub->os_inconsistent_items) ||
313             it->ooi_waiting || kthread_should_stop())
314                 scrub->os_waiting = 0;
315         else
316                 scrub->os_waiting = 1;
317         spin_unlock(&scrub->os_lock);
318
319         return !scrub->os_waiting;
320 }
321
322 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
323                           struct lu_fid *fid, uint64_t *oid)
324 {
325         struct lustre_scrub *scrub = &dev->od_scrub;
326         struct osd_otable_it *it = dev->od_otable_it;
327         struct lustre_mdt_attrs *lma = NULL;
328         nvlist_t *nvbuf = NULL;
329         int size = 0;
330         int rc = 0;
331
332         ENTRY;
333         if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
334                 wait_var_event_timeout(
335                         scrub,
336                         !list_empty(&scrub->os_inconsistent_items) ||
337                         kthread_should_stop(),
338                         cfs_time_seconds(cfs_fail_val));
339
340                 if (kthread_should_stop())
341                         RETURN(SCRUB_NEXT_EXIT);
342         }
343
344         if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
345                 spin_lock(&scrub->os_lock);
346                 scrub->os_running = 0;
347                 spin_unlock(&scrub->os_lock);
348                 RETURN(SCRUB_NEXT_CRASH);
349         }
350
351         if (CFS_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
352                 RETURN(SCRUB_NEXT_FATAL);
353
354 again:
355         if (nvbuf) {
356                 nvlist_free(nvbuf);
357                 nvbuf = NULL;
358                 lma = NULL;
359         }
360
361         if (!list_empty(&scrub->os_inconsistent_items)) {
362                 spin_lock(&scrub->os_lock);
363                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
364                         struct osd_inconsistent_item *oii;
365
366                         oii = list_first_entry(&scrub->os_inconsistent_items,
367                                                struct osd_inconsistent_item,
368                                                oii_list);
369                         *fid = oii->oii_cache.oic_fid;
370                         *oid = oii->oii_cache.oic_dnode;
371                         scrub->os_in_prior = 1;
372                         spin_unlock(&scrub->os_lock);
373
374                         GOTO(out, rc = 0);
375                 }
376                 spin_unlock(&scrub->os_lock);
377         }
378
379         if (!scrub->os_full_speed && !osd_scrub_has_window(it))
380                 wait_var_event(scrub, osd_scrub_wakeup(scrub, it));
381
382         if (kthread_should_stop())
383                 GOTO(out, rc = SCRUB_NEXT_EXIT);
384
385         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
386         if (rc)
387                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
388
389         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
390         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
391                 goto again;
392
393         if (rc)
394                 GOTO(out, rc);
395
396         LASSERT(nvbuf != NULL);
397         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
398                                        (uchar_t **)&lma, &size);
399         if (!rc) {
400                 lustre_lma_swab(lma);
401                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
402                            !(lma->lma_incompat & LMAI_AGENT))) {
403                         *fid = lma->lma_self_fid;
404                         *oid = scrub->os_pos_current;
405
406                         GOTO(out, rc = 0);
407                 }
408         }
409
410         if (!scrub->os_full_speed) {
411                 spin_lock(&scrub->os_lock);
412                 it->ooi_prefetched++;
413                 if (it->ooi_waiting) {
414                         it->ooi_waiting = 0;
415                         wake_up_var(scrub);
416                 }
417                 spin_unlock(&scrub->os_lock);
418         }
419
420         goto again;
421
422 out:
423         if (nvbuf)
424                 nvlist_free(nvbuf);
425
426         return rc;
427 }
428
429 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
430                           const struct lu_fid *fid, uint64_t oid, int rc)
431 {
432         struct lustre_scrub *scrub = &dev->od_scrub;
433         struct osd_otable_it *it = dev->od_otable_it;
434
435         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
436         if (!scrub->os_in_prior) {
437                 if (!scrub->os_full_speed) {
438                         spin_lock(&scrub->os_lock);
439                         it->ooi_prefetched++;
440                         if (it->ooi_waiting) {
441                                 it->ooi_waiting = 0;
442                                 wake_up_var(scrub);
443                         }
444                         spin_unlock(&scrub->os_lock);
445                 }
446         } else {
447                 spin_lock(&scrub->os_lock);
448                 scrub->os_in_prior = 0;
449                 spin_unlock(&scrub->os_lock);
450         }
451
452         if (rc)
453                 return rc;
454
455         rc = scrub_checkpoint(env, scrub);
456         if (rc) {
457                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: rc = %d\n",
458                        scrub->os_name, scrub->os_pos_current, rc);
459                 /* Continue, as long as the scrub itself can go ahead. */
460         }
461
462         return 0;
463 }
464
465 static int osd_scan_ml_file_main(const struct lu_env *env,
466                                  struct osd_device *dev);
467
468 static int osd_scrub_main(void *args)
469 {
470         struct lu_env env;
471         struct osd_device *dev = (struct osd_device *)args;
472         struct lustre_scrub *scrub = &dev->od_scrub;
473         struct lu_fid *fid;
474         uint64_t oid;
475         int rc = 0, ret;
476
477         ENTRY;
478         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
479         if (rc) {
480                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
481                        scrub->os_name, rc);
482                 GOTO(noenv, rc);
483         }
484
485         rc = scrub_thread_prep(&env, scrub, dev->od_uuid, 1);
486         if (rc) {
487                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
488                        scrub->os_name, rc);
489                 GOTO(out, rc);
490         }
491
492         if (!scrub->os_full_speed) {
493                 struct osd_otable_it *it = dev->od_otable_it;
494
495                 wait_var_event(scrub,
496                                it->ooi_user_ready ||
497                                kthread_should_stop());
498
499                 if (kthread_should_stop())
500                         GOTO(post, rc = 0);
501
502                 scrub->os_pos_current = it->ooi_pos;
503         }
504
505         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
506                scrub->os_name, scrub->os_start_flags,
507                scrub->os_pos_current);
508
509         fid = &osd_oti_get(&env)->oti_fid;
510         while (!rc && !kthread_should_stop()) {
511                 rc = osd_scrub_next(&env, dev, fid, &oid);
512                 switch (rc) {
513                 case SCRUB_NEXT_EXIT:
514                         GOTO(post, rc = 0);
515                 case SCRUB_NEXT_CRASH:
516                         spin_lock(&scrub->os_lock);
517                         scrub->os_running = 0;
518                         spin_unlock(&scrub->os_lock);
519                         GOTO(out, rc = -EINVAL);
520                 case SCRUB_NEXT_FATAL:
521                         GOTO(post, rc = -EINVAL);
522                 case SCRUB_NEXT_BREAK:
523                         GOTO(post, rc = 1);
524                 }
525
526                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
527         }
528
529         GOTO(post, rc);
530
531 post:
532         if (scrub->os_has_ml_file) {
533                 ret = osd_scan_ml_file_main(&env, dev);
534                 if (ret != 0)
535                         rc = ret;
536         }
537
538         rc = scrub_thread_post(&env, &dev->od_scrub, rc);
539         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
540                scrub->os_name, scrub->os_pos_current, rc);
541
542 out:
543         while (!list_empty(&scrub->os_inconsistent_items)) {
544                 struct osd_inconsistent_item *oii;
545
546                 oii = list_first_entry(&scrub->os_inconsistent_items,
547                                        struct osd_inconsistent_item, oii_list);
548                 list_del_init(&oii->oii_list);
549                 OBD_FREE_PTR(oii);
550         }
551
552         lu_env_fini(&env);
553
554 noenv:
555         spin_lock(&scrub->os_lock);
556         scrub->os_running = 0;
557         spin_unlock(&scrub->os_lock);
558         if (xchg(&scrub->os_task, NULL) == NULL)
559                 /* scrub_stop is waiting, we need to synchronize */
560                 wait_var_event(scrub, kthread_should_stop());
561         wake_up_var(scrub);
562         return rc;
563 }
564
565 /* initial OI scrub */
566
567 struct osd_lf_map;
568
569 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
570                                const char *, uint64_t, uint64_t,
571                                enum osd_lf_flags, bool);
572 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
573                              const char *, uint64_t, uint64_t,
574                              enum osd_lf_flags, bool);
575 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
576                           const char *, uint64_t, uint64_t,
577                           enum osd_lf_flags, bool);
578
579 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
580                           uint64_t, handle_dirent_t, enum osd_lf_flags);
581 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
582                               uint64_t, handle_dirent_t, enum osd_lf_flags);
583 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
584                            uint64_t, handle_dirent_t, enum osd_lf_flags);
585
586 struct osd_lf_map {
587         char                    *olm_name;
588         struct lu_fid            olm_fid;
589         enum osd_lf_flags        olm_flags;
590         scan_dir_t               olm_scan_dir;
591         handle_dirent_t          olm_handle_dirent;
592 };
593
594 /* Add the new introduced local files in the list in the future. */
595 static const struct osd_lf_map osd_lf_maps[] = {
596         /* CONFIGS */
597         {
598                 .olm_name               = MOUNT_CONFIGS_DIR,
599                 .olm_fid                = {
600                         .f_seq  = FID_SEQ_LOCAL_FILE,
601                         .f_oid  = MGS_CONFIGS_OID,
602                 },
603                 .olm_flags              = OLF_SCAN_SUBITEMS,
604                 .olm_scan_dir           = osd_ios_general_sd,
605                 .olm_handle_dirent      = osd_ios_varfid_hd,
606         },
607
608         /* NIDTBL_VERSIONS */
609         {
610                 .olm_name               = MGS_NIDTBL_DIR,
611                 .olm_flags              = OLF_SCAN_SUBITEMS,
612                 .olm_scan_dir           = osd_ios_general_sd,
613                 .olm_handle_dirent      = osd_ios_varfid_hd,
614         },
615
616         /* PENDING */
617         {
618                 .olm_name               = MDT_ORPHAN_DIR,
619         },
620
621         /* ROOT */
622         {
623                 .olm_name               = "ROOT",
624                 .olm_fid                = {
625                         .f_seq  = FID_SEQ_ROOT,
626                         .f_oid  = FID_OID_ROOT,
627                 },
628                 .olm_flags              = OLF_SCAN_SUBITEMS,
629                 .olm_scan_dir           = osd_ios_ROOT_sd,
630         },
631
632         /* fld */
633         {
634                 .olm_name               = "fld",
635                 .olm_fid                = {
636                         .f_seq  = FID_SEQ_LOCAL_FILE,
637                         .f_oid  = FLD_INDEX_OID,
638                 },
639         },
640
641         /* changelog_catalog */
642         {
643                 .olm_name               = CHANGELOG_CATALOG,
644         },
645
646         /* changelog_users */
647         {
648                 .olm_name               = CHANGELOG_USERS,
649         },
650
651         /* quota_master */
652         {
653                 .olm_name               = QMT_DIR,
654                 .olm_flags              = OLF_SCAN_SUBITEMS,
655                 .olm_scan_dir           = osd_ios_general_sd,
656                 .olm_handle_dirent      = osd_ios_varfid_hd,
657         },
658
659         /* quota_slave */
660         {
661                 .olm_name               = QSD_DIR,
662                 .olm_flags              = OLF_SCAN_SUBITEMS,
663                 .olm_scan_dir           = osd_ios_general_sd,
664                 .olm_handle_dirent      = osd_ios_varfid_hd,
665         },
666
667         /* LFSCK */
668         {
669                 .olm_name               = LFSCK_DIR,
670                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
671                 .olm_scan_dir           = osd_ios_general_sd,
672                 .olm_handle_dirent      = osd_ios_varfid_hd,
673         },
674
675         /* lfsck_bookmark */
676         {
677                 .olm_name               = LFSCK_BOOKMARK,
678         },
679
680         /* lfsck_layout */
681         {
682                 .olm_name               = LFSCK_LAYOUT,
683         },
684
685         /* lfsck_namespace */
686         {
687                 .olm_name               = LFSCK_NAMESPACE,
688         },
689
690         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
691          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
692          * for more details.
693          */
694
695         /* update_log */
696         {
697                 .olm_name               = "update_log",
698                 .olm_fid                = {
699                         .f_seq  = FID_SEQ_UPDATE_LOG,
700                 },
701                 .olm_flags              = OLF_IDX_IN_FID,
702         },
703
704         /* update_log_dir */
705         {
706                 .olm_name               = "update_log_dir",
707                 .olm_fid        = {
708                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
709                 },
710                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
711                 .olm_scan_dir           = osd_ios_general_sd,
712                 .olm_handle_dirent      = osd_ios_uld_hd,
713         },
714
715         /* hsm_actions */
716         {
717                 .olm_name               = HSM_ACTIONS,
718         },
719
720         /* nodemap */
721         {
722                 .olm_name               = LUSTRE_NODEMAP_NAME,
723         },
724
725         /* index_backup */
726         {
727                 .olm_name               = INDEX_BACKUP_DIR,
728                 .olm_fid                = {
729                         .f_seq  = FID_SEQ_LOCAL_FILE,
730                         .f_oid  = INDEX_BACKUP_OID,
731                 },
732                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
733                 .olm_scan_dir           = osd_ios_general_sd,
734                 .olm_handle_dirent      = osd_ios_varfid_hd,
735         },
736
737         {
738                 .olm_name               = NULL
739         }
740 };
741
742 /* Add the new introduced files under .lustre/ in the list in the future. */
743 static const struct osd_lf_map osd_dl_maps[] = {
744         /* .lustre/fid */
745         {
746                 .olm_name               = "fid",
747                 .olm_fid                = {
748                         .f_seq  = FID_SEQ_DOT_LUSTRE,
749                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
750                 },
751         },
752
753         /* .lustre/lost+found */
754         {
755                 .olm_name               = "lost+found",
756                 .olm_fid                = {
757                         .f_seq  = FID_SEQ_DOT_LUSTRE,
758                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
759                 },
760         },
761
762         {
763                 .olm_name               = NULL
764         }
765 };
766
767 struct osd_ios_item {
768         struct list_head        oii_list;
769         uint64_t                oii_parent;
770         enum osd_lf_flags       oii_flags;
771         scan_dir_t              oii_scan_dir;
772         handle_dirent_t         oii_handle_dirent;
773 };
774
775 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
776                             enum osd_lf_flags flags, scan_dir_t scan_dir,
777                             handle_dirent_t handle_dirent)
778 {
779         struct osd_ios_item *item;
780         int rc = 0;
781
782         OBD_ALLOC_PTR(item);
783         if (!item) {
784                 rc = -ENOMEM;
785                 CWARN("%s: initial OI scrub failed to add item for %llu: rc = %d\n",
786                       osd_name(dev), parent, rc);
787                 return rc;
788         }
789
790         INIT_LIST_HEAD(&item->oii_list);
791         item->oii_parent = parent;
792         item->oii_flags = flags;
793         item->oii_scan_dir = scan_dir;
794         item->oii_handle_dirent = handle_dirent;
795         list_add_tail(&item->oii_list, &dev->od_ios_list);
796
797         return rc;
798 }
799
800 static bool osd_index_need_recreate(const struct lu_env *env,
801                                     struct osd_device *dev, uint64_t oid)
802 {
803         struct osd_thread_info *info = osd_oti_get(env);
804         zap_attribute_t *za = &info->oti_za2;
805         zap_cursor_t *zc = &info->oti_zc2;
806         int rc;
807
808         ENTRY;
809         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
810         rc = -zap_cursor_retrieve(zc, za);
811         zap_cursor_fini(zc);
812         if (rc && rc != -ENOENT)
813                 RETURN(true);
814
815         RETURN(false);
816 }
817
818 static void osd_ios_index_register(const struct lu_env *env,
819                                    struct osd_device *osd,
820                                    const struct lu_fid *fid, uint64_t oid)
821 {
822         struct osd_thread_info *info = osd_oti_get(env);
823         zap_attribute_t *za = &info->oti_za2;
824         zap_cursor_t *zc = &info->oti_zc2;
825         struct zap_leaf_entry *le;
826         dnode_t *dn = NULL;
827         sa_handle_t *hdl;
828         __u64 mode = 0;
829         __u32 keysize = 0;
830         __u32 recsize = 0;
831         int rc;
832
833         ENTRY;
834         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
835         if (rc == -EEXIST || rc == -ENOENT)
836                 RETURN_EXIT;
837
838         if (rc < 0)
839                 GOTO(log, rc);
840
841         if (!osd_object_is_zap(dn))
842                 GOTO(log, rc = 1);
843
844         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
845         if (rc)
846                 GOTO(log, rc);
847
848         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
849         sa_handle_destroy(hdl);
850         if (rc)
851                 GOTO(log, rc);
852
853         if (!S_ISREG(mode))
854                 GOTO(log, rc = 1);
855
856         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
857         rc = -zap_cursor_retrieve(zc, za);
858         if (rc)
859                 /* Skip empty index object */
860                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
861
862         if (zc->zc_zap->zap_ismicro ||
863             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
864                 GOTO(fini, rc = 1);
865
866         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
867         keysize = le->le_name_numints * 8;
868         recsize = za->za_integer_length * za->za_num_integers;
869         if (likely(keysize && recsize))
870                 rc = osd_index_register(osd, fid, keysize, recsize);
871
872         GOTO(fini, rc);
873
874 fini:
875         zap_cursor_fini(zc);
876
877 log:
878         if (dn)
879                 osd_dnode_rele(dn);
880         if (rc < 0)
881                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
882                       osd_name(osd), PFID(fid), keysize, recsize, rc);
883         else if (!rc)
884                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
885                        osd_name(osd), PFID(fid), keysize, recsize);
886 }
887
888 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
889                               struct lustre_index_restore_unit *liru, void *buf,
890                               int bufsize)
891 {
892         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
893         struct lu_fid *tgt_fid = &liru->liru_cfid;
894         struct lu_fid bak_fid;
895         int rc;
896
897         ENTRY;
898         lustre_fid2lbx(buf, tgt_fid, bufsize);
899         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
900                          sizeof(*zde) / 8, (void *)zde);
901         if (rc)
902                 GOTO(log, rc);
903
904         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
905         if (rc)
906                 GOTO(log, rc);
907
908         /* The OI mapping for index may be invalid, since it will be
909          * re-created, not update the OI mapping, just cache it in RAM.
910          */
911         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
912                                             liru->liru_clid);
913         if (!rc)
914                 rc = lustre_index_restore(env, &dev->od_dt_dev,
915                                 &liru->liru_pfid, tgt_fid, &bak_fid,
916                                 liru->liru_name, &dev->od_index_backup_list,
917                                 &dev->od_lock, buf, bufsize);
918         GOTO(log, rc);
919
920 log:
921         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
922                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
923 }
924
925 /**
926  * verify FID-in-LMA and OI entry for one object
927  *
928  * ios: Initial OI Scrub.
929  */
930 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
931                             const struct lu_fid *fid, uint64_t parent,
932                             uint64_t oid, const char *name,
933                             enum osd_lf_flags flags)
934 {
935         struct lustre_scrub *scrub = &dev->od_scrub;
936         struct scrub_file *sf = &scrub->os_file;
937         struct lustre_mdt_attrs *lma = NULL;
938         nvlist_t *nvbuf = NULL;
939         struct lu_fid tfid;
940         uint64_t oid2 = 0;
941         __u64 flag = 0;
942         int size = 0;
943         int op = 0;
944         int rc;
945
946         ENTRY;
947         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
948         if (unlikely(rc == -ENOENT || rc == -EEXIST))
949                 RETURN(0);
950
951         if (rc && rc != -ENODATA) {
952                 CWARN("%s: initial OI scrub failed to get lma for %llu: rc = %d\n",
953                       osd_name(dev), oid, rc);
954
955                 RETURN(rc);
956         }
957
958         if (!rc) {
959                 LASSERT(nvbuf != NULL);
960                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
961                                                (uchar_t **)&lma, &size);
962                 if (rc || size == 0) {
963                         LASSERT(lma == NULL);
964                         rc = -ENODATA;
965                 } else {
966                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
967                         lustre_lma_swab(lma);
968                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
969                                 nvlist_free(nvbuf);
970                                 RETURN(0);
971                         }
972
973                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
974                             osd_index_need_recreate(env, dev, oid)) {
975                                 if (parent == dev->od_root) {
976                                         lu_local_obj_fid(&tfid,
977                                                          OSD_FS_ROOT_OID);
978                                 } else {
979                                         rc = osd_get_fid_by_oid(env, dev,
980                                                                 parent, &tfid);
981                                         if (rc) {
982                                                 nvlist_free(nvbuf);
983                                                 RETURN(rc);
984                                         }
985                                 }
986
987                                 rc = lustre_liru_new(
988                                                 &dev->od_index_restore_list,
989                                                 &tfid, &lma->lma_self_fid, oid,
990                                                 name, strlen(name));
991                                 nvlist_free(nvbuf);
992                                 RETURN(rc);
993                         }
994
995                         tfid = lma->lma_self_fid;
996                         if (!(flags & OLF_NOT_BACKUP))
997                                 osd_ios_index_register(env, dev, &tfid, oid);
998                 }
999                 nvlist_free(nvbuf);
1000         }
1001
1002         if (rc == -ENODATA) {
1003                 if (!fid) {
1004                         /* Skip the object without FID-in-LMA */
1005                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1006                                osd_name(dev), oid);
1007
1008                         RETURN(0);
1009                 }
1010
1011                 LASSERT(!fid_is_zero(fid));
1012
1013                 tfid = *fid;
1014                 if (flags & OLF_IDX_IN_FID) {
1015                         LASSERT(dev->od_index >= 0);
1016
1017                         tfid.f_oid = dev->od_index;
1018                 }
1019         }
1020
1021         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1022         if (rc) {
1023                 if (rc != -ENOENT) {
1024                         CWARN("%s: initial OI scrub failed to lookup fid for "DFID"=>%llu: rc = %d\n",
1025                               osd_name(dev), PFID(&tfid), oid, rc);
1026                         RETURN(rc);
1027                 }
1028
1029                 flag = SF_RECREATED;
1030                 op = DTO_INDEX_INSERT;
1031         } else {
1032                 if (oid == oid2)
1033                         RETURN(0);
1034
1035                 flag = SF_INCONSISTENT;
1036                 op = DTO_INDEX_UPDATE;
1037         }
1038
1039         if (!(sf->sf_flags & flag)) {
1040                 scrub_file_reset(scrub, dev->od_uuid, flag);
1041                 rc = scrub_file_store(env, scrub);
1042                 if (rc)
1043                         RETURN(rc);
1044         }
1045
1046         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1047
1048         RETURN(rc > 0 ? 0 : rc);
1049 }
1050
1051 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1052                              const char *name, uint64_t parent, uint64_t oid,
1053                              enum osd_lf_flags flags, bool is_dir)
1054 {
1055         int rc;
1056
1057         ENTRY;
1058         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1059         if (!rc && is_dir)
1060                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1061                                       osd_ios_varfid_hd);
1062
1063         RETURN(rc);
1064 }
1065
1066 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1067                           const char *name, uint64_t parent, uint64_t oid,
1068                           enum osd_lf_flags flags, bool is_dir)
1069 {
1070         struct lu_fid tfid;
1071         int rc;
1072
1073         ENTRY;
1074         /* skip any non-DFID format name */
1075         if (name[0] != '[')
1076                 RETURN(0);
1077
1078         /* skip the start '[' */
1079         sscanf(&name[1], SFID, RFID(&tfid));
1080         if (fid_is_sane(&tfid))
1081                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1082         else
1083                 rc = -EIO;
1084
1085         RETURN(rc);
1086 }
1087
1088 /*
1089  * General scanner for the directories execpt /ROOT during initial OI scrub.
1090  * It scans the name entries under the given directory one by one. For each
1091  * entry, verifies its OI mapping via the given @handle_dirent.
1092  */
1093 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1094                               uint64_t parent, handle_dirent_t handle_dirent,
1095                               enum osd_lf_flags flags)
1096 {
1097         struct osd_thread_info *info = osd_oti_get(env);
1098         struct luz_direntry *zde = &info->oti_zde;
1099         zap_attribute_t *za = &info->oti_za;
1100         zap_cursor_t *zc = &info->oti_zc;
1101         int rc;
1102
1103         ENTRY;
1104         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1105         rc = -zap_cursor_retrieve(zc, za);
1106         if (rc == -ENOENT)
1107                 zap_cursor_advance(zc);
1108         else if (rc)
1109                 GOTO(log, rc);
1110
1111         while (1) {
1112                 rc = -zap_cursor_retrieve(zc, za);
1113                 if (rc)
1114                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1115
1116                 /* skip the entry started with '.' */
1117                 if (likely(za->za_name[0] != '.')) {
1118                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1119                                         za->za_integer_length,
1120                                         sizeof(*zde) / za->za_integer_length,
1121                                         (void *)zde);
1122                         if (rc) {
1123                                 CWARN("%s: initial OI scrub failed to lookup %s under %llu: rc = %d\n",
1124                                       osd_name(dev), za->za_name, parent, rc);
1125                                 continue;
1126                         }
1127
1128                         rc = handle_dirent(env, dev, za->za_name, parent,
1129                                         zde->lzd_reg.zde_dnode, flags,
1130                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1131                                         true : false);
1132                         CDEBUG(D_LFSCK,
1133                                "%s: initial OI scrub handled %s under %llu: rc = %d\n",
1134                                osd_name(dev), za->za_name, parent, rc);
1135                 }
1136
1137                 zap_cursor_advance(zc);
1138         }
1139
1140 log:
1141         if (rc)
1142                 CWARN("%s: initial OI scrub failed to scan the directory %llu: rc = %d\n",
1143                       osd_name(dev), parent, rc);
1144         zap_cursor_fini(zc);
1145
1146         return rc;
1147 }
1148
1149 /*
1150  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1151  * be scanned during the initial OI scrub, instead, only the .lustre and the
1152  * sub-items under .lustre will be handled.
1153  */
1154 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1155                            uint64_t parent, handle_dirent_t handle_dirent,
1156                            enum osd_lf_flags flags)
1157 {
1158         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1159         const struct osd_lf_map *map;
1160         uint64_t oid;
1161         int rc;
1162         int rc1 = 0;
1163
1164         ENTRY;
1165         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1166                             sizeof(*zde) / 8, (void *)zde);
1167         if (rc == -ENOENT) {
1168                 /* The .lustre directory is lost. That is not fatal. It can
1169                  * be re-created in the subsequent MDT start processing.
1170                  */
1171                 RETURN(0);
1172         }
1173
1174         if (rc) {
1175                 CWARN("%s: initial OI scrub failed to find .lustre: rc = %d\n",
1176                       osd_name(dev), rc);
1177
1178                 RETURN(rc);
1179         }
1180
1181         oid = zde->lzd_reg.zde_dnode;
1182         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1183                               dot_lustre_name, 0);
1184         if (rc)
1185                 RETURN(rc);
1186
1187         for (map = osd_dl_maps; map->olm_name; map++) {
1188                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1189                                     sizeof(*zde) / 8, (void *)zde);
1190                 if (rc) {
1191                         if (rc != -ENOENT)
1192                                 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1193                                       osd_name(dev), map->olm_name, rc);
1194                         else if (!fid_is_zero(&map->olm_fid))
1195                                 /* Try to remove the stale OI mapping. */
1196                                 osd_scrub_refresh_mapping(env, dev,
1197                                                 &map->olm_fid, 0,
1198                                                 DTO_INDEX_DELETE, true,
1199                                                 map->olm_name);
1200                         continue;
1201                 }
1202
1203                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1204                                       zde->lzd_reg.zde_dnode, map->olm_name,
1205                                       map->olm_flags);
1206                 if (rc)
1207                         rc1 = rc;
1208         }
1209
1210         RETURN(rc1);
1211 }
1212
1213 static void osd_initial_OI_scrub(const struct lu_env *env,
1214                                  struct osd_device *dev)
1215 {
1216         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1217         const struct osd_lf_map *map;
1218         int rc;
1219
1220         ENTRY;
1221         for (map = osd_lf_maps; map->olm_name; map++) {
1222                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1223                                     sizeof(*zde) / 8, (void *)zde);
1224                 if (rc) {
1225                         if (rc != -ENOENT)
1226                                 CWARN("%s: initial OI scrub failed to find the entry %s: rc = %d\n",
1227                                       osd_name(dev), map->olm_name, rc);
1228                         else if (!fid_is_zero(&map->olm_fid))
1229                                 /* Try to remove the stale OI mapping. */
1230                                 osd_scrub_refresh_mapping(env, dev,
1231                                                 &map->olm_fid, 0,
1232                                                 DTO_INDEX_DELETE, true,
1233                                                 map->olm_name);
1234                         continue;
1235                 }
1236
1237                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1238                                       zde->lzd_reg.zde_dnode, map->olm_name,
1239                                       map->olm_flags);
1240                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1241                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1242                                          map->olm_flags, map->olm_scan_dir,
1243                                          map->olm_handle_dirent);
1244         }
1245
1246         while (!list_empty(&dev->od_ios_list)) {
1247                 struct osd_ios_item *item;
1248
1249                 item = list_first_entry(&dev->od_ios_list,
1250                                         struct osd_ios_item, oii_list);
1251                 list_del_init(&item->oii_list);
1252                 item->oii_scan_dir(env, dev, item->oii_parent,
1253                                    item->oii_handle_dirent, item->oii_flags);
1254                 OBD_FREE_PTR(item);
1255         }
1256
1257         if (!list_empty(&dev->od_index_restore_list)) {
1258                 char *buf;
1259
1260                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1261                 if (!buf)
1262                         CERROR("%s: not enough RAM for rebuild index: rc = %d\n",
1263                                osd_name(dev), -ENOMEM);
1264
1265                 while (!list_empty(&dev->od_index_restore_list)) {
1266                         struct lustre_index_restore_unit *liru;
1267
1268                         liru = list_first_entry(&dev->od_index_restore_list,
1269                                                 struct lustre_index_restore_unit,
1270                                                 liru_link);
1271                         list_del(&liru->liru_link);
1272                         if (buf)
1273                                 osd_index_restore(env, dev, liru, buf,
1274                                                   INDEX_BACKUP_BUFSIZE);
1275                         OBD_FREE(liru, liru->liru_len);
1276                 }
1277
1278                 if (buf)
1279                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1280         }
1281
1282         EXIT;
1283 }
1284
1285 /* OI scrub start/stop */
1286
1287 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1288                     __u32 flags)
1289 {
1290         int rc;
1291
1292         ENTRY;
1293         if (dev->od_dt_dev.dd_rdonly)
1294                 RETURN(-EROFS);
1295
1296         /* od_otable_sem: prevent concurrent start/stop */
1297         down(&dev->od_otable_sem);
1298         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1299         up(&dev->od_otable_sem);
1300
1301         RETURN(rc == -EALREADY ? 0 : rc);
1302 }
1303
1304 void osd_scrub_stop(struct osd_device *dev)
1305 {
1306         struct lustre_scrub *scrub = &dev->od_scrub;
1307
1308         ENTRY;
1309         /* od_otable_sem: prevent concurrent start/stop */
1310         down(&dev->od_otable_sem);
1311         spin_lock(&scrub->os_lock);
1312         scrub->os_paused = 1;
1313         spin_unlock(&scrub->os_lock);
1314         scrub_stop(scrub);
1315         up(&dev->od_otable_sem);
1316
1317         EXIT;
1318 }
1319
1320 /* OI scrub setup/cleanup */
1321
1322 static const char osd_scrub_name[] = "OI_scrub";
1323
1324 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev,
1325                     time64_t interval, bool resetoi)
1326 {
1327         struct osd_thread_info *info = osd_oti_get(env);
1328         struct lustre_scrub *scrub = &dev->od_scrub;
1329         struct scrub_file *sf = &scrub->os_file;
1330         struct lu_fid *fid = &info->oti_fid;
1331         struct dt_object *obj;
1332         uint64_t oid;
1333         int rc = 0;
1334         bool dirty = false;
1335
1336         ENTRY;
1337         memcpy(dev->od_uuid.b,
1338                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1339                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1340         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1341         init_rwsem(&scrub->os_rwsem);
1342         spin_lock_init(&scrub->os_lock);
1343         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1344         scrub->os_name = osd_name(dev);
1345         scrub->os_auto_scrub_interval = interval;
1346
1347         /* 'What the @fid is' is not imporatant, because the object
1348          * has no OI mapping, and only is visible inside the OSD.
1349          */
1350         fid->f_seq = FID_SEQ_IGIF_MAX;
1351         if (dev->od_is_ost)
1352                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1353         else
1354                 fid->f_oid = dev->od_index + 1;
1355         fid->f_ver = 0;
1356         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1357                                     osd_scrub_name, &oid, fid, false);
1358         if (rc)
1359                 RETURN(rc);
1360
1361         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1362         if (rc)
1363                 RETURN(rc);
1364
1365         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1366         if (IS_ERR_OR_NULL(obj))
1367                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1368
1369         obj->do_body_ops = &osd_body_scrub_ops;
1370         scrub->os_obj = obj;
1371         rc = scrub_file_load(env, scrub);
1372         if (rc == -ENOENT || rc == -EFAULT) {
1373                 scrub_file_init(scrub, dev->od_uuid);
1374                 dirty = true;
1375         } else if (rc < 0) {
1376                 GOTO(cleanup_obj, rc);
1377         } else {
1378                 if (!guid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1379                         CDEBUG(D_LFSCK,
1380                                "%s: UUID has been changed from %pU to %pU\n",
1381                                osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1382                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1383                         dirty = true;
1384                 } else if (sf->sf_status == SS_SCANNING) {
1385                         sf->sf_status = SS_CRASHED;
1386                         dirty = true;
1387                 }
1388
1389                 if (unlikely((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0 ||
1390                              sf->sf_oi_count > OSD_OI_FID_NR_MAX)) {
1391                         LCONSOLE_WARN("%s: invalid OI count %u, reset to %u\n",
1392                                       osd_name(dev), sf->sf_oi_count,
1393                                       osd_oi_count);
1394                         sf->sf_oi_count = osd_oi_count;
1395                         dirty = true;
1396                 }
1397         }
1398
1399         if (sf->sf_pos_last_checkpoint != 0)
1400                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1401         else
1402                 scrub->os_pos_current = 1;
1403
1404         if (dirty) {
1405                 rc = scrub_file_store(env, scrub);
1406                 if (rc)
1407                         GOTO(cleanup_obj, rc);
1408         }
1409
1410         /* Initialize OI files. */
1411         rc = osd_oi_init(env, dev, resetoi);
1412         if (rc < 0)
1413                 GOTO(cleanup_obj, rc);
1414
1415         if (!dev->od_dt_dev.dd_rdonly)
1416                 osd_initial_OI_scrub(env, dev);
1417
1418         if (!dev->od_dt_dev.dd_rdonly &&
1419             scrub->os_auto_scrub_interval != AS_NEVER &&
1420             ((sf->sf_status == SS_PAUSED) ||
1421              (sf->sf_status == SS_CRASHED &&
1422               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1423                               SF_UPGRADE | SF_AUTO)) ||
1424              (sf->sf_status == SS_INIT &&
1425               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1426                               SF_UPGRADE))))
1427                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1428
1429         if (rc)
1430                 GOTO(cleanup_oi, rc);
1431
1432         RETURN(0);
1433
1434 cleanup_oi:
1435         osd_oi_fini(env, dev);
1436 cleanup_obj:
1437         dt_object_put_nocache(env, scrub->os_obj);
1438         scrub->os_obj = NULL;
1439
1440         return rc;
1441 }
1442
1443 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1444 {
1445         struct lustre_scrub *scrub = &dev->od_scrub;
1446
1447         LASSERT(!dev->od_otable_it);
1448
1449         if (scrub->os_obj) {
1450                 osd_scrub_stop(dev);
1451                 dt_object_put_nocache(env, scrub->os_obj);
1452                 scrub->os_obj = NULL;
1453         }
1454
1455         if (dev->od_oi_table)
1456                 osd_oi_fini(env, dev);
1457 }
1458
1459 /* object table based iteration APIs */
1460
1461 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1462                                        struct dt_object *dt, __u32 attr)
1463 {
1464         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1465         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1466         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1467         struct lustre_scrub *scrub = &dev->od_scrub;
1468         struct osd_otable_it *it;
1469         __u32 start = 0;
1470         int rc;
1471
1472         ENTRY;
1473         if (dev->od_dt_dev.dd_rdonly)
1474                 RETURN(ERR_PTR(-EROFS));
1475
1476         /* od_otable_sem: prevent concurrent init/fini */
1477         down(&dev->od_otable_sem);
1478         if (dev->od_otable_it)
1479                 GOTO(out, it = ERR_PTR(-EALREADY));
1480
1481         OBD_ALLOC_PTR(it);
1482         if (!it)
1483                 GOTO(out, it = ERR_PTR(-ENOMEM));
1484
1485         if (flags & DOIF_OUTUSED)
1486                 it->ooi_used_outside = 1;
1487
1488         if (flags & DOIF_RESET)
1489                 start |= SS_RESET;
1490
1491         if (valid & DOIV_ERROR_HANDLE) {
1492                 if (flags & DOIF_FAILOUT)
1493                         start |= SS_SET_FAILOUT;
1494                 else
1495                         start |= SS_CLEAR_FAILOUT;
1496         }
1497
1498         if (valid & DOIV_DRYRUN) {
1499                 if (flags & DOIF_DRYRUN)
1500                         start |= SS_SET_DRYRUN;
1501                 else
1502                         start |= SS_CLEAR_DRYRUN;
1503         }
1504
1505         /* XXX: dmu_object_next() does NOT find dnodes allocated
1506          *      in the current non-committed txg, so we force txg
1507          *      commit to find all existing dnodes ...
1508          */
1509         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1510
1511         dev->od_otable_it = it;
1512         it->ooi_dev = dev;
1513         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1514         if (rc == -EALREADY) {
1515                 it->ooi_pos = 1;
1516         } else if (rc < 0) {
1517                 dev->od_otable_it = NULL;
1518                 OBD_FREE_PTR(it);
1519                 it = ERR_PTR(rc);
1520         } else {
1521                 it->ooi_pos = scrub->os_pos_current;
1522         }
1523
1524         GOTO(out, it);
1525
1526 out:
1527         up(&dev->od_otable_sem);
1528         return (struct dt_it *)it;
1529 }
1530
1531 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1532 {
1533         struct osd_otable_it *it = (struct osd_otable_it *)di;
1534         struct osd_device *dev = it->ooi_dev;
1535
1536         /* od_otable_sem: prevent concurrent init/fini */
1537         down(&dev->od_otable_sem);
1538         scrub_stop(&dev->od_scrub);
1539         LASSERT(dev->od_otable_it == it);
1540
1541         dev->od_otable_it = NULL;
1542         up(&dev->od_otable_sem);
1543         OBD_FREE_PTR(it);
1544 }
1545
1546 static int osd_otable_it_get(const struct lu_env *env,
1547                              struct dt_it *di, const struct dt_key *key)
1548 {
1549         return 0;
1550 }
1551
1552 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1553 {
1554 }
1555
1556 static void osd_otable_it_preload(const struct lu_env *env,
1557                                   struct osd_otable_it *it)
1558 {
1559         struct osd_device *dev = it->ooi_dev;
1560         int rc;
1561
1562         /* can go negative on the very first access to the iterator
1563          * or if some non-Lustre objects were found
1564          */
1565         if (unlikely(it->ooi_prefetched < 0))
1566                 it->ooi_prefetched = 0;
1567
1568         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1569                 return;
1570
1571         if (it->ooi_prefetched_dnode == 0)
1572                 it->ooi_prefetched_dnode = it->ooi_pos;
1573
1574         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1575                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1576                                       B_FALSE, 0);
1577                 if (rc)
1578                         break;
1579
1580                 dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1581                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1582                 it->ooi_prefetched++;
1583         }
1584 }
1585
1586 static inline int
1587 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1588 {
1589         spin_lock(&scrub->os_lock);
1590         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1591             !scrub->os_running)
1592                 it->ooi_waiting = 0;
1593         else
1594                 it->ooi_waiting = 1;
1595         spin_unlock(&scrub->os_lock);
1596
1597         return !it->ooi_waiting;
1598 }
1599
1600 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1601 {
1602         struct osd_otable_it *it = (struct osd_otable_it *)di;
1603         struct osd_device *dev = it->ooi_dev;
1604         struct lustre_scrub *scrub = &dev->od_scrub;
1605         struct lustre_mdt_attrs *lma = NULL;
1606         nvlist_t *nvbuf = NULL;
1607         int rc, size = 0;
1608         bool locked;
1609
1610         ENTRY;
1611         LASSERT(it->ooi_user_ready);
1612         fid_zero(&it->ooi_fid);
1613
1614         if (unlikely(it->ooi_all_cached))
1615                 RETURN(1);
1616
1617 again:
1618         if (nvbuf) {
1619                 nvlist_free(nvbuf);
1620                 nvbuf = NULL;
1621                 lma = NULL;
1622                 size = 0;
1623         }
1624
1625         if (it->ooi_pos >= scrub->os_pos_current)
1626                 wait_var_event(scrub,
1627                                osd_otable_it_wakeup(scrub, it));
1628
1629         if (!scrub->os_running && !it->ooi_used_outside)
1630                 GOTO(out, rc = 1);
1631
1632         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1633         if (rc) {
1634                 if (unlikely(rc == -ESRCH)) {
1635                         it->ooi_all_cached = 1;
1636                         rc = 1;
1637                 }
1638
1639                 GOTO(out, rc);
1640         }
1641
1642         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1643
1644         locked = false;
1645         if (!scrub->os_full_speed) {
1646                 spin_lock(&scrub->os_lock);
1647                 locked = true;
1648         }
1649         it->ooi_prefetched--;
1650         if (!scrub->os_full_speed) {
1651                 if (scrub->os_waiting) {
1652                         scrub->os_waiting = 0;
1653                         wake_up_var(scrub);
1654                 }
1655         }
1656         if (locked)
1657                 spin_unlock(&scrub->os_lock);
1658
1659         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1660                 goto again;
1661
1662         if (rc)
1663                 GOTO(out, rc);
1664
1665         LASSERT(nvbuf != NULL);
1666         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1667                                        (uchar_t **)&lma, &size);
1668         if (rc || size == 0)
1669                 /* It is either non-Lustre object or OSD internal object,
1670                  * ignore it, go ahead
1671                  */
1672                 goto again;
1673
1674         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1675         lustre_lma_swab(lma);
1676         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1677                      lma->lma_incompat & LMAI_AGENT))
1678                 goto again;
1679
1680         it->ooi_fid = lma->lma_self_fid;
1681
1682         GOTO(out, rc = 0);
1683
1684 out:
1685         if (nvbuf)
1686                 nvlist_free(nvbuf);
1687
1688         if (!rc && scrub->os_full_speed)
1689                 osd_otable_it_preload(env, it);
1690
1691         return rc;
1692 }
1693
1694 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1695                                         const struct dt_it *di)
1696 {
1697         return NULL;
1698 }
1699
1700 static int osd_otable_it_key_size(const struct lu_env *env,
1701                                   const struct dt_it *di)
1702 {
1703         return sizeof(__u64);
1704 }
1705
1706 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1707                              struct dt_rec *rec, __u32 attr)
1708 {
1709         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1710         struct lu_fid *fid = (struct lu_fid *)rec;
1711
1712         *fid = it->ooi_fid;
1713         return 0;
1714 }
1715
1716 static __u64 osd_otable_it_store(const struct lu_env *env,
1717                                  const struct dt_it *di)
1718 {
1719         struct osd_otable_it *it = (struct osd_otable_it *)di;
1720
1721         return it->ooi_pos;
1722 }
1723
1724 /**
1725  * Set the OSD layer iteration start position as the specified hash.
1726  */
1727 static int osd_otable_it_load(const struct lu_env *env,
1728                               const struct dt_it *di, __u64 hash)
1729 {
1730         struct osd_otable_it *it = (struct osd_otable_it *)di;
1731         struct osd_device *dev = it->ooi_dev;
1732         struct lustre_scrub *scrub = &dev->od_scrub;
1733         int rc;
1734
1735         ENTRY;
1736         /* Forbid to set iteration position after iteration started. */
1737         if (it->ooi_user_ready)
1738                 RETURN(-EPERM);
1739
1740         if (hash > OSD_OTABLE_MAX_HASH)
1741                 hash = OSD_OTABLE_MAX_HASH;
1742
1743         /* The hash is the last checkpoint position, start from the next one. */
1744         it->ooi_pos = hash + 1;
1745         it->ooi_prefetched = 0;
1746         it->ooi_prefetched_dnode = 0;
1747         it->ooi_user_ready = 1;
1748         if (!scrub->os_full_speed)
1749                 wake_up_var(scrub);
1750
1751         /* Unplug OSD layer iteration by the first next() call. */
1752         rc = osd_otable_it_next(env, (struct dt_it *)it);
1753
1754         RETURN(rc);
1755 }
1756
1757 static int osd_otable_it_key_rec(const struct lu_env *env,
1758                                  const struct dt_it *di, void *key_rec)
1759 {
1760         return 0;
1761 }
1762
1763 const struct dt_index_operations osd_otable_ops = {
1764         .dio_it = {
1765                 .init     = osd_otable_it_init,
1766                 .fini     = osd_otable_it_fini,
1767                 .get      = osd_otable_it_get,
1768                 .put      = osd_otable_it_put,
1769                 .next     = osd_otable_it_next,
1770                 .key      = osd_otable_it_key,
1771                 .key_size = osd_otable_it_key_size,
1772                 .rec      = osd_otable_it_rec,
1773                 .store    = osd_otable_it_store,
1774                 .load     = osd_otable_it_load,
1775                 .key_rec  = osd_otable_it_key_rec,
1776         }
1777 };
1778
1779 /* high priority inconsistent items list APIs */
1780
1781 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1782                    const struct lu_fid *fid, uint64_t oid, bool insert)
1783 {
1784         struct lustre_scrub *scrub = &dev->od_scrub;
1785         struct osd_inconsistent_item *oii;
1786         bool wakeup = false;
1787
1788         ENTRY;
1789         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1790         OBD_ALLOC_PTR(oii);
1791         if (unlikely(!oii))
1792                 RETURN(-ENOMEM);
1793
1794         INIT_LIST_HEAD(&oii->oii_list);
1795         oii->oii_cache.oic_dev = dev;
1796         oii->oii_cache.oic_fid = *fid;
1797         oii->oii_cache.oic_dnode = oid;
1798         oii->oii_insert = insert;
1799
1800         spin_lock(&scrub->os_lock);
1801         if (!scrub->os_running) {
1802                 spin_unlock(&scrub->os_lock);
1803                 OBD_FREE_PTR(oii);
1804                 RETURN(-EAGAIN);
1805         }
1806
1807         if (list_empty(&scrub->os_inconsistent_items))
1808                 wakeup = true;
1809         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1810         spin_unlock(&scrub->os_lock);
1811
1812         if (wakeup)
1813                 wake_up_var(scrub);
1814
1815         RETURN(0);
1816 }
1817
1818 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1819                    uint64_t *oid)
1820 {
1821         struct lustre_scrub *scrub = &dev->od_scrub;
1822         struct osd_inconsistent_item *oii;
1823         int ret = -ENOENT;
1824
1825         ENTRY;
1826         spin_lock(&scrub->os_lock);
1827         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1828                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1829                         *oid = oii->oii_cache.oic_dnode;
1830                         ret = 0;
1831                         break;
1832                 }
1833         }
1834         spin_unlock(&scrub->os_lock);
1835
1836         RETURN(ret);
1837 }
1838
1839 typedef int (*scan_dir_helper_t)(const struct lu_env *env,
1840                                  struct osd_device *dev, uint64_t dir_oid,
1841                                  struct osd_zap_it *ozi);
1842
1843 static int osd_scan_dir(const struct lu_env *env, struct osd_device *dev,
1844                         uint64_t id, scan_dir_helper_t cb)
1845 {
1846         struct osd_zap_it *it;
1847         struct luz_direntry *zde;
1848         zap_attribute_t *za;
1849         int rc;
1850
1851         ENTRY;
1852
1853         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
1854         if (it == NULL)
1855                 RETURN(-ENOMEM);
1856
1857         rc = osd_zap_cursor_init(&it->ozi_zc, dev->od_os, id, 0);
1858         if (rc != 0)
1859                 GOTO(out, rc);
1860
1861         za = &it->ozi_za;
1862         zde = &it->ozi_zde;
1863         while (1) {
1864                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1865                 if (unlikely(rc)) {
1866                         if (rc == -ENOENT)
1867                                 rc = 0;
1868
1869                         break;
1870                 }
1871
1872                 if (name_is_dot_or_dotdot(za->za_name, strlen(za->za_name))) {
1873                         zap_cursor_advance(it->ozi_zc);
1874                         continue;
1875                 }
1876
1877                 strncpy(it->ozi_name, za->za_name, sizeof(it->ozi_name));
1878                 if (za->za_integer_length != 8) {
1879                         rc = -EIO;
1880                         break;
1881                 }
1882
1883                 rc = osd_zap_lookup(dev, it->ozi_zc->zc_zapobj, NULL,
1884                                     za->za_name, za->za_integer_length,
1885                                     sizeof(*zde) / za->za_integer_length, zde);
1886                 if (rc)
1887                         break;
1888
1889                 rc = cb(env, dev, id, it);
1890                 if (rc)
1891                         break;
1892
1893                 zap_cursor_advance(it->ozi_zc);
1894         }
1895         osd_zap_cursor_fini(it->ozi_zc);
1896
1897 out:
1898         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
1899         RETURN(rc);
1900 }
1901
1902 static int osd_remove_ml_file(const struct lu_env *env, struct osd_device *dev,
1903                               uint64_t dir, uint64_t id, struct lu_fid *fid,
1904                               char *name)
1905 {
1906         struct osd_thread_info *info = osd_oti_get(env);
1907         struct dt_object *dt;
1908         struct osd_object *obj = NULL;
1909         dmu_tx_t *tx;
1910         sa_handle_t *hdl;
1911         uint64_t nlink;
1912         int rc;
1913
1914         rc = -sa_handle_get(dev->od_os, id, NULL, SA_HDL_PRIVATE, &hdl);
1915         if (rc)
1916                 RETURN(rc);
1917
1918         dt = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1919         if (IS_ERR(dt))
1920                 RETURN(PTR_ERR(dt));
1921
1922         if (dt) {
1923                 obj = osd_dt_obj(dt);
1924                 down_read(&obj->oo_guard);
1925         }
1926
1927         rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
1928         if (rc)
1929                 GOTO(out, rc);
1930
1931         if (nlink <= 1) {
1932                 rc = 0;
1933                 CERROR("%s: multi-link file O/%s/%s/%s has nlink %llu: rc = %d\n",
1934                        osd_name(dev), info->oti_seq_name, info->oti_dir_name,
1935                        name, nlink, rc);
1936                 GOTO(out, rc);
1937         }
1938
1939         tx = dmu_tx_create(dev->od_os);
1940         if (!tx) {
1941                 rc = -ENOMEM;
1942                 CERROR("%s: fail to create tx to remove multi-link file!: rc = %d\n",
1943                        osd_name(dev), rc);
1944                 GOTO(out, rc);
1945         }
1946
1947         dmu_tx_hold_zap(tx, dir, FALSE, NULL);
1948         rc = -dmu_tx_assign(tx, TXG_WAIT);
1949         if (rc)
1950                 GOTO(abort, rc);
1951
1952         nlink--;
1953         rc = -sa_update(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink), tx);
1954         if (rc)
1955                 GOTO(abort, rc);
1956
1957         rc = -zap_remove(dev->od_os, dir, name, tx);
1958         if (rc)
1959                 GOTO(abort, rc);
1960
1961         dmu_tx_commit(tx);
1962         GOTO(out, rc);
1963
1964 abort:
1965         dmu_tx_abort(tx);
1966
1967 out:
1968         if (dt) {
1969                 up_read(&obj->oo_guard);
1970                 dt_object_put_nocache(env, dt);
1971         }
1972
1973         sa_handle_destroy(hdl);
1974         RETURN(rc);
1975 }
1976
1977 static int osd_scan_ml_file(const struct lu_env *env, struct osd_device *dev,
1978                             uint64_t dir_oid, struct osd_zap_it *ozi)
1979 {
1980         struct osd_thread_info *info = osd_oti_get(env);
1981         struct lu_fid *fid = &info->oti_fid;
1982         struct ost_id *ostid = &info->oti_ostid;
1983         char name[32];
1984         u64 seq;
1985         int rc = 0;
1986
1987         ENTRY;
1988
1989         rc = osd_get_fid_by_oid(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode, fid);
1990         if (rc)
1991                 RETURN(rc);
1992
1993         seq = fid_seq(fid);
1994         fid_to_ostid(fid, ostid);
1995
1996         snprintf(name, sizeof(name), (fid_seq_is_rsvd(seq) ||
1997                                       fid_seq_is_mdt0(seq)) ? "%llu" : "%llx",
1998                                       fid_seq_is_idif(seq) ? 0 : seq);
1999         if (strcmp(info->oti_seq_name, name) != 0)
2000                 GOTO(fix, rc);
2001
2002         snprintf(name, sizeof(name), "d%d",
2003                 (int)ostid_id(ostid) % OSD_OST_MAP_SIZE);
2004         if (strcmp(info->oti_dir_name, name) != 0)
2005                 GOTO(fix, rc);
2006
2007         snprintf(name, sizeof(name), "%llu", ostid_id(ostid));
2008         if (strcmp(ozi->ozi_name, name) == 0)
2009                 RETURN(0);
2010
2011 fix:
2012         CDEBUG(D_LFSCK, "%s: the file O/%s/%s/%s is corrupted\n",
2013                osd_name(dev), info->oti_seq_name, info->oti_dir_name,
2014                ozi->ozi_name);
2015
2016         rc = osd_remove_ml_file(env, dev, dir_oid,
2017                                 ozi->ozi_zde.lzd_reg.zde_dnode, fid,
2018                                 ozi->ozi_name);
2019         RETURN(rc);
2020 }
2021
2022 static int osd_scan_ml_file_dir(const struct lu_env *env,
2023                                 struct osd_device *dev, uint64_t dir_oid,
2024                                 struct osd_zap_it *ozi)
2025 {
2026         struct osd_thread_info *info = osd_oti_get(env);
2027
2028         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2029                 return 0;
2030
2031         info->oti_dir_name = ozi->ozi_name;
2032         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2033                             osd_scan_ml_file);
2034 }
2035
2036 static int osd_scan_ml_file_seq(const struct lu_env *env,
2037                                 struct osd_device *dev, uint64_t dir_oid,
2038                                 struct osd_zap_it *ozi)
2039 {
2040         struct osd_thread_info *info = osd_oti_get(env);
2041
2042         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2043                 return 0;
2044
2045         info->oti_seq_name = ozi->ozi_name;
2046         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2047                             osd_scan_ml_file_dir);
2048 }
2049
2050 static int osd_scan_ml_file_main(const struct lu_env *env,
2051                                  struct osd_device *dev)
2052 {
2053         return osd_scan_dir(env, dev, dev->od_O_id, osd_scan_ml_file_seq);
2054 }