Whamcloud - gitweb
dd0eddd1646500f86b5c3bf1278ac831628f8031
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 #define DTO_INDEX_INSERT                1
59 #define DTO_INDEX_DELETE                2
60 #define DTO_INDEX_UPDATE                3
61
62 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
63 {
64         return it->ooi_prefetched < OTABLE_PREFETCH;
65 }
66
67 /**
68  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
69  *
70  * \retval   1, changed nothing
71  * \retval   0, changed successfully
72  * \retval -ve, on error
73  */
74 static int osd_scrub_refresh_mapping(const struct lu_env *env,
75                                      struct osd_device *dev,
76                                      const struct lu_fid *fid,
77                                      uint64_t oid, int ops,
78                                      bool force, const char *name)
79 {
80         struct osd_thread_info *info = osd_oti_get(env);
81         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
82         char *buf = info->oti_str;
83         dmu_tx_t *tx = NULL;
84         dnode_t *dn = NULL;
85         uint64_t zapid;
86         int rc;
87         ENTRY;
88
89         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
90                 GOTO(log, rc = 0);
91
92         tx = dmu_tx_create(dev->od_os);
93         if (!tx)
94                 GOTO(log, rc = -ENOMEM);
95
96         zapid = osd_get_name_n_idx(env, dev, fid, buf,
97                                    sizeof(info->oti_str), &dn);
98         osd_tx_hold_zap(tx, zapid, dn,
99                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
100         rc = -dmu_tx_assign(tx, TXG_WAIT);
101         if (rc) {
102                 dmu_tx_abort(tx);
103                 GOTO(log, rc);
104         }
105
106         switch (ops) {
107         case DTO_INDEX_UPDATE:
108                 zde->zde_pad = 0;
109                 zde->zde_dnode = oid;
110                 zde->zde_type = 0; /* The type in OI mapping is useless. */
111                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
112                                  zde, tx);
113                 if (unlikely(rc == -ENOENT)) {
114                         /* Some unlink thread may removed the OI mapping. */
115                         rc = 1;
116                 }
117                 break;
118         case DTO_INDEX_INSERT:
119                 zde->zde_pad = 0;
120                 zde->zde_dnode = oid;
121                 zde->zde_type = 0; /* The type in OI mapping is useless. */
122                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
123                                  zde, tx);
124                 if (unlikely(rc == -EEXIST))
125                         rc = 1;
126                 break;
127         case DTO_INDEX_DELETE:
128                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
129                 if (rc == -ENOENT) {
130                         /* It is normal that the unlink thread has removed the
131                          * OI mapping already. */
132                         rc = 1;
133                 }
134                 break;
135         default:
136                 LASSERTF(0, "Unexpected ops %d\n", ops);
137                 rc = -EINVAL;
138                 break;
139         }
140
141         dmu_tx_commit(tx);
142         GOTO(log, rc);
143
144 log:
145         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
146                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
147                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
148
149         return rc;
150 }
151
152 static int
153 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
154                        const struct lu_fid *fid, uint64_t oid, int val)
155 {
156         struct lustre_scrub *scrub = &dev->od_scrub;
157         struct scrub_file *sf = &scrub->os_file;
158         struct osd_inconsistent_item *oii = NULL;
159         nvlist_t *nvbuf = NULL;
160         dnode_t *dn = NULL;
161         uint64_t oid2;
162         int ops = DTO_INDEX_UPDATE;
163         int rc;
164         ENTRY;
165
166         down_write(&scrub->os_rwsem);
167         scrub->os_new_checked++;
168         if (val < 0)
169                 GOTO(out, rc = val);
170
171         if (scrub->os_in_prior)
172                 oii = list_entry(scrub->os_inconsistent_items.next,
173                                  struct osd_inconsistent_item, oii_list);
174
175         if (oid < sf->sf_pos_latest_start && !oii)
176                 GOTO(out, rc = 0);
177
178         if (oii && oii->oii_insert) {
179                 ops = DTO_INDEX_INSERT;
180                 goto zget;
181         }
182
183         rc = osd_fid_lookup(env, dev, fid, &oid2);
184         if (rc) {
185                 if (rc != -ENOENT)
186                         GOTO(out, rc);
187
188                 ops = DTO_INDEX_INSERT;
189
190 zget:
191                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
192                 if (rc) {
193                         /* Someone removed the object by race. */
194                         if (rc == -ENOENT || rc == -EEXIST)
195                                 rc = 0;
196                         GOTO(out, rc);
197                 }
198
199                 scrub->os_full_speed = 1;
200                 sf->sf_flags |= SF_INCONSISTENT;
201         } else if (oid == oid2) {
202                 GOTO(out, rc = 0);
203         } else {
204                 struct lustre_mdt_attrs *lma = NULL;
205                 int size;
206
207                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
208                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
209                         goto update;
210                 if (rc)
211                         GOTO(out, rc);
212
213                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
214                                                (uchar_t **)&lma, &size);
215                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
216                         goto update;
217                 if (rc)
218                         GOTO(out, rc);
219
220                 lustre_lma_swab(lma);
221                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
222                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
223                                "two objects: %llu and %llu (in OI)\n",
224                                osd_name(dev), PFID(fid), oid, oid2);
225
226                         GOTO(out, rc = -EEXIST);
227                 }
228
229 update:
230                 scrub->os_full_speed = 1;
231                 sf->sf_flags |= SF_INCONSISTENT;
232         }
233
234         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
235         if (!rc) {
236                 if (scrub->os_in_prior)
237                         sf->sf_items_updated_prior++;
238                 else
239                         sf->sf_items_updated++;
240         }
241
242         GOTO(out, rc);
243
244 out:
245         if (nvbuf)
246                 nvlist_free(nvbuf);
247
248         if (rc < 0) {
249                 sf->sf_items_failed++;
250                 if (sf->sf_pos_first_inconsistent == 0 ||
251                     sf->sf_pos_first_inconsistent > oid)
252                         sf->sf_pos_first_inconsistent = oid;
253         } else {
254                 rc = 0;
255         }
256
257         /* There may be conflict unlink during the OI scrub,
258          * if happend, then remove the new added OI mapping. */
259         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
260                 osd_scrub_refresh_mapping(env, dev, fid, oid,
261                                           DTO_INDEX_DELETE, false, NULL);
262         up_write(&scrub->os_rwsem);
263
264         if (dn)
265                 osd_dnode_rele(dn);
266
267         if (oii) {
268                 spin_lock(&scrub->os_lock);
269                 if (likely(!list_empty(&oii->oii_list)))
270                         list_del(&oii->oii_list);
271                 spin_unlock(&scrub->os_lock);
272                 OBD_FREE_PTR(oii);
273         }
274
275         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
276 }
277
278 static int osd_scrub_prep(const struct lu_env *env, struct osd_device *dev)
279 {
280         struct lustre_scrub *scrub = &dev->od_scrub;
281         struct ptlrpc_thread *thread = &scrub->os_thread;
282         struct scrub_file *sf = &scrub->os_file;
283         __u32 flags = scrub->os_start_flags;
284         int rc;
285         bool drop_dryrun = false;
286         ENTRY;
287
288         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
289                scrub->os_name, flags);
290
291         down_write(&scrub->os_rwsem);
292         if (flags & SS_SET_FAILOUT)
293                 sf->sf_param |= SP_FAILOUT;
294         else if (flags & SS_CLEAR_FAILOUT)
295                 sf->sf_param &= ~SP_FAILOUT;
296
297         if (flags & SS_SET_DRYRUN) {
298                 sf->sf_param |= SP_DRYRUN;
299         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
300                 sf->sf_param &= ~SP_DRYRUN;
301                 drop_dryrun = true;
302         }
303
304         if (flags & SS_RESET)
305                 scrub_file_reset(scrub, dev->od_uuid, 0);
306
307         scrub->os_partial_scan = 0;
308         if (flags & SS_AUTO_FULL) {
309                 scrub->os_full_speed = 1;
310                 sf->sf_flags |= SF_AUTO;
311         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
312                                    SF_UPGRADE)) {
313                 scrub->os_full_speed = 1;
314         } else {
315                 scrub->os_full_speed = 0;
316         }
317
318         spin_lock(&scrub->os_lock);
319         scrub->os_in_prior = 0;
320         scrub->os_waiting = 0;
321         scrub->os_paused = 0;
322         scrub->os_in_join = 0;
323         scrub->os_full_scrub = 0;
324         spin_unlock(&scrub->os_lock);
325         scrub->os_new_checked = 0;
326         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
327                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
328         else if (sf->sf_pos_last_checkpoint != 0)
329                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
330         else
331                 sf->sf_pos_latest_start = 1;
332
333         scrub->os_pos_current = sf->sf_pos_latest_start;
334         sf->sf_status = SS_SCANNING;
335         sf->sf_time_latest_start = cfs_time_current_sec();
336         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
337         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
338         rc = scrub_file_store(env, scrub);
339         if (!rc) {
340                 spin_lock(&scrub->os_lock);
341                 thread_set_flags(thread, SVC_RUNNING);
342                 spin_unlock(&scrub->os_lock);
343                 wake_up_all(&thread->t_ctl_waitq);
344         }
345         up_write(&scrub->os_rwsem);
346
347         RETURN(rc);
348 }
349
350 static int osd_scrub_post(const struct lu_env *env, struct osd_device *dev,
351                           int result)
352 {
353         struct lustre_scrub *scrub = &dev->od_scrub;
354         struct scrub_file *sf = &scrub->os_file;
355         int rc;
356         ENTRY;
357
358         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
359                scrub->os_name, result);
360
361         down_write(&scrub->os_rwsem);
362         spin_lock(&scrub->os_lock);
363         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
364         spin_unlock(&scrub->os_lock);
365         if (scrub->os_new_checked > 0) {
366                 sf->sf_items_checked += scrub->os_new_checked;
367                 scrub->os_new_checked = 0;
368                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
369         }
370         sf->sf_time_last_checkpoint = cfs_time_current_sec();
371         if (result > 0) {
372                 sf->sf_status = SS_COMPLETED;
373                 if (!(sf->sf_param & SP_DRYRUN)) {
374                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
375                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
376                                           SF_UPGRADE | SF_AUTO);
377                 }
378                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
379                 sf->sf_success_count++;
380         } else if (result == 0) {
381                 if (scrub->os_paused)
382                         sf->sf_status = SS_PAUSED;
383                 else
384                         sf->sf_status = SS_STOPPED;
385         } else {
386                 sf->sf_status = SS_FAILED;
387         }
388         sf->sf_run_time += cfs_duration_sec(cfs_time_current() + HALF_SEC -
389                                             scrub->os_time_last_checkpoint);
390         rc = scrub_file_store(env, scrub);
391         up_write(&scrub->os_rwsem);
392
393         RETURN(rc < 0 ? rc : result);
394 }
395
396 /* iteration engine */
397
398 static inline int
399 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
400 {
401         spin_lock(&scrub->os_lock);
402         if (osd_scrub_has_window(it) ||
403             !list_empty(&scrub->os_inconsistent_items) ||
404             it->ooi_waiting || !thread_is_running(&scrub->os_thread))
405                 scrub->os_waiting = 0;
406         else
407                 scrub->os_waiting = 1;
408         spin_unlock(&scrub->os_lock);
409
410         return !scrub->os_waiting;
411 }
412
413 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
414                           struct lu_fid *fid, uint64_t *oid)
415 {
416         struct l_wait_info lwi = { 0 };
417         struct lustre_scrub *scrub = &dev->od_scrub;
418         struct ptlrpc_thread *thread = &scrub->os_thread;
419         struct osd_otable_it *it = dev->od_otable_it;
420         struct lustre_mdt_attrs *lma = NULL;
421         nvlist_t *nvbuf = NULL;
422         int size = 0;
423         int rc = 0;
424         ENTRY;
425
426         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
427                 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val), NULL, NULL);
428                 if (likely(lwi.lwi_timeout > 0)) {
429                         l_wait_event(thread->t_ctl_waitq,
430                                 !list_empty(&scrub->os_inconsistent_items) ||
431                                 !thread_is_running(thread),
432                                 &lwi);
433                         if (unlikely(!thread_is_running(thread)))
434                                 RETURN(SCRUB_NEXT_EXIT);
435                 }
436         }
437
438         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
439                 spin_lock(&scrub->os_lock);
440                 thread_set_flags(thread, SVC_STOPPING);
441                 spin_unlock(&scrub->os_lock);
442                 RETURN(SCRUB_NEXT_CRASH);
443         }
444
445         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
446                 RETURN(SCRUB_NEXT_FATAL);
447
448 again:
449         if (nvbuf) {
450                 nvlist_free(nvbuf);
451                 nvbuf = NULL;
452                 lma = NULL;
453         }
454
455         if (!list_empty(&scrub->os_inconsistent_items)) {
456                 spin_lock(&scrub->os_lock);
457                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
458                         struct osd_inconsistent_item *oii;
459
460                         oii = list_entry(scrub->os_inconsistent_items.next,
461                                 struct osd_inconsistent_item, oii_list);
462                         *fid = oii->oii_cache.oic_fid;
463                         *oid = oii->oii_cache.oic_dnode;
464                         scrub->os_in_prior = 1;
465                         spin_unlock(&scrub->os_lock);
466
467                         GOTO(out, rc = 0);
468                 }
469                 spin_unlock(&scrub->os_lock);
470         }
471
472         if (!scrub->os_full_speed && !osd_scrub_has_window(it)) {
473                 memset(&lwi, 0, sizeof(lwi));
474                 l_wait_event(thread->t_ctl_waitq,
475                              osd_scrub_wakeup(scrub, it),
476                              &lwi);
477         }
478
479         if (unlikely(!thread_is_running(thread)))
480                 GOTO(out, rc = SCRUB_NEXT_EXIT);
481
482         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
483         if (rc)
484                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
485
486         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
487         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
488                 goto again;
489
490         if (rc)
491                 GOTO(out, rc);
492
493         LASSERT(nvbuf != NULL);
494         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
495                                        (uchar_t **)&lma, &size);
496         if (!rc) {
497                 lustre_lma_swab(lma);
498                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
499                            !(lma->lma_incompat & LMAI_AGENT))) {
500                         *fid = lma->lma_self_fid;
501                         *oid = scrub->os_pos_current;
502
503                         GOTO(out, rc = 0);
504                 }
505         }
506
507         if (!scrub->os_full_speed) {
508                 spin_lock(&scrub->os_lock);
509                 it->ooi_prefetched++;
510                 if (it->ooi_waiting) {
511                         it->ooi_waiting = 0;
512                         wake_up_all(&thread->t_ctl_waitq);
513                 }
514                 spin_unlock(&scrub->os_lock);
515         }
516
517         goto again;
518
519 out:
520         if (nvbuf)
521                 nvlist_free(nvbuf);
522
523         return rc;
524 }
525
526 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
527                           const struct lu_fid *fid, uint64_t oid, int rc)
528 {
529         struct lustre_scrub *scrub = &dev->od_scrub;
530         struct ptlrpc_thread *thread = &scrub->os_thread;
531         struct osd_otable_it *it = dev->od_otable_it;
532
533         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
534         if (!scrub->os_in_prior) {
535                 if (!scrub->os_full_speed) {
536                         spin_lock(&scrub->os_lock);
537                         it->ooi_prefetched++;
538                         if (it->ooi_waiting) {
539                                 it->ooi_waiting = 0;
540                                 wake_up_all(&thread->t_ctl_waitq);
541                         }
542                         spin_unlock(&scrub->os_lock);
543                 }
544         } else {
545                 scrub->os_in_prior = 0;
546         }
547
548         if (rc)
549                 return rc;
550
551         rc = scrub_checkpoint(env, scrub);
552         if (rc) {
553                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
554                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
555                 /* Continue, as long as the scrub itself can go ahead. */
556         }
557
558         return 0;
559 }
560
561 static int osd_scrub_main(void *args)
562 {
563         struct lu_env env;
564         struct osd_device *dev = (struct osd_device *)args;
565         struct lustre_scrub *scrub = &dev->od_scrub;
566         struct ptlrpc_thread *thread = &scrub->os_thread;
567         struct lu_fid *fid;
568         uint64_t oid;
569         int rc = 0;
570         ENTRY;
571
572         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
573         if (rc) {
574                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
575                        scrub->os_name, rc);
576                 GOTO(noenv, rc);
577         }
578
579         rc = osd_scrub_prep(&env, dev);
580         if (rc) {
581                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
582                        scrub->os_name, rc);
583                 GOTO(out, rc);
584         }
585
586         if (!scrub->os_full_speed) {
587                 struct l_wait_info lwi = { 0 };
588                 struct osd_otable_it *it = dev->od_otable_it;
589
590                 l_wait_event(thread->t_ctl_waitq,
591                              it->ooi_user_ready || !thread_is_running(thread),
592                              &lwi);
593                 if (unlikely(!thread_is_running(thread)))
594                         GOTO(post, rc = 0);
595
596                 scrub->os_pos_current = it->ooi_pos;
597         }
598
599         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
600                scrub->os_name, scrub->os_start_flags,
601                scrub->os_pos_current);
602
603         fid = &osd_oti_get(&env)->oti_fid;
604         while (!rc && thread_is_running(thread)) {
605                 rc = osd_scrub_next(&env, dev, fid, &oid);
606                 switch (rc) {
607                 case SCRUB_NEXT_EXIT:
608                         GOTO(post, rc = 0);
609                 case SCRUB_NEXT_CRASH:
610                         spin_lock(&scrub->os_lock);
611                         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
612                         spin_unlock(&scrub->os_lock);
613                         GOTO(out, rc = -EINVAL);
614                 case SCRUB_NEXT_FATAL:
615                         GOTO(post, rc = -EINVAL);
616                 case SCRUB_NEXT_BREAK:
617                         GOTO(post, rc = 1);
618                 }
619
620                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
621         }
622
623         GOTO(post, rc);
624
625 post:
626         rc = osd_scrub_post(&env, dev, rc);
627         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
628                scrub->os_name, scrub->os_pos_current, rc);
629
630 out:
631         while (!list_empty(&scrub->os_inconsistent_items)) {
632                 struct osd_inconsistent_item *oii;
633
634                 oii = list_entry(scrub->os_inconsistent_items.next,
635                                  struct osd_inconsistent_item, oii_list);
636                 list_del_init(&oii->oii_list);
637                 OBD_FREE_PTR(oii);
638         }
639
640         lu_env_fini(&env);
641
642 noenv:
643         spin_lock(&scrub->os_lock);
644         thread_set_flags(thread, SVC_STOPPED);
645         wake_up_all(&thread->t_ctl_waitq);
646         spin_unlock(&scrub->os_lock);
647         return rc;
648 }
649
650 /* initial OI scrub */
651
652 struct osd_lf_map;
653
654 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
655                                const char *, uint64_t, uint64_t,
656                                enum osd_lf_flags, bool);
657 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
658                              const char *, uint64_t, uint64_t,
659                              enum osd_lf_flags, bool);
660 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
661                           const char *, uint64_t, uint64_t,
662                           enum osd_lf_flags, bool);
663
664 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
665                           uint64_t, handle_dirent_t, enum osd_lf_flags);
666 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
667                               uint64_t, handle_dirent_t, enum osd_lf_flags);
668 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
669                            uint64_t, handle_dirent_t, enum osd_lf_flags);
670
671 struct osd_lf_map {
672         char                    *olm_name;
673         struct lu_fid            olm_fid;
674         enum osd_lf_flags        olm_flags;
675         scan_dir_t               olm_scan_dir;
676         handle_dirent_t          olm_handle_dirent;
677 };
678
679 /* Add the new introduced local files in the list in the future. */
680 static const struct osd_lf_map osd_lf_maps[] = {
681         /* CONFIGS */
682         {
683                 .olm_name               = MOUNT_CONFIGS_DIR,
684                 .olm_fid                = {
685                         .f_seq  = FID_SEQ_LOCAL_FILE,
686                         .f_oid  = MGS_CONFIGS_OID,
687                 },
688                 .olm_flags              = OLF_SCAN_SUBITEMS,
689                 .olm_scan_dir           = osd_ios_general_sd,
690                 .olm_handle_dirent      = osd_ios_varfid_hd,
691         },
692
693         /* NIDTBL_VERSIONS */
694         {
695                 .olm_name               = MGS_NIDTBL_DIR,
696                 .olm_flags              = OLF_SCAN_SUBITEMS,
697                 .olm_scan_dir           = osd_ios_general_sd,
698                 .olm_handle_dirent      = osd_ios_varfid_hd,
699         },
700
701         /* PENDING */
702         {
703                 .olm_name               = "PENDING",
704         },
705
706         /* ROOT */
707         {
708                 .olm_name               = "ROOT",
709                 .olm_fid                = {
710                         .f_seq  = FID_SEQ_ROOT,
711                         .f_oid  = FID_OID_ROOT,
712                 },
713                 .olm_flags              = OLF_SCAN_SUBITEMS,
714                 .olm_scan_dir           = osd_ios_ROOT_sd,
715         },
716
717         /* fld */
718         {
719                 .olm_name               = "fld",
720                 .olm_fid                = {
721                         .f_seq  = FID_SEQ_LOCAL_FILE,
722                         .f_oid  = FLD_INDEX_OID,
723                 },
724         },
725
726         /* changelog_catalog */
727         {
728                 .olm_name               = CHANGELOG_CATALOG,
729         },
730
731         /* changelog_users */
732         {
733                 .olm_name               = CHANGELOG_USERS,
734         },
735
736         /* quota_master */
737         {
738                 .olm_name               = QMT_DIR,
739                 .olm_flags              = OLF_SCAN_SUBITEMS,
740                 .olm_scan_dir           = osd_ios_general_sd,
741                 .olm_handle_dirent      = osd_ios_varfid_hd,
742         },
743
744         /* quota_slave */
745         {
746                 .olm_name               = QSD_DIR,
747                 .olm_flags              = OLF_SCAN_SUBITEMS,
748                 .olm_scan_dir           = osd_ios_general_sd,
749                 .olm_handle_dirent      = osd_ios_varfid_hd,
750         },
751
752         /* LFSCK */
753         {
754                 .olm_name               = LFSCK_DIR,
755                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
756                 .olm_scan_dir           = osd_ios_general_sd,
757                 .olm_handle_dirent      = osd_ios_varfid_hd,
758         },
759
760         /* lfsck_bookmark */
761         {
762                 .olm_name               = LFSCK_BOOKMARK,
763         },
764
765         /* lfsck_layout */
766         {
767                 .olm_name               = LFSCK_LAYOUT,
768         },
769
770         /* lfsck_namespace */
771         {
772                 .olm_name               = LFSCK_NAMESPACE,
773         },
774
775         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
776          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
777          * for more details. */
778
779         /* update_log */
780         {
781                 .olm_name               = "update_log",
782                 .olm_fid                = {
783                         .f_seq  = FID_SEQ_UPDATE_LOG,
784                 },
785                 .olm_flags              = OLF_IDX_IN_FID,
786         },
787
788         /* update_log_dir */
789         {
790                 .olm_name               = "update_log_dir",
791                 .olm_fid        = {
792                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
793                 },
794                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
795                 .olm_scan_dir           = osd_ios_general_sd,
796                 .olm_handle_dirent      = osd_ios_uld_hd,
797         },
798
799         /* hsm_actions */
800         {
801                 .olm_name               = HSM_ACTIONS,
802         },
803
804         /* nodemap */
805         {
806                 .olm_name               = LUSTRE_NODEMAP_NAME,
807         },
808
809         /* index_backup */
810         {
811                 .olm_name               = INDEX_BACKUP_DIR,
812                 .olm_fid                = {
813                         .f_seq  = FID_SEQ_LOCAL_FILE,
814                         .f_oid  = INDEX_BACKUP_OID,
815                 },
816                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
817                 .olm_scan_dir           = osd_ios_general_sd,
818                 .olm_handle_dirent      = osd_ios_varfid_hd,
819         },
820
821         {
822                 .olm_name               = NULL
823         }
824 };
825
826 /* Add the new introduced files under .lustre/ in the list in the future. */
827 static const struct osd_lf_map osd_dl_maps[] = {
828         /* .lustre/fid */
829         {
830                 .olm_name               = "fid",
831                 .olm_fid                = {
832                         .f_seq  = FID_SEQ_DOT_LUSTRE,
833                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
834                 },
835         },
836
837         /* .lustre/lost+found */
838         {
839                 .olm_name               = "lost+found",
840                 .olm_fid                = {
841                         .f_seq  = FID_SEQ_DOT_LUSTRE,
842                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
843                 },
844         },
845
846         {
847                 .olm_name               = NULL
848         }
849 };
850
851 struct osd_ios_item {
852         struct list_head        oii_list;
853         uint64_t                oii_parent;
854         enum osd_lf_flags       oii_flags;
855         scan_dir_t              oii_scan_dir;
856         handle_dirent_t         oii_handle_dirent;
857 };
858
859 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
860                             enum osd_lf_flags flags, scan_dir_t scan_dir,
861                             handle_dirent_t handle_dirent)
862 {
863         struct osd_ios_item *item;
864
865         OBD_ALLOC_PTR(item);
866         if (!item) {
867                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
868                       osd_name(dev), parent);
869                 return -ENOMEM;
870         }
871
872         INIT_LIST_HEAD(&item->oii_list);
873         item->oii_parent = parent;
874         item->oii_flags = flags;
875         item->oii_scan_dir = scan_dir;
876         item->oii_handle_dirent = handle_dirent;
877         list_add_tail(&item->oii_list, &dev->od_ios_list);
878
879         return 0;
880 }
881
882 static bool osd_index_need_recreate(const struct lu_env *env,
883                                     struct osd_device *dev, uint64_t oid)
884 {
885         struct osd_thread_info *info = osd_oti_get(env);
886         zap_attribute_t *za = &info->oti_za2;
887         zap_cursor_t *zc = &info->oti_zc2;
888         int rc;
889         ENTRY;
890
891         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
892         rc = -zap_cursor_retrieve(zc, za);
893         zap_cursor_fini(zc);
894         if (rc && rc != -ENOENT)
895                 RETURN(true);
896
897         RETURN(false);
898 }
899
900 static void osd_ios_index_register(const struct lu_env *env,
901                                    struct osd_device *osd,
902                                    const struct lu_fid *fid, uint64_t oid)
903 {
904         struct osd_thread_info *info = osd_oti_get(env);
905         zap_attribute_t *za = &info->oti_za2;
906         zap_cursor_t *zc = &info->oti_zc2;
907         struct zap_leaf_entry *le;
908         dnode_t *dn = NULL;
909         sa_handle_t *hdl;
910         __u64 mode = 0;
911         __u32 keysize = 0;
912         __u32 recsize = 0;
913         int rc;
914         ENTRY;
915
916         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
917         if (rc == -EEXIST || rc == -ENOENT)
918                 RETURN_EXIT;
919
920         if (rc < 0)
921                 GOTO(log, rc);
922
923         if (!osd_object_is_zap(dn))
924                 GOTO(log, rc = 1);
925
926         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
927         if (rc)
928                 GOTO(log, rc);
929
930         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
931         sa_handle_destroy(hdl);
932         if (rc)
933                 GOTO(log, rc);
934
935         if (!S_ISREG(mode))
936                 GOTO(log, rc = 1);
937
938         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
939         rc = -zap_cursor_retrieve(zc, za);
940         if (rc)
941                 /* Skip empty index object */
942                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
943
944         if (zc->zc_zap->zap_ismicro ||
945             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
946                 GOTO(fini, rc = 1);
947
948         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
949         keysize = le->le_name_numints * 8;
950         recsize = za->za_integer_length * za->za_num_integers;
951         if (likely(keysize && recsize))
952                 rc = osd_index_register(osd, fid, keysize, recsize);
953
954         GOTO(fini, rc);
955
956 fini:
957         zap_cursor_fini(zc);
958
959 log:
960         if (dn)
961                 osd_dnode_rele(dn);
962         if (rc < 0)
963                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
964                       osd_name(osd), PFID(fid), keysize, recsize, rc);
965         else if (!rc)
966                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
967                        osd_name(osd), PFID(fid), keysize, recsize);
968 }
969
970 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
971                               struct lustre_index_restore_unit *liru, void *buf,
972                               int bufsize)
973 {
974         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
975         struct lu_fid *tgt_fid = &liru->liru_cfid;
976         struct lu_fid bak_fid;
977         int rc;
978         ENTRY;
979
980         lustre_fid2lbx(buf, tgt_fid, bufsize);
981         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
982                          sizeof(*zde) / 8, (void *)zde);
983         if (rc)
984                 GOTO(log, rc);
985
986         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
987         if (rc)
988                 GOTO(log, rc);
989
990         /* The OI mapping for index may be invalid, since it will be
991          * re-created, not update the OI mapping, just cache it in RAM. */
992         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
993                                             liru->liru_clid);
994         if (!rc)
995                 rc = lustre_index_restore(env, &dev->od_dt_dev,
996                                 &liru->liru_pfid, tgt_fid, &bak_fid,
997                                 liru->liru_name, &dev->od_index_backup_list,
998                                 &dev->od_lock, buf, bufsize);
999         GOTO(log, rc);
1000
1001 log:
1002         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
1003                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
1004 }
1005
1006 /**
1007  * verify FID-in-LMA and OI entry for one object
1008  *
1009  * ios: Initial OI Scrub.
1010  */
1011 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
1012                             const struct lu_fid *fid, uint64_t parent,
1013                             uint64_t oid, const char *name,
1014                             enum osd_lf_flags flags)
1015 {
1016         struct lustre_scrub *scrub = &dev->od_scrub;
1017         struct scrub_file *sf = &scrub->os_file;
1018         struct lustre_mdt_attrs *lma = NULL;
1019         nvlist_t *nvbuf = NULL;
1020         struct lu_fid tfid;
1021         uint64_t oid2 = 0;
1022         __u64 flag = 0;
1023         int size = 0;
1024         int op = 0;
1025         int rc;
1026         ENTRY;
1027
1028         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
1029         if (unlikely(rc == -ENOENT || rc == -EEXIST))
1030                 RETURN(0);
1031
1032         if (rc && rc != -ENODATA) {
1033                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
1034                       "rc = %d\n", osd_name(dev), oid, rc);
1035
1036                 RETURN(rc);
1037         }
1038
1039         if (!rc) {
1040                 LASSERT(nvbuf != NULL);
1041                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1042                                                (uchar_t **)&lma, &size);
1043                 if (rc || size == 0) {
1044                         LASSERT(lma == NULL);
1045                         rc = -ENODATA;
1046                 } else {
1047                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1048                         lustre_lma_swab(lma);
1049                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
1050                                 nvlist_free(nvbuf);
1051                                 RETURN(0);
1052                         }
1053
1054                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
1055                             osd_index_need_recreate(env, dev, oid)) {
1056                                 if (parent == dev->od_root) {
1057                                         lu_local_obj_fid(&tfid,
1058                                                          OSD_FS_ROOT_OID);
1059                                 } else {
1060                                         rc = osd_get_fid_by_oid(env, dev,
1061                                                                 parent, &tfid);
1062                                         if (rc) {
1063                                                 nvlist_free(nvbuf);
1064                                                 RETURN(rc);
1065                                         }
1066                                 }
1067
1068                                 rc = lustre_liru_new(
1069                                                 &dev->od_index_restore_list,
1070                                                 &tfid, &lma->lma_self_fid, oid,
1071                                                 name, strlen(name));
1072                                 nvlist_free(nvbuf);
1073                                 RETURN(rc);
1074                         }
1075
1076                         tfid = lma->lma_self_fid;
1077                         if (!(flags & OLF_NOT_BACKUP))
1078                                 osd_ios_index_register(env, dev, &tfid, oid);
1079                 }
1080                 nvlist_free(nvbuf);
1081         }
1082
1083         if (rc == -ENODATA) {
1084                 if (!fid) {
1085                         /* Skip the object without FID-in-LMA */
1086                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1087                                osd_name(dev), oid);
1088
1089                         RETURN(0);
1090                 }
1091
1092                 LASSERT(!fid_is_zero(fid));
1093
1094                 tfid = *fid;
1095                 if (flags & OLF_IDX_IN_FID) {
1096                         LASSERT(dev->od_index >= 0);
1097
1098                         tfid.f_oid = dev->od_index;
1099                 }
1100         }
1101
1102         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1103         if (rc) {
1104                 if (rc != -ENOENT) {
1105                         CWARN("%s: initial OI scrub failed to lookup fid for "
1106                               DFID"=>%llu: rc = %d\n",
1107                               osd_name(dev), PFID(&tfid), oid, rc);
1108
1109                         RETURN(rc);
1110                 }
1111
1112                 flag = SF_RECREATED;
1113                 op = DTO_INDEX_INSERT;
1114         } else {
1115                 if (oid == oid2)
1116                         RETURN(0);
1117
1118                 flag = SF_INCONSISTENT;
1119                 op = DTO_INDEX_UPDATE;
1120         }
1121
1122         if (!(sf->sf_flags & flag)) {
1123                 scrub_file_reset(scrub, dev->od_uuid, flag);
1124                 rc = scrub_file_store(env, scrub);
1125                 if (rc)
1126                         RETURN(rc);
1127         }
1128
1129         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1130
1131         RETURN(rc > 0 ? 0 : rc);
1132 }
1133
1134 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1135                              const char *name, uint64_t parent, uint64_t oid,
1136                              enum osd_lf_flags flags, bool is_dir)
1137 {
1138         int rc;
1139         ENTRY;
1140
1141         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1142         if (!rc && is_dir)
1143                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1144                                       osd_ios_varfid_hd);
1145
1146         RETURN(rc);
1147 }
1148
1149 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1150                           const char *name, uint64_t parent, uint64_t oid,
1151                           enum osd_lf_flags flags, bool is_dir)
1152 {
1153         struct lu_fid tfid;
1154         int rc;
1155         ENTRY;
1156
1157         /* skip any non-DFID format name */
1158         if (name[0] != '[')
1159                 RETURN(0);
1160
1161         /* skip the start '[' */
1162         sscanf(&name[1], SFID, RFID(&tfid));
1163         if (fid_is_sane(&tfid))
1164                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1165         else
1166                 rc = -EIO;
1167
1168         RETURN(rc);
1169 }
1170
1171 /*
1172  * General scanner for the directories execpt /ROOT during initial OI scrub.
1173  * It scans the name entries under the given directory one by one. For each
1174  * entry, verifies its OI mapping via the given @handle_dirent.
1175  */
1176 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1177                               uint64_t parent, handle_dirent_t handle_dirent,
1178                               enum osd_lf_flags flags)
1179 {
1180         struct osd_thread_info *info = osd_oti_get(env);
1181         struct luz_direntry *zde = &info->oti_zde;
1182         zap_attribute_t *za = &info->oti_za;
1183         zap_cursor_t *zc = &info->oti_zc;
1184         int rc;
1185         ENTRY;
1186
1187         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1188         rc = -zap_cursor_retrieve(zc, za);
1189         if (rc == -ENOENT)
1190                 zap_cursor_advance(zc);
1191         else if (rc)
1192                 GOTO(log, rc);
1193
1194         while (1) {
1195                 rc = -zap_cursor_retrieve(zc, za);
1196                 if (rc)
1197                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1198
1199                 /* skip the entry started with '.' */
1200                 if (likely(za->za_name[0] != '.')) {
1201                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1202                                         za->za_integer_length,
1203                                         sizeof(*zde) / za->za_integer_length,
1204                                         (void *)zde);
1205                         if (rc) {
1206                                 CWARN("%s: initial OI scrub failed to lookup "
1207                                       "%s under %llu: rc = %d\n",
1208                                       osd_name(dev), za->za_name, parent, rc);
1209                                 continue;
1210                         }
1211
1212                         rc = handle_dirent(env, dev, za->za_name, parent,
1213                                         zde->lzd_reg.zde_dnode, flags,
1214                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1215                                         true : false);
1216                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1217                                "%llu: rc = %d\n",
1218                                osd_name(dev), za->za_name, parent, rc);
1219                 }
1220
1221                 zap_cursor_advance(zc);
1222         }
1223
1224 log:
1225         if (rc)
1226                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1227                       "rc = %d\n", osd_name(dev), parent, rc);
1228         zap_cursor_fini(zc);
1229
1230         return rc;
1231 }
1232
1233 /*
1234  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1235  * be scanned during the initial OI scrub, instead, only the .lustre and the
1236  * sub-items under .lustre will be handled.
1237  */
1238 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1239                            uint64_t parent, handle_dirent_t handle_dirent,
1240                            enum osd_lf_flags flags)
1241 {
1242         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1243         const struct osd_lf_map *map;
1244         uint64_t oid;
1245         int rc;
1246         int rc1 = 0;
1247         ENTRY;
1248
1249         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1250                             sizeof(*zde) / 8, (void *)zde);
1251         if (rc == -ENOENT) {
1252                 /* The .lustre directory is lost. That is not fatal. It can
1253                  * be re-created in the subsequent MDT start processing. */
1254                 RETURN(0);
1255         }
1256
1257         if (rc) {
1258                 CWARN("%s: initial OI scrub failed to find .lustre: "
1259                       "rc = %d\n", osd_name(dev), rc);
1260
1261                 RETURN(rc);
1262         }
1263
1264         oid = zde->lzd_reg.zde_dnode;
1265         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1266                               dot_lustre_name, 0);
1267         if (rc)
1268                 RETURN(rc);
1269
1270         for (map = osd_dl_maps; map->olm_name; map++) {
1271                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1272                                     sizeof(*zde) / 8, (void *)zde);
1273                 if (rc) {
1274                         if (rc != -ENOENT)
1275                                 CWARN("%s: initial OI scrub failed to find"
1276                                       "the entry %s under .lustre: rc = %d\n",
1277                                       osd_name(dev), map->olm_name, rc);
1278                         else if (!fid_is_zero(&map->olm_fid))
1279                                 /* Try to remove the stale OI mapping. */
1280                                 osd_scrub_refresh_mapping(env, dev,
1281                                                 &map->olm_fid, 0,
1282                                                 DTO_INDEX_DELETE, true,
1283                                                 map->olm_name);
1284                         continue;
1285                 }
1286
1287                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1288                                       zde->lzd_reg.zde_dnode, map->olm_name,
1289                                       map->olm_flags);
1290                 if (rc)
1291                         rc1 = rc;
1292         }
1293
1294         RETURN(rc1);
1295 }
1296
1297 static void osd_initial_OI_scrub(const struct lu_env *env,
1298                                  struct osd_device *dev)
1299 {
1300         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1301         const struct osd_lf_map *map;
1302         int rc;
1303         ENTRY;
1304
1305         for (map = osd_lf_maps; map->olm_name; map++) {
1306                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1307                                     sizeof(*zde) / 8, (void *)zde);
1308                 if (rc) {
1309                         if (rc != -ENOENT)
1310                                 CWARN("%s: initial OI scrub failed "
1311                                       "to find the entry %s: rc = %d\n",
1312                                       osd_name(dev), map->olm_name, rc);
1313                         else if (!fid_is_zero(&map->olm_fid))
1314                                 /* Try to remove the stale OI mapping. */
1315                                 osd_scrub_refresh_mapping(env, dev,
1316                                                 &map->olm_fid, 0,
1317                                                 DTO_INDEX_DELETE, true,
1318                                                 map->olm_name);
1319                         continue;
1320                 }
1321
1322                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1323                                       zde->lzd_reg.zde_dnode, map->olm_name,
1324                                       map->olm_flags);
1325                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1326                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1327                                          map->olm_flags, map->olm_scan_dir,
1328                                          map->olm_handle_dirent);
1329         }
1330
1331         while (!list_empty(&dev->od_ios_list)) {
1332                 struct osd_ios_item *item;
1333
1334                 item = list_entry(dev->od_ios_list.next,
1335                                   struct osd_ios_item, oii_list);
1336                 list_del_init(&item->oii_list);
1337                 item->oii_scan_dir(env, dev, item->oii_parent,
1338                                    item->oii_handle_dirent, item->oii_flags);
1339                 OBD_FREE_PTR(item);
1340         }
1341
1342         if (!list_empty(&dev->od_index_restore_list)) {
1343                 char *buf;
1344
1345                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1346                 if (!buf)
1347                         CERROR("%s: not enough RAM for rebuild index\n",
1348                                osd_name(dev));
1349
1350                 while (!list_empty(&dev->od_index_restore_list)) {
1351                         struct lustre_index_restore_unit *liru;
1352
1353                         liru = list_entry(dev->od_index_restore_list.next,
1354                                           struct lustre_index_restore_unit,
1355                                           liru_link);
1356                         list_del(&liru->liru_link);
1357                         if (buf)
1358                                 osd_index_restore(env, dev, liru, buf,
1359                                                   INDEX_BACKUP_BUFSIZE);
1360                         OBD_FREE(liru, liru->liru_len);
1361                 }
1362
1363                 if (buf)
1364                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1365         }
1366
1367         EXIT;
1368 }
1369
1370 /* OI scrub start/stop */
1371
1372 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1373                     __u32 flags)
1374 {
1375         int rc;
1376         ENTRY;
1377
1378         if (dev->od_dt_dev.dd_rdonly)
1379                 RETURN(-EROFS);
1380
1381         /* od_otable_sem: prevent concurrent start/stop */
1382         down(&dev->od_otable_sem);
1383         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1384         up(&dev->od_otable_sem);
1385
1386         RETURN(rc == -EALREADY ? 0 : rc);
1387 }
1388
1389 static void osd_scrub_stop(struct osd_device *dev)
1390 {
1391         struct lustre_scrub *scrub = &dev->od_scrub;
1392         ENTRY;
1393
1394         /* od_otable_sem: prevent concurrent start/stop */
1395         down(&dev->od_otable_sem);
1396         scrub->os_paused = 1;
1397         scrub_stop(scrub);
1398         up(&dev->od_otable_sem);
1399
1400         EXIT;
1401 }
1402
1403 /* OI scrub setup/cleanup */
1404
1405 static const char osd_scrub_name[] = "OI_scrub";
1406
1407 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev)
1408 {
1409         struct osd_thread_info *info = osd_oti_get(env);
1410         struct lustre_scrub *scrub = &dev->od_scrub;
1411         struct scrub_file *sf = &scrub->os_file;
1412         struct lu_fid *fid = &info->oti_fid;
1413         struct dt_object *obj;
1414         uint64_t oid;
1415         int rc = 0;
1416         bool dirty = false;
1417         ENTRY;
1418
1419         memcpy(dev->od_uuid,
1420                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1421                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1422         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1423         init_waitqueue_head(&scrub->os_thread.t_ctl_waitq);
1424         init_rwsem(&scrub->os_rwsem);
1425         spin_lock_init(&scrub->os_lock);
1426         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1427         scrub->os_name = osd_name(dev);
1428
1429         /* 'What the @fid is' is not imporatant, because the object
1430          * has no OI mapping, and only is visible inside the OSD.*/
1431         fid->f_seq = FID_SEQ_IGIF_MAX;
1432         if (dev->od_is_ost)
1433                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1434         else
1435                 fid->f_oid = dev->od_index + 1;
1436         fid->f_ver = 0;
1437         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1438                                     osd_scrub_name, &oid, fid, false);
1439         if (rc)
1440                 RETURN(rc);
1441
1442         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1443         if (rc)
1444                 RETURN(rc);
1445
1446         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1447         if (IS_ERR_OR_NULL(obj))
1448                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1449
1450         scrub->os_obj = obj;
1451         rc = scrub_file_load(env, scrub);
1452         if (rc == -ENOENT || rc == -EFAULT) {
1453                 scrub_file_init(scrub, dev->od_uuid);
1454                 dirty = true;
1455         } else if (rc < 0) {
1456                 GOTO(cleanup_obj, rc);
1457         } else {
1458                 if (memcmp(sf->sf_uuid, dev->od_uuid, 16) != 0) {
1459                         struct obd_uuid *old_uuid;
1460                         struct obd_uuid *new_uuid;
1461
1462                         OBD_ALLOC_PTR(old_uuid);
1463                         OBD_ALLOC_PTR(new_uuid);
1464                         if (!old_uuid || !new_uuid) {
1465                                 CERROR("%s: UUID has been changed, but"
1466                                        "failed to allocate RAM for report\n",
1467                                        osd_name(dev));
1468                         } else {
1469                                 class_uuid_unparse(sf->sf_uuid, old_uuid);
1470                                 class_uuid_unparse(dev->od_uuid, new_uuid);
1471                                 CDEBUG(D_LFSCK, "%s: UUID has been changed "
1472                                        "from %s to %s\n", osd_name(dev),
1473                                        old_uuid->uuid, new_uuid->uuid);
1474                         }
1475                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1476                         dirty = true;
1477                         if (old_uuid)
1478                                 OBD_FREE_PTR(old_uuid);
1479                         if (new_uuid)
1480                                 OBD_FREE_PTR(new_uuid);
1481                 } else if (sf->sf_status == SS_SCANNING) {
1482                         sf->sf_status = SS_CRASHED;
1483                         dirty = true;
1484                 }
1485
1486                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1487                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1488                                       osd_name(dev), sf->sf_oi_count,
1489                                       osd_oi_count);
1490                         sf->sf_oi_count = osd_oi_count;
1491                         dirty = true;
1492                 }
1493         }
1494
1495         if (sf->sf_pos_last_checkpoint != 0)
1496                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1497         else
1498                 scrub->os_pos_current = 1;
1499
1500         if (dirty) {
1501                 rc = scrub_file_store(env, scrub);
1502                 if (rc)
1503                         GOTO(cleanup_obj, rc);
1504         }
1505
1506         /* Initialize OI files. */
1507         rc = osd_oi_init(env, dev);
1508         if (rc < 0)
1509                 GOTO(cleanup_obj, rc);
1510
1511         if (!dev->od_dt_dev.dd_rdonly)
1512                 osd_initial_OI_scrub(env, dev);
1513
1514         if (!dev->od_dt_dev.dd_rdonly &&
1515             dev->od_auto_scrub_interval != AS_NEVER &&
1516             ((sf->sf_status == SS_PAUSED) ||
1517              (sf->sf_status == SS_CRASHED &&
1518               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1519                               SF_UPGRADE | SF_AUTO)) ||
1520              (sf->sf_status == SS_INIT &&
1521               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1522                               SF_UPGRADE))))
1523                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1524
1525         if (rc)
1526                 GOTO(cleanup_oi, rc);
1527
1528         RETURN(0);
1529
1530 cleanup_oi:
1531         osd_oi_fini(env, dev);
1532 cleanup_obj:
1533         dt_object_put_nocache(env, scrub->os_obj);
1534         scrub->os_obj = NULL;
1535
1536         return rc;
1537 }
1538
1539 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1540 {
1541         struct lustre_scrub *scrub = &dev->od_scrub;
1542
1543         LASSERT(!dev->od_otable_it);
1544
1545         if (scrub->os_obj) {
1546                 osd_scrub_stop(dev);
1547                 dt_object_put_nocache(env, scrub->os_obj);
1548                 scrub->os_obj = NULL;
1549         }
1550
1551         if (dev->od_oi_table)
1552                 osd_oi_fini(env, dev);
1553 }
1554
1555 /* object table based iteration APIs */
1556
1557 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1558                                        struct dt_object *dt, __u32 attr)
1559 {
1560         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1561         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1562         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1563         struct lustre_scrub *scrub = &dev->od_scrub;
1564         struct osd_otable_it *it;
1565         __u32 start = 0;
1566         int rc;
1567         ENTRY;
1568
1569         if (dev->od_dt_dev.dd_rdonly)
1570                 RETURN(ERR_PTR(-EROFS));
1571
1572         /* od_otable_sem: prevent concurrent init/fini */
1573         down(&dev->od_otable_sem);
1574         if (dev->od_otable_it)
1575                 GOTO(out, it = ERR_PTR(-EALREADY));
1576
1577         OBD_ALLOC_PTR(it);
1578         if (!it)
1579                 GOTO(out, it = ERR_PTR(-ENOMEM));
1580
1581         if (flags & DOIF_OUTUSED)
1582                 it->ooi_used_outside = 1;
1583
1584         if (flags & DOIF_RESET)
1585                 start |= SS_RESET;
1586
1587         if (valid & DOIV_ERROR_HANDLE) {
1588                 if (flags & DOIF_FAILOUT)
1589                         start |= SS_SET_FAILOUT;
1590                 else
1591                         start |= SS_CLEAR_FAILOUT;
1592         }
1593
1594         if (valid & DOIV_DRYRUN) {
1595                 if (flags & DOIF_DRYRUN)
1596                         start |= SS_SET_DRYRUN;
1597                 else
1598                         start |= SS_CLEAR_DRYRUN;
1599         }
1600
1601         /* XXX: dmu_object_next() does NOT find dnodes allocated
1602          *      in the current non-committed txg, so we force txg
1603          *      commit to find all existing dnodes ... */
1604         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1605
1606         dev->od_otable_it = it;
1607         it->ooi_dev = dev;
1608         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1609         if (rc == -EALREADY) {
1610                 it->ooi_pos = 1;
1611         } else if (rc < 0) {
1612                 dev->od_otable_it = NULL;
1613                 OBD_FREE_PTR(it);
1614                 it = ERR_PTR(rc);
1615         } else {
1616                 it->ooi_pos = scrub->os_pos_current;
1617         }
1618
1619         GOTO(out, it);
1620
1621 out:
1622         up(&dev->od_otable_sem);
1623         return (struct dt_it *)it;
1624 }
1625
1626 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1627 {
1628         struct osd_otable_it *it = (struct osd_otable_it *)di;
1629         struct osd_device *dev = it->ooi_dev;
1630
1631         /* od_otable_sem: prevent concurrent init/fini */
1632         down(&dev->od_otable_sem);
1633         scrub_stop(&dev->od_scrub);
1634         LASSERT(dev->od_otable_it == it);
1635
1636         dev->od_otable_it = NULL;
1637         up(&dev->od_otable_sem);
1638         OBD_FREE_PTR(it);
1639 }
1640
1641 static int osd_otable_it_get(const struct lu_env *env,
1642                              struct dt_it *di, const struct dt_key *key)
1643 {
1644         return 0;
1645 }
1646
1647 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1648 {
1649 }
1650
1651 static void osd_otable_it_preload(const struct lu_env *env,
1652                                   struct osd_otable_it *it)
1653 {
1654         struct osd_device *dev = it->ooi_dev;
1655         int rc;
1656
1657         /* can go negative on the very first access to the iterator
1658          * or if some non-Lustre objects were found */
1659         if (unlikely(it->ooi_prefetched < 0))
1660                 it->ooi_prefetched = 0;
1661
1662         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1663                 return;
1664
1665         if (it->ooi_prefetched_dnode == 0)
1666                 it->ooi_prefetched_dnode = it->ooi_pos;
1667
1668         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1669                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1670                                       B_FALSE, 0);
1671                 if (rc)
1672                         break;
1673
1674                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1675                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1676                 it->ooi_prefetched++;
1677         }
1678 }
1679
1680 static inline int
1681 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1682 {
1683         spin_lock(&scrub->os_lock);
1684         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1685             !thread_is_running(&scrub->os_thread))
1686                 it->ooi_waiting = 0;
1687         else
1688                 it->ooi_waiting = 1;
1689         spin_unlock(&scrub->os_lock);
1690
1691         return !it->ooi_waiting;
1692 }
1693
1694 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1695 {
1696         struct osd_otable_it *it = (struct osd_otable_it *)di;
1697         struct osd_device *dev = it->ooi_dev;
1698         struct lustre_scrub *scrub = &dev->od_scrub;
1699         struct ptlrpc_thread *thread = &scrub->os_thread;
1700         struct l_wait_info lwi = { 0 };
1701         struct lustre_mdt_attrs *lma = NULL;
1702         nvlist_t *nvbuf = NULL;
1703         int size = 0;
1704         int rc;
1705         ENTRY;
1706
1707         LASSERT(it->ooi_user_ready);
1708         fid_zero(&it->ooi_fid);
1709
1710         if (unlikely(it->ooi_all_cached))
1711                 RETURN(1);
1712
1713 again:
1714         if (nvbuf) {
1715                 nvlist_free(nvbuf);
1716                 nvbuf = NULL;
1717                 lma = NULL;
1718                 size = 0;
1719         }
1720
1721         if (it->ooi_pos >= scrub->os_pos_current)
1722                 l_wait_event(thread->t_ctl_waitq,
1723                              osd_otable_it_wakeup(scrub, it),
1724                              &lwi);
1725
1726         if (!thread_is_running(thread) && !it->ooi_used_outside)
1727                 GOTO(out, rc = 1);
1728
1729         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1730         if (rc) {
1731                 if (unlikely(rc == -ESRCH)) {
1732                         it->ooi_all_cached = 1;
1733                         rc = 1;
1734                 }
1735
1736                 GOTO(out, rc);
1737         }
1738
1739         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1740
1741         if (!scrub->os_full_speed)
1742                 spin_lock(&scrub->os_lock);
1743         it->ooi_prefetched--;
1744         if (!scrub->os_full_speed) {
1745                 if (scrub->os_waiting) {
1746                         scrub->os_waiting = 0;
1747                         wake_up_all(&thread->t_ctl_waitq);
1748                 }
1749                 spin_unlock(&scrub->os_lock);
1750         }
1751
1752         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1753                 goto again;
1754
1755         if (rc)
1756                 GOTO(out, rc);
1757
1758         LASSERT(nvbuf != NULL);
1759         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1760                                        (uchar_t **)&lma, &size);
1761         if (rc || size == 0)
1762                 /* It is either non-Lustre object or OSD internal object,
1763                  * ignore it, go ahead */
1764                 goto again;
1765
1766         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1767         lustre_lma_swab(lma);
1768         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1769                      lma->lma_incompat & LMAI_AGENT))
1770                 goto again;
1771
1772         it->ooi_fid = lma->lma_self_fid;
1773
1774         GOTO(out, rc = 0);
1775
1776 out:
1777         if (nvbuf)
1778                 nvlist_free(nvbuf);
1779
1780         if (!rc && scrub->os_full_speed)
1781                 osd_otable_it_preload(env, it);
1782
1783         return rc;
1784 }
1785
1786 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1787                                         const struct dt_it *di)
1788 {
1789         return NULL;
1790 }
1791
1792 static int osd_otable_it_key_size(const struct lu_env *env,
1793                                   const struct dt_it *di)
1794 {
1795         return sizeof(__u64);
1796 }
1797
1798 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1799                              struct dt_rec *rec, __u32 attr)
1800 {
1801         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1802         struct lu_fid *fid = (struct lu_fid *)rec;
1803
1804         *fid = it->ooi_fid;
1805         return 0;
1806 }
1807
1808 static __u64 osd_otable_it_store(const struct lu_env *env,
1809                                  const struct dt_it *di)
1810 {
1811         struct osd_otable_it *it = (struct osd_otable_it *)di;
1812
1813         return it->ooi_pos;
1814 }
1815
1816 /**
1817  * Set the OSD layer iteration start position as the specified hash.
1818  */
1819 static int osd_otable_it_load(const struct lu_env *env,
1820                               const struct dt_it *di, __u64 hash)
1821 {
1822         struct osd_otable_it *it = (struct osd_otable_it *)di;
1823         struct osd_device *dev = it->ooi_dev;
1824         struct lustre_scrub *scrub = &dev->od_scrub;
1825         int rc;
1826         ENTRY;
1827
1828         /* Forbid to set iteration position after iteration started. */
1829         if (it->ooi_user_ready)
1830                 RETURN(-EPERM);
1831
1832         if (hash > OSD_OTABLE_MAX_HASH)
1833                 hash = OSD_OTABLE_MAX_HASH;
1834
1835         /* The hash is the last checkpoint position,
1836          * we will start from the next one. */
1837         it->ooi_pos = hash + 1;
1838         it->ooi_prefetched = 0;
1839         it->ooi_prefetched_dnode = 0;
1840         it->ooi_user_ready = 1;
1841         if (!scrub->os_full_speed)
1842                 wake_up_all(&scrub->os_thread.t_ctl_waitq);
1843
1844         /* Unplug OSD layer iteration by the first next() call. */
1845         rc = osd_otable_it_next(env, (struct dt_it *)it);
1846
1847         RETURN(rc);
1848 }
1849
1850 static int osd_otable_it_key_rec(const struct lu_env *env,
1851                                  const struct dt_it *di, void *key_rec)
1852 {
1853         return 0;
1854 }
1855
1856 const struct dt_index_operations osd_otable_ops = {
1857         .dio_it = {
1858                 .init     = osd_otable_it_init,
1859                 .fini     = osd_otable_it_fini,
1860                 .get      = osd_otable_it_get,
1861                 .put      = osd_otable_it_put,
1862                 .next     = osd_otable_it_next,
1863                 .key      = osd_otable_it_key,
1864                 .key_size = osd_otable_it_key_size,
1865                 .rec      = osd_otable_it_rec,
1866                 .store    = osd_otable_it_store,
1867                 .load     = osd_otable_it_load,
1868                 .key_rec  = osd_otable_it_key_rec,
1869         }
1870 };
1871
1872 /* high priority inconsistent items list APIs */
1873
1874 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1875                    const struct lu_fid *fid, uint64_t oid, bool insert)
1876 {
1877         struct lustre_scrub *scrub = &dev->od_scrub;
1878         struct ptlrpc_thread *thread = &scrub->os_thread;
1879         struct osd_inconsistent_item *oii;
1880         bool wakeup = false;
1881         ENTRY;
1882
1883         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1884         OBD_ALLOC_PTR(oii);
1885         if (unlikely(!oii))
1886                 RETURN(-ENOMEM);
1887
1888         INIT_LIST_HEAD(&oii->oii_list);
1889         oii->oii_cache.oic_dev = dev;
1890         oii->oii_cache.oic_fid = *fid;
1891         oii->oii_cache.oic_dnode = oid;
1892         oii->oii_insert = insert;
1893
1894         spin_lock(&scrub->os_lock);
1895         if (unlikely(!thread_is_running(thread))) {
1896                 spin_unlock(&scrub->os_lock);
1897                 OBD_FREE_PTR(oii);
1898                 RETURN(-EAGAIN);
1899         }
1900
1901         if (list_empty(&scrub->os_inconsistent_items))
1902                 wakeup = true;
1903         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1904         spin_unlock(&scrub->os_lock);
1905
1906         if (wakeup)
1907                 wake_up_all(&thread->t_ctl_waitq);
1908
1909         RETURN(0);
1910 }
1911
1912 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1913                    uint64_t *oid)
1914 {
1915         struct lustre_scrub *scrub = &dev->od_scrub;
1916         struct osd_inconsistent_item *oii;
1917         int ret = -ENOENT;
1918         ENTRY;
1919
1920         spin_lock(&scrub->os_lock);
1921         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1922                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1923                         *oid = oii->oii_cache.oic_dnode;
1924                         ret = 0;
1925                         break;
1926                 }
1927         }
1928         spin_unlock(&scrub->os_lock);
1929
1930         RETURN(ret);
1931 }