Whamcloud - gitweb
LU-12780 scrub: all update to bitfields must be protected.
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 #define DTO_INDEX_INSERT                1
59 #define DTO_INDEX_DELETE                2
60 #define DTO_INDEX_UPDATE                3
61
62 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
63 {
64         return it->ooi_prefetched < OTABLE_PREFETCH;
65 }
66
67 /**
68  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
69  *
70  * \retval   1, changed nothing
71  * \retval   0, changed successfully
72  * \retval -ve, on error
73  */
74 static int osd_scrub_refresh_mapping(const struct lu_env *env,
75                                      struct osd_device *dev,
76                                      const struct lu_fid *fid,
77                                      uint64_t oid, int ops,
78                                      bool force, const char *name)
79 {
80         struct osd_thread_info *info = osd_oti_get(env);
81         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
82         char *buf = info->oti_str;
83         dmu_tx_t *tx = NULL;
84         dnode_t *dn = NULL;
85         uint64_t zapid;
86         int rc;
87         ENTRY;
88
89         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
90                 GOTO(log, rc = 0);
91
92         tx = dmu_tx_create(dev->od_os);
93         if (!tx)
94                 GOTO(log, rc = -ENOMEM);
95
96         zapid = osd_get_name_n_idx(env, dev, fid, buf,
97                                    sizeof(info->oti_str), &dn);
98         osd_tx_hold_zap(tx, zapid, dn,
99                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
100         rc = -dmu_tx_assign(tx, TXG_WAIT);
101         if (rc) {
102                 dmu_tx_abort(tx);
103                 GOTO(log, rc);
104         }
105
106         switch (ops) {
107         case DTO_INDEX_UPDATE:
108                 zde->zde_pad = 0;
109                 zde->zde_dnode = oid;
110                 zde->zde_type = 0; /* The type in OI mapping is useless. */
111                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
112                                  zde, tx);
113                 if (unlikely(rc == -ENOENT)) {
114                         /* Some unlink thread may removed the OI mapping. */
115                         rc = 1;
116                 }
117                 break;
118         case DTO_INDEX_INSERT:
119                 zde->zde_pad = 0;
120                 zde->zde_dnode = oid;
121                 zde->zde_type = 0; /* The type in OI mapping is useless. */
122                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
123                                  zde, tx);
124                 if (unlikely(rc == -EEXIST))
125                         rc = 1;
126                 break;
127         case DTO_INDEX_DELETE:
128                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
129                 if (rc == -ENOENT) {
130                         /* It is normal that the unlink thread has removed the
131                          * OI mapping already. */
132                         rc = 1;
133                 }
134                 break;
135         default:
136                 LASSERTF(0, "Unexpected ops %d\n", ops);
137                 rc = -EINVAL;
138                 break;
139         }
140
141         dmu_tx_commit(tx);
142         GOTO(log, rc);
143
144 log:
145         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
146                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
147                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
148
149         return rc;
150 }
151
152 static int
153 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
154                        const struct lu_fid *fid, uint64_t oid, int val)
155 {
156         struct lustre_scrub *scrub = &dev->od_scrub;
157         struct scrub_file *sf = &scrub->os_file;
158         struct osd_inconsistent_item *oii = NULL;
159         nvlist_t *nvbuf = NULL;
160         dnode_t *dn = NULL;
161         uint64_t oid2;
162         int ops = DTO_INDEX_UPDATE;
163         int rc;
164         ENTRY;
165
166         down_write(&scrub->os_rwsem);
167         scrub->os_new_checked++;
168         if (val < 0)
169                 GOTO(out, rc = val);
170
171         if (scrub->os_in_prior)
172                 oii = list_entry(scrub->os_inconsistent_items.next,
173                                  struct osd_inconsistent_item, oii_list);
174
175         if (oid < sf->sf_pos_latest_start && !oii)
176                 GOTO(out, rc = 0);
177
178         if (oii && oii->oii_insert) {
179                 ops = DTO_INDEX_INSERT;
180                 goto zget;
181         }
182
183         rc = osd_fid_lookup(env, dev, fid, &oid2);
184         if (rc) {
185                 if (rc != -ENOENT)
186                         GOTO(out, rc);
187
188                 ops = DTO_INDEX_INSERT;
189
190 zget:
191                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
192                 if (rc) {
193                         /* Someone removed the object by race. */
194                         if (rc == -ENOENT || rc == -EEXIST)
195                                 rc = 0;
196                         GOTO(out, rc);
197                 }
198
199                 spin_lock(&scrub->os_lock);
200                 scrub->os_full_speed = 1;
201                 spin_unlock(&scrub->os_lock);
202
203                 sf->sf_flags |= SF_INCONSISTENT;
204         } else if (oid == oid2) {
205                 GOTO(out, rc = 0);
206         } else {
207                 struct lustre_mdt_attrs *lma = NULL;
208                 int size;
209
210                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
211                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
212                         goto update;
213                 if (rc)
214                         GOTO(out, rc);
215
216                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
217                                                (uchar_t **)&lma, &size);
218                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
219                         goto update;
220                 if (rc)
221                         GOTO(out, rc);
222
223                 lustre_lma_swab(lma);
224                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
225                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
226                                "two objects: %llu and %llu (in OI)\n",
227                                osd_name(dev), PFID(fid), oid, oid2);
228
229                         GOTO(out, rc = -EEXIST);
230                 }
231
232 update:
233                 spin_lock(&scrub->os_lock);
234                 scrub->os_full_speed = 1;
235                 spin_unlock(&scrub->os_lock);
236                 sf->sf_flags |= SF_INCONSISTENT;
237         }
238
239         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
240         if (!rc) {
241                 if (scrub->os_in_prior)
242                         sf->sf_items_updated_prior++;
243                 else
244                         sf->sf_items_updated++;
245         }
246
247         GOTO(out, rc);
248
249 out:
250         if (nvbuf)
251                 nvlist_free(nvbuf);
252
253         if (rc < 0) {
254                 sf->sf_items_failed++;
255                 if (sf->sf_pos_first_inconsistent == 0 ||
256                     sf->sf_pos_first_inconsistent > oid)
257                         sf->sf_pos_first_inconsistent = oid;
258         } else {
259                 rc = 0;
260         }
261
262         /* There may be conflict unlink during the OI scrub,
263          * if happend, then remove the new added OI mapping. */
264         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
265                 osd_scrub_refresh_mapping(env, dev, fid, oid,
266                                           DTO_INDEX_DELETE, false, NULL);
267         up_write(&scrub->os_rwsem);
268
269         if (dn)
270                 osd_dnode_rele(dn);
271
272         if (oii) {
273                 spin_lock(&scrub->os_lock);
274                 if (likely(!list_empty(&oii->oii_list)))
275                         list_del(&oii->oii_list);
276                 spin_unlock(&scrub->os_lock);
277                 OBD_FREE_PTR(oii);
278         }
279
280         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
281 }
282
283 static int osd_scrub_prep(const struct lu_env *env, struct osd_device *dev)
284 {
285         struct lustre_scrub *scrub = &dev->od_scrub;
286         struct ptlrpc_thread *thread = &scrub->os_thread;
287         struct scrub_file *sf = &scrub->os_file;
288         __u32 flags = scrub->os_start_flags;
289         int rc;
290         bool drop_dryrun = false;
291         ENTRY;
292
293         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
294                scrub->os_name, flags);
295
296         down_write(&scrub->os_rwsem);
297         if (flags & SS_SET_FAILOUT)
298                 sf->sf_param |= SP_FAILOUT;
299         else if (flags & SS_CLEAR_FAILOUT)
300                 sf->sf_param &= ~SP_FAILOUT;
301
302         if (flags & SS_SET_DRYRUN) {
303                 sf->sf_param |= SP_DRYRUN;
304         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
305                 sf->sf_param &= ~SP_DRYRUN;
306                 drop_dryrun = true;
307         }
308
309         if (flags & SS_RESET)
310                 scrub_file_reset(scrub, dev->od_uuid, 0);
311
312         spin_lock(&scrub->os_lock);
313         scrub->os_partial_scan = 0;
314         if (flags & SS_AUTO_FULL) {
315                 scrub->os_full_speed = 1;
316                 sf->sf_flags |= SF_AUTO;
317         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
318                                    SF_UPGRADE)) {
319                 scrub->os_full_speed = 1;
320         } else {
321                 scrub->os_full_speed = 0;
322         }
323
324         scrub->os_in_prior = 0;
325         scrub->os_waiting = 0;
326         scrub->os_paused = 0;
327         scrub->os_in_join = 0;
328         scrub->os_full_scrub = 0;
329         spin_unlock(&scrub->os_lock);
330         scrub->os_new_checked = 0;
331         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
332                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
333         else if (sf->sf_pos_last_checkpoint != 0)
334                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
335         else
336                 sf->sf_pos_latest_start = 1;
337
338         scrub->os_pos_current = sf->sf_pos_latest_start;
339         sf->sf_status = SS_SCANNING;
340         sf->sf_time_latest_start = ktime_get_real_seconds();
341         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
342         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
343         rc = scrub_file_store(env, scrub);
344         if (!rc) {
345                 spin_lock(&scrub->os_lock);
346                 thread_set_flags(thread, SVC_RUNNING);
347                 spin_unlock(&scrub->os_lock);
348                 wake_up_all(&thread->t_ctl_waitq);
349         }
350         up_write(&scrub->os_rwsem);
351
352         RETURN(rc);
353 }
354
355 static int osd_scrub_post(const struct lu_env *env, struct osd_device *dev,
356                           int result)
357 {
358         struct lustre_scrub *scrub = &dev->od_scrub;
359         struct scrub_file *sf = &scrub->os_file;
360         int rc;
361         ENTRY;
362
363         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
364                scrub->os_name, result);
365
366         down_write(&scrub->os_rwsem);
367         spin_lock(&scrub->os_lock);
368         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
369         spin_unlock(&scrub->os_lock);
370         if (scrub->os_new_checked > 0) {
371                 sf->sf_items_checked += scrub->os_new_checked;
372                 scrub->os_new_checked = 0;
373                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
374         }
375         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
376         if (result > 0) {
377                 sf->sf_status = SS_COMPLETED;
378                 if (!(sf->sf_param & SP_DRYRUN)) {
379                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
380                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
381                                           SF_UPGRADE | SF_AUTO);
382                 }
383                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
384                 sf->sf_success_count++;
385         } else if (result == 0) {
386                 if (scrub->os_paused)
387                         sf->sf_status = SS_PAUSED;
388                 else
389                         sf->sf_status = SS_STOPPED;
390         } else {
391                 sf->sf_status = SS_FAILED;
392         }
393         sf->sf_run_time += ktime_get_seconds() -
394                            scrub->os_time_last_checkpoint;
395
396         rc = scrub_file_store(env, scrub);
397         up_write(&scrub->os_rwsem);
398
399         RETURN(rc < 0 ? rc : result);
400 }
401
402 /* iteration engine */
403
404 static inline int
405 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
406 {
407         spin_lock(&scrub->os_lock);
408         if (osd_scrub_has_window(it) ||
409             !list_empty(&scrub->os_inconsistent_items) ||
410             it->ooi_waiting || !thread_is_running(&scrub->os_thread))
411                 scrub->os_waiting = 0;
412         else
413                 scrub->os_waiting = 1;
414         spin_unlock(&scrub->os_lock);
415
416         return !scrub->os_waiting;
417 }
418
419 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
420                           struct lu_fid *fid, uint64_t *oid)
421 {
422         struct lustre_scrub *scrub = &dev->od_scrub;
423         struct ptlrpc_thread *thread = &scrub->os_thread;
424         struct osd_otable_it *it = dev->od_otable_it;
425         struct lustre_mdt_attrs *lma = NULL;
426         nvlist_t *nvbuf = NULL;
427         int size = 0;
428         int rc = 0;
429         ENTRY;
430
431         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
432                 wait_event_idle_timeout(
433                         thread->t_ctl_waitq,
434                         !list_empty(&scrub->os_inconsistent_items) ||
435                         !thread_is_running(thread),
436                         cfs_time_seconds(cfs_fail_val));
437
438                 if (unlikely(!thread_is_running(thread)))
439                         RETURN(SCRUB_NEXT_EXIT);
440         }
441
442         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
443                 spin_lock(&scrub->os_lock);
444                 thread_set_flags(thread, SVC_STOPPING);
445                 spin_unlock(&scrub->os_lock);
446                 RETURN(SCRUB_NEXT_CRASH);
447         }
448
449         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
450                 RETURN(SCRUB_NEXT_FATAL);
451
452 again:
453         if (nvbuf) {
454                 nvlist_free(nvbuf);
455                 nvbuf = NULL;
456                 lma = NULL;
457         }
458
459         if (!list_empty(&scrub->os_inconsistent_items)) {
460                 spin_lock(&scrub->os_lock);
461                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
462                         struct osd_inconsistent_item *oii;
463
464                         oii = list_entry(scrub->os_inconsistent_items.next,
465                                 struct osd_inconsistent_item, oii_list);
466                         *fid = oii->oii_cache.oic_fid;
467                         *oid = oii->oii_cache.oic_dnode;
468                         scrub->os_in_prior = 1;
469                         spin_unlock(&scrub->os_lock);
470
471                         GOTO(out, rc = 0);
472                 }
473                 spin_unlock(&scrub->os_lock);
474         }
475
476         if (!scrub->os_full_speed && !osd_scrub_has_window(it))
477                 wait_event_idle(thread->t_ctl_waitq,
478                                 osd_scrub_wakeup(scrub, it));
479
480         if (unlikely(!thread_is_running(thread)))
481                 GOTO(out, rc = SCRUB_NEXT_EXIT);
482
483         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
484         if (rc)
485                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
486
487         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
488         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
489                 goto again;
490
491         if (rc)
492                 GOTO(out, rc);
493
494         LASSERT(nvbuf != NULL);
495         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
496                                        (uchar_t **)&lma, &size);
497         if (!rc) {
498                 lustre_lma_swab(lma);
499                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
500                            !(lma->lma_incompat & LMAI_AGENT))) {
501                         *fid = lma->lma_self_fid;
502                         *oid = scrub->os_pos_current;
503
504                         GOTO(out, rc = 0);
505                 }
506         }
507
508         if (!scrub->os_full_speed) {
509                 spin_lock(&scrub->os_lock);
510                 it->ooi_prefetched++;
511                 if (it->ooi_waiting) {
512                         it->ooi_waiting = 0;
513                         wake_up_all(&thread->t_ctl_waitq);
514                 }
515                 spin_unlock(&scrub->os_lock);
516         }
517
518         goto again;
519
520 out:
521         if (nvbuf)
522                 nvlist_free(nvbuf);
523
524         return rc;
525 }
526
527 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
528                           const struct lu_fid *fid, uint64_t oid, int rc)
529 {
530         struct lustre_scrub *scrub = &dev->od_scrub;
531         struct ptlrpc_thread *thread = &scrub->os_thread;
532         struct osd_otable_it *it = dev->od_otable_it;
533
534         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
535         if (!scrub->os_in_prior) {
536                 if (!scrub->os_full_speed) {
537                         spin_lock(&scrub->os_lock);
538                         it->ooi_prefetched++;
539                         if (it->ooi_waiting) {
540                                 it->ooi_waiting = 0;
541                                 wake_up_all(&thread->t_ctl_waitq);
542                         }
543                         spin_unlock(&scrub->os_lock);
544                 }
545         } else {
546                 spin_lock(&scrub->os_lock);
547                 scrub->os_in_prior = 0;
548                 spin_unlock(&scrub->os_lock);
549         }
550
551         if (rc)
552                 return rc;
553
554         rc = scrub_checkpoint(env, scrub);
555         if (rc) {
556                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
557                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
558                 /* Continue, as long as the scrub itself can go ahead. */
559         }
560
561         return 0;
562 }
563
564 static int osd_scrub_main(void *args)
565 {
566         struct lu_env env;
567         struct osd_device *dev = (struct osd_device *)args;
568         struct lustre_scrub *scrub = &dev->od_scrub;
569         struct ptlrpc_thread *thread = &scrub->os_thread;
570         struct lu_fid *fid;
571         uint64_t oid;
572         int rc = 0;
573         ENTRY;
574
575         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
576         if (rc) {
577                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
578                        scrub->os_name, rc);
579                 GOTO(noenv, rc);
580         }
581
582         rc = osd_scrub_prep(&env, dev);
583         if (rc) {
584                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
585                        scrub->os_name, rc);
586                 GOTO(out, rc);
587         }
588
589         if (!scrub->os_full_speed) {
590                 struct osd_otable_it *it = dev->od_otable_it;
591
592                 wait_event_idle(thread->t_ctl_waitq,
593                                 it->ooi_user_ready ||
594                                 !thread_is_running(thread));
595
596                 if (unlikely(!thread_is_running(thread)))
597                         GOTO(post, rc = 0);
598
599                 scrub->os_pos_current = it->ooi_pos;
600         }
601
602         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
603                scrub->os_name, scrub->os_start_flags,
604                scrub->os_pos_current);
605
606         fid = &osd_oti_get(&env)->oti_fid;
607         while (!rc && thread_is_running(thread)) {
608                 rc = osd_scrub_next(&env, dev, fid, &oid);
609                 switch (rc) {
610                 case SCRUB_NEXT_EXIT:
611                         GOTO(post, rc = 0);
612                 case SCRUB_NEXT_CRASH:
613                         spin_lock(&scrub->os_lock);
614                         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
615                         spin_unlock(&scrub->os_lock);
616                         GOTO(out, rc = -EINVAL);
617                 case SCRUB_NEXT_FATAL:
618                         GOTO(post, rc = -EINVAL);
619                 case SCRUB_NEXT_BREAK:
620                         GOTO(post, rc = 1);
621                 }
622
623                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
624         }
625
626         GOTO(post, rc);
627
628 post:
629         rc = osd_scrub_post(&env, dev, rc);
630         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
631                scrub->os_name, scrub->os_pos_current, rc);
632
633 out:
634         while (!list_empty(&scrub->os_inconsistent_items)) {
635                 struct osd_inconsistent_item *oii;
636
637                 oii = list_entry(scrub->os_inconsistent_items.next,
638                                  struct osd_inconsistent_item, oii_list);
639                 list_del_init(&oii->oii_list);
640                 OBD_FREE_PTR(oii);
641         }
642
643         lu_env_fini(&env);
644
645 noenv:
646         spin_lock(&scrub->os_lock);
647         thread_set_flags(thread, SVC_STOPPED);
648         wake_up_all(&thread->t_ctl_waitq);
649         spin_unlock(&scrub->os_lock);
650         return rc;
651 }
652
653 /* initial OI scrub */
654
655 struct osd_lf_map;
656
657 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
658                                const char *, uint64_t, uint64_t,
659                                enum osd_lf_flags, bool);
660 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
661                              const char *, uint64_t, uint64_t,
662                              enum osd_lf_flags, bool);
663 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
664                           const char *, uint64_t, uint64_t,
665                           enum osd_lf_flags, bool);
666
667 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
668                           uint64_t, handle_dirent_t, enum osd_lf_flags);
669 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
670                               uint64_t, handle_dirent_t, enum osd_lf_flags);
671 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
672                            uint64_t, handle_dirent_t, enum osd_lf_flags);
673
674 struct osd_lf_map {
675         char                    *olm_name;
676         struct lu_fid            olm_fid;
677         enum osd_lf_flags        olm_flags;
678         scan_dir_t               olm_scan_dir;
679         handle_dirent_t          olm_handle_dirent;
680 };
681
682 /* Add the new introduced local files in the list in the future. */
683 static const struct osd_lf_map osd_lf_maps[] = {
684         /* CONFIGS */
685         {
686                 .olm_name               = MOUNT_CONFIGS_DIR,
687                 .olm_fid                = {
688                         .f_seq  = FID_SEQ_LOCAL_FILE,
689                         .f_oid  = MGS_CONFIGS_OID,
690                 },
691                 .olm_flags              = OLF_SCAN_SUBITEMS,
692                 .olm_scan_dir           = osd_ios_general_sd,
693                 .olm_handle_dirent      = osd_ios_varfid_hd,
694         },
695
696         /* NIDTBL_VERSIONS */
697         {
698                 .olm_name               = MGS_NIDTBL_DIR,
699                 .olm_flags              = OLF_SCAN_SUBITEMS,
700                 .olm_scan_dir           = osd_ios_general_sd,
701                 .olm_handle_dirent      = osd_ios_varfid_hd,
702         },
703
704         /* PENDING */
705         {
706                 .olm_name               = MDT_ORPHAN_DIR,
707         },
708
709         /* ROOT */
710         {
711                 .olm_name               = "ROOT",
712                 .olm_fid                = {
713                         .f_seq  = FID_SEQ_ROOT,
714                         .f_oid  = FID_OID_ROOT,
715                 },
716                 .olm_flags              = OLF_SCAN_SUBITEMS,
717                 .olm_scan_dir           = osd_ios_ROOT_sd,
718         },
719
720         /* fld */
721         {
722                 .olm_name               = "fld",
723                 .olm_fid                = {
724                         .f_seq  = FID_SEQ_LOCAL_FILE,
725                         .f_oid  = FLD_INDEX_OID,
726                 },
727         },
728
729         /* changelog_catalog */
730         {
731                 .olm_name               = CHANGELOG_CATALOG,
732         },
733
734         /* changelog_users */
735         {
736                 .olm_name               = CHANGELOG_USERS,
737         },
738
739         /* quota_master */
740         {
741                 .olm_name               = QMT_DIR,
742                 .olm_flags              = OLF_SCAN_SUBITEMS,
743                 .olm_scan_dir           = osd_ios_general_sd,
744                 .olm_handle_dirent      = osd_ios_varfid_hd,
745         },
746
747         /* quota_slave */
748         {
749                 .olm_name               = QSD_DIR,
750                 .olm_flags              = OLF_SCAN_SUBITEMS,
751                 .olm_scan_dir           = osd_ios_general_sd,
752                 .olm_handle_dirent      = osd_ios_varfid_hd,
753         },
754
755         /* LFSCK */
756         {
757                 .olm_name               = LFSCK_DIR,
758                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
759                 .olm_scan_dir           = osd_ios_general_sd,
760                 .olm_handle_dirent      = osd_ios_varfid_hd,
761         },
762
763         /* lfsck_bookmark */
764         {
765                 .olm_name               = LFSCK_BOOKMARK,
766         },
767
768         /* lfsck_layout */
769         {
770                 .olm_name               = LFSCK_LAYOUT,
771         },
772
773         /* lfsck_namespace */
774         {
775                 .olm_name               = LFSCK_NAMESPACE,
776         },
777
778         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
779          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
780          * for more details. */
781
782         /* update_log */
783         {
784                 .olm_name               = "update_log",
785                 .olm_fid                = {
786                         .f_seq  = FID_SEQ_UPDATE_LOG,
787                 },
788                 .olm_flags              = OLF_IDX_IN_FID,
789         },
790
791         /* update_log_dir */
792         {
793                 .olm_name               = "update_log_dir",
794                 .olm_fid        = {
795                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
796                 },
797                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
798                 .olm_scan_dir           = osd_ios_general_sd,
799                 .olm_handle_dirent      = osd_ios_uld_hd,
800         },
801
802         /* hsm_actions */
803         {
804                 .olm_name               = HSM_ACTIONS,
805         },
806
807         /* nodemap */
808         {
809                 .olm_name               = LUSTRE_NODEMAP_NAME,
810         },
811
812         /* index_backup */
813         {
814                 .olm_name               = INDEX_BACKUP_DIR,
815                 .olm_fid                = {
816                         .f_seq  = FID_SEQ_LOCAL_FILE,
817                         .f_oid  = INDEX_BACKUP_OID,
818                 },
819                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
820                 .olm_scan_dir           = osd_ios_general_sd,
821                 .olm_handle_dirent      = osd_ios_varfid_hd,
822         },
823
824         {
825                 .olm_name               = NULL
826         }
827 };
828
829 /* Add the new introduced files under .lustre/ in the list in the future. */
830 static const struct osd_lf_map osd_dl_maps[] = {
831         /* .lustre/fid */
832         {
833                 .olm_name               = "fid",
834                 .olm_fid                = {
835                         .f_seq  = FID_SEQ_DOT_LUSTRE,
836                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
837                 },
838         },
839
840         /* .lustre/lost+found */
841         {
842                 .olm_name               = "lost+found",
843                 .olm_fid                = {
844                         .f_seq  = FID_SEQ_DOT_LUSTRE,
845                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
846                 },
847         },
848
849         {
850                 .olm_name               = NULL
851         }
852 };
853
854 struct osd_ios_item {
855         struct list_head        oii_list;
856         uint64_t                oii_parent;
857         enum osd_lf_flags       oii_flags;
858         scan_dir_t              oii_scan_dir;
859         handle_dirent_t         oii_handle_dirent;
860 };
861
862 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
863                             enum osd_lf_flags flags, scan_dir_t scan_dir,
864                             handle_dirent_t handle_dirent)
865 {
866         struct osd_ios_item *item;
867
868         OBD_ALLOC_PTR(item);
869         if (!item) {
870                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
871                       osd_name(dev), parent);
872                 return -ENOMEM;
873         }
874
875         INIT_LIST_HEAD(&item->oii_list);
876         item->oii_parent = parent;
877         item->oii_flags = flags;
878         item->oii_scan_dir = scan_dir;
879         item->oii_handle_dirent = handle_dirent;
880         list_add_tail(&item->oii_list, &dev->od_ios_list);
881
882         return 0;
883 }
884
885 static bool osd_index_need_recreate(const struct lu_env *env,
886                                     struct osd_device *dev, uint64_t oid)
887 {
888         struct osd_thread_info *info = osd_oti_get(env);
889         zap_attribute_t *za = &info->oti_za2;
890         zap_cursor_t *zc = &info->oti_zc2;
891         int rc;
892         ENTRY;
893
894         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
895         rc = -zap_cursor_retrieve(zc, za);
896         zap_cursor_fini(zc);
897         if (rc && rc != -ENOENT)
898                 RETURN(true);
899
900         RETURN(false);
901 }
902
903 static void osd_ios_index_register(const struct lu_env *env,
904                                    struct osd_device *osd,
905                                    const struct lu_fid *fid, uint64_t oid)
906 {
907         struct osd_thread_info *info = osd_oti_get(env);
908         zap_attribute_t *za = &info->oti_za2;
909         zap_cursor_t *zc = &info->oti_zc2;
910         struct zap_leaf_entry *le;
911         dnode_t *dn = NULL;
912         sa_handle_t *hdl;
913         __u64 mode = 0;
914         __u32 keysize = 0;
915         __u32 recsize = 0;
916         int rc;
917         ENTRY;
918
919         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
920         if (rc == -EEXIST || rc == -ENOENT)
921                 RETURN_EXIT;
922
923         if (rc < 0)
924                 GOTO(log, rc);
925
926         if (!osd_object_is_zap(dn))
927                 GOTO(log, rc = 1);
928
929         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
930         if (rc)
931                 GOTO(log, rc);
932
933         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
934         sa_handle_destroy(hdl);
935         if (rc)
936                 GOTO(log, rc);
937
938         if (!S_ISREG(mode))
939                 GOTO(log, rc = 1);
940
941         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
942         rc = -zap_cursor_retrieve(zc, za);
943         if (rc)
944                 /* Skip empty index object */
945                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
946
947         if (zc->zc_zap->zap_ismicro ||
948             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
949                 GOTO(fini, rc = 1);
950
951         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
952         keysize = le->le_name_numints * 8;
953         recsize = za->za_integer_length * za->za_num_integers;
954         if (likely(keysize && recsize))
955                 rc = osd_index_register(osd, fid, keysize, recsize);
956
957         GOTO(fini, rc);
958
959 fini:
960         zap_cursor_fini(zc);
961
962 log:
963         if (dn)
964                 osd_dnode_rele(dn);
965         if (rc < 0)
966                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
967                       osd_name(osd), PFID(fid), keysize, recsize, rc);
968         else if (!rc)
969                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
970                        osd_name(osd), PFID(fid), keysize, recsize);
971 }
972
973 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
974                               struct lustre_index_restore_unit *liru, void *buf,
975                               int bufsize)
976 {
977         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
978         struct lu_fid *tgt_fid = &liru->liru_cfid;
979         struct lu_fid bak_fid;
980         int rc;
981         ENTRY;
982
983         lustre_fid2lbx(buf, tgt_fid, bufsize);
984         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
985                          sizeof(*zde) / 8, (void *)zde);
986         if (rc)
987                 GOTO(log, rc);
988
989         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
990         if (rc)
991                 GOTO(log, rc);
992
993         /* The OI mapping for index may be invalid, since it will be
994          * re-created, not update the OI mapping, just cache it in RAM. */
995         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
996                                             liru->liru_clid);
997         if (!rc)
998                 rc = lustre_index_restore(env, &dev->od_dt_dev,
999                                 &liru->liru_pfid, tgt_fid, &bak_fid,
1000                                 liru->liru_name, &dev->od_index_backup_list,
1001                                 &dev->od_lock, buf, bufsize);
1002         GOTO(log, rc);
1003
1004 log:
1005         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
1006                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
1007 }
1008
1009 /**
1010  * verify FID-in-LMA and OI entry for one object
1011  *
1012  * ios: Initial OI Scrub.
1013  */
1014 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
1015                             const struct lu_fid *fid, uint64_t parent,
1016                             uint64_t oid, const char *name,
1017                             enum osd_lf_flags flags)
1018 {
1019         struct lustre_scrub *scrub = &dev->od_scrub;
1020         struct scrub_file *sf = &scrub->os_file;
1021         struct lustre_mdt_attrs *lma = NULL;
1022         nvlist_t *nvbuf = NULL;
1023         struct lu_fid tfid;
1024         uint64_t oid2 = 0;
1025         __u64 flag = 0;
1026         int size = 0;
1027         int op = 0;
1028         int rc;
1029         ENTRY;
1030
1031         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
1032         if (unlikely(rc == -ENOENT || rc == -EEXIST))
1033                 RETURN(0);
1034
1035         if (rc && rc != -ENODATA) {
1036                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
1037                       "rc = %d\n", osd_name(dev), oid, rc);
1038
1039                 RETURN(rc);
1040         }
1041
1042         if (!rc) {
1043                 LASSERT(nvbuf != NULL);
1044                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1045                                                (uchar_t **)&lma, &size);
1046                 if (rc || size == 0) {
1047                         LASSERT(lma == NULL);
1048                         rc = -ENODATA;
1049                 } else {
1050                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1051                         lustre_lma_swab(lma);
1052                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
1053                                 nvlist_free(nvbuf);
1054                                 RETURN(0);
1055                         }
1056
1057                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
1058                             osd_index_need_recreate(env, dev, oid)) {
1059                                 if (parent == dev->od_root) {
1060                                         lu_local_obj_fid(&tfid,
1061                                                          OSD_FS_ROOT_OID);
1062                                 } else {
1063                                         rc = osd_get_fid_by_oid(env, dev,
1064                                                                 parent, &tfid);
1065                                         if (rc) {
1066                                                 nvlist_free(nvbuf);
1067                                                 RETURN(rc);
1068                                         }
1069                                 }
1070
1071                                 rc = lustre_liru_new(
1072                                                 &dev->od_index_restore_list,
1073                                                 &tfid, &lma->lma_self_fid, oid,
1074                                                 name, strlen(name));
1075                                 nvlist_free(nvbuf);
1076                                 RETURN(rc);
1077                         }
1078
1079                         tfid = lma->lma_self_fid;
1080                         if (!(flags & OLF_NOT_BACKUP))
1081                                 osd_ios_index_register(env, dev, &tfid, oid);
1082                 }
1083                 nvlist_free(nvbuf);
1084         }
1085
1086         if (rc == -ENODATA) {
1087                 if (!fid) {
1088                         /* Skip the object without FID-in-LMA */
1089                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1090                                osd_name(dev), oid);
1091
1092                         RETURN(0);
1093                 }
1094
1095                 LASSERT(!fid_is_zero(fid));
1096
1097                 tfid = *fid;
1098                 if (flags & OLF_IDX_IN_FID) {
1099                         LASSERT(dev->od_index >= 0);
1100
1101                         tfid.f_oid = dev->od_index;
1102                 }
1103         }
1104
1105         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1106         if (rc) {
1107                 if (rc != -ENOENT) {
1108                         CWARN("%s: initial OI scrub failed to lookup fid for "
1109                               DFID"=>%llu: rc = %d\n",
1110                               osd_name(dev), PFID(&tfid), oid, rc);
1111
1112                         RETURN(rc);
1113                 }
1114
1115                 flag = SF_RECREATED;
1116                 op = DTO_INDEX_INSERT;
1117         } else {
1118                 if (oid == oid2)
1119                         RETURN(0);
1120
1121                 flag = SF_INCONSISTENT;
1122                 op = DTO_INDEX_UPDATE;
1123         }
1124
1125         if (!(sf->sf_flags & flag)) {
1126                 scrub_file_reset(scrub, dev->od_uuid, flag);
1127                 rc = scrub_file_store(env, scrub);
1128                 if (rc)
1129                         RETURN(rc);
1130         }
1131
1132         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1133
1134         RETURN(rc > 0 ? 0 : rc);
1135 }
1136
1137 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1138                              const char *name, uint64_t parent, uint64_t oid,
1139                              enum osd_lf_flags flags, bool is_dir)
1140 {
1141         int rc;
1142         ENTRY;
1143
1144         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1145         if (!rc && is_dir)
1146                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1147                                       osd_ios_varfid_hd);
1148
1149         RETURN(rc);
1150 }
1151
1152 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1153                           const char *name, uint64_t parent, uint64_t oid,
1154                           enum osd_lf_flags flags, bool is_dir)
1155 {
1156         struct lu_fid tfid;
1157         int rc;
1158         ENTRY;
1159
1160         /* skip any non-DFID format name */
1161         if (name[0] != '[')
1162                 RETURN(0);
1163
1164         /* skip the start '[' */
1165         sscanf(&name[1], SFID, RFID(&tfid));
1166         if (fid_is_sane(&tfid))
1167                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1168         else
1169                 rc = -EIO;
1170
1171         RETURN(rc);
1172 }
1173
1174 /*
1175  * General scanner for the directories execpt /ROOT during initial OI scrub.
1176  * It scans the name entries under the given directory one by one. For each
1177  * entry, verifies its OI mapping via the given @handle_dirent.
1178  */
1179 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1180                               uint64_t parent, handle_dirent_t handle_dirent,
1181                               enum osd_lf_flags flags)
1182 {
1183         struct osd_thread_info *info = osd_oti_get(env);
1184         struct luz_direntry *zde = &info->oti_zde;
1185         zap_attribute_t *za = &info->oti_za;
1186         zap_cursor_t *zc = &info->oti_zc;
1187         int rc;
1188         ENTRY;
1189
1190         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1191         rc = -zap_cursor_retrieve(zc, za);
1192         if (rc == -ENOENT)
1193                 zap_cursor_advance(zc);
1194         else if (rc)
1195                 GOTO(log, rc);
1196
1197         while (1) {
1198                 rc = -zap_cursor_retrieve(zc, za);
1199                 if (rc)
1200                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1201
1202                 /* skip the entry started with '.' */
1203                 if (likely(za->za_name[0] != '.')) {
1204                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1205                                         za->za_integer_length,
1206                                         sizeof(*zde) / za->za_integer_length,
1207                                         (void *)zde);
1208                         if (rc) {
1209                                 CWARN("%s: initial OI scrub failed to lookup "
1210                                       "%s under %llu: rc = %d\n",
1211                                       osd_name(dev), za->za_name, parent, rc);
1212                                 continue;
1213                         }
1214
1215                         rc = handle_dirent(env, dev, za->za_name, parent,
1216                                         zde->lzd_reg.zde_dnode, flags,
1217                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1218                                         true : false);
1219                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1220                                "%llu: rc = %d\n",
1221                                osd_name(dev), za->za_name, parent, rc);
1222                 }
1223
1224                 zap_cursor_advance(zc);
1225         }
1226
1227 log:
1228         if (rc)
1229                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1230                       "rc = %d\n", osd_name(dev), parent, rc);
1231         zap_cursor_fini(zc);
1232
1233         return rc;
1234 }
1235
1236 /*
1237  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1238  * be scanned during the initial OI scrub, instead, only the .lustre and the
1239  * sub-items under .lustre will be handled.
1240  */
1241 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1242                            uint64_t parent, handle_dirent_t handle_dirent,
1243                            enum osd_lf_flags flags)
1244 {
1245         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1246         const struct osd_lf_map *map;
1247         uint64_t oid;
1248         int rc;
1249         int rc1 = 0;
1250         ENTRY;
1251
1252         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1253                             sizeof(*zde) / 8, (void *)zde);
1254         if (rc == -ENOENT) {
1255                 /* The .lustre directory is lost. That is not fatal. It can
1256                  * be re-created in the subsequent MDT start processing. */
1257                 RETURN(0);
1258         }
1259
1260         if (rc) {
1261                 CWARN("%s: initial OI scrub failed to find .lustre: "
1262                       "rc = %d\n", osd_name(dev), rc);
1263
1264                 RETURN(rc);
1265         }
1266
1267         oid = zde->lzd_reg.zde_dnode;
1268         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1269                               dot_lustre_name, 0);
1270         if (rc)
1271                 RETURN(rc);
1272
1273         for (map = osd_dl_maps; map->olm_name; map++) {
1274                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1275                                     sizeof(*zde) / 8, (void *)zde);
1276                 if (rc) {
1277                         if (rc != -ENOENT)
1278                                 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1279                                       osd_name(dev), map->olm_name, rc);
1280                         else if (!fid_is_zero(&map->olm_fid))
1281                                 /* Try to remove the stale OI mapping. */
1282                                 osd_scrub_refresh_mapping(env, dev,
1283                                                 &map->olm_fid, 0,
1284                                                 DTO_INDEX_DELETE, true,
1285                                                 map->olm_name);
1286                         continue;
1287                 }
1288
1289                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1290                                       zde->lzd_reg.zde_dnode, map->olm_name,
1291                                       map->olm_flags);
1292                 if (rc)
1293                         rc1 = rc;
1294         }
1295
1296         RETURN(rc1);
1297 }
1298
1299 static void osd_initial_OI_scrub(const struct lu_env *env,
1300                                  struct osd_device *dev)
1301 {
1302         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1303         const struct osd_lf_map *map;
1304         int rc;
1305         ENTRY;
1306
1307         for (map = osd_lf_maps; map->olm_name; map++) {
1308                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1309                                     sizeof(*zde) / 8, (void *)zde);
1310                 if (rc) {
1311                         if (rc != -ENOENT)
1312                                 CWARN("%s: initial OI scrub failed "
1313                                       "to find the entry %s: rc = %d\n",
1314                                       osd_name(dev), map->olm_name, rc);
1315                         else if (!fid_is_zero(&map->olm_fid))
1316                                 /* Try to remove the stale OI mapping. */
1317                                 osd_scrub_refresh_mapping(env, dev,
1318                                                 &map->olm_fid, 0,
1319                                                 DTO_INDEX_DELETE, true,
1320                                                 map->olm_name);
1321                         continue;
1322                 }
1323
1324                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1325                                       zde->lzd_reg.zde_dnode, map->olm_name,
1326                                       map->olm_flags);
1327                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1328                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1329                                          map->olm_flags, map->olm_scan_dir,
1330                                          map->olm_handle_dirent);
1331         }
1332
1333         while (!list_empty(&dev->od_ios_list)) {
1334                 struct osd_ios_item *item;
1335
1336                 item = list_entry(dev->od_ios_list.next,
1337                                   struct osd_ios_item, oii_list);
1338                 list_del_init(&item->oii_list);
1339                 item->oii_scan_dir(env, dev, item->oii_parent,
1340                                    item->oii_handle_dirent, item->oii_flags);
1341                 OBD_FREE_PTR(item);
1342         }
1343
1344         if (!list_empty(&dev->od_index_restore_list)) {
1345                 char *buf;
1346
1347                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1348                 if (!buf)
1349                         CERROR("%s: not enough RAM for rebuild index\n",
1350                                osd_name(dev));
1351
1352                 while (!list_empty(&dev->od_index_restore_list)) {
1353                         struct lustre_index_restore_unit *liru;
1354
1355                         liru = list_entry(dev->od_index_restore_list.next,
1356                                           struct lustre_index_restore_unit,
1357                                           liru_link);
1358                         list_del(&liru->liru_link);
1359                         if (buf)
1360                                 osd_index_restore(env, dev, liru, buf,
1361                                                   INDEX_BACKUP_BUFSIZE);
1362                         OBD_FREE(liru, liru->liru_len);
1363                 }
1364
1365                 if (buf)
1366                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1367         }
1368
1369         EXIT;
1370 }
1371
1372 /* OI scrub start/stop */
1373
1374 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1375                     __u32 flags)
1376 {
1377         int rc;
1378         ENTRY;
1379
1380         if (dev->od_dt_dev.dd_rdonly)
1381                 RETURN(-EROFS);
1382
1383         /* od_otable_sem: prevent concurrent start/stop */
1384         down(&dev->od_otable_sem);
1385         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1386         up(&dev->od_otable_sem);
1387
1388         RETURN(rc == -EALREADY ? 0 : rc);
1389 }
1390
1391 void osd_scrub_stop(struct osd_device *dev)
1392 {
1393         struct lustre_scrub *scrub = &dev->od_scrub;
1394         ENTRY;
1395
1396         /* od_otable_sem: prevent concurrent start/stop */
1397         down(&dev->od_otable_sem);
1398         spin_lock(&scrub->os_lock);
1399         scrub->os_paused = 1;
1400         spin_unlock(&scrub->os_lock);
1401         scrub_stop(scrub);
1402         up(&dev->od_otable_sem);
1403
1404         EXIT;
1405 }
1406
1407 /* OI scrub setup/cleanup */
1408
1409 static const char osd_scrub_name[] = "OI_scrub";
1410
1411 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev)
1412 {
1413         struct osd_thread_info *info = osd_oti_get(env);
1414         struct lustre_scrub *scrub = &dev->od_scrub;
1415         struct scrub_file *sf = &scrub->os_file;
1416         struct lu_fid *fid = &info->oti_fid;
1417         struct dt_object *obj;
1418         uint64_t oid;
1419         int rc = 0;
1420         bool dirty = false;
1421         ENTRY;
1422
1423         memcpy(dev->od_uuid.b,
1424                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1425                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1426         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1427         init_waitqueue_head(&scrub->os_thread.t_ctl_waitq);
1428         init_rwsem(&scrub->os_rwsem);
1429         spin_lock_init(&scrub->os_lock);
1430         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1431         scrub->os_name = osd_name(dev);
1432
1433         /* 'What the @fid is' is not imporatant, because the object
1434          * has no OI mapping, and only is visible inside the OSD.*/
1435         fid->f_seq = FID_SEQ_IGIF_MAX;
1436         if (dev->od_is_ost)
1437                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1438         else
1439                 fid->f_oid = dev->od_index + 1;
1440         fid->f_ver = 0;
1441         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1442                                     osd_scrub_name, &oid, fid, false);
1443         if (rc)
1444                 RETURN(rc);
1445
1446         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1447         if (rc)
1448                 RETURN(rc);
1449
1450         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1451         if (IS_ERR_OR_NULL(obj))
1452                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1453
1454         obj->do_body_ops = &osd_body_scrub_ops;
1455         scrub->os_obj = obj;
1456         rc = scrub_file_load(env, scrub);
1457         if (rc == -ENOENT || rc == -EFAULT) {
1458                 scrub_file_init(scrub, dev->od_uuid);
1459                 dirty = true;
1460         } else if (rc < 0) {
1461                 GOTO(cleanup_obj, rc);
1462         } else {
1463                 if (!uuid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1464                         CDEBUG(D_LFSCK,
1465                                "%s: UUID has been changed from %pU to %pU\n",
1466                                osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1467                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1468                         dirty = true;
1469                 } else if (sf->sf_status == SS_SCANNING) {
1470                         sf->sf_status = SS_CRASHED;
1471                         dirty = true;
1472                 }
1473
1474                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1475                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1476                                       osd_name(dev), sf->sf_oi_count,
1477                                       osd_oi_count);
1478                         sf->sf_oi_count = osd_oi_count;
1479                         dirty = true;
1480                 }
1481         }
1482
1483         if (sf->sf_pos_last_checkpoint != 0)
1484                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1485         else
1486                 scrub->os_pos_current = 1;
1487
1488         if (dirty) {
1489                 rc = scrub_file_store(env, scrub);
1490                 if (rc)
1491                         GOTO(cleanup_obj, rc);
1492         }
1493
1494         /* Initialize OI files. */
1495         rc = osd_oi_init(env, dev);
1496         if (rc < 0)
1497                 GOTO(cleanup_obj, rc);
1498
1499         if (!dev->od_dt_dev.dd_rdonly)
1500                 osd_initial_OI_scrub(env, dev);
1501
1502         if (!dev->od_dt_dev.dd_rdonly &&
1503             dev->od_auto_scrub_interval != AS_NEVER &&
1504             ((sf->sf_status == SS_PAUSED) ||
1505              (sf->sf_status == SS_CRASHED &&
1506               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1507                               SF_UPGRADE | SF_AUTO)) ||
1508              (sf->sf_status == SS_INIT &&
1509               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1510                               SF_UPGRADE))))
1511                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1512
1513         if (rc)
1514                 GOTO(cleanup_oi, rc);
1515
1516         RETURN(0);
1517
1518 cleanup_oi:
1519         osd_oi_fini(env, dev);
1520 cleanup_obj:
1521         dt_object_put_nocache(env, scrub->os_obj);
1522         scrub->os_obj = NULL;
1523
1524         return rc;
1525 }
1526
1527 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1528 {
1529         struct lustre_scrub *scrub = &dev->od_scrub;
1530
1531         LASSERT(!dev->od_otable_it);
1532
1533         if (scrub->os_obj) {
1534                 osd_scrub_stop(dev);
1535                 dt_object_put_nocache(env, scrub->os_obj);
1536                 scrub->os_obj = NULL;
1537         }
1538
1539         if (dev->od_oi_table)
1540                 osd_oi_fini(env, dev);
1541 }
1542
1543 /* object table based iteration APIs */
1544
1545 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1546                                        struct dt_object *dt, __u32 attr)
1547 {
1548         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1549         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1550         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1551         struct lustre_scrub *scrub = &dev->od_scrub;
1552         struct osd_otable_it *it;
1553         __u32 start = 0;
1554         int rc;
1555         ENTRY;
1556
1557         if (dev->od_dt_dev.dd_rdonly)
1558                 RETURN(ERR_PTR(-EROFS));
1559
1560         /* od_otable_sem: prevent concurrent init/fini */
1561         down(&dev->od_otable_sem);
1562         if (dev->od_otable_it)
1563                 GOTO(out, it = ERR_PTR(-EALREADY));
1564
1565         OBD_ALLOC_PTR(it);
1566         if (!it)
1567                 GOTO(out, it = ERR_PTR(-ENOMEM));
1568
1569         if (flags & DOIF_OUTUSED)
1570                 it->ooi_used_outside = 1;
1571
1572         if (flags & DOIF_RESET)
1573                 start |= SS_RESET;
1574
1575         if (valid & DOIV_ERROR_HANDLE) {
1576                 if (flags & DOIF_FAILOUT)
1577                         start |= SS_SET_FAILOUT;
1578                 else
1579                         start |= SS_CLEAR_FAILOUT;
1580         }
1581
1582         if (valid & DOIV_DRYRUN) {
1583                 if (flags & DOIF_DRYRUN)
1584                         start |= SS_SET_DRYRUN;
1585                 else
1586                         start |= SS_CLEAR_DRYRUN;
1587         }
1588
1589         /* XXX: dmu_object_next() does NOT find dnodes allocated
1590          *      in the current non-committed txg, so we force txg
1591          *      commit to find all existing dnodes ... */
1592         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1593
1594         dev->od_otable_it = it;
1595         it->ooi_dev = dev;
1596         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1597         if (rc == -EALREADY) {
1598                 it->ooi_pos = 1;
1599         } else if (rc < 0) {
1600                 dev->od_otable_it = NULL;
1601                 OBD_FREE_PTR(it);
1602                 it = ERR_PTR(rc);
1603         } else {
1604                 it->ooi_pos = scrub->os_pos_current;
1605         }
1606
1607         GOTO(out, it);
1608
1609 out:
1610         up(&dev->od_otable_sem);
1611         return (struct dt_it *)it;
1612 }
1613
1614 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1615 {
1616         struct osd_otable_it *it = (struct osd_otable_it *)di;
1617         struct osd_device *dev = it->ooi_dev;
1618
1619         /* od_otable_sem: prevent concurrent init/fini */
1620         down(&dev->od_otable_sem);
1621         scrub_stop(&dev->od_scrub);
1622         LASSERT(dev->od_otable_it == it);
1623
1624         dev->od_otable_it = NULL;
1625         up(&dev->od_otable_sem);
1626         OBD_FREE_PTR(it);
1627 }
1628
1629 static int osd_otable_it_get(const struct lu_env *env,
1630                              struct dt_it *di, const struct dt_key *key)
1631 {
1632         return 0;
1633 }
1634
1635 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1636 {
1637 }
1638
1639 static void osd_otable_it_preload(const struct lu_env *env,
1640                                   struct osd_otable_it *it)
1641 {
1642         struct osd_device *dev = it->ooi_dev;
1643         int rc;
1644
1645         /* can go negative on the very first access to the iterator
1646          * or if some non-Lustre objects were found */
1647         if (unlikely(it->ooi_prefetched < 0))
1648                 it->ooi_prefetched = 0;
1649
1650         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1651                 return;
1652
1653         if (it->ooi_prefetched_dnode == 0)
1654                 it->ooi_prefetched_dnode = it->ooi_pos;
1655
1656         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1657                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1658                                       B_FALSE, 0);
1659                 if (rc)
1660                         break;
1661
1662                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1663                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1664                 it->ooi_prefetched++;
1665         }
1666 }
1667
1668 static inline int
1669 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1670 {
1671         spin_lock(&scrub->os_lock);
1672         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1673             !thread_is_running(&scrub->os_thread))
1674                 it->ooi_waiting = 0;
1675         else
1676                 it->ooi_waiting = 1;
1677         spin_unlock(&scrub->os_lock);
1678
1679         return !it->ooi_waiting;
1680 }
1681
1682 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1683 {
1684         struct osd_otable_it *it = (struct osd_otable_it *)di;
1685         struct osd_device *dev = it->ooi_dev;
1686         struct lustre_scrub *scrub = &dev->od_scrub;
1687         struct ptlrpc_thread *thread = &scrub->os_thread;
1688         struct lustre_mdt_attrs *lma = NULL;
1689         nvlist_t *nvbuf = NULL;
1690         int rc, size = 0;
1691         bool locked;
1692         ENTRY;
1693
1694         LASSERT(it->ooi_user_ready);
1695         fid_zero(&it->ooi_fid);
1696
1697         if (unlikely(it->ooi_all_cached))
1698                 RETURN(1);
1699
1700 again:
1701         if (nvbuf) {
1702                 nvlist_free(nvbuf);
1703                 nvbuf = NULL;
1704                 lma = NULL;
1705                 size = 0;
1706         }
1707
1708         if (it->ooi_pos >= scrub->os_pos_current)
1709                 wait_event_idle(thread->t_ctl_waitq,
1710                                 osd_otable_it_wakeup(scrub, it));
1711
1712         if (!thread_is_running(thread) && !it->ooi_used_outside)
1713                 GOTO(out, rc = 1);
1714
1715         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1716         if (rc) {
1717                 if (unlikely(rc == -ESRCH)) {
1718                         it->ooi_all_cached = 1;
1719                         rc = 1;
1720                 }
1721
1722                 GOTO(out, rc);
1723         }
1724
1725         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1726
1727         locked = false;
1728         if (!scrub->os_full_speed) {
1729                 spin_lock(&scrub->os_lock);
1730                 locked = true;
1731         }
1732         it->ooi_prefetched--;
1733         if (!scrub->os_full_speed) {
1734                 if (scrub->os_waiting) {
1735                         scrub->os_waiting = 0;
1736                         wake_up_all(&thread->t_ctl_waitq);
1737                 }
1738         }
1739         if (locked)
1740                 spin_unlock(&scrub->os_lock);
1741
1742         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1743                 goto again;
1744
1745         if (rc)
1746                 GOTO(out, rc);
1747
1748         LASSERT(nvbuf != NULL);
1749         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1750                                        (uchar_t **)&lma, &size);
1751         if (rc || size == 0)
1752                 /* It is either non-Lustre object or OSD internal object,
1753                  * ignore it, go ahead */
1754                 goto again;
1755
1756         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1757         lustre_lma_swab(lma);
1758         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1759                      lma->lma_incompat & LMAI_AGENT))
1760                 goto again;
1761
1762         it->ooi_fid = lma->lma_self_fid;
1763
1764         GOTO(out, rc = 0);
1765
1766 out:
1767         if (nvbuf)
1768                 nvlist_free(nvbuf);
1769
1770         if (!rc && scrub->os_full_speed)
1771                 osd_otable_it_preload(env, it);
1772
1773         return rc;
1774 }
1775
1776 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1777                                         const struct dt_it *di)
1778 {
1779         return NULL;
1780 }
1781
1782 static int osd_otable_it_key_size(const struct lu_env *env,
1783                                   const struct dt_it *di)
1784 {
1785         return sizeof(__u64);
1786 }
1787
1788 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1789                              struct dt_rec *rec, __u32 attr)
1790 {
1791         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1792         struct lu_fid *fid = (struct lu_fid *)rec;
1793
1794         *fid = it->ooi_fid;
1795         return 0;
1796 }
1797
1798 static __u64 osd_otable_it_store(const struct lu_env *env,
1799                                  const struct dt_it *di)
1800 {
1801         struct osd_otable_it *it = (struct osd_otable_it *)di;
1802
1803         return it->ooi_pos;
1804 }
1805
1806 /**
1807  * Set the OSD layer iteration start position as the specified hash.
1808  */
1809 static int osd_otable_it_load(const struct lu_env *env,
1810                               const struct dt_it *di, __u64 hash)
1811 {
1812         struct osd_otable_it *it = (struct osd_otable_it *)di;
1813         struct osd_device *dev = it->ooi_dev;
1814         struct lustre_scrub *scrub = &dev->od_scrub;
1815         int rc;
1816         ENTRY;
1817
1818         /* Forbid to set iteration position after iteration started. */
1819         if (it->ooi_user_ready)
1820                 RETURN(-EPERM);
1821
1822         if (hash > OSD_OTABLE_MAX_HASH)
1823                 hash = OSD_OTABLE_MAX_HASH;
1824
1825         /* The hash is the last checkpoint position,
1826          * we will start from the next one. */
1827         it->ooi_pos = hash + 1;
1828         it->ooi_prefetched = 0;
1829         it->ooi_prefetched_dnode = 0;
1830         it->ooi_user_ready = 1;
1831         if (!scrub->os_full_speed)
1832                 wake_up_all(&scrub->os_thread.t_ctl_waitq);
1833
1834         /* Unplug OSD layer iteration by the first next() call. */
1835         rc = osd_otable_it_next(env, (struct dt_it *)it);
1836
1837         RETURN(rc);
1838 }
1839
1840 static int osd_otable_it_key_rec(const struct lu_env *env,
1841                                  const struct dt_it *di, void *key_rec)
1842 {
1843         return 0;
1844 }
1845
1846 const struct dt_index_operations osd_otable_ops = {
1847         .dio_it = {
1848                 .init     = osd_otable_it_init,
1849                 .fini     = osd_otable_it_fini,
1850                 .get      = osd_otable_it_get,
1851                 .put      = osd_otable_it_put,
1852                 .next     = osd_otable_it_next,
1853                 .key      = osd_otable_it_key,
1854                 .key_size = osd_otable_it_key_size,
1855                 .rec      = osd_otable_it_rec,
1856                 .store    = osd_otable_it_store,
1857                 .load     = osd_otable_it_load,
1858                 .key_rec  = osd_otable_it_key_rec,
1859         }
1860 };
1861
1862 /* high priority inconsistent items list APIs */
1863
1864 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1865                    const struct lu_fid *fid, uint64_t oid, bool insert)
1866 {
1867         struct lustre_scrub *scrub = &dev->od_scrub;
1868         struct ptlrpc_thread *thread = &scrub->os_thread;
1869         struct osd_inconsistent_item *oii;
1870         bool wakeup = false;
1871         ENTRY;
1872
1873         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1874         OBD_ALLOC_PTR(oii);
1875         if (unlikely(!oii))
1876                 RETURN(-ENOMEM);
1877
1878         INIT_LIST_HEAD(&oii->oii_list);
1879         oii->oii_cache.oic_dev = dev;
1880         oii->oii_cache.oic_fid = *fid;
1881         oii->oii_cache.oic_dnode = oid;
1882         oii->oii_insert = insert;
1883
1884         spin_lock(&scrub->os_lock);
1885         if (unlikely(!thread_is_running(thread))) {
1886                 spin_unlock(&scrub->os_lock);
1887                 OBD_FREE_PTR(oii);
1888                 RETURN(-EAGAIN);
1889         }
1890
1891         if (list_empty(&scrub->os_inconsistent_items))
1892                 wakeup = true;
1893         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1894         spin_unlock(&scrub->os_lock);
1895
1896         if (wakeup)
1897                 wake_up_all(&thread->t_ctl_waitq);
1898
1899         RETURN(0);
1900 }
1901
1902 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1903                    uint64_t *oid)
1904 {
1905         struct lustre_scrub *scrub = &dev->od_scrub;
1906         struct osd_inconsistent_item *oii;
1907         int ret = -ENOENT;
1908         ENTRY;
1909
1910         spin_lock(&scrub->os_lock);
1911         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1912                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1913                         *oid = oii->oii_cache.oic_dnode;
1914                         ret = 0;
1915                         break;
1916                 }
1917         }
1918         spin_unlock(&scrub->os_lock);
1919
1920         RETURN(ret);
1921 }