Whamcloud - gitweb
LU-14543 target: prevent overflowing of tgd->tgd_tot_granted
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
59 {
60         return it->ooi_prefetched < OTABLE_PREFETCH;
61 }
62
63 /**
64  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
65  *
66  * \retval   1, changed nothing
67  * \retval   0, changed successfully
68  * \retval -ve, on error
69  */
70 int osd_scrub_refresh_mapping(const struct lu_env *env,
71                               struct osd_device *dev,
72                               const struct lu_fid *fid,
73                               uint64_t oid, enum dt_txn_op ops,
74                               bool force, const char *name)
75 {
76         struct osd_thread_info *info = osd_oti_get(env);
77         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
78         char *buf = info->oti_str;
79         dmu_tx_t *tx = NULL;
80         dnode_t *dn = NULL;
81         uint64_t zapid;
82         int rc;
83         ENTRY;
84
85         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
86                 GOTO(log, rc = 0);
87
88         tx = dmu_tx_create(dev->od_os);
89         if (!tx)
90                 GOTO(log, rc = -ENOMEM);
91
92         zapid = osd_get_name_n_idx(env, dev, fid, buf,
93                                    sizeof(info->oti_str), &dn);
94         osd_tx_hold_zap(tx, zapid, dn,
95                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
96         rc = -dmu_tx_assign(tx, TXG_WAIT);
97         if (rc) {
98                 dmu_tx_abort(tx);
99                 GOTO(log, rc);
100         }
101
102         switch (ops) {
103         case DTO_INDEX_UPDATE:
104                 zde->zde_pad = 0;
105                 zde->zde_dnode = oid;
106                 zde->zde_type = 0; /* The type in OI mapping is useless. */
107                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
108                                  zde, tx);
109                 if (unlikely(rc == -ENOENT)) {
110                         /* Some unlink thread may removed the OI mapping. */
111                         rc = 1;
112                 }
113                 break;
114         case DTO_INDEX_INSERT:
115                 zde->zde_pad = 0;
116                 zde->zde_dnode = oid;
117                 zde->zde_type = 0; /* The type in OI mapping is useless. */
118                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
119                                  zde, tx);
120                 if (unlikely(rc == -EEXIST))
121                         rc = 1;
122                 break;
123         case DTO_INDEX_DELETE:
124                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
125                 if (rc == -ENOENT) {
126                         /* It is normal that the unlink thread has removed the
127                          * OI mapping already. */
128                         rc = 1;
129                 }
130                 break;
131         default:
132                 LASSERTF(0, "Unexpected ops %d\n", ops);
133                 rc = -EINVAL;
134                 break;
135         }
136
137         dmu_tx_commit(tx);
138         GOTO(log, rc);
139
140 log:
141         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
142                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
143                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
144
145         return rc;
146 }
147
148 static int
149 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
150                        const struct lu_fid *fid, uint64_t oid, int val)
151 {
152         struct lustre_scrub *scrub = &dev->od_scrub;
153         struct scrub_file *sf = &scrub->os_file;
154         struct osd_inconsistent_item *oii = NULL;
155         nvlist_t *nvbuf = NULL;
156         dnode_t *dn = NULL;
157         uint64_t oid2;
158         int ops = DTO_INDEX_UPDATE;
159         int rc;
160         ENTRY;
161
162         down_write(&scrub->os_rwsem);
163         scrub->os_new_checked++;
164         if (val < 0)
165                 GOTO(out, rc = val);
166
167         if (scrub->os_in_prior)
168                 oii = list_entry(scrub->os_inconsistent_items.next,
169                                  struct osd_inconsistent_item, oii_list);
170
171         if (oid < sf->sf_pos_latest_start && !oii)
172                 GOTO(out, rc = 0);
173
174         if (oii && oii->oii_insert) {
175                 ops = DTO_INDEX_INSERT;
176                 goto zget;
177         }
178
179         rc = osd_fid_lookup(env, dev, fid, &oid2);
180         if (rc) {
181                 if (rc != -ENOENT)
182                         GOTO(out, rc);
183
184                 ops = DTO_INDEX_INSERT;
185
186 zget:
187                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
188                 if (rc) {
189                         /* Someone removed the object by race. */
190                         if (rc == -ENOENT || rc == -EEXIST)
191                                 rc = 0;
192                         GOTO(out, rc);
193                 }
194
195                 scrub->os_full_speed = 1;
196                 sf->sf_flags |= SF_INCONSISTENT;
197         } else if (oid == oid2) {
198                 GOTO(out, rc = 0);
199         } else {
200                 struct lustre_mdt_attrs *lma = NULL;
201                 int size;
202
203                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
204                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
205                         goto update;
206                 if (rc)
207                         GOTO(out, rc);
208
209                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
210                                                (uchar_t **)&lma, &size);
211                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
212                         goto update;
213                 if (rc)
214                         GOTO(out, rc);
215
216                 lustre_lma_swab(lma);
217                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
218                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
219                                "two objects: %llu and %llu (in OI)\n",
220                                osd_name(dev), PFID(fid), oid, oid2);
221
222                         GOTO(out, rc = -EEXIST);
223                 }
224
225 update:
226                 scrub->os_full_speed = 1;
227                 sf->sf_flags |= SF_INCONSISTENT;
228         }
229
230         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
231         if (!rc) {
232                 if (scrub->os_in_prior)
233                         sf->sf_items_updated_prior++;
234                 else
235                         sf->sf_items_updated++;
236         }
237
238         GOTO(out, rc);
239
240 out:
241         if (nvbuf)
242                 nvlist_free(nvbuf);
243
244         if (rc < 0) {
245                 sf->sf_items_failed++;
246                 if (sf->sf_pos_first_inconsistent == 0 ||
247                     sf->sf_pos_first_inconsistent > oid)
248                         sf->sf_pos_first_inconsistent = oid;
249         } else {
250                 rc = 0;
251         }
252
253         /* There may be conflict unlink during the OI scrub,
254          * if happend, then remove the new added OI mapping. */
255         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
256                 osd_scrub_refresh_mapping(env, dev, fid, oid,
257                                           DTO_INDEX_DELETE, false, NULL);
258         up_write(&scrub->os_rwsem);
259
260         if (dn)
261                 osd_dnode_rele(dn);
262
263         if (oii) {
264                 spin_lock(&scrub->os_lock);
265                 if (likely(!list_empty(&oii->oii_list)))
266                         list_del(&oii->oii_list);
267                 spin_unlock(&scrub->os_lock);
268                 OBD_FREE_PTR(oii);
269         }
270
271         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
272 }
273
274 static int osd_scrub_prep(const struct lu_env *env, struct osd_device *dev)
275 {
276         struct lustre_scrub *scrub = &dev->od_scrub;
277         struct ptlrpc_thread *thread = &scrub->os_thread;
278         struct scrub_file *sf = &scrub->os_file;
279         __u32 flags = scrub->os_start_flags;
280         int rc;
281         bool drop_dryrun = false;
282         ENTRY;
283
284         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
285                scrub->os_name, flags);
286
287         down_write(&scrub->os_rwsem);
288         if (flags & SS_SET_FAILOUT)
289                 sf->sf_param |= SP_FAILOUT;
290         else if (flags & SS_CLEAR_FAILOUT)
291                 sf->sf_param &= ~SP_FAILOUT;
292
293         if (flags & SS_SET_DRYRUN) {
294                 sf->sf_param |= SP_DRYRUN;
295         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
296                 sf->sf_param &= ~SP_DRYRUN;
297                 drop_dryrun = true;
298         }
299
300         if (flags & SS_RESET)
301                 scrub_file_reset(scrub, dev->od_uuid, 0);
302
303         scrub->os_partial_scan = 0;
304         if (flags & SS_AUTO_FULL) {
305                 scrub->os_full_speed = 1;
306                 sf->sf_flags |= SF_AUTO;
307         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
308                                    SF_UPGRADE)) {
309                 scrub->os_full_speed = 1;
310         } else {
311                 scrub->os_full_speed = 0;
312         }
313
314         spin_lock(&scrub->os_lock);
315         scrub->os_in_prior = 0;
316         scrub->os_waiting = 0;
317         scrub->os_paused = 0;
318         scrub->os_in_join = 0;
319         scrub->os_full_scrub = 0;
320         spin_unlock(&scrub->os_lock);
321         scrub->os_new_checked = 0;
322         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
323                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
324         else if (sf->sf_pos_last_checkpoint != 0)
325                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
326         else
327                 sf->sf_pos_latest_start = 1;
328
329         scrub->os_pos_current = sf->sf_pos_latest_start;
330         sf->sf_status = SS_SCANNING;
331         sf->sf_time_latest_start = ktime_get_real_seconds();
332         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
333         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
334         rc = scrub_file_store(env, scrub);
335         if (!rc) {
336                 spin_lock(&scrub->os_lock);
337                 thread_set_flags(thread, SVC_RUNNING);
338                 spin_unlock(&scrub->os_lock);
339                 wake_up_all(&thread->t_ctl_waitq);
340         }
341         up_write(&scrub->os_rwsem);
342
343         RETURN(rc);
344 }
345
346 static int osd_scrub_post(const struct lu_env *env, struct osd_device *dev,
347                           int result)
348 {
349         struct lustre_scrub *scrub = &dev->od_scrub;
350         struct scrub_file *sf = &scrub->os_file;
351         int rc;
352         ENTRY;
353
354         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
355                scrub->os_name, result);
356
357         down_write(&scrub->os_rwsem);
358         spin_lock(&scrub->os_lock);
359         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
360         spin_unlock(&scrub->os_lock);
361         if (scrub->os_new_checked > 0) {
362                 sf->sf_items_checked += scrub->os_new_checked;
363                 scrub->os_new_checked = 0;
364                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
365         }
366         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
367         if (result > 0) {
368                 sf->sf_status = SS_COMPLETED;
369                 if (!(sf->sf_param & SP_DRYRUN)) {
370                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
371                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
372                                           SF_UPGRADE | SF_AUTO);
373                 }
374                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
375                 sf->sf_success_count++;
376         } else if (result == 0) {
377                 if (scrub->os_paused)
378                         sf->sf_status = SS_PAUSED;
379                 else
380                         sf->sf_status = SS_STOPPED;
381         } else {
382                 sf->sf_status = SS_FAILED;
383         }
384         sf->sf_run_time += ktime_get_seconds() -
385                            scrub->os_time_last_checkpoint;
386
387         rc = scrub_file_store(env, scrub);
388         up_write(&scrub->os_rwsem);
389
390         RETURN(rc < 0 ? rc : result);
391 }
392
393 /* iteration engine */
394
395 static inline int
396 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
397 {
398         spin_lock(&scrub->os_lock);
399         if (osd_scrub_has_window(it) ||
400             !list_empty(&scrub->os_inconsistent_items) ||
401             it->ooi_waiting || !thread_is_running(&scrub->os_thread))
402                 scrub->os_waiting = 0;
403         else
404                 scrub->os_waiting = 1;
405         spin_unlock(&scrub->os_lock);
406
407         return !scrub->os_waiting;
408 }
409
410 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
411                           struct lu_fid *fid, uint64_t *oid)
412 {
413         struct l_wait_info lwi = { 0 };
414         struct lustre_scrub *scrub = &dev->od_scrub;
415         struct ptlrpc_thread *thread = &scrub->os_thread;
416         struct osd_otable_it *it = dev->od_otable_it;
417         struct lustre_mdt_attrs *lma = NULL;
418         nvlist_t *nvbuf = NULL;
419         int size = 0;
420         int rc = 0;
421         ENTRY;
422
423         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
424                 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val), NULL, NULL);
425                 if (likely(lwi.lwi_timeout > 0)) {
426                         l_wait_event(thread->t_ctl_waitq,
427                                 !list_empty(&scrub->os_inconsistent_items) ||
428                                 !thread_is_running(thread),
429                                 &lwi);
430                         if (unlikely(!thread_is_running(thread)))
431                                 RETURN(SCRUB_NEXT_EXIT);
432                 }
433         }
434
435         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
436                 spin_lock(&scrub->os_lock);
437                 thread_set_flags(thread, SVC_STOPPING);
438                 spin_unlock(&scrub->os_lock);
439                 RETURN(SCRUB_NEXT_CRASH);
440         }
441
442         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
443                 RETURN(SCRUB_NEXT_FATAL);
444
445 again:
446         if (nvbuf) {
447                 nvlist_free(nvbuf);
448                 nvbuf = NULL;
449                 lma = NULL;
450         }
451
452         if (!list_empty(&scrub->os_inconsistent_items)) {
453                 spin_lock(&scrub->os_lock);
454                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
455                         struct osd_inconsistent_item *oii;
456
457                         oii = list_entry(scrub->os_inconsistent_items.next,
458                                 struct osd_inconsistent_item, oii_list);
459                         *fid = oii->oii_cache.oic_fid;
460                         *oid = oii->oii_cache.oic_dnode;
461                         scrub->os_in_prior = 1;
462                         spin_unlock(&scrub->os_lock);
463
464                         GOTO(out, rc = 0);
465                 }
466                 spin_unlock(&scrub->os_lock);
467         }
468
469         if (!scrub->os_full_speed && !osd_scrub_has_window(it)) {
470                 memset(&lwi, 0, sizeof(lwi));
471                 l_wait_event(thread->t_ctl_waitq,
472                              osd_scrub_wakeup(scrub, it),
473                              &lwi);
474         }
475
476         if (unlikely(!thread_is_running(thread)))
477                 GOTO(out, rc = SCRUB_NEXT_EXIT);
478
479         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
480         if (rc)
481                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
482
483         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
484         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
485                 goto again;
486
487         if (rc)
488                 GOTO(out, rc);
489
490         LASSERT(nvbuf != NULL);
491         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
492                                        (uchar_t **)&lma, &size);
493         if (!rc) {
494                 lustre_lma_swab(lma);
495                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
496                            !(lma->lma_incompat & LMAI_AGENT))) {
497                         *fid = lma->lma_self_fid;
498                         *oid = scrub->os_pos_current;
499
500                         GOTO(out, rc = 0);
501                 }
502         }
503
504         if (!scrub->os_full_speed) {
505                 spin_lock(&scrub->os_lock);
506                 it->ooi_prefetched++;
507                 if (it->ooi_waiting) {
508                         it->ooi_waiting = 0;
509                         wake_up_all(&thread->t_ctl_waitq);
510                 }
511                 spin_unlock(&scrub->os_lock);
512         }
513
514         goto again;
515
516 out:
517         if (nvbuf)
518                 nvlist_free(nvbuf);
519
520         return rc;
521 }
522
523 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
524                           const struct lu_fid *fid, uint64_t oid, int rc)
525 {
526         struct lustre_scrub *scrub = &dev->od_scrub;
527         struct ptlrpc_thread *thread = &scrub->os_thread;
528         struct osd_otable_it *it = dev->od_otable_it;
529
530         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
531         if (!scrub->os_in_prior) {
532                 if (!scrub->os_full_speed) {
533                         spin_lock(&scrub->os_lock);
534                         it->ooi_prefetched++;
535                         if (it->ooi_waiting) {
536                                 it->ooi_waiting = 0;
537                                 wake_up_all(&thread->t_ctl_waitq);
538                         }
539                         spin_unlock(&scrub->os_lock);
540                 }
541         } else {
542                 scrub->os_in_prior = 0;
543         }
544
545         if (rc)
546                 return rc;
547
548         rc = scrub_checkpoint(env, scrub);
549         if (rc) {
550                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
551                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
552                 /* Continue, as long as the scrub itself can go ahead. */
553         }
554
555         return 0;
556 }
557
558 static int osd_scrub_main(void *args)
559 {
560         struct lu_env env;
561         struct osd_device *dev = (struct osd_device *)args;
562         struct lustre_scrub *scrub = &dev->od_scrub;
563         struct ptlrpc_thread *thread = &scrub->os_thread;
564         struct lu_fid *fid;
565         uint64_t oid;
566         int rc = 0;
567         ENTRY;
568
569         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
570         if (rc) {
571                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
572                        scrub->os_name, rc);
573                 GOTO(noenv, rc);
574         }
575
576         rc = osd_scrub_prep(&env, dev);
577         if (rc) {
578                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
579                        scrub->os_name, rc);
580                 GOTO(out, rc);
581         }
582
583         if (!scrub->os_full_speed) {
584                 struct l_wait_info lwi = { 0 };
585                 struct osd_otable_it *it = dev->od_otable_it;
586
587                 l_wait_event(thread->t_ctl_waitq,
588                              it->ooi_user_ready || !thread_is_running(thread),
589                              &lwi);
590                 if (unlikely(!thread_is_running(thread)))
591                         GOTO(post, rc = 0);
592
593                 scrub->os_pos_current = it->ooi_pos;
594         }
595
596         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
597                scrub->os_name, scrub->os_start_flags,
598                scrub->os_pos_current);
599
600         fid = &osd_oti_get(&env)->oti_fid;
601         while (!rc && thread_is_running(thread)) {
602                 rc = osd_scrub_next(&env, dev, fid, &oid);
603                 switch (rc) {
604                 case SCRUB_NEXT_EXIT:
605                         GOTO(post, rc = 0);
606                 case SCRUB_NEXT_CRASH:
607                         spin_lock(&scrub->os_lock);
608                         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
609                         spin_unlock(&scrub->os_lock);
610                         GOTO(out, rc = -EINVAL);
611                 case SCRUB_NEXT_FATAL:
612                         GOTO(post, rc = -EINVAL);
613                 case SCRUB_NEXT_BREAK:
614                         GOTO(post, rc = 1);
615                 }
616
617                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
618         }
619
620         GOTO(post, rc);
621
622 post:
623         rc = osd_scrub_post(&env, dev, rc);
624         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
625                scrub->os_name, scrub->os_pos_current, rc);
626
627 out:
628         while (!list_empty(&scrub->os_inconsistent_items)) {
629                 struct osd_inconsistent_item *oii;
630
631                 oii = list_entry(scrub->os_inconsistent_items.next,
632                                  struct osd_inconsistent_item, oii_list);
633                 list_del_init(&oii->oii_list);
634                 OBD_FREE_PTR(oii);
635         }
636
637         lu_env_fini(&env);
638
639 noenv:
640         spin_lock(&scrub->os_lock);
641         thread_set_flags(thread, SVC_STOPPED);
642         wake_up_all(&thread->t_ctl_waitq);
643         spin_unlock(&scrub->os_lock);
644         return rc;
645 }
646
647 /* initial OI scrub */
648
649 struct osd_lf_map;
650
651 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
652                                const char *, uint64_t, uint64_t,
653                                enum osd_lf_flags, bool);
654 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
655                              const char *, uint64_t, uint64_t,
656                              enum osd_lf_flags, bool);
657 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
658                           const char *, uint64_t, uint64_t,
659                           enum osd_lf_flags, bool);
660
661 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
662                           uint64_t, handle_dirent_t, enum osd_lf_flags);
663 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
664                               uint64_t, handle_dirent_t, enum osd_lf_flags);
665 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
666                            uint64_t, handle_dirent_t, enum osd_lf_flags);
667
668 struct osd_lf_map {
669         char                    *olm_name;
670         struct lu_fid            olm_fid;
671         enum osd_lf_flags        olm_flags;
672         scan_dir_t               olm_scan_dir;
673         handle_dirent_t          olm_handle_dirent;
674 };
675
676 /* Add the new introduced local files in the list in the future. */
677 static const struct osd_lf_map osd_lf_maps[] = {
678         /* CONFIGS */
679         {
680                 .olm_name               = MOUNT_CONFIGS_DIR,
681                 .olm_fid                = {
682                         .f_seq  = FID_SEQ_LOCAL_FILE,
683                         .f_oid  = MGS_CONFIGS_OID,
684                 },
685                 .olm_flags              = OLF_SCAN_SUBITEMS,
686                 .olm_scan_dir           = osd_ios_general_sd,
687                 .olm_handle_dirent      = osd_ios_varfid_hd,
688         },
689
690         /* NIDTBL_VERSIONS */
691         {
692                 .olm_name               = MGS_NIDTBL_DIR,
693                 .olm_flags              = OLF_SCAN_SUBITEMS,
694                 .olm_scan_dir           = osd_ios_general_sd,
695                 .olm_handle_dirent      = osd_ios_varfid_hd,
696         },
697
698         /* PENDING */
699         {
700                 .olm_name               = MDT_ORPHAN_DIR,
701         },
702
703         /* ROOT */
704         {
705                 .olm_name               = "ROOT",
706                 .olm_fid                = {
707                         .f_seq  = FID_SEQ_ROOT,
708                         .f_oid  = FID_OID_ROOT,
709                 },
710                 .olm_flags              = OLF_SCAN_SUBITEMS,
711                 .olm_scan_dir           = osd_ios_ROOT_sd,
712         },
713
714         /* fld */
715         {
716                 .olm_name               = "fld",
717                 .olm_fid                = {
718                         .f_seq  = FID_SEQ_LOCAL_FILE,
719                         .f_oid  = FLD_INDEX_OID,
720                 },
721         },
722
723         /* changelog_catalog */
724         {
725                 .olm_name               = CHANGELOG_CATALOG,
726         },
727
728         /* changelog_users */
729         {
730                 .olm_name               = CHANGELOG_USERS,
731         },
732
733         /* quota_master */
734         {
735                 .olm_name               = QMT_DIR,
736                 .olm_flags              = OLF_SCAN_SUBITEMS,
737                 .olm_scan_dir           = osd_ios_general_sd,
738                 .olm_handle_dirent      = osd_ios_varfid_hd,
739         },
740
741         /* quota_slave */
742         {
743                 .olm_name               = QSD_DIR,
744                 .olm_flags              = OLF_SCAN_SUBITEMS,
745                 .olm_scan_dir           = osd_ios_general_sd,
746                 .olm_handle_dirent      = osd_ios_varfid_hd,
747         },
748
749         /* LFSCK */
750         {
751                 .olm_name               = LFSCK_DIR,
752                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
753                 .olm_scan_dir           = osd_ios_general_sd,
754                 .olm_handle_dirent      = osd_ios_varfid_hd,
755         },
756
757         /* lfsck_bookmark */
758         {
759                 .olm_name               = LFSCK_BOOKMARK,
760         },
761
762         /* lfsck_layout */
763         {
764                 .olm_name               = LFSCK_LAYOUT,
765         },
766
767         /* lfsck_namespace */
768         {
769                 .olm_name               = LFSCK_NAMESPACE,
770         },
771
772         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
773          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
774          * for more details. */
775
776         /* update_log */
777         {
778                 .olm_name               = "update_log",
779                 .olm_fid                = {
780                         .f_seq  = FID_SEQ_UPDATE_LOG,
781                 },
782                 .olm_flags              = OLF_IDX_IN_FID,
783         },
784
785         /* update_log_dir */
786         {
787                 .olm_name               = "update_log_dir",
788                 .olm_fid        = {
789                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
790                 },
791                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
792                 .olm_scan_dir           = osd_ios_general_sd,
793                 .olm_handle_dirent      = osd_ios_uld_hd,
794         },
795
796         /* hsm_actions */
797         {
798                 .olm_name               = HSM_ACTIONS,
799         },
800
801         /* nodemap */
802         {
803                 .olm_name               = LUSTRE_NODEMAP_NAME,
804         },
805
806         /* index_backup */
807         {
808                 .olm_name               = INDEX_BACKUP_DIR,
809                 .olm_fid                = {
810                         .f_seq  = FID_SEQ_LOCAL_FILE,
811                         .f_oid  = INDEX_BACKUP_OID,
812                 },
813                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
814                 .olm_scan_dir           = osd_ios_general_sd,
815                 .olm_handle_dirent      = osd_ios_varfid_hd,
816         },
817
818         {
819                 .olm_name               = NULL
820         }
821 };
822
823 /* Add the new introduced files under .lustre/ in the list in the future. */
824 static const struct osd_lf_map osd_dl_maps[] = {
825         /* .lustre/fid */
826         {
827                 .olm_name               = "fid",
828                 .olm_fid                = {
829                         .f_seq  = FID_SEQ_DOT_LUSTRE,
830                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
831                 },
832         },
833
834         /* .lustre/lost+found */
835         {
836                 .olm_name               = "lost+found",
837                 .olm_fid                = {
838                         .f_seq  = FID_SEQ_DOT_LUSTRE,
839                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
840                 },
841         },
842
843         {
844                 .olm_name               = NULL
845         }
846 };
847
848 struct osd_ios_item {
849         struct list_head        oii_list;
850         uint64_t                oii_parent;
851         enum osd_lf_flags       oii_flags;
852         scan_dir_t              oii_scan_dir;
853         handle_dirent_t         oii_handle_dirent;
854 };
855
856 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
857                             enum osd_lf_flags flags, scan_dir_t scan_dir,
858                             handle_dirent_t handle_dirent)
859 {
860         struct osd_ios_item *item;
861
862         OBD_ALLOC_PTR(item);
863         if (!item) {
864                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
865                       osd_name(dev), parent);
866                 return -ENOMEM;
867         }
868
869         INIT_LIST_HEAD(&item->oii_list);
870         item->oii_parent = parent;
871         item->oii_flags = flags;
872         item->oii_scan_dir = scan_dir;
873         item->oii_handle_dirent = handle_dirent;
874         list_add_tail(&item->oii_list, &dev->od_ios_list);
875
876         return 0;
877 }
878
879 static bool osd_index_need_recreate(const struct lu_env *env,
880                                     struct osd_device *dev, uint64_t oid)
881 {
882         struct osd_thread_info *info = osd_oti_get(env);
883         zap_attribute_t *za = &info->oti_za2;
884         zap_cursor_t *zc = &info->oti_zc2;
885         int rc;
886         ENTRY;
887
888         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
889         rc = -zap_cursor_retrieve(zc, za);
890         zap_cursor_fini(zc);
891         if (rc && rc != -ENOENT)
892                 RETURN(true);
893
894         RETURN(false);
895 }
896
897 static void osd_ios_index_register(const struct lu_env *env,
898                                    struct osd_device *osd,
899                                    const struct lu_fid *fid, uint64_t oid)
900 {
901         struct osd_thread_info *info = osd_oti_get(env);
902         zap_attribute_t *za = &info->oti_za2;
903         zap_cursor_t *zc = &info->oti_zc2;
904         struct zap_leaf_entry *le;
905         dnode_t *dn = NULL;
906         sa_handle_t *hdl;
907         __u64 mode = 0;
908         __u32 keysize = 0;
909         __u32 recsize = 0;
910         int rc;
911         ENTRY;
912
913         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
914         if (rc == -EEXIST || rc == -ENOENT)
915                 RETURN_EXIT;
916
917         if (rc < 0)
918                 GOTO(log, rc);
919
920         if (!osd_object_is_zap(dn))
921                 GOTO(log, rc = 1);
922
923         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
924         if (rc)
925                 GOTO(log, rc);
926
927         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
928         sa_handle_destroy(hdl);
929         if (rc)
930                 GOTO(log, rc);
931
932         if (!S_ISREG(mode))
933                 GOTO(log, rc = 1);
934
935         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
936         rc = -zap_cursor_retrieve(zc, za);
937         if (rc)
938                 /* Skip empty index object */
939                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
940
941         if (zc->zc_zap->zap_ismicro ||
942             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
943                 GOTO(fini, rc = 1);
944
945         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
946         keysize = le->le_name_numints * 8;
947         recsize = za->za_integer_length * za->za_num_integers;
948         if (likely(keysize && recsize))
949                 rc = osd_index_register(osd, fid, keysize, recsize);
950
951         GOTO(fini, rc);
952
953 fini:
954         zap_cursor_fini(zc);
955
956 log:
957         if (dn)
958                 osd_dnode_rele(dn);
959         if (rc < 0)
960                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
961                       osd_name(osd), PFID(fid), keysize, recsize, rc);
962         else if (!rc)
963                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
964                        osd_name(osd), PFID(fid), keysize, recsize);
965 }
966
967 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
968                               struct lustre_index_restore_unit *liru, void *buf,
969                               int bufsize)
970 {
971         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
972         struct lu_fid *tgt_fid = &liru->liru_cfid;
973         struct lu_fid bak_fid;
974         int rc;
975         ENTRY;
976
977         lustre_fid2lbx(buf, tgt_fid, bufsize);
978         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
979                          sizeof(*zde) / 8, (void *)zde);
980         if (rc)
981                 GOTO(log, rc);
982
983         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
984         if (rc)
985                 GOTO(log, rc);
986
987         /* The OI mapping for index may be invalid, since it will be
988          * re-created, not update the OI mapping, just cache it in RAM. */
989         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
990                                             liru->liru_clid);
991         if (!rc)
992                 rc = lustre_index_restore(env, &dev->od_dt_dev,
993                                 &liru->liru_pfid, tgt_fid, &bak_fid,
994                                 liru->liru_name, &dev->od_index_backup_list,
995                                 &dev->od_lock, buf, bufsize);
996         GOTO(log, rc);
997
998 log:
999         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
1000                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
1001 }
1002
1003 /**
1004  * verify FID-in-LMA and OI entry for one object
1005  *
1006  * ios: Initial OI Scrub.
1007  */
1008 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
1009                             const struct lu_fid *fid, uint64_t parent,
1010                             uint64_t oid, const char *name,
1011                             enum osd_lf_flags flags)
1012 {
1013         struct lustre_scrub *scrub = &dev->od_scrub;
1014         struct scrub_file *sf = &scrub->os_file;
1015         struct lustre_mdt_attrs *lma = NULL;
1016         nvlist_t *nvbuf = NULL;
1017         struct lu_fid tfid;
1018         uint64_t oid2 = 0;
1019         __u64 flag = 0;
1020         int size = 0;
1021         int op = 0;
1022         int rc;
1023         ENTRY;
1024
1025         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
1026         if (unlikely(rc == -ENOENT || rc == -EEXIST))
1027                 RETURN(0);
1028
1029         if (rc && rc != -ENODATA) {
1030                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
1031                       "rc = %d\n", osd_name(dev), oid, rc);
1032
1033                 RETURN(rc);
1034         }
1035
1036         if (!rc) {
1037                 LASSERT(nvbuf != NULL);
1038                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1039                                                (uchar_t **)&lma, &size);
1040                 if (rc || size == 0) {
1041                         LASSERT(lma == NULL);
1042                         rc = -ENODATA;
1043                 } else {
1044                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1045                         lustre_lma_swab(lma);
1046                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
1047                                 nvlist_free(nvbuf);
1048                                 RETURN(0);
1049                         }
1050
1051                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
1052                             osd_index_need_recreate(env, dev, oid)) {
1053                                 if (parent == dev->od_root) {
1054                                         lu_local_obj_fid(&tfid,
1055                                                          OSD_FS_ROOT_OID);
1056                                 } else {
1057                                         rc = osd_get_fid_by_oid(env, dev,
1058                                                                 parent, &tfid);
1059                                         if (rc) {
1060                                                 nvlist_free(nvbuf);
1061                                                 RETURN(rc);
1062                                         }
1063                                 }
1064
1065                                 rc = lustre_liru_new(
1066                                                 &dev->od_index_restore_list,
1067                                                 &tfid, &lma->lma_self_fid, oid,
1068                                                 name, strlen(name));
1069                                 nvlist_free(nvbuf);
1070                                 RETURN(rc);
1071                         }
1072
1073                         tfid = lma->lma_self_fid;
1074                         if (!(flags & OLF_NOT_BACKUP))
1075                                 osd_ios_index_register(env, dev, &tfid, oid);
1076                 }
1077                 nvlist_free(nvbuf);
1078         }
1079
1080         if (rc == -ENODATA) {
1081                 if (!fid) {
1082                         /* Skip the object without FID-in-LMA */
1083                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1084                                osd_name(dev), oid);
1085
1086                         RETURN(0);
1087                 }
1088
1089                 LASSERT(!fid_is_zero(fid));
1090
1091                 tfid = *fid;
1092                 if (flags & OLF_IDX_IN_FID) {
1093                         LASSERT(dev->od_index >= 0);
1094
1095                         tfid.f_oid = dev->od_index;
1096                 }
1097         }
1098
1099         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1100         if (rc) {
1101                 if (rc != -ENOENT) {
1102                         CWARN("%s: initial OI scrub failed to lookup fid for "
1103                               DFID"=>%llu: rc = %d\n",
1104                               osd_name(dev), PFID(&tfid), oid, rc);
1105
1106                         RETURN(rc);
1107                 }
1108
1109                 flag = SF_RECREATED;
1110                 op = DTO_INDEX_INSERT;
1111         } else {
1112                 if (oid == oid2)
1113                         RETURN(0);
1114
1115                 flag = SF_INCONSISTENT;
1116                 op = DTO_INDEX_UPDATE;
1117         }
1118
1119         if (!(sf->sf_flags & flag)) {
1120                 scrub_file_reset(scrub, dev->od_uuid, flag);
1121                 rc = scrub_file_store(env, scrub);
1122                 if (rc)
1123                         RETURN(rc);
1124         }
1125
1126         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1127
1128         RETURN(rc > 0 ? 0 : rc);
1129 }
1130
1131 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1132                              const char *name, uint64_t parent, uint64_t oid,
1133                              enum osd_lf_flags flags, bool is_dir)
1134 {
1135         int rc;
1136         ENTRY;
1137
1138         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1139         if (!rc && is_dir)
1140                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1141                                       osd_ios_varfid_hd);
1142
1143         RETURN(rc);
1144 }
1145
1146 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1147                           const char *name, uint64_t parent, uint64_t oid,
1148                           enum osd_lf_flags flags, bool is_dir)
1149 {
1150         struct lu_fid tfid;
1151         int rc;
1152         ENTRY;
1153
1154         /* skip any non-DFID format name */
1155         if (name[0] != '[')
1156                 RETURN(0);
1157
1158         /* skip the start '[' */
1159         sscanf(&name[1], SFID, RFID(&tfid));
1160         if (fid_is_sane(&tfid))
1161                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1162         else
1163                 rc = -EIO;
1164
1165         RETURN(rc);
1166 }
1167
1168 /*
1169  * General scanner for the directories execpt /ROOT during initial OI scrub.
1170  * It scans the name entries under the given directory one by one. For each
1171  * entry, verifies its OI mapping via the given @handle_dirent.
1172  */
1173 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1174                               uint64_t parent, handle_dirent_t handle_dirent,
1175                               enum osd_lf_flags flags)
1176 {
1177         struct osd_thread_info *info = osd_oti_get(env);
1178         struct luz_direntry *zde = &info->oti_zde;
1179         zap_attribute_t *za = &info->oti_za;
1180         zap_cursor_t *zc = &info->oti_zc;
1181         int rc;
1182         ENTRY;
1183
1184         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1185         rc = -zap_cursor_retrieve(zc, za);
1186         if (rc == -ENOENT)
1187                 zap_cursor_advance(zc);
1188         else if (rc)
1189                 GOTO(log, rc);
1190
1191         while (1) {
1192                 rc = -zap_cursor_retrieve(zc, za);
1193                 if (rc)
1194                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1195
1196                 /* skip the entry started with '.' */
1197                 if (likely(za->za_name[0] != '.')) {
1198                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1199                                         za->za_integer_length,
1200                                         sizeof(*zde) / za->za_integer_length,
1201                                         (void *)zde);
1202                         if (rc) {
1203                                 CWARN("%s: initial OI scrub failed to lookup "
1204                                       "%s under %llu: rc = %d\n",
1205                                       osd_name(dev), za->za_name, parent, rc);
1206                                 continue;
1207                         }
1208
1209                         rc = handle_dirent(env, dev, za->za_name, parent,
1210                                         zde->lzd_reg.zde_dnode, flags,
1211                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1212                                         true : false);
1213                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1214                                "%llu: rc = %d\n",
1215                                osd_name(dev), za->za_name, parent, rc);
1216                 }
1217
1218                 zap_cursor_advance(zc);
1219         }
1220
1221 log:
1222         if (rc)
1223                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1224                       "rc = %d\n", osd_name(dev), parent, rc);
1225         zap_cursor_fini(zc);
1226
1227         return rc;
1228 }
1229
1230 /*
1231  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1232  * be scanned during the initial OI scrub, instead, only the .lustre and the
1233  * sub-items under .lustre will be handled.
1234  */
1235 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1236                            uint64_t parent, handle_dirent_t handle_dirent,
1237                            enum osd_lf_flags flags)
1238 {
1239         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1240         const struct osd_lf_map *map;
1241         uint64_t oid;
1242         int rc;
1243         int rc1 = 0;
1244         ENTRY;
1245
1246         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1247                             sizeof(*zde) / 8, (void *)zde);
1248         if (rc == -ENOENT) {
1249                 /* The .lustre directory is lost. That is not fatal. It can
1250                  * be re-created in the subsequent MDT start processing. */
1251                 RETURN(0);
1252         }
1253
1254         if (rc) {
1255                 CWARN("%s: initial OI scrub failed to find .lustre: "
1256                       "rc = %d\n", osd_name(dev), rc);
1257
1258                 RETURN(rc);
1259         }
1260
1261         oid = zde->lzd_reg.zde_dnode;
1262         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1263                               dot_lustre_name, 0);
1264         if (rc)
1265                 RETURN(rc);
1266
1267         for (map = osd_dl_maps; map->olm_name; map++) {
1268                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1269                                     sizeof(*zde) / 8, (void *)zde);
1270                 if (rc) {
1271                         if (rc != -ENOENT)
1272                                 CWARN("%s: initial OI scrub failed to find"
1273                                       "the entry %s under .lustre: rc = %d\n",
1274                                       osd_name(dev), map->olm_name, rc);
1275                         else if (!fid_is_zero(&map->olm_fid))
1276                                 /* Try to remove the stale OI mapping. */
1277                                 osd_scrub_refresh_mapping(env, dev,
1278                                                 &map->olm_fid, 0,
1279                                                 DTO_INDEX_DELETE, true,
1280                                                 map->olm_name);
1281                         continue;
1282                 }
1283
1284                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1285                                       zde->lzd_reg.zde_dnode, map->olm_name,
1286                                       map->olm_flags);
1287                 if (rc)
1288                         rc1 = rc;
1289         }
1290
1291         RETURN(rc1);
1292 }
1293
1294 static void osd_initial_OI_scrub(const struct lu_env *env,
1295                                  struct osd_device *dev)
1296 {
1297         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1298         const struct osd_lf_map *map;
1299         int rc;
1300         ENTRY;
1301
1302         for (map = osd_lf_maps; map->olm_name; map++) {
1303                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1304                                     sizeof(*zde) / 8, (void *)zde);
1305                 if (rc) {
1306                         if (rc != -ENOENT)
1307                                 CWARN("%s: initial OI scrub failed "
1308                                       "to find the entry %s: rc = %d\n",
1309                                       osd_name(dev), map->olm_name, rc);
1310                         else if (!fid_is_zero(&map->olm_fid))
1311                                 /* Try to remove the stale OI mapping. */
1312                                 osd_scrub_refresh_mapping(env, dev,
1313                                                 &map->olm_fid, 0,
1314                                                 DTO_INDEX_DELETE, true,
1315                                                 map->olm_name);
1316                         continue;
1317                 }
1318
1319                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1320                                       zde->lzd_reg.zde_dnode, map->olm_name,
1321                                       map->olm_flags);
1322                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1323                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1324                                          map->olm_flags, map->olm_scan_dir,
1325                                          map->olm_handle_dirent);
1326         }
1327
1328         while (!list_empty(&dev->od_ios_list)) {
1329                 struct osd_ios_item *item;
1330
1331                 item = list_entry(dev->od_ios_list.next,
1332                                   struct osd_ios_item, oii_list);
1333                 list_del_init(&item->oii_list);
1334                 item->oii_scan_dir(env, dev, item->oii_parent,
1335                                    item->oii_handle_dirent, item->oii_flags);
1336                 OBD_FREE_PTR(item);
1337         }
1338
1339         if (!list_empty(&dev->od_index_restore_list)) {
1340                 char *buf;
1341
1342                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1343                 if (!buf)
1344                         CERROR("%s: not enough RAM for rebuild index\n",
1345                                osd_name(dev));
1346
1347                 while (!list_empty(&dev->od_index_restore_list)) {
1348                         struct lustre_index_restore_unit *liru;
1349
1350                         liru = list_entry(dev->od_index_restore_list.next,
1351                                           struct lustre_index_restore_unit,
1352                                           liru_link);
1353                         list_del(&liru->liru_link);
1354                         if (buf)
1355                                 osd_index_restore(env, dev, liru, buf,
1356                                                   INDEX_BACKUP_BUFSIZE);
1357                         OBD_FREE(liru, liru->liru_len);
1358                 }
1359
1360                 if (buf)
1361                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1362         }
1363
1364         EXIT;
1365 }
1366
1367 /* OI scrub start/stop */
1368
1369 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1370                     __u32 flags)
1371 {
1372         int rc;
1373         ENTRY;
1374
1375         if (dev->od_dt_dev.dd_rdonly)
1376                 RETURN(-EROFS);
1377
1378         /* od_otable_sem: prevent concurrent start/stop */
1379         down(&dev->od_otable_sem);
1380         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1381         up(&dev->od_otable_sem);
1382
1383         RETURN(rc == -EALREADY ? 0 : rc);
1384 }
1385
1386 void osd_scrub_stop(struct osd_device *dev)
1387 {
1388         struct lustre_scrub *scrub = &dev->od_scrub;
1389         ENTRY;
1390
1391         /* od_otable_sem: prevent concurrent start/stop */
1392         down(&dev->od_otable_sem);
1393         scrub->os_paused = 1;
1394         scrub_stop(scrub);
1395         up(&dev->od_otable_sem);
1396
1397         EXIT;
1398 }
1399
1400 /* OI scrub setup/cleanup */
1401
1402 static const char osd_scrub_name[] = "OI_scrub";
1403
1404 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev,
1405                     bool resetoi)
1406 {
1407         struct osd_thread_info *info = osd_oti_get(env);
1408         struct lustre_scrub *scrub = &dev->od_scrub;
1409         struct scrub_file *sf = &scrub->os_file;
1410         struct lu_fid *fid = &info->oti_fid;
1411         struct dt_object *obj;
1412         uint64_t oid;
1413         int rc = 0;
1414         bool dirty = false;
1415         ENTRY;
1416
1417         memcpy(dev->od_uuid,
1418                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1419                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1420         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1421         init_waitqueue_head(&scrub->os_thread.t_ctl_waitq);
1422         init_rwsem(&scrub->os_rwsem);
1423         spin_lock_init(&scrub->os_lock);
1424         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1425         scrub->os_name = osd_name(dev);
1426
1427         /* 'What the @fid is' is not imporatant, because the object
1428          * has no OI mapping, and only is visible inside the OSD.*/
1429         fid->f_seq = FID_SEQ_IGIF_MAX;
1430         if (dev->od_is_ost)
1431                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1432         else
1433                 fid->f_oid = dev->od_index + 1;
1434         fid->f_ver = 0;
1435         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1436                                     osd_scrub_name, &oid, fid, false);
1437         if (rc)
1438                 RETURN(rc);
1439
1440         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1441         if (rc)
1442                 RETURN(rc);
1443
1444         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1445         if (IS_ERR_OR_NULL(obj))
1446                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1447
1448         obj->do_body_ops = &osd_body_scrub_ops;
1449         scrub->os_obj = obj;
1450         rc = scrub_file_load(env, scrub);
1451         if (rc == -ENOENT || rc == -EFAULT) {
1452                 scrub_file_init(scrub, dev->od_uuid);
1453                 dirty = true;
1454         } else if (rc < 0) {
1455                 GOTO(cleanup_obj, rc);
1456         } else {
1457                 if (memcmp(sf->sf_uuid, dev->od_uuid, 16) != 0) {
1458                         struct obd_uuid *old_uuid;
1459                         struct obd_uuid *new_uuid;
1460
1461                         OBD_ALLOC_PTR(old_uuid);
1462                         OBD_ALLOC_PTR(new_uuid);
1463                         if (!old_uuid || !new_uuid) {
1464                                 CERROR("%s: UUID has been changed, but"
1465                                        "failed to allocate RAM for report\n",
1466                                        osd_name(dev));
1467                         } else {
1468                                 class_uuid_unparse(sf->sf_uuid, old_uuid);
1469                                 class_uuid_unparse(dev->od_uuid, new_uuid);
1470                                 CDEBUG(D_LFSCK, "%s: UUID has been changed "
1471                                        "from %s to %s\n", osd_name(dev),
1472                                        old_uuid->uuid, new_uuid->uuid);
1473                         }
1474                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1475                         dirty = true;
1476                         if (old_uuid)
1477                                 OBD_FREE_PTR(old_uuid);
1478                         if (new_uuid)
1479                                 OBD_FREE_PTR(new_uuid);
1480                 } else if (sf->sf_status == SS_SCANNING) {
1481                         sf->sf_status = SS_CRASHED;
1482                         dirty = true;
1483                 }
1484
1485                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1486                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1487                                       osd_name(dev), sf->sf_oi_count,
1488                                       osd_oi_count);
1489                         sf->sf_oi_count = osd_oi_count;
1490                         dirty = true;
1491                 }
1492         }
1493
1494         if (sf->sf_pos_last_checkpoint != 0)
1495                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1496         else
1497                 scrub->os_pos_current = 1;
1498
1499         if (dirty) {
1500                 rc = scrub_file_store(env, scrub);
1501                 if (rc)
1502                         GOTO(cleanup_obj, rc);
1503         }
1504
1505         /* Initialize OI files. */
1506         rc = osd_oi_init(env, dev, resetoi);
1507         if (rc < 0)
1508                 GOTO(cleanup_obj, rc);
1509
1510         if (!dev->od_dt_dev.dd_rdonly)
1511                 osd_initial_OI_scrub(env, dev);
1512
1513         if (!dev->od_dt_dev.dd_rdonly &&
1514             dev->od_auto_scrub_interval != AS_NEVER &&
1515             ((sf->sf_status == SS_PAUSED) ||
1516              (sf->sf_status == SS_CRASHED &&
1517               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1518                               SF_UPGRADE | SF_AUTO)) ||
1519              (sf->sf_status == SS_INIT &&
1520               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1521                               SF_UPGRADE))))
1522                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1523
1524         if (rc)
1525                 GOTO(cleanup_oi, rc);
1526
1527         RETURN(0);
1528
1529 cleanup_oi:
1530         osd_oi_fini(env, dev);
1531 cleanup_obj:
1532         dt_object_put_nocache(env, scrub->os_obj);
1533         scrub->os_obj = NULL;
1534
1535         return rc;
1536 }
1537
1538 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1539 {
1540         struct lustre_scrub *scrub = &dev->od_scrub;
1541
1542         LASSERT(!dev->od_otable_it);
1543
1544         if (scrub->os_obj) {
1545                 osd_scrub_stop(dev);
1546                 dt_object_put_nocache(env, scrub->os_obj);
1547                 scrub->os_obj = NULL;
1548         }
1549
1550         if (dev->od_oi_table)
1551                 osd_oi_fini(env, dev);
1552 }
1553
1554 /* object table based iteration APIs */
1555
1556 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1557                                        struct dt_object *dt, __u32 attr)
1558 {
1559         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1560         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1561         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1562         struct lustre_scrub *scrub = &dev->od_scrub;
1563         struct osd_otable_it *it;
1564         __u32 start = 0;
1565         int rc;
1566         ENTRY;
1567
1568         if (dev->od_dt_dev.dd_rdonly)
1569                 RETURN(ERR_PTR(-EROFS));
1570
1571         /* od_otable_sem: prevent concurrent init/fini */
1572         down(&dev->od_otable_sem);
1573         if (dev->od_otable_it)
1574                 GOTO(out, it = ERR_PTR(-EALREADY));
1575
1576         OBD_ALLOC_PTR(it);
1577         if (!it)
1578                 GOTO(out, it = ERR_PTR(-ENOMEM));
1579
1580         if (flags & DOIF_OUTUSED)
1581                 it->ooi_used_outside = 1;
1582
1583         if (flags & DOIF_RESET)
1584                 start |= SS_RESET;
1585
1586         if (valid & DOIV_ERROR_HANDLE) {
1587                 if (flags & DOIF_FAILOUT)
1588                         start |= SS_SET_FAILOUT;
1589                 else
1590                         start |= SS_CLEAR_FAILOUT;
1591         }
1592
1593         if (valid & DOIV_DRYRUN) {
1594                 if (flags & DOIF_DRYRUN)
1595                         start |= SS_SET_DRYRUN;
1596                 else
1597                         start |= SS_CLEAR_DRYRUN;
1598         }
1599
1600         /* XXX: dmu_object_next() does NOT find dnodes allocated
1601          *      in the current non-committed txg, so we force txg
1602          *      commit to find all existing dnodes ... */
1603         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1604
1605         dev->od_otable_it = it;
1606         it->ooi_dev = dev;
1607         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1608         if (rc == -EALREADY) {
1609                 it->ooi_pos = 1;
1610         } else if (rc < 0) {
1611                 dev->od_otable_it = NULL;
1612                 OBD_FREE_PTR(it);
1613                 it = ERR_PTR(rc);
1614         } else {
1615                 it->ooi_pos = scrub->os_pos_current;
1616         }
1617
1618         GOTO(out, it);
1619
1620 out:
1621         up(&dev->od_otable_sem);
1622         return (struct dt_it *)it;
1623 }
1624
1625 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1626 {
1627         struct osd_otable_it *it = (struct osd_otable_it *)di;
1628         struct osd_device *dev = it->ooi_dev;
1629
1630         /* od_otable_sem: prevent concurrent init/fini */
1631         down(&dev->od_otable_sem);
1632         scrub_stop(&dev->od_scrub);
1633         LASSERT(dev->od_otable_it == it);
1634
1635         dev->od_otable_it = NULL;
1636         up(&dev->od_otable_sem);
1637         OBD_FREE_PTR(it);
1638 }
1639
1640 static int osd_otable_it_get(const struct lu_env *env,
1641                              struct dt_it *di, const struct dt_key *key)
1642 {
1643         return 0;
1644 }
1645
1646 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1647 {
1648 }
1649
1650 static void osd_otable_it_preload(const struct lu_env *env,
1651                                   struct osd_otable_it *it)
1652 {
1653         struct osd_device *dev = it->ooi_dev;
1654         int rc;
1655
1656         /* can go negative on the very first access to the iterator
1657          * or if some non-Lustre objects were found */
1658         if (unlikely(it->ooi_prefetched < 0))
1659                 it->ooi_prefetched = 0;
1660
1661         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1662                 return;
1663
1664         if (it->ooi_prefetched_dnode == 0)
1665                 it->ooi_prefetched_dnode = it->ooi_pos;
1666
1667         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1668                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1669                                       B_FALSE, 0);
1670                 if (rc)
1671                         break;
1672
1673                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1674                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1675                 it->ooi_prefetched++;
1676         }
1677 }
1678
1679 static inline int
1680 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1681 {
1682         spin_lock(&scrub->os_lock);
1683         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1684             !thread_is_running(&scrub->os_thread))
1685                 it->ooi_waiting = 0;
1686         else
1687                 it->ooi_waiting = 1;
1688         spin_unlock(&scrub->os_lock);
1689
1690         return !it->ooi_waiting;
1691 }
1692
1693 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1694 {
1695         struct osd_otable_it *it = (struct osd_otable_it *)di;
1696         struct osd_device *dev = it->ooi_dev;
1697         struct lustre_scrub *scrub = &dev->od_scrub;
1698         struct ptlrpc_thread *thread = &scrub->os_thread;
1699         struct l_wait_info lwi = { 0 };
1700         struct lustre_mdt_attrs *lma = NULL;
1701         nvlist_t *nvbuf = NULL;
1702         int size = 0;
1703         int rc;
1704         ENTRY;
1705
1706         LASSERT(it->ooi_user_ready);
1707         fid_zero(&it->ooi_fid);
1708
1709         if (unlikely(it->ooi_all_cached))
1710                 RETURN(1);
1711
1712 again:
1713         if (nvbuf) {
1714                 nvlist_free(nvbuf);
1715                 nvbuf = NULL;
1716                 lma = NULL;
1717                 size = 0;
1718         }
1719
1720         if (it->ooi_pos >= scrub->os_pos_current)
1721                 l_wait_event(thread->t_ctl_waitq,
1722                              osd_otable_it_wakeup(scrub, it),
1723                              &lwi);
1724
1725         if (!thread_is_running(thread) && !it->ooi_used_outside)
1726                 GOTO(out, rc = 1);
1727
1728         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1729         if (rc) {
1730                 if (unlikely(rc == -ESRCH)) {
1731                         it->ooi_all_cached = 1;
1732                         rc = 1;
1733                 }
1734
1735                 GOTO(out, rc);
1736         }
1737
1738         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1739
1740         if (!scrub->os_full_speed)
1741                 spin_lock(&scrub->os_lock);
1742         it->ooi_prefetched--;
1743         if (!scrub->os_full_speed) {
1744                 if (scrub->os_waiting) {
1745                         scrub->os_waiting = 0;
1746                         wake_up_all(&thread->t_ctl_waitq);
1747                 }
1748                 spin_unlock(&scrub->os_lock);
1749         }
1750
1751         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1752                 goto again;
1753
1754         if (rc)
1755                 GOTO(out, rc);
1756
1757         LASSERT(nvbuf != NULL);
1758         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1759                                        (uchar_t **)&lma, &size);
1760         if (rc || size == 0)
1761                 /* It is either non-Lustre object or OSD internal object,
1762                  * ignore it, go ahead */
1763                 goto again;
1764
1765         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1766         lustre_lma_swab(lma);
1767         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1768                      lma->lma_incompat & LMAI_AGENT))
1769                 goto again;
1770
1771         it->ooi_fid = lma->lma_self_fid;
1772
1773         GOTO(out, rc = 0);
1774
1775 out:
1776         if (nvbuf)
1777                 nvlist_free(nvbuf);
1778
1779         if (!rc && scrub->os_full_speed)
1780                 osd_otable_it_preload(env, it);
1781
1782         return rc;
1783 }
1784
1785 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1786                                         const struct dt_it *di)
1787 {
1788         return NULL;
1789 }
1790
1791 static int osd_otable_it_key_size(const struct lu_env *env,
1792                                   const struct dt_it *di)
1793 {
1794         return sizeof(__u64);
1795 }
1796
1797 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1798                              struct dt_rec *rec, __u32 attr)
1799 {
1800         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1801         struct lu_fid *fid = (struct lu_fid *)rec;
1802
1803         *fid = it->ooi_fid;
1804         return 0;
1805 }
1806
1807 static __u64 osd_otable_it_store(const struct lu_env *env,
1808                                  const struct dt_it *di)
1809 {
1810         struct osd_otable_it *it = (struct osd_otable_it *)di;
1811
1812         return it->ooi_pos;
1813 }
1814
1815 /**
1816  * Set the OSD layer iteration start position as the specified hash.
1817  */
1818 static int osd_otable_it_load(const struct lu_env *env,
1819                               const struct dt_it *di, __u64 hash)
1820 {
1821         struct osd_otable_it *it = (struct osd_otable_it *)di;
1822         struct osd_device *dev = it->ooi_dev;
1823         struct lustre_scrub *scrub = &dev->od_scrub;
1824         int rc;
1825         ENTRY;
1826
1827         /* Forbid to set iteration position after iteration started. */
1828         if (it->ooi_user_ready)
1829                 RETURN(-EPERM);
1830
1831         if (hash > OSD_OTABLE_MAX_HASH)
1832                 hash = OSD_OTABLE_MAX_HASH;
1833
1834         /* The hash is the last checkpoint position,
1835          * we will start from the next one. */
1836         it->ooi_pos = hash + 1;
1837         it->ooi_prefetched = 0;
1838         it->ooi_prefetched_dnode = 0;
1839         it->ooi_user_ready = 1;
1840         if (!scrub->os_full_speed)
1841                 wake_up_all(&scrub->os_thread.t_ctl_waitq);
1842
1843         /* Unplug OSD layer iteration by the first next() call. */
1844         rc = osd_otable_it_next(env, (struct dt_it *)it);
1845
1846         RETURN(rc);
1847 }
1848
1849 static int osd_otable_it_key_rec(const struct lu_env *env,
1850                                  const struct dt_it *di, void *key_rec)
1851 {
1852         return 0;
1853 }
1854
1855 const struct dt_index_operations osd_otable_ops = {
1856         .dio_it = {
1857                 .init     = osd_otable_it_init,
1858                 .fini     = osd_otable_it_fini,
1859                 .get      = osd_otable_it_get,
1860                 .put      = osd_otable_it_put,
1861                 .next     = osd_otable_it_next,
1862                 .key      = osd_otable_it_key,
1863                 .key_size = osd_otable_it_key_size,
1864                 .rec      = osd_otable_it_rec,
1865                 .store    = osd_otable_it_store,
1866                 .load     = osd_otable_it_load,
1867                 .key_rec  = osd_otable_it_key_rec,
1868         }
1869 };
1870
1871 /* high priority inconsistent items list APIs */
1872
1873 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1874                    const struct lu_fid *fid, uint64_t oid, bool insert)
1875 {
1876         struct lustre_scrub *scrub = &dev->od_scrub;
1877         struct ptlrpc_thread *thread = &scrub->os_thread;
1878         struct osd_inconsistent_item *oii;
1879         bool wakeup = false;
1880         ENTRY;
1881
1882         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1883         OBD_ALLOC_PTR(oii);
1884         if (unlikely(!oii))
1885                 RETURN(-ENOMEM);
1886
1887         INIT_LIST_HEAD(&oii->oii_list);
1888         oii->oii_cache.oic_dev = dev;
1889         oii->oii_cache.oic_fid = *fid;
1890         oii->oii_cache.oic_dnode = oid;
1891         oii->oii_insert = insert;
1892
1893         spin_lock(&scrub->os_lock);
1894         if (unlikely(!thread_is_running(thread))) {
1895                 spin_unlock(&scrub->os_lock);
1896                 OBD_FREE_PTR(oii);
1897                 RETURN(-EAGAIN);
1898         }
1899
1900         if (list_empty(&scrub->os_inconsistent_items))
1901                 wakeup = true;
1902         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1903         spin_unlock(&scrub->os_lock);
1904
1905         if (wakeup)
1906                 wake_up_all(&thread->t_ctl_waitq);
1907
1908         RETURN(0);
1909 }
1910
1911 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1912                    uint64_t *oid)
1913 {
1914         struct lustre_scrub *scrub = &dev->od_scrub;
1915         struct osd_inconsistent_item *oii;
1916         int ret = -ENOENT;
1917         ENTRY;
1918
1919         spin_lock(&scrub->os_lock);
1920         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1921                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1922                         *oid = oii->oii_cache.oic_dnode;
1923                         ret = 0;
1924                         break;
1925                 }
1926         }
1927         spin_unlock(&scrub->os_lock);
1928
1929         RETURN(ret);
1930 }